# 03 — Feature Engineering in PySpark


In [29]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName('CustomerRecommendationEngine')
    .master('local[*]')
    .getOrCreate()
)

# local[*] means: run Spark locally, using all available CPU cores
print(f'Spark version: {spark.version}')
print('Spark session created.')

Spark version: 4.1.1
Spark session created.


## Loading CSVs

PySpark: spark.read.csv(file, header=True, inferSchema=True)

In [30]:
RAW = '../data/raw/'

customers = spark.read.csv(RAW + 'olist_customers_dataset.csv', header=True, inferSchema=True)
orders = spark.read.csv(RAW + 'olist_orders_dataset.csv', header=True, inferSchema=True)
items = spark.read.csv(RAW + 'olist_order_items_dataset.csv', header=True, inferSchema=True)
payments = spark.read.csv(RAW + 'olist_order_payments_dataset.csv', header=True, inferSchema=True)
reviews = spark.read.csv(RAW + 'olist_order_reviews_dataset.csv', header=True, inferSchema=True)
products = spark.read.csv(RAW + 'olist_products_dataset.csv', header=True, inferSchema=True)
sellers = spark.read.csv(RAW + 'olist_sellers_dataset.csv', header=True, inferSchema=True)
categories = spark.read.csv(RAW + 'product_category_name_translation.csv', header=True, inferSchema=True)

print('All files loaded into Spark DataFrames.')

All files loaded into Spark DataFrames.


## Joining Tables
Pandas: pd.merge(df1, df2, on=key)

PySpark: df1.join(df2, on=key, how=left)

In [31]:
# Start from orders (the central table)
df = (
    orders
    .join(customers, on='customer_id', how='left')
    .join(items, on='order_id', how='left')
    .join(payments, on='order_id', how='left')
    .join(reviews, on='order_id', how='left')
    .join(products, on='product_id', how='left')
    .join(categories, on='product_category_name', how='left')
)

print(f'Joined: {df.count():,} rows x {len(df.columns)} cols')
df.printSchema()


Joined: 119,143 rows x 37 cols
root
 |-- product_category_name: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- payment_sequen

## Prep: Fix Column Types
Before aggregating, cast `review_score` from string to integer so you can compute averages.

In [32]:
from pyspark.sql import functions as F

# Cast review_score to integer (it loaded as string)
df = df.withColumn('review_score', F.col('review_score').cast('integer'))

## Customer Feature Aggregation

Group by `customer_unique_id` and compute one row per customer.

Use `df.groupBy('customer_unique_id').agg(...)` with multiple aggregation functions inside `.agg()`.

Useful PySpark functions: `F.countDistinct()`, `F.sum()`, `F.avg()`, `F.max()`, `F.datediff()`, `F.lit()`

In [33]:
#df.groupBy('customer_unique_id').agg()
df.show(5)

+---------------------+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+------------------------+--------------+--------------+-------------+--------------------+-------------------+-----+-------------+------------------+------------+--------------------+-------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+-----------------------------+
|product_category_name|          product_id|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|  customer_unique_id|custo

In [34]:
# First, find the max date in the dataset (you'll need this for recency)
# YOUR CODE HERE: reference_date = ...
reference_date = df.select(F.max('order_purchase_timestamp')).collect()[0][0]


customer_features = (
    df.groupBy('customer_unique_id')
    .agg(

        # RECENCY: days between the customer's most recent order and the reference_date
        # Hint: F.datediff(reference_date, F.max('order_purchase_timestamp'))
        F.datediff(F.lit(reference_date), F.max('order_purchase_timestamp')).alias('recency'),


        # FREQUENCY: number of distinct orders this customer placed
        # Hint: F.countDistinct('order_id')
        F.countDistinct('order_id').alias('frequency'),

        # MONETARY: total amount spent (sum of price)
        # Watch out: the join inflated rows, so use sum on distinct order-item combos
        # Hint: F.sum('price') — this works if each item row has a unique price entry
        F.sum('price').alias('monetary'),

        # AVG_ORDER_VALUE: monetary / frequency
        # Hint: compute it after the .agg() using .withColumn()
        # Or: F.sum('price') / F.countDistinct('order_id')
        (F.sum('price') / F.countDistinct('order_id')).alias('avg_order_value'),

        # NUM_PRODUCTS: how many distinct products this customer bought
        # Hint: F.countDistinct('product_id')
        F.countDistinct('product_id').alias('num_products'),

        # AVG_REVIEW_SCORE: average rating this customer gives
        # Hint: F.avg('review_score')
        F.avg('review_score').alias('avg_review_score'),

        # DELIVERY_DIFF: avg days between estimated and actual delivery (negative = early, positive = late)
        # Hint: F.avg(F.datediff('order_delivered_customer_date', 'order_estimated_delivery_date'))
        F.avg(F.datediff('order_delivered_customer_date', 'order_estimated_delivery_date')).alias('delivery_diff')

    )
)

customer_features.show(5)
print(f'Customer features: {customer_features.count():,} rows')

+--------------------+-------+---------+--------+---------------+------------+----------------+-------------+
|  customer_unique_id|recency|frequency|monetary|avg_order_value|num_products|avg_review_score|delivery_diff|
+--------------------+-------+---------+--------+---------------+------------+----------------+-------------+
|a439177407290b4a5...|    212|        1|   64.89|          64.89|           1|             4.0|         -5.0|
|1234ccd4e0b359525...|    189|        1|    49.1|           49.1|           1|             5.0|        -15.0|
|216ab90e27f18940c...|     87|        2|   259.8|          129.9|           2|             5.0|         -8.5|
|0e1a29e5ec2a8b40b...|    604|        1|   113.0|          113.0|           1|             5.0|        -20.0|
|a6408b4025cdea6e8...|    521|        1|    69.9|           69.9|           1|             5.0|        -21.0|
+--------------------+-------+---------+--------+---------------+------------+----------------+-------------+
only showi

## Save to Parquet
Save the final customer features table to `data/processed/` as a parquet file.

In [None]:
# Fill missing values with 0
customer_features = customer_features.fillna(0)

# Save to CSV
customer_features.toPandas().to_csv('../data/processed/customer_features.csv', index=False)
print('Saved to data/processed/customer_features.csv')

In [None]:
spark.stop()
print('Spark session stopped.')