# Day 1, 2 & 3 Experiment

This notebook walks through the major milestones of the PySpark coding challenge.
It is intended for data scientists and engineers who wish to understand how the
pipeline transforms raw impression and action logs into fixed‑length sequences,
how to compute data quality metrics, and how to analyse the resulting training
data.


## Day 1 – Core pipeline

Day 1 focuses on building the core transformation logic.  We define strict
Spark schemas for each input dataset (impressions, clicks, add‑to‑carts,
previous orders) and normalise heterogeneous actions into a single table.
Each impression row is exploded so that there is one output row per
impression item, and we gather up to a fixed number of historical actions
per customer–impression pair with a configurable look‑back window.  The
resulting sequences are padded or truncated to a fixed length and stored in
`actions` and `action_types` array columns.

Below we demonstrate how to load the synthetic Day 1 data and run the
pipeline end‑to‑end using the functions from `src.transformations`.


In [None]:
from pyspark.sql import SparkSession
from src import config, schema
from src.utils import read_csv_with_schema, coerce_impressions_from_json
from src.transformations import build_training_inputs

# Start a Spark session (requires Java to be installed)
spark = SparkSession.builder.appName('Day123Experiment').getOrCreate()

# Load the small Day 1 datasets
imp = read_csv_with_schema(spark, config.IMPRESSIONS_PATH, schema.impressions_schema)
clk = read_csv_with_schema(spark, config.CLICKS_PATH, schema.clicks_schema)
atc = read_csv_with_schema(spark, config.ADD_TO_CARTS_PATH, schema.add_to_carts_schema)
ord = read_csv_with_schema(spark, config.PREVIOUS_ORDERS_PATH, schema.previous_orders_schema)

# Ensure impressions are parsed if stored as JSON strings
imp = coerce_impressions_from_json(imp)

# Build the training inputs with default look‑back (365 days) and sequence length (1000)
train_df = build_training_inputs(imp, clk, atc, ord)

# Display schema and a sample row
train_df.printSchema()
train_df.show(3, truncate=False)

# Stop the session when done
spark.stop()


## Day 2 – CLI, Parquet export and metrics

On Day 2 we extended the pipeline with a lightweight command‑line
interface, a Parquet writer and a basic set of data quality metrics.  The
CLI can be used to run the entire pipeline from raw CSV files to a
Parquet dataset with optional JSON metrics output.  In code, you can call
`calculate_dq_metrics` directly on the training DataFrame to obtain a
dictionary of summary statistics.


In [None]:
from pyspark.sql import SparkSession
from src import config, schema
from src.utils import read_csv_with_schema, coerce_impressions_from_json
from src.transformations import build_training_inputs
from src.metrics import calculate_dq_metrics

spark = SparkSession.builder.appName('Day2Metrics').getOrCreate()

# Load and process the Day 1 synthetic data
imp = coerce_impressions_from_json(read_csv_with_schema(spark, config.IMPRESSIONS_PATH, schema.impressions_schema))
clk = read_csv_with_schema(spark, config.CLICKS_PATH, schema.clicks_schema)
atc = read_csv_with_schema(spark, config.ADD_TO_CARTS_PATH, schema.add_to_carts_schema)
ord = read_csv_with_schema(spark, config.PREVIOUS_ORDERS_PATH, schema.previous_orders_schema)

train_df = build_training_inputs(imp, clk, atc, ord)

# Compute data quality metrics
metrics = calculate_dq_metrics(train_df)
print(metrics)

spark.stop()


## Day 3 – Scaling and deeper analysis

Day 3 introduces two enhancements:

1. **Larger synthetic dataset**: The project now includes CSV files in the
   `data/` directory with suffix `_1000` (e.g. `impressions_1000.csv`) to
   simulate a data set with 1000 impression rows.  You can point the CLI
   at these files or load them directly in Python for testing the scalability
   of the pipeline.
2. **Advanced analysis helpers**: A new module `src.analysis` exposes
   functions for computing the distribution of historical sequence lengths
   and the per‑position frequency of action types.  These tools help
   diagnose sparsity and bias in the training inputs.

Below we illustrate how to run the pipeline on the larger dataset and
compute these analysis metrics.  Note that running on 1000 impressions
may require more memory than the tiny Day 1 examples.


In [None]:
from pyspark.sql import SparkSession
from src import config, schema
from src.utils import read_csv_with_schema, coerce_impressions_from_json
from src.transformations import build_training_inputs
from src.analysis import sequence_length_distribution, action_type_frequency_by_position

spark = SparkSession.builder.appName('Day3Analysis').getOrCreate()

# Load the larger datasets
imp_big = coerce_impressions_from_json(read_csv_with_schema(spark, config.DATA_DIR + '/impressions_1000.csv', schema.impressions_schema))
clk_big = read_csv_with_schema(spark, config.DATA_DIR + '/clicks_1000.csv', schema.clicks_schema)
atc_big = read_csv_with_schema(spark, config.DATA_DIR + '/add_to_carts_1000.csv', schema.add_to_carts_schema)
ord_big = read_csv_with_schema(spark, config.DATA_DIR + '/previous_orders_1000.csv', schema.previous_orders_schema)

train_big = build_training_inputs(imp_big, clk_big, atc_big, ord_big)

# Compute distribution of non‑zero sequence lengths
length_dist = sequence_length_distribution(train_big)
print('Sequence length distribution:', length_dist)

# Compute action type frequencies for the first 5 positions
type_freq = action_type_frequency_by_position(train_big, max_positions=5)
for i, freq in enumerate(type_freq):
    print(f'Position {i}:', freq)

spark.stop()


## Conclusion

The PySpark coding challenge evolves from a simple transformation
pipeline in Day 1 to a more production‑ready workflow in Day 2, and
finally to a scalable and analysable solution in Day 3.  Using the
helpers provided in `src.analysis` you can gain insight into how
customers interact with products over time and whether the model is
seeing enough meaningful history.  Feel free to extend the analysis
functions or adjust the look‑back window and sequence length to suit
your particular domain.
