# Part A: Big Data Platform Setup and Data Preprocessing

**Course**: DSC3108 - Big Data Mining and Analytics  
**Scenario**: Large-Scale Retail Recommendation System

## 1. Big Data Justification (50 words)

The Retail Recommendation scenario involves processing **high-volume transactional data** (millions of rows) with **high velocity** (real-time purchases). Relational databases struggle with such scale and unstructured correlations. A Big Data platform like **Apache Spark** is necessary for distributed processing, enabling scalable collaborative filtering and real-time personalized recommendations.

## 2. Tool Selection: Apache Spark (PySpark)

We use **PySpark** for distributed data processing because:
- In-memory computation for fast iterative algorithms
- Built-in MLlib for scalable machine learning
- Handles data partitioning across nodes automatically

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("RetailRecommendation_Preprocessing") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print(f"✓ Spark {spark.version} initialized")

## 3. Data Acquisition

Load the generated CSV files into Spark DataFrames.

In [None]:
# Load transactions data
df = spark.read.csv("transactions.csv", header=True, inferSchema=True)

print(f"Initial raw count: {df.count():,}")
df.show(5)

In [None]:
# Check schema
df.printSchema()

## 4. Distributed Processing: Data Cleaning and Transformation

### 4.1 Remove Duplicates

In [None]:
df_clean = df.dropDuplicates()
print(f"Removed {df.count() - df_clean.count():,} duplicate rows")

### 4.2 Handle Missing Values

In [None]:
df_clean = df_clean.dropna()
print(f"Cleaned count: {df_clean.count():,}")

### 4.3 Data Type Transformations

In [None]:
# Convert timestamp to proper datetime type
df_clean = df_clean.withColumn("timestamp", to_timestamp(col("timestamp")))

# Ensure correct data types
df_clean = df_clean.withColumn("user_id", col("user_id").cast("integer")) \
                   .withColumn("product_id", col("product_id").cast("integer")) \
                   .withColumn("rating", col("rating").cast("float"))

print("✓ Data types corrected")
df_clean.printSchema()

## 5. Data Quality Verification

In [None]:
# Summary statistics
df_clean.select("rating").summary().show()

# Check for any remaining nulls
from pyspark.sql.functions import col, sum as spark_sum
null_counts = df_clean.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df_clean.columns])
null_counts.show()

## 6. Save Cleaned Data (Optional)

In [None]:
# Save as Parquet for efficient storage and faster loading in Part B
df_clean.write.mode("overwrite").parquet("transactions_clean.parquet")
print("✓ Cleaned data saved to transactions_clean.parquet")

## Summary

**Part A Completed:**
- ✓ Justified Big Data approach
- ✓ Set up Apache Spark platform
- ✓ Acquired and ingested data
- ✓ Performed distributed cleaning and transformation

**Next**: Proceed to Part B for model development.