# Basics

In [None]:
#eda stacks
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import datetime
import s3fs
import os
import sys

#display settings
pd.set_option("display.max_columns", 200)      # show all columns
pd.set_option("display.max_rows", 20)         # show more rows
pd.set_option("display.width", 1200)           # wider output
pd.set_option("display.max_colwidth", 100)     # long text columns
pd.set_option("display.float_format", "{:.2f}".format)  # clean decimals
sns.set_theme(style="whitegrid")

# Function to flatten JSON

In [None]:
def get_event_param(params, key):
    if params is None:
        return None

    # handle list OR numpy array
    if isinstance(params, (list, np.ndarray)):
        for p in params:
            if p.get("key") == key:
                val = p.get("value", {})
                return (
                    val.get("string_value")
                    or val.get("int_value")
                    or val.get("float_value")
                    or val.get("double_value")
                )
    return None

# Function to Transform (Bronze Tier)

In [None]:
def bronze_transform(df:pd.DataFrame) -> pd.DataFrame:
    #identifiers
    identifiers=df.sort_values(by=['date_ist','user_id','session_id'],ascending=[True,True,True])
    identifiers=identifiers.drop_duplicates(subset=['date_ist','user_id','session_id'])[['date_ist','user_id','session_id','page_location','page_type','device']]
    identifiers.rename(columns={'page_location':'landing_page','page_type':'landing_page_type'},inplace=True)
    #dimensions
    start = df.loc[df["event"] == "session_start"].copy()
    start["source"] = start["event_params"].apply(
    lambda x: get_event_param(x, "source"))
    start["medium"] = start["event_params"].apply(
    lambda x: get_event_param(x, "medium"))
    start["campaign"] = start["event_params"].apply(
    lambda x: get_event_param(x, "campaign"))
    dimensions=identifiers.merge(start[['source','medium','campaign','user_id','session_id']],how='left',on=['user_id','session_id'])
    #funnel
    events=['add_shipping_info', 'add_payment_info', 'purchase','view_item','view_product_page_loaded','add_to_cart','add_to_cart_custom_event','begin_checkout','gokwik_checkout_initiated']
    focus=df[df['event'].isin(events)]
    focus['event']=np.where(focus['event'].isin(['view_item','view_product_page_loaded']),'product_view',focus['event'])
    focus['event']=np.where(focus['event'].isin(['add_to_cart','add_to_cart_custom_event']),'add_to_cart',focus['event'])
    focus['event']=np.where(focus['event'].isin(['begin_checkout','gokwik_checkout_initiated']),'begin_checkout',focus['event'])
    funnel=focus.groupby(['user_id','session_id','event']).agg(
    first_timestamp=('time_ist','min')).reset_index()
    #cartesian product
    users = funnel[['user_id','session_id']].drop_duplicates()
    events=pd.DataFrame(
    {"event":funnel['event'].unique()})
    users=users.merge(events,how='cross')
    #beautifying funnel
    funnel_new=users.merge(funnel,how='left',on=['user_id','session_id','event'])
    funnel_new.fillna(0,inplace=True)
    funnel_new['flag']=np.where(funnel_new['first_timestamp']==0,0,1)
    #purchases
    purchases=df.loc[df["event"] == "purchase"].copy()
    purchases["transaction_id"] = purchases["event_params"].apply(
    lambda x: get_event_param(x, "transaction_id"))
    purchases["value"] = purchases["event_params"].apply(
    lambda x: get_event_param(x, "value"))
    purchases=purchases[['user_id','session_id','time_ist','transaction_id','value']]
    purchases=purchases.groupby(['user_id','session_id']).agg(
    orders=('transaction_id','nunique'),
    orders_dups_inclusive=('transaction_id','count'),
    rev=('value','sum')).reset_index()
    purchases['aov']=purchases['rev']/purchases['orders']
    #final merging and nesting
    final=dimensions.copy()
    cols_to_nest = [
    "landing_page",
    "landing_page_type",
    "source",
    "medium",
    "campaign",
    "device"]
    final["dimensions"] = final[cols_to_nest].to_dict(orient="records")
    final = final.drop(columns=cols_to_nest)
    funnel_event_cols=["event", "flag", "first_timestamp"]
    funnel_new["event_payload"] = funnel_new.apply(
    lambda r: {
        r["event"]: {
            "flag": r["flag"],
            "first_timestamp": r["first_timestamp"]
        }
    },
    axis=1)
    funnel_df = (
    funnel_new
    .groupby(["user_id", "session_id"], as_index=False)
    .agg({
        "event_payload": lambda x: {
            k: v
            for d in x
            for k, v in d.items()
        }
    })
    .rename(columns={"event_payload": "funnels"}))
    final=final.merge(funnel_df,how='inner',on=['user_id','session_id'])
    final=final.merge(purchases,how='left',on=['user_id','session_id'])
    final.fillna(0,inplace=True)
    cols_to_nest = [
    "orders",
    "orders_dups_inclusive",
    "rev",
    "aov"]
    final["purchase"] = final[cols_to_nest].to_dict(orient="records")
    final = final.drop(columns=cols_to_nest)
    return final

# Transforming all Parquet Files

In [None]:
bucket="kerala-ayurveda-s3"
prefix="raw-data/"

fs=s3fs.S3FileSystem()
dfs=[]

for i in range(0, 32):
    file_key=f"{prefix}events_{i:012d}.parquet"
    s3_path=f"s3://{bucket}/{file_key}"
    
    print(f"Reading {s3_path}")
    df=pd.read_parquet(s3_path, filesystem=fs)
    print(f"{file_key} was of shape: {df.shape}")
    df=bronze_transform(df)
    dfs.append(df)
    print(f"{file_key} transformed to shape: {df.shape}")
    for k in range(0,11):
        print("*")
df=pd.concat(dfs, ignore_index=True)
print(f"Final df's shape: {df.shape}")

In [None]:
dates=df.sort_values(by='date_ist',ascending=True).date_ist.unique() #QA
dates

In [None]:
df.sample()

# Saving in the S3

In [None]:
prefix="bronze-tier/"
s3_path=f"s3://{bucket}/{prefix}bronze_data.csv"
df.to_csv(
    s3_path,
    index=False
)