# Offline Feature Generation (DuckDB + Parquet)

Goal: build an **offline feature store** pipeline end-to-end: generate raw events, compute features, and materialize them as Parquet partitions.

This notebook is designed as an industrial baseline: deterministic data generation, schema validation, reproducible feature computation, and artifact outputs.

In [None]:
import os
import json
import time
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone

import numpy as np
import pandas as pd
import duckdb

SEED = 1337
rng = np.random.default_rng(SEED)

OUT_DIR = os.path.abspath(os.path.join('..', 'registry', 'offline'))
os.makedirs(OUT_DIR, exist_ok=True)
OUT_RAW = os.path.join(OUT_DIR, 'raw_events.parquet')
OUT_FEAT = os.path.join(OUT_DIR, 'features_daily.parquet')

print('OUT_DIR=', OUT_DIR)

## 1) Generate raw events

In [None]:
n_users = 2000
days = 60
events_per_user = rng.poisson(lam=25, size=n_users)

start = datetime.now(timezone.utc) - timedelta(days=days)

rows = []
for user_id in range(1, n_users + 1):
    n = int(events_per_user[user_id - 1])
    if n == 0:
        continue

    base_value = rng.lognormal(mean=2.0, sigma=0.6)
    for _ in range(n):
        ts = start + timedelta(seconds=int(rng.integers(0, days * 24 * 3600)))
        event_type = rng.choice(['login', 'purchase', 'view', 'support'], p=[0.25, 0.08, 0.62, 0.05])
        amount = 0.0
        if event_type == 'purchase':
            amount = float(base_value * rng.lognormal(mean=0.0, sigma=0.8))
        rows.append((user_id, ts.isoformat(), event_type, amount))

df = pd.DataFrame(rows, columns=['user_id', 'event_ts', 'event_type', 'amount'])
df['event_ts'] = pd.to_datetime(df['event_ts'], utc=True)
df['event_date'] = df['event_ts'].dt.date
df = df.sort_values(['user_id', 'event_ts']).reset_index(drop=True)

df.head(), df.shape

In [None]:
df.to_parquet(OUT_RAW, index=False)
print('wrote', OUT_RAW, os.path.getsize(OUT_RAW), 'bytes')

## 2) Feature computation with DuckDB
We compute daily features per user: counts by event type, rolling 7-day purchase amount, and activity recency.

In [None]:
con = duckdb.connect(database=':memory:')
con.execute('PRAGMA threads=4')
con.execute(f"CREATE VIEW raw AS SELECT * FROM read_parquet('{OUT_RAW.replace('\\','\\\\')}')")

query = r'''
WITH daily AS (
  SELECT
    user_id,
    CAST(event_date AS DATE) AS event_date,
    COUNT(*) AS events_total,
    SUM(CASE WHEN event_type='login' THEN 1 ELSE 0 END) AS c_login,
    SUM(CASE WHEN event_type='view' THEN 1 ELSE 0 END) AS c_view,
    SUM(CASE WHEN event_type='support' THEN 1 ELSE 0 END) AS c_support,
    SUM(CASE WHEN event_type='purchase' THEN 1 ELSE 0 END) AS c_purchase,
    SUM(CASE WHEN event_type='purchase' THEN amount ELSE 0 END) AS purchase_amount
  FROM raw
  GROUP BY user_id, CAST(event_date AS DATE)
),
roll AS (
  SELECT
    *,
    SUM(purchase_amount) OVER (PARTITION BY user_id ORDER BY event_date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW)
      AS purchase_amount_7d
  FROM daily
)
SELECT
  user_id,
  event_date,
  events_total, c_login, c_view, c_support, c_purchase,
  purchase_amount, purchase_amount_7d,
  DATE_DIFF('day', event_date, (SELECT MAX(event_date) FROM daily)) AS days_since_last_activity
FROM roll
ORDER BY user_id, event_date
'''

feat = con.execute(query).df()
feat.head(), feat.shape

In [None]:
feat.to_parquet(OUT_FEAT, index=False)
print('wrote', OUT_FEAT, os.path.getsize(OUT_FEAT), 'bytes')

## 3) Sanity checks

In [None]:
assert feat['user_id'].min() >= 1
assert feat['events_total'].ge(0).all()
assert feat['purchase_amount_7d'].ge(0).all()

feat.describe(include='all').T.head(12)