# **Day 5 - Production-grade feature engineering**

### Create Dataset Directly

In [0]:
from pyspark.sql import Row

data = [
    (1,"view",25,"India",5),
    (1,"purchase",25,"India",10),
    (2,"view",30,"India",3),
    (2,"cart",30,"India",8),
    (3,"view",22,"USA",4),
    (3,"purchase",22,"USA",12),
    (4,"view",28,"UK",6),
    (5,"view",35,"India",7),
    (6,"purchase",40,"USA",15),
    (7,"view",29,"UK",3),
    (8,"cart",31,"India",9),
    (9,"view",26,"USA",4),
    (10,"purchase",33,"India",11),
    (11,"view",27,"UK",5),
    (12,"cart",24,"USA",6),
    (13,"view",38,"India",7),
    (14,"purchase",21,"USA",13),
    (15,"view",23,"UK",2),
    (16,"view",32,"India",8),
    (17,"purchase",36,"USA",14)
]

columns = ["user_id","event_type","age","country","session_time"]

events = spark.createDataFrame(data, columns)

display(events)

user_id,event_type,age,country,session_time
1,view,25,India,5
1,purchase,25,India,10
2,view,30,India,3
2,cart,30,India,8
3,view,22,USA,4
3,purchase,22,USA,12
4,view,28,UK,6
5,view,35,India,7
6,purchase,40,USA,15
7,view,29,UK,3


## 1️) Create Purchase Label

In [0]:
from pyspark.sql import functions as F

label_df = events.groupBy("user_id") \
    .agg(
        F.max(
            F.when(F.col("event_type") == "purchase", 1)
            .otherwise(0)
        ).alias("purchased")
    )

display(label_df)

user_id,purchased
1,1
2,0
3,1
4,0
5,0
6,1
7,0
8,0
9,0
10,1


## 2️) Create Feature Table

In [0]:
features_df = events.groupBy("user_id", "age", "country") \
    .agg(
        F.avg("session_time").alias("avg_session_time"),
        F.count("*").alias("total_events")
    )

display(features_df)

user_id,age,country,avg_session_time,total_events
1,25,India,7.5,2
2,30,India,5.5,2
3,22,USA,8.0,2
4,28,UK,6.0,1
5,35,India,7.0,1
6,40,USA,15.0,1
7,29,UK,3.0,1
8,31,India,9.0,1
9,26,USA,4.0,1
10,33,India,11.0,1


## 3️)Join Features + Label

In [0]:
training_data = features_df.join(label_df, on="user_id", how="left")

display(training_data)

user_id,age,country,avg_session_time,total_events,purchased
1,25,India,7.5,2,1
2,30,India,5.5,2,0
3,22,USA,8.0,2,1
4,28,UK,6.0,1,0
5,35,India,7.0,1,0
6,40,USA,15.0,1,1
7,29,UK,3.0,1,0
9,26,USA,4.0,1,0
8,31,India,9.0,1,0
11,27,UK,5.0,1,0


In [0]:
training_data

DataFrame[user_id: bigint, age: bigint, country: string, avg_session_time: double, total_events: bigint, purchased: int]

## 4️) Train / Test Split

In [0]:
train_df, test_df = training_data.randomSplit([0.8, 0.2], seed=42)

print("Train Count:", train_df.count())
print("Test Count:", test_df.count())

Train Count: 14
Test Count: 3


In [0]:
train_df
test_df

DataFrame[user_id: bigint, age: bigint, country: string, avg_session_time: double, total_events: bigint, purchased: int]

## 5️) Validate Distribution

In [0]:
training_data.groupBy("purchased").count().display()

purchased,count
1,6
0,11
