# 05 Feature Engineering

This notebook performs feature engineering for machine learning models using Spark MLlib.

## Import Libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, year, month, dayofmonth, dayofweek, 
    quarter, when, lit
)
from pyspark.ml.feature import (
    StringIndexer, VectorAssembler, StandardScaler, 
    OneHotEncoder
)
from pyspark.ml import Pipeline
import pandas as pd

## Initialize Spark Session

In [2]:
spark = SparkSession.builder \
    .appName("FeatureEngineering") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print("Spark session initialized successfully!")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/06 11:09:56 WARN Utils: Your hostname, pc-ThinkPad-P15-Gen-1, resolves to a loopback address: 127.0.1.1; using 10.42.101.241 instead (on interface wlp0s20f3)
26/01/06 11:09:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/06 11:09:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark session initialized successfully!


## Load Data

In [18]:
amazon_sales = spark.read.parquet("../processed/amazon_sales.parquet")
pricing = spark.read.parquet("../processed/pricing_may2022.parquet")
inventory = spark.read.parquet("../processed/inventory.parquet")

print("Data loaded successfully!")
print(f"Amazon Sales: {amazon_sales.count():,} rows")
print(f"Pricing: {pricing.count():,} rows")
print(f"Inventory: {inventory.count():,} rows")

Data loaded successfully!
Amazon Sales: 128,975 rows
Pricing: 1,330 rows
Inventory: 9,271 rows


## Time-Based Features

In [19]:
amazon_sales = amazon_sales.filter(col("date").isNotNull())

amazon_sales = amazon_sales.withColumn("year", year(col("date"))) \
    .withColumn("month", month(col("date"))) \
    .withColumn("day", dayofmonth(col("date"))) \
    .withColumn("dayofweek", dayofweek(col("date"))) \
    .withColumn("quarter", quarter(col("date")))

amazon_sales = amazon_sales.withColumn(
    "is_weekend",
    when(col("dayofweek").isin([1, 7]), 1).otherwise(0)
)

amazon_sales = amazon_sales.withColumn(
    "is_holiday_season",
    when(col("month").isin([11, 12]), 1).otherwise(0)
)

print("Time-based features created!")

Time-based features created!


## Product-Based Features

In [20]:
inventory_features = inventory.groupBy("SKU Code").agg({
    "stock": "first"
})

amazon_sales = amazon_sales.join(
    inventory_features,
    amazon_sales["SKU"] == inventory_features["SKU Code"],
    "left"
)

category_counts = amazon_sales.groupBy("Category").count()
category_avg_price = amazon_sales.groupBy("Category").avg("amount")

print("Product-based features created!")

Product-based features created!


## Customer-Based Features

In [21]:
state_order_count = amazon_sales.groupBy("ship-state").count()
state_avg_amount = amazon_sales.groupBy("ship-state").avg("amount")

city_order_count = amazon_sales.groupBy("ship-city").count()

b2b_avg_amount = amazon_sales.groupBy("b2b").avg("amount")

print("Customer-based features created!")

Customer-based features created!


## Pricing-Based Features

In [22]:
amazon_sales = amazon_sales.withColumn(
    "discount_pct",
    when(
        (col("amount") > 0) & (col("amount").isNotNull()) & (col("qty") > 0),
        ((col("qty") * 500) - col("amount")) / (col("qty") * 500) * 100
    ).otherwise(0)
)

print("Pricing-based features created!")

Pricing-based features created!


## Categorical Feature Encoding

In [23]:
categorical_cols = ["Category", "Size", "Status", "ship-state"]

indexers = []
for col_name in categorical_cols:
    indexer = StringIndexer(
        inputCol=col_name,
        outputCol=col_name + "_idx",
        handleInvalid="keep"
    )
    indexers.append(indexer)

print("Categorical columns to encode:", categorical_cols)

Categorical columns to encode: ['Category', 'Size', 'Status', 'ship-state']


## Numerical Feature Assembly

In [24]:
numerical_cols = [
    "year", "month", "day", "dayofweek", "quarter",
    "is_weekend", "is_holiday_season", "b2b", "qty"
]

numeric_assembler = VectorAssembler(
    inputCols=numerical_cols,
    outputCol="numerical_features"
)

scaler = StandardScaler(
    inputCol="numerical_features",
    outputCol="scaled_numerical_features",
    withStd=True,
    withMean=True
)

print("Numerical feature assembler created!")

Numerical feature assembler created!


## Final Feature Assembly

In [25]:
final_feature_cols = ["scaled_numerical_features"] + \
    [col + "_idx" for col in categorical_cols]

final_assembler = VectorAssembler(
    inputCols=final_feature_cols,
    outputCol="features"
)

print("Final feature assembler created!")
print(f"Total features: {len(final_feature_cols)}")

Final feature assembler created!
Total features: 5


## Build Pipeline

In [26]:
pipeline = Pipeline(stages=indexers + [numeric_assembler, scaler, final_assembler])

print("Feature engineering pipeline created!")

Feature engineering pipeline created!


## Fit and Transform

In [27]:
model = pipeline.fit(amazon_sales)
featured_df = model.transform(amazon_sales)

print("Feature engineering completed!")
print(f"Transformed dataset: {featured_df.count():,} rows")
print(f"Features vector dimension: {len(featured_df.select('features').first()['features'])}")

Feature engineering completed!
Transformed dataset: 128,975 rows
Features vector dimension: 13


## Save Feature-Engineered Data

In [29]:
featured_df.write.parquet(
    "processed/sales_new_features.parquet",
    mode="overwrite"
)

print("Feature-engineered data saved to processed/sales_features.parquet")



Feature-engineered data saved to processed/sales_features.parquet


                                                                                

## Clean Up

In [30]:
spark.stop()
print("Spark session stopped!")

Spark session stopped!
