Ashray Jaya Mani

%md
## Barstool Sports — Data Engineering Challenge  

### Objective  
Build data models from the souce files, and surface actionable content‑consumption insights.

*Data volume*: 3 raw files 

*Tooling*:(Databricks Community), Delta Lake, Python, Pyspark, Pandas, SQL
*Run time on DBR 14.3 small cluster*: < 5 minutes end‑to‑end  


### source_files - 
- Content Catalog (CSV): Contains published content information with titles, types, URLs, and related metadata
- Talent/Brand Lookup (HTML table): Serves as a dimension table to map IDs to human-readable names
- User Events (JSONL): Contains user interaction data with geo-location and content consumption information

## Raw Data 

*Sources*  
| File | Format | Key columns | Size | Notes |
|------|--------|-------------|------|-------|
| `BARSTOOL_PROD_CONTENT_PROD_CONTENTS_3.csv` | CSV | `content_id`, metadata | 1706 rows | Tags/talent/franchise come as JSON‑like strings |
| `brands_talent_franchise.html` | HTML | `_ID`, `TYPE`  | 164 rows | contains talent and franchise details, no brand details |
| `event_logs.jsonl` | JSONL | `ANONID`, `LOG` (nested) | 10 K rows | Contains OS, app version, events, timestamps |




![]('C:\Users\gowda\Downloads\vv.png')


#### Python Imports required

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import (
    col, from_json, explode, explode_outer, split, expr,
    regexp_extract, to_date, hour, dayofweek, month, year,
    count, sum as sum_, avg, desc, struct, collect_list, lit
)
from bs4 import BeautifulSoup
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.functions import col, explode_outer, struct, collect_list, array_distinct
from pyspark.sql.functions import to_date, hour, dayofweek, month, year

# 1
**Content Catalog (CSV)**: Contains published content information with titles, types, URLs, and related metadata

**fields** - 
- content_title - Title of the content
- content_type - Type of the content(article, gallery, podcast-episode, viral-original, viral-ugc, vod )
- share_url - Link to the content
- tags - tags attached to the content
- talent - List of Talent_ids (creators)
- franchise - Franchise_id
- published_at - Published


####Notes - 
**CONTENT_TYPE** 
- (articles, gallery have talent_ids)
- (viral-original, viral-ugc has no talent_ids)
- (podcast-episode will have multiple talent_ids in list and they have franchise_ids)
- (vod will have either talent_ids or franchise_ids)

Enforcing Schema before Loading into content_df

In [0]:
content_schema = (
    StructType()
        .add("content_title", StringType())
        .add("content_type", StringType())
        .add("share_url", StringType())
        .add("tags", StringType())  # Will parse to array later
        .add("talent", StringType())  # Will parse to array later
        .add("franchise", IntegerType())  # Changed to string for better null handling
        .add("published_at", TimestampType())
)

# Load content with proper handling of multiline CSV
contents_raw = (spark.read.format("csv")
        .schema(content_schema)
        .option("header", "true")
        .option("multiLine", "true")
        .option("quote", '"')
        .option("escape", '"')
        .load("/FileStore/tables/BARSTOOL_PROD_CONTENT_PROD_CONTENTS_3.csv")
)

contents_raw.show(10)

+--------------------+---------------+--------------------+--------------------+--------------------+---------+-------------------+
|       content_title|   content_type|           share_url|                tags|              talent|franchise|       published_at|
+--------------------+---------------+--------------------+--------------------+--------------------+---------+-------------------+
|Disc Golfer Brodi...|podcast-episode|https://s.barstoo...|                  []|[\n  "23435882",\...|       11|2021-03-12 01:15:14|
|The 13th Round: F...|            vod|https://s.barstoo...|[\n  "13th",\n  "...|  [\n  "22342033"\n]| 76029778|2021-05-13 16:10:01|
|Dumping Them Out:...|        article|https://s.barstoo...|[\n  "smokeshow",...|   [\n  "7416138"\n]|     null|2023-01-02 02:00:46|
|I Have To Take My...|        article|https://s.barstoo...|[\n  "driving",\n...|  [\n  "40092034"\n]|     null|2023-01-23 18:50:00|
|Dumping Them Out:...|        article|https://s.barstoo...|[\n  "hot",\n  "g

# 2
### Loading talent/brand/franchise lookup data (brands_talent_franchise.html) and creating talent_dim & franchise_dim models
brands_talent_franchise.html Serves as a dimension table to map IDs to human-readable names

Fields - 
- _ID
- _NAME
- SHORT_NAME
- TYPE


Creating seperate dimension table **talent_dim, franchise_dim** from the main table

In [0]:
%pip install lxml

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
# Load talent/franchise lookup data
html_path = "dbfs:/FileStore/tables/brands_talent_franchise.html"
html_content = dbutils.fs.head(html_path)
soup = BeautifulSoup(html_content, "html.parser")
lookup_pdf = pd.read_html(str(soup))[0]
lookup_df = spark.createDataFrame(lookup_pdf)

# Create separate dimension tables
talent_dim = lookup_df.filter(col("TYPE") == "talent")
franchise_dim = lookup_df.filter(col("TYPE") == "franchise")
#brand_dim = lookup_df.filter(col("TYPE") == "brand")

talent_dim.show(5)
franchise_dim.show(5)

+--------+-------------+----------+------+
|     _ID|         NAME|SHORT_NAME|  TYPE|
+--------+-------------+----------+------+
|13906103|  Uncle Chaps|     Chaps|talent|
|81445381|   Ohios Tate|Ohios Tate|talent|
|55089177|Francis Ellis|   Francis|talent|
|91417281|         Kate|      Kate|talent|
|62319293|        Hubbs|     Hubbs|talent|
+--------+-------------+----------+------+
only showing top 5 rows

+--------+-------------------+----------+---------+
|     _ID|               NAME|SHORT_NAME|     TYPE|
+--------+-------------------+----------+---------+
|25141828|Sundae Conversation|      null|franchise|
|      63|       The Dog Walk|   DogWalk|franchise|
|      12|           One Bite|  One Bite|franchise|
|      50|        The Stretch|TheStretch|franchise|
|       7|           Gametime|      null|franchise|
+--------+-------------------+----------+---------+
only showing top 5 rows



# 3
##### User Events (JSONL): Contains user interaction data with geo-location and content consumption information from date 2025-05-05 - 2025-05-06

Fields - 
- ANONID - unique client_ids (String)
- GEO - Geographical metadata (JSON String)
- LOG - User Interaction Application Logs (JSON String)

"GEO": Encodes the user’s geolocation and network context. Once parsed, it yields fields like:
- area_code, metro_code, postal_code (integers/strings)
- city, region, country_name (strings)
- latitude, longitude (floats)
- conn_type, conn_speed, proxy_type, proxy_description (strings)
- Autonomous System metadata (as_name, as_number), time offsets (gmt_offset, utc_offset), etc.

"LOG" : Captures the app‑level event payload. When parsed, it contains:

- app struct: os (e.g. iOS/Android), title (bundle ID), version
- content struct: barstoolContentID (e.g. article‑3544468)
- event (e.g. ContentStarted), type (e.g. track)
- page struct: title (page title, if any)
- sha1 (event fingerprint), source (app + OS), timestamp (UTC ISO 8601)




In [0]:
# Load JSONL event data
events_raw = (
    spark.read
    .option("multiLine", True)
    .json("dbfs:/FileStore/tables/sampled_data.jsonl")
)

In [0]:
events_raw.show(5)

+--------------------+--------------------+--------------------+
|              ANONID|                 GEO|                 LOG|
+--------------------+--------------------+--------------------+
|953e33302aba6c3e1...|{\n  "area_code":...|{\n  "app": {\n  ...|
|b3e1d16f345f443b1...|{\n  "area_code":...|{\n  "app": {\n  ...|
|d404e5735bd468492...|{\n  "area_code":...|{\n  "app": {\n  ...|
|f8775432f5c9531f8...|{\n  "area_code":...|{\n  "app": {\n  ...|
|1b513ebadcecc178d...|{\n  "area_code":...|{\n  "app": {\n  ...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



> --------------------------------------------------------------------------------------------------------------------------------------------------------------

> --------------------------------------------------------------------------------------------------------------------------------------------------------------

%md
# 4
##   Clean & Transform (Silver)  

| Transformation |  |
|----------------|-----------|
| `from_json(LOG…)` + typed struct | Guarantees nested fields (`app.os`, `event`) are types. |
| `regexp_extract(share_url, '/([^/]+)$')` → `content_id` | content_id extraction from share_url. |
| Cast `FRANCHISE` to `INT` *after* filling null/“N/A” | avoids join type mismatch. |
| `explode_outer(TALENT)` then regroup | Preserves **multi‑talent** rows while keeping fact grain on (`anon_id`, `event_ts`). |

**Caching strategy**  
Frequent small‑table joins (`talent_dim`, `franchise_dim`) are broadcasted to reduce shuffling, increasing performance.


# 5
### Data Cleaning & Transformation  - Content Catalog

In [0]:
# Identify and handle null values
nulls_df =contents_raw.select([
    F.sum(col(c).isNull().cast("int")).alias(c)
    for c in contents_raw.columns
])
print("Null counts before cleaning:")
nulls_df.show()

Null counts before cleaning:
+-------------+------------+---------+----+------+---------+------------+
|content_title|content_type|share_url|tags|talent|franchise|published_at|
+-------------+------------+---------+----+------+---------+------------+
|            4|           4|        4|   4|     4|      459|           7|
+-------------+------------+---------+----+------+---------+------------+




- There are 4 Null Rows in the data will be reomved, and FRANCHISE(franchise_id) has 459 Null we will replace that with UNKNOWN
- Droppin Null Values from the subset ["TITLE", "TYPE", "SHARE_URL", "PUBLISHED_AT"]
- converting TAGS and TALENT Column into ArrayType to work with nested data
- extracting content_id from "SHARE_URL" using regex

In [0]:
clean_content_df = contents_raw.dropna(
    subset=["content_title", "content_type", "share_url"]
)

In [0]:
# Parse TAGS column to array
tags_schema = ArrayType(StringType())
clean_content_df = clean_content_df.withColumn(
    "TAGS",
    from_json(col("TAGS"), tags_schema)
)

# Parse TALENT column to array
talent_schema = ArrayType(StringType())
clean_content_df = clean_content_df.withColumn(
    "TALENT",
    from_json(col("TALENT"), talent_schema)
)
clean_content_df = clean_content_df.withColumn(
    "FRANCHISE",
    F.when(
        col("FRANCHISE").isNull(), F.lit("UNKNOWN")
    ).otherwise(col("FRANCHISE"))
)


# Extract content_id from SHARE_URL for easier joining
contents_silver = clean_content_df.withColumn(
    "content_id", 
    F.regexp_extract(col("SHARE_URL"), r".*\/c\/(.*)", 1)
)

In [0]:
contents_silver.show(10)

+--------------------+---------------+--------------------+--------------------+--------------------+---------+-------------------+--------------------+
|       content_title|   content_type|           share_url|                TAGS|              TALENT|FRANCHISE|       published_at|          content_id|
+--------------------+---------------+--------------------+--------------------+--------------------+---------+-------------------+--------------------+
|Disc Golfer Brodi...|podcast-episode|https://s.barstoo...|                  []|[23435882, 289887...|       11|2021-03-12 01:15:14|podcast-episode-4...|
|The 13th Round: F...|            vod|https://s.barstoo...|[13th, round, mma...|          [22342033]| 76029778|2021-05-13 16:10:01|vod-jGkXQfGoyUSiY...|
|Dumping Them Out:...|        article|https://s.barstoo...|[smokeshow, hot, ...|           [7416138]|  UNKNOWN|2023-01-02 02:00:46|     article-3450350|
|I Have To Take My...|        article|https://s.barstoo...|[driving, covid, ...|  

In [0]:
contents_silver.count()

Out[57]: 1702

`--------------------------------------------------------------------------------------------------------------------------------------------------------------`

# 6
### Data Cleaning & Transformation  - Events Data

- Infer schemas for nested JSON(GEO, LOG) and Parse the JSON columns  into structured data
- Flatten the nested structures with clear naming conventions
- Adding time dimensions for easier analysis

In [0]:
# Infer schemas for nested JSON
geo_schema = spark.read.json(
    events_raw.select("GEO").rdd.map(lambda r: r.GEO)
).schema

log_schema = spark.read.json(
    events_raw.select("LOG").rdd.map(lambda r: r.LOG)
).schema
# Parse the JSON columns into structured data
parsed_events = (
    events_raw
    .withColumn("geo_struct", from_json(col("GEO"), geo_schema))
    .withColumn("log_struct", from_json(col("LOG"), log_schema))
    .drop("GEO", "LOG")
)
parsed_events.show(5)

+--------------------+--------------------+--------------------+
|              ANONID|          geo_struct|          log_struct|
+--------------------+--------------------+--------------------+
|953e33302aba6c3e1...|{952, comcast cab...|{{iOS, com.Design...|
|b3e1d16f345f443b1...|{703, amazon.com ...|{{iOS, com.Design...|
|d404e5735bd468492...|{703, amazon.com ...|{{iOS, com.Design...|
|f8775432f5c9531f8...|{703, amazon.com ...|{{iOS, com.Design...|
|1b513ebadcecc178d...|{703, amazon.com ...|{{iOS, com.Design...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [0]:
# Flatten the nested structures with clear naming conventions
flattened_events = parsed_events.select(
    col("ANONID"),
    # Geo fields - add 'geo_' prefix
    *[col(f"geo_struct.{f.name}").alias(f"geo_{f.name}") for f in geo_schema.fields],
    # App fields
    col("log_struct.app.os").alias("app_os"),
    col("log_struct.app.title").alias("app_title"),
    col("log_struct.app.version").alias("app_version"),
    # Content fields
    col("log_struct.content.barstoolContentID").alias("content_id"),
    col("log_struct.content.duration").alias("content_duration"),
    # Event info
    col("log_struct.event").alias("event_name"),
    col("log_struct.page.title").alias("page_title"),
    col("log_struct.timestamp").alias("event_timestamp"),
    col("log_struct.type").alias("event_type")
)

In [0]:
# Add time dimensions for easier analysis
from pyspark.sql.functions import to_date, hour, dayofweek, month, year

events_flattened_silver = flattened_events.withColumn(
    "event_date", to_date(col("event_timestamp"))
).withColumn(
    "event_hour", hour(col("event_timestamp"))
).withColumn(
    "event_day", dayofweek(col("event_timestamp"))
).withColumn(
    "event_month", month(col("event_timestamp"))
).withColumn(
    "event_year", year(col("event_timestamp"))
)

In [0]:
display(events_flattened_silver.limit(0))

ANONID,geo_area_code,geo_as_name,geo_as_number,geo_city,geo_conn_speed,geo_conn_type,geo_continent,geo_country_code,geo_country_code3,geo_country_name,geo_gmt_offset,geo_latitude,geo_longitude,geo_metro_code,geo_postal_code,geo_proxy_description,geo_proxy_type,geo_region,geo_utc_offset,app_os,app_title,app_version,content_id,content_duration,event_name,page_title,event_timestamp,event_type,event_date,event_hour,event_day,event_month,event_year
953e33302aba6c3e1e028e18ab9ec28e,952,comcast cable communications llc,7922,minnetonka,cable,wifi,,US,USA,united states,-500,44.92,-93.48,613,55345,?,?,MN,-500,iOS,com.Design-Menace.Barstool-Sports,60.0.3,article-3544478,,,The Easton Dub Is the New Youth Baseball Bat Sweeping the Nation and It Seems Like It May Actually Be One of the GOATs,2025-05-06T02:31:56.709Z,page,2025-05-06,2,3,5,2025
b3e1d16f345f443b1686ec4bd35c0c97,703,amazon.com inc.,14618,ashburn,broadband,wired,,US,USA,united states,-400,39.03,-77.49,511,20147,cloud,hosting,VA,-400,iOS,com.Design-Menace.Barstool-Sports,60.0.3,viral-original-U1hSASkeqkYrrDAviUT61Z5D,0.0,ContentStarted,,2025-05-06T02:32:25.703Z,track,2025-05-06,2,3,5,2025
d404e5735bd46849285809e62d71efd2,703,amazon.com inc.,14618,ashburn,broadband,wired,,US,USA,united states,-400,39.03,-77.49,511,20147,cloud,hosting,VA,-400,iOS,com.Design-Menace.Barstool-Sports,60.0.3,viral-original-aScmLHvhad1E8KTdQoXbNDzD,0.0,ContentStarted,,2025-05-06T02:33:01.076Z,track,2025-05-06,2,3,5,2025
f8775432f5c9531f858cab342ea709a6,703,amazon.com inc.,14618,ashburn,broadband,wired,,US,USA,united states,-400,39.03,-77.49,511,20147,cloud,hosting,VA,-400,iOS,com.Design-Menace.Barstool-Sports,60.0.3,viral-original-wvAZOxIHCqfNzWKmTVBuc2jj,3.968261135,ContentStarted,,2025-05-06T02:33:21.105Z,track,2025-05-06,2,3,5,2025
1b513ebadcecc178dd96f07f197ff2ef,703,amazon.com inc.,14618,ashburn,broadband,wired,,US,USA,united states,-400,39.03,-77.49,511,20147,cloud,hosting,VA,-400,iOS,com.Design-Menace.Barstool-Sports,60.0.3,article-3544468,,ContentStarted,,2025-05-06T02:33:52.164Z,track,2025-05-06,2,3,5,2025
6c665a3f27622f759f2e7747cc89c034,703,amazon.com inc.,14618,ashburn,broadband,wired,,US,USA,united states,-400,39.03,-77.49,511,20147,cloud,hosting,VA,-400,iOS,com.Design-Menace.Barstool-Sports,60.0.3,article-3544468,,ContentStarted,,2025-05-06T02:34:24.656Z,track,2025-05-06,2,3,5,2025
e08b00ab61cee4c368c4bdbb7f047128,406,charter communications llc,33588,butte,cable,wifi,,US,USA,united states,-600,45.99,-112.5,754,59701,?,?,MT,-600,iOS,com.Design-Menace.Barstool-Sports,60.0.3,article-3544483,,,"An Ex-CIA Scientist Tells Joe Rogan the US Has 'More Than 10' Recovered UFOs, Some of Which are 'Donations'",2025-05-06T02:34:33.171Z,page,2025-05-06,2,3,5,2025
1cafe3c6a31670e12c327f6a92906025,703,amazon.com inc.,14618,ashburn,broadband,wired,,US,USA,united states,-400,39.03,-77.49,511,20147,cloud,hosting,VA,-400,iOS,com.Design-Menace.Barstool-Sports,60.0.3,article-3544455,,ContentStarted,,2025-05-06T02:34:48.170Z,track,2025-05-06,2,3,5,2025
e265d62d6937f461626e951e25991bb2,0,deutsche telekom ag,3320,berlin,broadband,wifi,EU,DE,DEU,germany,200,52.53,13.38,276006,10115,?,?,BE,200,iOS,com.Design-Menace.Barstool-Sports,60.0.3,article-3544480,,,"Ben Affleck Confirmed His 'Armageddon' DVD Commentary Was The Best Work Of His Career, So Let's Give The Whole Thing A Listen Again",2025-05-06T02:34:59.927Z,page,2025-05-06,2,3,5,2025
5bad6ad2919480238ed6f83da0323db7,508,comcast cable communications llc,7922,brewster,cable,wifi,,US,USA,united states,-400,41.77,-70.04,506,2631,?,?,MA,-400,iOS,com.Design-Menace.Barstool-Sports,60.0.3,article-3544448,,,I just got in a screaming match with ABC Philadelphia scumbag reporter who tried to blame me and Barstool and white men for the “Fuck the Jews” sign,2025-05-06T02:35:54.095Z,page,2025-05-06,2,3,5,2025


`-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------`


### 6. Modelling consumption(gold) table

Joining talent_dim, franchise_dim with content_dim, which will be joined with events_flattened_silver to make it consumption table.



Logic - 
- Exploding Talent_ids column to explode into seprate rows for each talent_id
- join content_dim.talent_id = talent_dim._ID
- join content_dim.franchise = franchise_dim._ID
- Group back to get all talents as an array of structs

In [0]:
from pyspark.sql.functions import col, explode_outer, struct, collect_list

# to avoid ambiguity
clean_content_alias = contents_silver.alias("cc")
talent_dim_alias = talent_dim.alias("td")
franchise_dim_alias = franchise_dim.alias("fd")

# Join content with talent dimension and franchise dimension
content_with_talent_franchise = clean_content_alias.withColumn(
    "talent_id", explode_outer(col("cc.TALENT")) # Use alias for TALENT column
).join(
    talent_dim_alias,
    col("talent_id") == col("td._ID"), # Qualify with alias 'td'
    "left"
).join(
    franchise_dim_alias,
    col("franchise") == col("fd._ID"), # Qualify with alias 'fd'
    "left"
).select(
    # Select all columns from the original clean_content_df
    col("cc.*"),
    # Qualify all selected columns with their aliases
    col("td.NAME").alias("talent_name"),
    col("td.SHORT_NAME").alias("talent_short_name"),
    col("fd.NAME").alias("franchise_name"),
    col("fd.SHORT_NAME").alias("franchise_short_name")
)



In [0]:
# Group back to get all talents as an array of structs
grouping_columns = [col(c) for c in contents_silver.columns]
content_with_all_talent_franchise = content_with_talent_franchise.groupBy(
    grouping_columns
).agg(
    # Aggregating talent details, using array_distinct
    array_distinct(collect_list(
        struct(
            col("talent_name"),
            col("talent_short_name")
        )
    )).alias("talent_details"),

    # Aggregating franchise details, using array_distinct
    array_distinct(collect_list(
        struct(
            col("franchise_name"),
            col("franchise_short_name")
        )
    )).alias("franchise_details")
)



Joining **content_with_all_talent_franchise** with **events_flattened**  on **content_id**

In [0]:
# Final analytical model
complete_model = events_flattened_silver.join(
    content_with_all_talent_franchise,
    events_flattened_silver.content_id == content_with_all_talent_franchise.content_id
)
complete_model = complete_model.drop(content_with_all_talent_franchise.content_id)
#complete_model.printSchema()

In [0]:
consumption_events_logs = (
    complete_model.select(
        col("ANONID").alias("anon_id"),
        col("content_id"),          
        col("content_title"),
        col("content_type"),
        col("content_duration"),
        col("share_url"),
        col("tags"),
        col("talent"),
        col("franchise"),
        col("published_at").alias("published_at"),
        col("talent_details"),
        col("franchise_details"),
        col("event_name"),
        col("event_timestamp"),
        col("event_type"),
        col("event_date"),
        col("event_month"),
        col("event_year"),
        col("event_hour"),
        col("event_day"),
        col("page_title"),
        col("geo_area_code"),
        col("geo_city"),
        col("geo_country_code"),
        col("geo_country_code3"),
        col("geo_postal_code"),
        col("geo_region"),
        col("app_os"),
        col("app_version")
    )
)

In [0]:
consumption_events_logs.count()

Out[116]: 93415

### Final Table

In [0]:
display(consumption_events_logs.limit(20))

anon_id,content_id,content_title,content_type,content_duration,share_url,tags,talent,franchise,published_at,talent_details,franchise_details,event_name,event_timestamp,event_type,event_date,event_month,event_year,event_hour,event_day,page_title,geo_area_code,geo_city,geo_country_code,geo_country_code3,geo_postal_code,geo_region,app_os,app_version
df5872a8b302aa5982907a456761faa0,viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,ja3scobr,viral-ugc,0.0,https://s.barstool.link/c/viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,List(),List(),83,2025-03-28T19:08:36.748+0000,"List(List(null, null))","List(List(null, null))",ContentStarted,2025-05-06T02:33:48.388Z,track,2025-05-06,5,2025,2,3,,703,ashburn,US,USA,20147,VA,iOS,60.0.3
1da5aa3be1c46cdf400c2d62f86d46ed,viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,ja3scobr,viral-ugc,0.0,https://s.barstool.link/c/viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,List(),List(),83,2025-03-28T19:08:36.748+0000,"List(List(null, null))","List(List(null, null))",ContentStarted,2025-05-06T03:46:00.659Z,track,2025-05-06,5,2025,3,3,,703,ashburn,US,USA,20147,VA,iOS,60.0.3
9a97eca68687abec9b8406db737542b8,viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,ja3scobr,viral-ugc,,https://s.barstool.link/c/viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,List(),List(),83,2025-03-28T19:08:36.748+0000,"List(List(null, null))","List(List(null, null))",ContentCompleted,2025-05-06T03:38:13.360Z,track,2025-05-06,5,2025,3,3,,703,ashburn,US,USA,20147,VA,iOS,60.0.3
9a97eca68687abec9b8406db737542b8,viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,ja3scobr,viral-ugc,,https://s.barstool.link/c/viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,List(),List(),83,2025-03-28T19:08:36.748+0000,"List(List(null, null))","List(List(null, null))",ContentCompleted,2025-05-06T03:38:13.472Z,track,2025-05-06,5,2025,3,3,,703,ashburn,US,USA,20147,VA,iOS,60.0.3
c3b3a5fb16ba506cd39bd44c520948aa,viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,ja3scobr,viral-ugc,0.0,https://s.barstool.link/c/viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,List(),List(),83,2025-03-28T19:08:36.748+0000,"List(List(null, null))","List(List(null, null))",ContentStarted,2025-05-05T10:53:31.202Z,track,2025-05-05,5,2025,10,2,,703,ashburn,US,USA,20147,VA,iOS,60.0.3
672fc7f6dcbd53c5421538cbd0e4fd49,viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,ja3scobr,viral-ugc,0.0,https://s.barstool.link/c/viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,List(),List(),83,2025-03-28T19:08:36.748+0000,"List(List(null, null))","List(List(null, null))",ContentStarted,2025-05-05T12:03:04.137Z,track,2025-05-05,5,2025,12,2,,703,ashburn,US,USA,20147,VA,iOS,60.0.3
4d3387e1b96030a6c4c59b398f24f35c,viral-original-SAgRt5xBCx201ooxB5VXAQl8,Rory Wins 2025 Masters In Wild Fashion,viral-original,0.0,https://s.barstool.link/c/viral-original-SAgRt5xBCx201ooxB5VXAQl8,List(),List(),5,2025-04-14T09:15:33.299+0000,"List(List(null, null))","List(List(Fore Play, Fore Play))",ContentStarted,2025-05-05T13:42:20.820Z,track,2025-05-05,5,2025,13,2,,703,ashburn,US,USA,20147,VA,iOS,60.0.3
47bb5be8a5bd70059e4c7eb59e175aee,viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,ja3scobr,viral-ugc,,https://s.barstool.link/c/viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,List(),List(),83,2025-03-28T19:08:36.748+0000,"List(List(null, null))","List(List(null, null))",ContentCompleted,2025-05-05T15:32:11.564Z,track,2025-05-05,5,2025,15,2,,703,ashburn,US,USA,20147,VA,iOS,60.0.3
b2b0606a73221a34f9a5fa0f3a230d22,viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,ja3scobr,viral-ugc,0.0,https://s.barstool.link/c/viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,List(),List(),83,2025-03-28T19:08:36.748+0000,"List(List(null, null))","List(List(null, null))",ContentStarted,2025-05-05T15:50:34.645Z,track,2025-05-05,5,2025,15,2,,703,ashburn,US,USA,20147,VA,iOS,60.0.3
2ac9ad800adb64b770bed5327ce2daab,viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,ja3scobr,viral-ugc,,https://s.barstool.link/c/viral-ugc-LWLSI2y6366AVa6Mvbyu5c51,List(),List(),83,2025-03-28T19:08:36.748+0000,"List(List(null, null))","List(List(null, null))",ContentCompleted,2025-05-05T10:01:21.868Z,track,2025-05-05,5,2025,10,2,,703,ashburn,US,USA,20147,VA,iOS,60.0.3


In [0]:
(consumption_events_logs
  .write
  .format("delta")
  .mode("overwrite")            
  .option("mergeSchema", "true") 
  .partitionBy("event_date", "content_type") 
  .save("/mnt/consumption/barstool/consumption_events_logs")
)


Partitoning with event_date and content_type for better performance and optimization and saving as delta table

In [0]:
# Checking for missing content links
missing_content_links = events_flattened_silver.filter(
    col("content_id").isNotNull()
).join(
    contents_silver,
    "content_id",
    "left_anti"  # Only keep records without matches
).count()

print(f"Event records with missing content links: {missing_content_links}")

Event records with missing content links: 5611


this is because some of the rows does not match to content_id from contents_silver.
maybe because they are livestreams .

for ex content_id - livestream-8w41-HCk6T9GOrXlJf8n9Z4XHIxhb 