<a href="https://colab.research.google.com/github/cbonnin88/VifStream/blob/main/VifStreaming_Data_Cleaning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import polars as pl
import gdown as gd
from datetime import datetime

In [None]:
url_events = 'https://drive.google.com/uc?id=1BUewVLD_DkGvyBj-xpUPW1FoVR7015JK'
url_subscriptions = 'https://drive.google.com/uc?id=1bwusEYKsAIzGAH9QiPXRPGsCL1TTPwws'
url_users = 'https://drive.google.com/uc?id=1KLIbrW__PuqxyN43gSvN3-BnGbCBKVeF'

In [None]:
gd.download(url_events,'vifstream_raw_events.csv',quiet=True)

'vifstream_raw_events.csv'

In [None]:
gd.download(url_subscriptions,'vifstream_raw_subscriptions.csv',quiet=True)

'vifstream_raw_subscriptions.csv'

In [None]:
gd.download(url_users,'vifstream_raw_users.csv',quiet=True)

'vifstream_raw_users.csv'

In [None]:
df_raw_events = pl.read_csv('vifstream_raw_events.csv')
df_raw_events.head()

event_id,user_id,event_name,timestamp,session_id
i64,i64,str,str,i64
1,48877,"""app_open""","""2025-02-28 11:34:09.352745""",385338
2,6834,"""paywall_view""","""2025-08-31 06:50:20.919241""",632518
3,45336,"""channel_tune_in""","""2025-06-09 09:58:59.059098""",834341
4,32146,"""channel_tune_in""","""2026-01-17 00:00:06.068916""",763891
5,22376,"""app_open""","""2025-02-16 08:27:44.384627""",352259


In [None]:
df_raw_subscriptions = pl.read_csv('vifstream_raw_subscriptions.csv')
df_raw_subscriptions.head()

transaction_id,user_id,amount,status,payment_method,transaction_date
i64,i64,f64,str,str,str
1,21773,15.99,"""completed""","""PayPal""","""2026-01-17 08:05:43.426932"""
2,9131,15.99,"""completed""","""Visa""","""2025-05-26 01:12:28.003434"""
3,2895,9.99,"""completed""","""Visa""","""2025-12-02 04:03:04.978442"""
4,39842,15.99,"""completed""","""Mastercard""","""2025-07-05 08:43:41.163240"""
5,21124,9.99,"""completed""","""Mastercard""","""2025-10-31 20:44:36.704798"""


In [None]:
df_raw_users = pl.read_csv('vifstream_raw_users.csv')
df_raw_users.head()

user_id,signup_date,plan_type,region,device,age_group
i64,str,str,str,str,str
1,"""2024-10-14""","""free""","""Hauts-de-France""","""AppleTV""","""35-44"""
2,"""2025-02-19""","""free""","""Brittany""","""SmartTV""","""25-34"""
3,"""2025-12-07""","""premium""","""Hauts-de-France""","""Mobile""","""45+"""
4,"""2024-08-11""","""basic""","""Nouvelle-Aquitiaine""","""Mobile""","""25-34"""
5,"""2025-11-26""","""basic""","""Grand Est""","""Web""","""25-34"""


# **Clean 'Users' dataset**

- Handling date formats and age segments

In [None]:
df_users_clean = df_raw_users.with_columns([
    pl.col('signup_date').str.to_date(),
    pl.col('age_group').fill_null('Unknown')

])

# **Clean Events**
 - Deduplication and Session Logic

In [None]:
df_events_clean = df_raw_events.with_columns([
    pl.col('timestamp').str.to_datetime(),
]).unique(subset=['event_id']) # Removing duplicates

# **Clean Subscriptions**

- Revenue Formatting

In [None]:
df_subs_clean = df_raw_subscriptions.with_columns([
    pl.col('transaction_date').str.to_datetime(),
    pl.col('amount').cast(pl.Float64)
]).filter(pl.col('status') == 'completed') # Only keep successful revenue for now

# **Product Engineering**
- Join Users + Events to calculate 'Days Since Signup'

In [None]:
df_final_events = df_events_clean.join(
    df_users_clean.select(['user_id','signup_date']),
    on='user_id',
    how='left'
).with_columns([
    ((pl.col('timestamp').cast(pl.Date) - pl.col('signup_date')).dt.total_days()).alias('days_since_signup')
])

# **Execute and Save**

In [None]:
df_final_events.write_csv('vifstream_events_clean.csv')
df_users_clean.write_csv('vifstream_users_clean.csv')
df_subs_clean.write_csv('vifstream_subscriptions_clean.csv')

print('Cleaned files dready for BigQuery')

Cleaned files dready for BigQuery
