In [None]:
#!/usr/bin/env python3
import os
import re
import argparse
import logging
from datetime import datetime, timedelta, timezone
import urllib.request
from typing import Optional, List, Tuple, Dict, Any

import pandas as pd
from user_agents import parse as parse_ua
from jinja2 import Template
import geoip2.database
import plotly.express as px
import plotly.graph_objects as go
import pycountry

# Constants
GEO_DB_URL = "https://github.com/P3TERX/GeoLite.mmdb/releases/latest/download/GeoLite2-Country.mmdb"
LOCAL_GEO_DB = "GeoLite2-Country.mmdb"
LOG_RE = re.compile(
    r'(?P<ip>\S+) - - \[(?P<ts>.*?)\] '
    r'"(?P<method>\w+) (?P<url>\S+) HTTP/\d\.\d" '
    r'(?P<status>\d+) \d+ "(?P<ref>.*?)" "(?P<ua>.*?)"'
)

In [37]:
HTML_TEMPLATE = """<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="utf-8" />
  <meta name="viewport" content="width=device-width, initial-scale=1" />
  <title>B&amp;B Website Dashboard</title>
  <link href="https://fonts.googleapis.com/css2?family=Inter:wght@400;700&display=swap" rel="stylesheet" />
  <style>
    body {
      font-family: 'Inter', sans-serif;
      max-width: 900px;
      margin: auto;
      padding: 1rem;
      background: #f8f9fa;
      color: #333;
    }
    h1, h2 {
      font-weight: 700;
      color: #2c3e50;
      margin-bottom: 0.5rem;
    }
    .summary {
      display: flex;
      justify-content: space-around;
      flex-wrap: wrap;
      gap: 1.5rem;
      margin-bottom: 2rem;
    }
    .summary .card {
      flex: 1 1 200px;
      background: #fff;
      color: #2c3e50;
      padding: 1.2rem 1.5rem;
      border-radius: 12px;
      box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
      text-align: center;
      min-height: 120px;
    }
    .summary .card .value {
      font-size: 2rem;
      font-weight: 900;
      margin-top: 0.2rem;
    }
    img {
      max-width: 100%;
      border-radius: 8px;
      box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
      margin-bottom: 1.5rem;
    }
    table {
      width: 100%;
      border-radius: 8px;
      overflow: hidden;
      box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
      margin-bottom: 1.5rem;
      background: #fff;
    }
    th, td {
      padding: 0.75rem 1rem;
      text-align: left;
      border-bottom: 1px solid #e9ecef;
    }
    thead th {
      background: #2c3e50;
      color: white;
      font-weight: 700;
      font-size: 1rem;
    }
    tbody tr:nth-child(even) td {
      background: #f8f9fa;
    }
    tbody tr:hover td {
      background: #e9ecef;
    }
    a {
      color: #3498db;
      text-decoration: none;
    }
    a:hover {
      text-decoration: underline;
    }
    p.small {
      font-size: 0.85rem;
      color: #7f8c8d;
      margin-top: 2rem;
      text-align: center;
    }
    @media (max-width: 576px) {
      .summary {
        flex-direction: column;
      }
      .summary .card {
        flex: unset;
      }
    }
  </style>
</head>
<body>
  <h1 class="mb-4">Website Dashboard <small class="text-muted">({{ generated }})</small></h1>

  <div class="summary">
    <div class="card">
      <div>Total Sessions</div>
      <div class="value">{{ total_visits }}</div>
    </div>
    <div class="card">
      <div>Unique Visitors</div>
      <div class="value">{{ unique_ips }}</div>
    </div>
    <div class="card">
      <div>Avg. Session Duration</div>
      <div class="value">{{ "%.1f"|format(avg_len) }} min</div>
    </div>
  </div>

  <h2>Sessions by Day of Week</h2>
  <img src="{{ base_url }}/sessions_dow.png" alt="Sessions per weekday" loading="lazy" />

  <h2>Top 5 Landing Pages</h2>
  <img src="{{ base_url }}/top5_pages.png" alt="Top landing pages" loading="lazy" />

  <h2>Avg. Session Duration by Day</h2>
  <img src="{{ base_url }}/avg_len_dow.png" alt="Avg session length per weekday" loading="lazy" />

  <h2>Sessions by Hour of Day</h2>
  <img src="{{ base_url }}/sessions_by_hour.png" alt="Sessions by hour" loading="lazy" />

  <h2>Top 5 External Referrers</h2>
  <table class="table">
    <thead><tr><th>Referrer URL</th><th>Sessions</th></tr></thead>
    <tbody>
    {% for ref,sessions in top5_ref %}
      <tr><td><a href="{{ ref }}" target="_blank" rel="noopener">{{ ref }}</a></td><td>{{ sessions }}</td></tr>
    {% endfor %}
    </tbody>
  </table>

  <h2>Top 5 Countries</h2>
  <img src="{{ base_url }}/top5_countries.png" alt="Top countries" loading="lazy" />

  <p class="small">
    * GeoIP via MaxMind GeoLite2 (free).<br />
    * Bots filtered out; your domain "{{ domain }}" excluded from referrers.<br />
    * All metrics are session-based (30-minute timeout).
  </p>
</body>
</html>"""


In [38]:
def download_geodb(path: str):
    if not os.path.isfile(path):
        print("Downloading GeoLite2 DB to %s ...", path)
        try:
            urllib.request.urlretrieve(GEO_DB_URL, path)
            print("GeoLite2 DB downloaded successfully.")
        except Exception as e:
            print("Failed to download GeoLite2 DB: %s", e)
            raise


def is_bot_or_suspicious(ua: str, ref: str, domain: str, url: str, method: str):
    ua = ua.lower()
    bot_indicators = [
        "bot", "crawler", "spider", "crawl", "slurp", "search",
        "archive", "transcoder", "monitor", "fetch", "loader",
        "python-requests", "httpclient", "java", "wget", "curl",
        "lighthouse", "axios", "scrapy", "httpx", "phantomjs",
        "headless", "libwww", "mechanize", "apachebench"
    ]
    if any(indicator in ua for indicator in bot_indicators):
        return True

    suspicious_patterns = ['/wp-admin/', '/admin/', '/login/', '/phpmyadmin/']
    suspicious_methods = ['POST', 'PUT', 'DELETE']
    if any(pattern in url for pattern in suspicious_patterns) or method in suspicious_methods:
        return True

    if ref == '-' or domain in ref:
        return True

    return False


def parse_log_line(line: str, domain: str):
    m = LOG_RE.match(line)
    if not m:
        return None
    d = m.groupdict()
    try:
        dt = datetime.strptime(d['ts'], '%d/%b/%Y:%H:%M:%S %z')
    except ValueError:
        return None

    return {
        'ip': d['ip'],
        'dt': dt,
        'url': d['url'],
        'status': int(d['status']),
        'ref': d['ref'],
        'ua': d['ua'],
        'method': d['method']
    }


def load_and_clean(logpath: str, domain: str, start_date: datetime):
    rows = []
    try:
        with open(logpath, errors='ignore') as f:
            for line in f:
                rec = parse_log_line(line, domain)
                if rec and rec['dt'] >= start_date:
                    rows.append(rec)
    except Exception as e:
        print("Failed to read log file: %s", e)
        raise

    df = pd.DataFrame(rows)
    df.sort_values('dt', inplace=True)
    df['url'] = df['url'].str.replace(r'index\.html$', '', regex=True)
    return df


def identify_sessions(df: pd.DataFrame):
    df['prev'] = df.groupby('ip')['dt'].shift()
    df['gap_m'] = (df['dt'] - df['prev']).dt.total_seconds().div(60).fillna(31)
    df['new_sess'] = df['gap_m'] > 30
    df['sess_id'] = df.groupby('ip')['new_sess'].cumsum().astype(
        str).radd(df['ip'] + '_')


    sess_summary = df.groupby('sess_id').agg({
        'dt': ['min', 'max'],
        'ip': 'first',
        'url': 'first',
        'ref': 'first',
        'ua': 'first',
        'method': 'first'
    }).reset_index()


    sess_summary.columns = ['sess_id', 'start', 'end',
                            'ip', 'landing_page', 'referrer', 'ua', 'method']
    sess_summary['duration'] = (
        sess_summary['end'] - sess_summary['start']).dt.total_seconds().div(60)

    return sess_summary, df


def filter_sessions(sess_summary: pd.DataFrame, domain: str):
    filtered_sessions = sess_summary[~sess_summary.apply(
        lambda row: is_bot_or_suspicious(
            row['ua'], row['referrer'], domain, row['landing_page'], row['method']),
        axis=1
    )]
    return filtered_sessions


def ensure_dirs(base: str = 'output', period: str = 'w'):
    now = datetime.now()
    if period == 'w':
        fld = f"w-{now:%Y-%m-%d}"
    elif period == 'm':
        fld = f"m-{now:%Y-%m}"
    elif period == 'y':
        fld = f"y-{now:%Y}"
    else:
        fld = now.strftime("%Y-%m-%d")

    img_dir = os.path.join(base, fld, 'images')
    os.makedirs(img_dir, exist_ok=True)
    return base, fld, img_dir


def save_plotly(fig: go.Figure, out_dir: str, fname: str):
    fig.update_layout(
        template='plotly_white',
        margin=dict(t=40, b=20, l=30, r=20),
        font=dict(family="Inter, sans-serif", size=14)
    )
    try:
        fig.write_image(os.path.join(out_dir, fname))
    except Exception as e:
        print("Failed to save Plotly figure: %s", e)
        raise


def generate_visualizations(sess: pd.DataFrame, df: pd.DataFrame, img_dir: str, domain: str):
    total_visits = len(sess)
    unique_ips = sess['ip'].nunique()
    avg_len = sess['duration'].mean()


    dow = sess['start'].dt.day_name().value_counts().reindex(
        ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']).fillna(0)
    fig = px.bar(
        x=dow.index, y=dow.values,
        labels={'x': 'Day', 'y': 'Sessions'},
        title="Sessions per Weekday",
        color=dow.values,
        color_continuous_scale=px.colors.sequential.Blues
    )
    save_plotly(fig, img_dir, "sessions_dow.png")


    landing_pages = df['url'].copy()
    landing_pages = landing_pages[
        landing_pages.str.endswith('.html') |
        landing_pages.str.endswith('/') |
        (landing_pages == '')
    ]
    landing_pages = landing_pages.str.replace(r'index\.html$', '', regex=True)
    top5_pages = landing_pages.value_counts().iloc[:5]

    fig = px.bar(
        x=top5_pages.values, y=top5_pages.index,
        orientation='h',
        labels={'x': 'Sessions', 'y': 'Landing Page'},
        title="Top 5 Landing Pages",
        color=top5_pages.values,
        color_continuous_scale=px.colors.sequential.Blues
    )
    save_plotly(fig, img_dir, "top5_pages.png")


    avg_by_dow = sess.groupby(sess['start'].dt.day_name())['duration'].mean().reindex(
        ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']).fillna(0)
    fig = px.bar(
        x=avg_by_dow.index, y=avg_by_dow.values,
        labels={'x': 'Day', 'y': 'Avg Duration (min)'},
        title="Avg Session Length by Weekday",
        color=avg_by_dow.values,
        color_continuous_scale=px.colors.sequential.Blues
    )
    save_plotly(fig, img_dir, "avg_len_dow.png")


    hrs = sess['start'].dt.hour.value_counts().sort_index()
    fig = px.bar(
        x=hrs.index, y=hrs.values,
        labels={'x': 'Hour of Day', 'y': 'Sessions'},
        title="Sessions by Hour of Day",
        color=hrs.values,
        color_continuous_scale=px.colors.sequential.Blues
    )
    save_plotly(fig, img_dir, "sessions_by_hour.png")


    ext_referrers = sess.loc[(sess['referrer'] != '-') &
                             (~sess['referrer'].str.contains(domain, na=False)), 'referrer']
    ext_ref_counts = ext_referrers.value_counts().iloc[:5]
    top5_ref = list(ext_ref_counts.items())


    reader = geoip2.database.Reader(LOCAL_GEO_DB)

    def lookup_country(ip: str):
        try:
            return reader.country(ip).country.iso_code or 'Unknown'
        except geoip2.errors.AddressNotFoundError:
            return 'Unknown'

    sess['country'] = sess['ip'].apply(lookup_country)
    reader.close()

    cc = sess['country'].value_counts()
    top5c = cc.iloc[:5]
    fig = px.bar(
        x=top5c.index, y=top5c.values,
        labels={'x': 'Country', 'y': 'Sessions'},
        title="Top 5 Visitor Countries",
        color=top5c.values,
        color_continuous_scale=px.colors.sequential.Blues
    )
    save_plotly(fig, img_dir, "top5_countries.png")

    return {
        'total_visits': total_visits,
        'unique_ips': unique_ips,
        'avg_len': avg_len,
        'top5_ref': top5_ref
    }


def generate_html(template_data: Dict[str, Any], base_url: str, domain: str, output_path: str):
    try:
        with open(output_path, "w", encoding="utf-8") as f:
            tpl = Template(HTML_TEMPLATE)
            f.write(tpl.render(
                generated=datetime.now().strftime("%Y-%m-%d %H:%M"),
                base_url=base_url,
                domain=domain,
                **template_data
            ))
    except Exception as e:
        print("Failed to generate HTML: %s", e)
        raise

In [None]:
import statistics
import re
from datetime import datetime, timezone, timedelta
from collections import Counter

log_pattern = re.compile(
    r'(?P<ip>\d+\.\d+\.\d+\.\d+)\s+-\s+-\s+'
    r'\[(?P<timestamp>[^\]]+)\]\s+'
    r'"(?P<method>[A-Z]+)\s(?P<url>\S+)\s(?P<protocol>[^"]+)"\s+'
    r'(?P<status>\d{3})\s+'
    r'(?P<size>\d+|-)\s+'
    r'"(?P<referrer>[^"]*)"\s+'
    r'"(?P<user_agent>[^"]+)"'
)
referrer_regex = re.compile(r"^(-|.*loucantou\.yvelin\.net.*)?$")

now = datetime.now(timezone.utc)
start_date = now - timedelta(days=7)
sessions = {}

index = 0

with open("logs/loucantou-access.log", errors='ignore') as f:
    for raw_line in f:
        index += 1
        match = log_pattern.match(raw_line)
        if not match:
            print("NO MATCH", raw_line)
            continue

        data = match.groupdict()
        data['url'] = data['url'].split('?', 1)[0]
        data['referrer'] = data['referrer'].split('?', 1)[0]

        if '/logs/' in data['url'] or '/logs/' in data['referrer']:
            continue

        try:
            dt = datetime.strptime(data['timestamp'], '%d/%b/%Y:%H:%M:%S %z')
            data['timestamp'] = dt
        except ValueError:
            print("BAD TIMESTAMP", data['timestamp'])
            continue

        if dt < start_date:
            continue

        ip = data['ip']
        # if index == 23498:
        #     print(data)

        if ip not in sessions:
            sessions[ip] = [[data]]
        else:
            last_session = sessions[ip][-1]
            last_dt = last_session[-1]['timestamp']
            if (dt - last_dt).total_seconds() > 1800:
                sessions[ip].append([data])
            else:
                last_session.append(data)

user_sessions = []
for ip, session_list in sessions.items():
    for session in session_list:
        if all(
            all(
                referrer_regex.match(ref)
                for ref in (line.get('referrer') for line in session)
            )
            for session in session_list
        ):
            continue
        user_sessions.append((ip, session))

print(len(user_sessions), "sessions found")

avg_session_duration = statistics.mean(
    (session[-1]['timestamp'] - session[0]['timestamp']).total_seconds()
    for _, session in user_sessions
)
print("Avg session duration:", *(
    f"{int(m) + int(s / 60)} minutes" for m, s in [divmod(avg_session_duration, 60)]
))


all_pages = [line['url'].replace('index.html', '')
                for _, session in user_sessions for line in session if line['url'].endswith('.html') or line['url'].endswith('/')]
all_pages = [url.replace('index.html', '') for url in all_pages]
print(Counter(all_pages))


16 sessions found
Avg session duration: 10 minutes
Counter({'/': 62, '/r%C3%A9server.html': 18, '/en/': 8, '/assets/': 6, '/assets/olivier/': 1, '/mentions-l%C3%A9gales.html': 1})


In [None]:

def main() -> None:
    # parser = argparse.ArgumentParser(
    #     description="Analyze website traffic (session-based)")
    # parser.add_argument('--logpath', required=True, help="Path to access.log")
    # parser.add_argument('--domain', required=True,
    #                     help="Your own domain to exclude")
    # parser.add_argument(
    #     '--period', choices=['w', 'm', 'y'], default='w', help="w=weekly, m=monthly, y=yearly")
    # args = parser.parse_args()

    # Calculate start date based on the period
    now = datetime.now(timezone.utc)
    start_date = now - timedelta(days=7)
    # if args.period == 'w':
    #     start_date = now - timedelta(days=7)
    # elif args.period == 'm':
    #     start_date = now - timedelta(days=30)
    # elif args.period == 'y':
    #     start_date = now - timedelta(days=365)
    # else:
    #     start_date = now - timedelta(days=7)

    download_geodb(LOCAL_GEO_DB)

    # base, folder, img_dir = ensure_dirs('output', args.period)
    base, folder, img_dir = ensure_dirs('output', "w")
    raw_base = "https://raw.githubusercontent.com/L-Yvelin/loucantou/refs/heads/main/output"
    base_url = f"{raw_base}/{folder}/images"

    # df = load_and_clean(args.logpath, args.domain, start_date)
    df = load_and_clean("logs/loucantou-access.log",
                        "loucantou.yvelin.net", start_date)
    sess, df_enriched = identify_sessions(df)
    # sess = filter_sessions(sess, args.domain)
    sess = filter_sessions(sess, "loucantou.yvelin.net")

    # template_data = generate_visualizations(
    #     sess, df, img_dir, args.domain)
    template_data = generate_visualizations(
        sess, df, img_dir, "loucantou.yvelin.net")

    html_out = os.path.join(base, folder, "dashboard.html")
    # generate_html(template_data, base_url, args.domain, html_out)
    generate_html(template_data, base_url, "loucantou.yvelin.net", html_out)

main()