# 01 — Ingest raw interactions → processed parquet

This notebook:
- Loads the raw interactions CSV
- Performs minimal cleaning / type standardization
- Writes `data/processed/interactions.parquet`

> The pipeline equivalent is: `python -m src.pipelines.01_make_processed`


In [1]:
from pathlib import Path
import pandas as pd
import numpy as np

# In notebooks, assume we are running from repo_root/notebooks
PROJECT_ROOT = Path.cwd().parent

RAW_PATH = PROJECT_ROOT / "data" / "raw" / "customer_interactions_fact_2_years.csv"
OUT_PATH = PROJECT_ROOT / "data" / "processed" / "interactions.parquet"
OUT_PATH.parent.mkdir(parents=True, exist_ok=True)

RAW_PATH

PosixPath('/Users/ahmedelgantiry/Documents/churn 1/data/raw/customer_interactions_fact_2_years.csv')

In [2]:
# Load raw CSV (large)
df_raw = pd.read_csv(
    RAW_PATH,
    parse_dates=["event_time"],
    low_memory=False
)

df_raw.shape, df_raw.head()

((4765402, 7),
                        external_customerkey          event_time  \
 0  5sXTJJW621Y4f3IjB+GIlDfAd7O2RWqEjW5HJA== 2025-06-14 07:58:35   
 1  5sXTJJW621Y4f3IjB+GIlDfAd7O2RWqEjW5HJA== 2025-06-14 07:58:35   
 2  5sXTJJW621Y4f3IjB+GIlDfAd7O2RWqEjW5HJA== 2025-06-14 07:58:35   
 3  5sXTJJW621Y4f3IjB+GIlDfAd7O2RWqEjW5HJA== 2025-06-14 07:58:35   
 4  fNOp7nbns5kCFT8pM8tlcL3y1v3ih7u6Gjm5aQ== 2025-06-14 08:08:37   
 
                     interaction_type incoming_outgoing  \
 0  emarsys_sessions_content_category          incoming   
 1  emarsys_sessions_content_category          incoming   
 2       emarsys_sessions_content_url          incoming   
 3       emarsys_sessions_content_url          incoming   
 4  emarsys_sessions_content_category          incoming   
 
                              channel  amount          shop  
 0  emarsys_sessions_content_pageview     0.0   meinfoto.de  
 1  emarsys_sessions_content_pageview     0.0   meinfoto.de  
 2  emarsys_sessions_content_page

In [3]:
# Minimal cleaning
df = df_raw.copy()

# Ensure datetime
df["event_time"] = pd.to_datetime(df["event_time"], errors="coerce")

# Required columns check
required = {"external_customerkey", "event_time", "interaction_type"}
missing_cols = required - set(df.columns)
assert not missing_cols, f"Missing columns: {missing_cols}"

# Drop rows missing critical values
df = df.dropna(subset=["external_customerkey", "event_time", "interaction_type"]).copy()

# Standardize key string cols
df["interaction_type"] = df["interaction_type"].astype("string").str.strip()
df["external_customerkey"] = df["external_customerkey"].astype("string").str.strip()

for c in ["channel", "shop", "incoming_outgoing"]:
    if c in df.columns:
        df[c] = df[c].astype("string").str.strip()

df.shape

(4765402, 7)

In [4]:
print("rows:", len(df))
print("customers:", df["external_customerkey"].nunique())
print("event types:", df["interaction_type"].nunique())
print("time range:", df["event_time"].min(), "→", df["event_time"].max())

df["interaction_type"].value_counts().head(15)

rows: 4765402
customers: 74067
event types: 18
time range: 2025-01-18 00:01:46 → 2026-01-18 03:01:46


interaction_type
emarsys_open                         2669813
emarsys_sessions_content_url          733879
emarsys_sessions_content_category     732270
emarsys_cancel                        199787
emarsys_sessions_content_tag          146504
emarsys_click                          77053
emarsys_sessions_view                  65785
emarsys_sessions_purchase              58115
order                                  33052
emarsys_sessions_cart_update           15802
emarsys_soft_bounce                    10904
emarsys_unsub                           8219
emarsys_block_bounce                    5134
emarsys_sessions_category_view          4300
emarsys_webchannel_show                 2016
Name: count, dtype: Int64

In [5]:
# Missingness overview
(df.isna().mean().sort_values(ascending=False).head(20))

amount                  0.624106
shop                    0.022556
channel                 0.003633
external_customerkey    0.000000
event_time              0.000000
interaction_type        0.000000
incoming_outgoing       0.000000
dtype: float64

In [6]:
# Write processed parquet
df.to_parquet(OUT_PATH, index=False)
print("Wrote:", OUT_PATH, "rows:", len(df))

Wrote: /Users/ahmedelgantiry/Documents/churn 1/data/processed/interactions.parquet rows: 4765402


In [7]:
# Quick read-back check
df_check = pd.read_parquet(OUT_PATH)
df_check.shape, df_check.head()

((4765402, 7),
                        external_customerkey          event_time  \
 0  5sXTJJW621Y4f3IjB+GIlDfAd7O2RWqEjW5HJA== 2025-06-14 07:58:35   
 1  5sXTJJW621Y4f3IjB+GIlDfAd7O2RWqEjW5HJA== 2025-06-14 07:58:35   
 2  5sXTJJW621Y4f3IjB+GIlDfAd7O2RWqEjW5HJA== 2025-06-14 07:58:35   
 3  5sXTJJW621Y4f3IjB+GIlDfAd7O2RWqEjW5HJA== 2025-06-14 07:58:35   
 4  fNOp7nbns5kCFT8pM8tlcL3y1v3ih7u6Gjm5aQ== 2025-06-14 08:08:37   
 
                     interaction_type incoming_outgoing  \
 0  emarsys_sessions_content_category          incoming   
 1  emarsys_sessions_content_category          incoming   
 2       emarsys_sessions_content_url          incoming   
 3       emarsys_sessions_content_url          incoming   
 4  emarsys_sessions_content_category          incoming   
 
                              channel  amount          shop  
 0  emarsys_sessions_content_pageview     0.0   meinfoto.de  
 1  emarsys_sessions_content_pageview     0.0   meinfoto.de  
 2  emarsys_sessions_content_page