# MarketTech: The Truth Engine

Engineering data products for growth strategy.

This notebook matches `markettech_workshop.py`.

In [None]:
import datetime as dt
import duckdb
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

np.random.seed(2026)


## Phase 1: Ingestion

In [None]:
from dataclasses import dataclass
from typing import Tuple

@dataclass(frozen=True)
class Channel:
    id: str
    name: str
    cac: float

def generate_stream(days: int = 60, start_date: dt.date = dt.date(2025, 9, 1)) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    channels = [
        Channel("CH_ORG", "Organic", 0.00),
        Channel("CH_SOC", "Paid Social", 12.50),
        Channel("CH_EML", "Email", 0.50),
    ]
    df_chan = pd.DataFrame([c.__dict__ for c in channels])

    day_idx = np.arange(days)
    dates = np.array([start_date + dt.timedelta(days=int(i)) for i in day_idx])
    is_weekend = np.array([d.weekday() >= 5 for d in dates], dtype=bool)
    base = np.where(is_weekend, 1500, 800)
    vol = np.clip(np.random.normal(loc=base, scale=100).astype(int), 300, None)
    total_sessions = int(vol.sum())

    session_id = np.random.randint(100000, 999999, size=total_sessions).astype(str)
    day_for_session = np.repeat(np.arange(days), vol)
    session_date = dates[day_for_session]

    chan_ids = np.random.choice(["CH_ORG", "CH_SOC", "CH_EML"], p=[0.40, 0.40, 0.20], size=total_sessions)

    hour = np.random.randint(0, 24, size=total_sessions)
    minute = np.random.randint(0, 60, size=total_sessions)
    second = np.random.randint(0, 60, size=total_sessions)

    session_ts = np.array([dt.datetime.combine(d, dt.time(int(h), int(m), int(s)))
                           for d, h, m, s in zip(session_date, hour, minute, second)])

    engagement = np.where(
        chan_ids == "CH_SOC",
        np.clip(np.random.normal(loc=30, scale=20, size=total_sessions).astype(int), 1, None),
        np.clip(np.random.normal(loc=120, scale=60, size=total_sessions).astype(int), 5, None),
    )

    sessions = pd.DataFrame({
        "event_type": "session_start",
        "session_id": session_id,
        "channel_id": chan_ids,
        "ts": pd.to_datetime(session_ts),
        "engagement_sec": engagement,
    })

    base_prob = np.where(engagement > 60, 0.05, 0.005)
    is_convert = np.random.random(size=total_sessions) < base_prob

    conv_sessions = sessions.loc[is_convert, ["session_id", "ts"]].copy()
    lag_days = np.random.randint(0, 11, size=len(conv_sessions))
    lag_minutes = np.random.randint(5, 121, size=len(conv_sessions))
    conv_ts = conv_sessions["ts"].to_numpy() + pd.to_timedelta(lag_days, unit="D") + pd.to_timedelta(lag_minutes, unit="m")
    revenue = np.round(np.random.uniform(50, 200, size=len(conv_sessions)), 2)

    conversions = pd.DataFrame({
        "event_type": "purchase",
        "session_id": conv_sessions["session_id"].to_numpy(),
        "ts": pd.to_datetime(conv_ts),
        "revenue": revenue,
    })

    return sessions, conversions, df_chan

df_sess, df_conv, df_chan = generate_stream(days=60)
print(f"Stream Online: {len(df_sess):,} sessions | {len(df_conv):,} purchases")
df_sess.head(3)


## Phase 2: Storage (DuckDB)

In [None]:
con = duckdb.connect(database=":memory:")
con.register("raw_sessions", df_sess)
con.register("raw_conversions", df_conv)
con.register("dim_channels", df_chan)

con.execute("SELECT COUNT(*) AS sessions FROM raw_sessions").df()


## Phase 3: Metric contract (Semantic Layer)

In [None]:
def build_semantic_view(con: duckdb.DuckDBPyConnection, window_days: int = 7) -> None:
    window_days = int(window_days)
    con.execute(f"""
    CREATE OR REPLACE VIEW f_attribution AS
    SELECT
        s.session_id,
        c.name AS channel_name,
        c.cac AS cost_per_acquisition,
        CAST(s.ts AS TIMESTAMP) AS session_ts,
        conv.revenue,
        CAST(conv.ts AS TIMESTAMP) AS conversion_ts,
        DATE_DIFF('day', CAST(s.ts AS TIMESTAMP), CAST(conv.ts AS TIMESTAMP)) AS days_to_convert,
        CASE
            WHEN conv.session_id IS NOT NULL
             AND DATE_DIFF('day', CAST(s.ts AS TIMESTAMP), CAST(conv.ts AS TIMESTAMP)) <= {window_days}
            THEN 1 ELSE 0
        END AS is_attributed
    FROM raw_sessions s
    LEFT JOIN raw_conversions conv
        ON s.session_id = conv.session_id
    JOIN dim_channels c
        ON s.channel_id = c.id
    """)

build_semantic_view(con, window_days=7)
con.execute("SELECT * FROM f_attribution WHERE revenue IS NOT NULL LIMIT 5").df()


## Phase 4: Truth vs Fiction

In [None]:
def channel_summary(con: duckdb.DuckDBPyConnection) -> pd.DataFrame:
    sql = """
    SELECT
        channel_name,
        COUNT(*) AS total_traffic,
        COUNT(revenue) AS naive_conversions,
        SUM(is_attributed) AS trusted_conversions,
        COUNT(revenue) - SUM(is_attributed) AS out_of_window
    FROM f_attribution
    GROUP BY 1
    ORDER BY trusted_conversions DESC
    """
    return con.execute(sql).df()

channel_summary(con)


## Phase 5: Data Quality Gates

In [None]:
def run_quality_checks(con: duckdb.DuckDBPyConnection) -> pd.DataFrame:
    checks = [
        ("Negative revenue", "SELECT COUNT(*) AS n FROM raw_conversions WHERE revenue < 0"),
        ("Orphaned conversions", """
            SELECT COUNT(*) AS n
            FROM raw_conversions c
            LEFT JOIN raw_sessions s ON c.session_id = s.session_id
            WHERE s.session_id IS NULL
        """),
        ("Future session timestamps", "SELECT COUNT(*) AS n FROM raw_sessions WHERE CAST(ts AS TIMESTAMP) > NOW()"),
    ]

    rows = []
    for name, sql in checks:
        n = int(con.execute(sql).fetchone()[0])
        rows.append({"check": name, "errors": n, "status": "PASS" if n == 0 else "FAIL"})
    return pd.DataFrame(rows)

run_quality_checks(con)


## Phase 6: Visualization

In [None]:
build_semantic_view(con, window_days=7)

trend_sql = """
SELECT
    DATE_TRUNC('week', session_ts) AS week,
    channel_name,
    SUM(is_attributed) AS sales
FROM f_attribution
GROUP BY 1, 2
ORDER BY 1, 2
"""
df_viz = con.execute(trend_sql).df()

pivot = df_viz.pivot(index="week", columns="channel_name", values="sales").fillna(0)
pivot.plot(kind="line", figsize=(10, 5), title="Weekly Attributed Sales by Channel")
plt.ylabel("Trusted conversions (count)")
plt.xlabel("Week")
plt.show()
