In [1]:
import polars as pl
from pathlib import Path

In [10]:

DATA_DIR = Path("data")

RETAILER_PATH = DATA_DIR / "retailer.csv"
TV_PATH = DATA_DIR / "tv_publisher.csv"
PROG_PATH = DATA_DIR / "programmatic_publisher.csv"
MAP_PATH = DATA_DIR / "mapping_transac_publisher_tv.csv"

OUT_PATH = DATA_DIR / "customer_events.parquet"

In [11]:
# =========================
# 1. Mapping (résolution d'identité)
# =========================
mapping = (
    pl.scan_csv(MAP_PATH)
    .select(["customer_id", "device_id", "dsp_id"])
    .unique()
)


In [12]:

# =========================
# 2. Retailer events
# =========================
retailer_events = (
    pl.scan_csv(RETAILER_PATH)
    .with_columns(
        pl.col("timestamp_utc").str.to_datetime(strict=False),
        pl.lit("RETAIL").alias("event_type"),
        pl.lit(None).cast(pl.Utf8).alias("lever"),
        pl.lit(None).cast(pl.Utf8).alias("campaign_name"),
        pl.lit(None).cast(pl.Utf8).alias("device_type"),
        pl.lit(None).cast(pl.Float64).alias("cost_milli_cent"),
        pl.lit(None).cast(pl.Utf8).alias("source_id"),
    )
    .select([
        "customer_id",
        "timestamp_utc",
        "event_type",
        "lever",
        "campaign_name",
        "device_type",
        "product_name",
        "brand",
        "sales",
        "quantity",
        "cost_milli_cent",
        "source_id",
        "event_name",
    ])
)

In [13]:


# =========================
# 3. TV exposures
# =========================
tv_events = (
    pl.scan_csv(TV_PATH)
    .with_columns(
        pl.col("timestamp_utc").str.to_datetime(strict=False),
        pl.lit("EXPOSURE").alias("event_type"),
        pl.lit("TV").alias("lever"),
        pl.lit(None).cast(pl.Utf8).alias("campaign_name"),
        pl.lit("TV").alias("device_type"),
        pl.col("cost_milli_cent").cast(pl.Float64),
    )
    .join(
        mapping.select(["customer_id", "device_id"]),
        on="device_id",
        how="inner",
    )
    .with_columns(
        pl.col("device_id").alias("source_id"),
        pl.lit(None).cast(pl.Utf8).alias("product_name"),
        pl.lit(None).cast(pl.Utf8).alias("brand"),
        pl.lit(None).cast(pl.Float64).alias("sales"),
        pl.lit(None).cast(pl.Int64).alias("quantity"),
        pl.lit(None).cast(pl.Utf8).alias("event_name"),
    )
    .select(retailer_events.columns)
)


  .select(retailer_events.columns)


In [14]:

# =========================
# 4. Programmatic exposures
# =========================
prog_events = (
    pl.scan_csv(PROG_PATH)
    .with_columns(
        pl.col("timestamp_utc").str.to_datetime(strict=False),
        pl.lit("EXPOSURE").alias("event_type"),
        pl.lit("PROGRAMMATIC").alias("lever"),
        pl.col("campaign_name").cast(pl.Utf8),
        pl.col("device_type").cast(pl.Utf8),
        pl.col("cost_milli_cent").cast(pl.Float64),
    )
    .join(
        mapping.select(["customer_id", "dsp_id"]),
        on="dsp_id",
        how="inner",
    )
    .with_columns(
        pl.col("dsp_id").alias("source_id"),
        pl.lit(None).cast(pl.Utf8).alias("product_name"),
        pl.lit(None).cast(pl.Utf8).alias("brand"),
        pl.lit(None).cast(pl.Float64).alias("sales"),
        pl.lit(None).cast(pl.Int64).alias("quantity"),
        pl.lit(None).cast(pl.Utf8).alias("event_name"),
    )
    .select(retailer_events.columns)
)


  .select(retailer_events.columns)


In [15]:

# =========================
# 5. UNION ALL → customer_events
# =========================
customer_events = (
    pl.concat(
        [retailer_events, tv_events, prog_events],
        how="vertical",
    )
    .filter(
        pl.col("customer_id").is_not_null()
        & pl.col("timestamp_utc").is_not_null()
    )
    .sort(["customer_id", "timestamp_utc"])
)


In [16]:
DATA_DIR = Path("data")
DATA_DIR.mkdir(parents=True, exist_ok=True)  # <-- IMPORTANT

OUT_PATH = DATA_DIR / "customer_events.parquet"
print("Writing to:", OUT_PATH.resolve())      # <-- DEBUG utile

customer_events.sink_parquet(OUT_PATH)

Writing to: C:\Users\adamc\Business_Data_Case\Business_Data_Case\data\customer_events.parquet


InvalidOperationError: 'union'/'concat' inputs should all have the same schema,got
Schema:
name: customer_id, field: String
name: timestamp_utc, field: Datetime('μs')
name: event_type, field: String
name: lever, field: String
name: campaign_name, field: String
name: device_type, field: String
name: product_name, field: String
name: brand, field: String
name: sales, field: Float64
name: quantity, field: Float64
name: cost_milli_cent, field: Float64
name: source_id, field: String
name: event_name, field: String
 and 
Schema:
name: customer_id, field: String
name: timestamp_utc, field: Datetime('μs')
name: event_type, field: String
name: lever, field: String
name: campaign_name, field: String
name: device_type, field: String
name: product_name, field: String
name: brand, field: String
name: sales, field: Float64
name: quantity, field: Int64
name: cost_milli_cent, field: Float64
name: source_id, field: String
name: event_name, field: String


Resolved plan until failure:

	---> FAILED HERE RESOLVING 'sink' <---
SELECT [col("customer_id"), col("timestamp_utc"), col("event_type"), col("lever"), col("campaign_name"), col("device_type"), col("product_name"), col("brand"), col("sales"), col("quantity"), col("cost_milli_cent"), col("source_id"), col("event_name")]
   WITH_COLUMNS:
   [col("dsp_id").alias("source_id"), null.cast(String).alias("product_name"), null.cast(String).alias("brand"), null.cast(Float64).alias("sales"), null.cast(Int64).alias("quantity"), null.cast(String).alias("event_name")] 
    INNER JOIN:
    LEFT PLAN ON: [col("dsp_id")]
       WITH_COLUMNS:
       [col("timestamp_utc").str.strptime(["raise"]), "EXPOSURE".alias("event_type"), "PROGRAMMATIC".alias("lever"), col("campaign_name"), col("device_type"), col("cost_milli_cent")] 
        Csv SCAN [data\programmatic_publisher.csv]
        PROJECT */5 COLUMNS
        ESTIMATED ROWS: 16946774
    RIGHT PLAN ON: [col("dsp_id")]
      SELECT [col("customer_id"), col("dsp_id")]
        UNIQUE[maintain_order: false, keep_strategy: Any] BY None
          SELECT [col("customer_id"), col("device_id"), col("dsp_id")]
            Csv SCAN [data\mapping_transac_publisher_tv.csv]
            PROJECT */3 COLUMNS
            ESTIMATED ROWS: 7931402
    END INNER JOIN

In [22]:
mapping_rows = pl.scan_csv("data/mapping_transac_publisher_tv.csv").select(pl.len()).collect()
tv_rows = pl.scan_csv("data/tv_publisher.csv").select(pl.len()).collect()
prog_rows = pl.scan_csv("data/programmatic_publisher.csv").select(pl.len()).collect()

print("mapping:", mapping_rows.item())
print("tv:", tv_rows.item())
print("prog:", prog_rows.item())


mapping: 7984411
tv: 5827133
prog: 17493428


In [None]:
tv_join_rows = (
    pl.scan_csv("data/tv_publisher.csv")
    .select(["device_id"])
    .join(
        pl.scan_csv("data/mapping_transac_publisher_tv.csv").select(["customer_id","device_id"]).unique(),
        on="device_id",
        how="inner"
    )
    .select(pl.len())
    .collect()
)
print("tv join rows:", tv_join_rows.item())
