In [None]:
import os, re, warnings, joblib
from pathlib import Path
from math import log1p

from dotenv import load_dotenv
import polars as pl
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest

import numpy as np

import sqlalchemy as sa
import tldextract, rapidjson as rj
warnings.filterwarnings(
    "ignore",
    message="The 'registered_domain' property is deprecated",
    category=DeprecationWarning,
    module=tldextract.__name__,
)

load_dotenv()
engine = sa.create_engine(os.environ["WAREHOUSE_COOLIFY_URL"], pool_pre_ping=True)

In [None]:
import time
import polars as pl

DAYS_BACK = 28
# seconds‑since‑epoch threshold in Python (no DB date math)
cutoff = int(time.time() - DAYS_BACK * 86_400)

query = f"""
SELECT
    user_id::int,
    /* convert only the rows that survive the WHERE filter */
    CASE
        WHEN time > 32503680000*1000 THEN time/1000000               -- μs → s
        WHEN time > 32503680000      THEN time/1000                  -- ms → s
        ELSE                           time                          -- s
    END::double precision                AS ts_sec,

    is_write, line_additions, line_deletions,
    branch, category, dependencies,
    editor, language, machine, operating_system,
    entity, project, type, user_agent,
    lineno, lines, cursorpos, project_root_count,
    source_type, ysws_program, ip_address
FROM hackatime.heartbeats
/* pre‑filter in native units so the index on `time` is usable */
WHERE (
        (time <= 32503680000                  AND time >= {cutoff})           -- seconds
     OR (time >  32503680000  AND time <= 32503680000*1000
                                         AND time >= {cutoff*1000})          -- milliseconds
     OR (time >  32503680000*1000          AND time >= {cutoff*1000000})      -- microseconds
)
ORDER BY user_id, time;
"""

with engine.begin() as conn:
    heartbeats = pl.read_database(query, connection=conn, infer_schema_length=None)

# already ordered correctly, but keep for safety
heartbeats = heartbeats.sort(["user_id", "ts_sec"])

In [None]:
ext = tldextract.TLDExtract(cache_dir=False)

def dep_len(j: str) -> int:
    try: return len(rj.loads(j)) if j else 0
    except Exception: return 0

def ua_dom(ua: str) -> str:
    if not ua: return ""
    m = re.search(r"https?://([^ /]+)", ua)
    host = m.group(1) if m else ua.split()[-1]
    td = ext(host)
    return td.top_domain_under_public_suffix or host

hb = (
    heartbeats
    # numeric safe‑casts
    .with_columns(
        [pl.col(c).fill_null(0).cast(pl.Int32) for c in [
            "line_additions","line_deletions","lineno","lines",
            "cursorpos","project_root_count","source_type"
        ]]
    )
    # json / UA parsing
    .with_columns([
        pl.col("dependencies").map_elements(dep_len, return_dtype=pl.Int32).alias("dep_count"),
        pl.col("user_agent").map_elements(ua_dom, return_dtype=pl.String).alias("ua_domain"),
    ])
    # log‑scaled big counts
    .with_columns([
        pl.col("lines").map_elements(log1p, return_dtype=pl.Float32).alias("log_lines"),
        pl.col("cursorpos").map_elements(log1p, return_dtype=pl.Float32).alias("log_cursor"),
    ])
    # delta‑t
    .with_columns(
        (pl.col("ts_sec") - pl.col("ts_sec").shift(1))
        .over("user_id")
        .alias("delta_t")
    )
    # 🆕 Add per-heartbeat zero change flag
    .with_columns(
        ((pl.col("line_additions") == 0) & (pl.col("line_deletions") == 0))
        .cast(pl.Int8)
        .alias("hb_zero_change_flag") # Renamed to avoid confusion later
    )
    # Fill NaN delta_t for the first heartbeat of each user
    .with_columns(
        pl.col("delta_t").fill_null(0) # Or another sensible default like -1 or mean? 0 seems okay here.
    )
)

hb.head()

In [None]:
BUCKET = 3600

# ⬇️ ADD THIS STEP BACK IN ⬇️
# Add the bucket_id column to the hb DataFrame first
hb_with_bucket = hb.with_columns(
    (pl.col("ts_sec") // BUCKET).alias("bucket_id")
)
# ⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️

# Aggregation step (now using hb_with_bucket)
base = (
    # ⬇️ Use the DataFrame that now has 'bucket_id' ⬇️
    hb_with_bucket.group_by(["user_id", "bucket_id"])
    # ⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️
      .agg([
          # ── timing / volume ──────────────────────────────────────────
          pl.len().alias("hb_count"),
          pl.col("delta_t").mean().alias("mean_dt"),
          pl.col("delta_t").std().fill_null(0).alias("std_dt"),
          pl.col("delta_t").n_unique().alias("unique_dt"),
          # Count occurrences of the most frequent delta_t
          pl.col("delta_t").value_counts(sort=True) # Get counts per dt value
              .struct.field("count").first() # Take the count of the most frequent one
              .fill_null(0).alias("dominant_dt_count"), # Handle cases with 0/1 HB

          # ── basic sums (no derived maths yet) ───────────────────────
          pl.col("is_write").sum().alias("write_hb_count"), # Renamed for clarity
          pl.col("line_additions").sum().alias("tot_add"),
          pl.col("line_deletions").sum().alias("tot_del"),
          pl.col("dep_count").sum().alias("tot_dep"),
          # Sum of per-heartbeat zero change flags
          pl.col("hb_zero_change_flag").sum().alias("sum_hb_zero_change"),

          # ── diversity metrics ───────────────────────────────────────
          pl.col("entity").n_unique().alias("file_diversity"),
          pl.col("language").n_unique().alias("lang_diversity"),
          pl.col("editor").n_unique().alias("editor_switches"),
          pl.col("machine").n_unique().alias("machine_switches"),
          pl.col("operating_system").n_unique().alias("os_switches"),
          pl.col("ip_address").n_unique().alias("ip_switches"),
          pl.col("branch").n_unique().alias("branch_switches"),
          pl.col("ua_domain").n_unique().alias("ua_diversity"),
          pl.col("source_type").n_unique().alias("src_switches"),

          # simple sums for later ratios
          pl.col("project_root_count").sum().alias("sum_root_count"),
      ])
)

# Derived features step (uses 'base' which is derived from hb_with_bucket)
features = (
    base.with_columns([
        # coefficient of variation for delta_t
        (pl.col("std_dt") / pl.col("mean_dt").replace(0, 1e-9)) # Avoid division by zero
            .fill_nan(0).fill_null(0).alias("dt_cv"),

        # Ratio indicating perfect uniformity in delta_t
        (pl.col("unique_dt") == 1).cast(pl.Int8).alias("uniform_dt_flag"), # Renamed flag

        # Average lines added/deleted per heartbeat
        ((pl.col("tot_add") + pl.col("tot_del")) / pl.col("hb_count"))
            .fill_nan(0).alias("lines_per_hb"),

        # Flag indicating only one file touched in the window
        (pl.col("file_diversity") == 1).cast(pl.Int8).alias("same_file_flag"), # Renamed flag

        # Percentage of heartbeats that were write events
        (pl.col("write_hb_count") / pl.col("hb_count"))
            .fill_nan(0).alias("pct_write"),

        # Corrected: Ratio of heartbeats with zero line changes
        (pl.col("sum_hb_zero_change") / pl.col("hb_count"))
            .fill_nan(0).alias("zero_change_ratio"),

        # Average project root count per heartbeat
        (pl.col("sum_root_count") / pl.col("hb_count"))
            .fill_nan(0).alias("avg_root_count"),

        # Dominant delta_t frequency ratio
        (pl.col("dominant_dt_count") / pl.col("hb_count"))
             .fill_nan(0).fill_null(0).alias("dominant_dt_freq_ratio")

    ])
    # Drop intermediate sum columns and potentially redundant/less informative ones
    .drop([
        "write_hb_count", "tot_add", "tot_del", "sum_root_count",
        "sum_hb_zero_change", "dominant_dt_count"
    ])
    .sort(["user_id", "bucket_id"])
)

# Optional: Display head/tail to verify new features
# print(features.head())
# print(features.filter(pl.col("user_id") == 1613).select(["user_id", "bucket_id", "hb_count", "mean_dt", "std_dt", "dt_cv", "uniform_dt_flag", "dominant_dt_freq_ratio", "zero_change_ratio", "lines_per_hb", "same_file_flag"]))

In [None]:
# Ensure NUMERIC_COLS includes the new/renamed features and excludes dropped ones
NUMERIC_COLS = [
    c for c in features.columns if c not in (
        "user_id", "bucket_id"
    )
]
print(f"Using {len(NUMERIC_COLS)} numeric features:")
print(NUMERIC_COLS)

X = features.select(NUMERIC_COLS).to_numpy()

# Verify shape
print(f"Feature matrix X shape: {X.shape}")

In [None]:
GOOD_USERS = [
    1, # max
    2, # zrl
    104, # acon
    69, # malted
    864, # thomas
    664, # lux
    10, # annabel

    # hack clubbers that look ok
    1256, 
    1309,
    1460,
    1561,
    40,
    48,
    1729,
    1591
]

BAD_USERS = [
    1613,
    1728,
    18
]

TRUSTED = (
    features.filter(pl.col("user_id").is_in(GOOD_USERS))
            .select(NUMERIC_COLS)
            .to_numpy()
)

print(f"Training on {TRUSTED.shape[0]} windows from {len(GOOD_USERS)} trusted users.")

model = Pipeline([
    # 🆕 Use default StandardScaler (centers data)
    ("scale", StandardScaler()),
    ("iso", IsolationForest(
        n_estimators=800, contamination=0.01, # contamination on trusted set
        bootstrap=True, random_state=42
    ))
]).fit(TRUSTED)

print("Model training complete.")

In [None]:
# bottom k‑percent of trusted windows define "too weird"
PCTL = 5
# Calculate threshold based on the scores of the TRUSTED data
scores_trusted = model.decision_function(TRUSTED)
threshold = np.percentile(scores_trusted, PCTL)
print(f"cut‑off at {PCTL}th percentile of trusted scores → {threshold:.4f}")

# Score all data (X)
scores = model.decision_function(X)
features = (
    features.hstack([pl.Series("anomaly_score", scores)])
            .with_columns((pl.col("anomaly_score") < threshold).alias("is_anomaly"))
)
# summary
user_stats = (
    features.group_by("user_id")
            .agg([
                pl.len().alias("windows"),
                pl.col("is_anomaly").mean().alias("anomaly_rate"),
                pl.col("anomaly_score").mean().alias("avg_score"),
                # Add min/max score for more insight?
                pl.col("anomaly_score").min().alias("min_score"),
                pl.col("anomaly_score").max().alias("max_score"),
            ])
    .sort("user_id") # Sort for consistency
)
print(user_stats)

In [None]:
user_stats.filter(pl.col("user_id").is_in(GOOD_USERS))

In [None]:
user_stats.filter(pl.col("user_id").is_in(BAD_USERS))

In [None]:
user_stats.filter(pl.col("anomaly_rate") > 0.4, pl.col("windows") > 3, ~pl.col('user_id').is_in(BAD_USERS + GOOD_USERS))