# Mission 9: Big Data Processing on AWS EMR

## PySpark Processing Pipeline - PRODUCTION

This notebook runs the complete pipeline:
1. **Loading**: Images from S3 (full dataset ~90K images)
2. **Feature extraction**: MobileNetV2 with broadcast weights
3. **PCA**: Dimensionality reduction (1280 â†’ 50)
4. **Export**: CSV to S3

### GDPR Compliance
- Region: **eu-west-1** (Ireland) - European servers
- Data stored and processed in the EU

---
## 1. Spark Configuration for EMR

In [None]:
# Set YARN as master for EMR cluster
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--master yarn pyspark-shell'

from pyspark.sql import SparkSession

# Create Spark session with optimized settings
spark = SparkSession.builder \
    .appName("Mission9_Fruits_PROD") \
    .config("spark.sql.parquet.writeLegacyFormat", "true") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .enableHiveSupport() \
    .getOrCreate()

sc = spark.sparkContext
print(f"Spark version: {spark.version}")
print(f"Master: {sc.master}")
print(f"Application ID: {sc.applicationId}")

---
## 1.1 Dependencies Fix (urllib3/OpenSSL)

> **Important**: EMR uses Python 3.7 with OpenSSL 1.0.2k. 
> urllib3 v2.0 requires OpenSSL 1.1.1+. This cell fixes compatibility.

In [None]:
# Fix urllib3/OpenSSL compatibility + install boto3
import sys
import subprocess
import os

# Install to /tmp (writable) and add to path FIRST
install_dir = '/tmp/pip_packages'
os.makedirs(install_dir, exist_ok=True)

# Add to path BEFORE any imports
if install_dir not in sys.path:
    sys.path.insert(0, install_dir)

# Remove old urllib3 from cache if loaded
mods_to_remove = [m for m in sys.modules if m.startswith('urllib3') or m.startswith('boto')]
for m in mods_to_remove:
    del sys.modules[m]

result = subprocess.run(
    [sys.executable, '-m', 'pip', 'install', '--target', install_dir, '--upgrade', '--force-reinstall',
     'urllib3<2.0', 'boto3'],
    capture_output=True, text=True
)
print("Install completed" if result.returncode == 0 else f"Error: {result.stderr[-300:]}")

# Verify
import urllib3
import boto3
print(f"urllib3: {urllib3.__version__} from {urllib3.__file__}")
print(f"boto3: {boto3.__version__} from {boto3.__file__}")

---
## 2. S3 Configuration - Full Dataset

In [None]:
import boto3

# Auto-discover S3 bucket name
BUCKET_NAME = [b['Name'] for b in boto3.client('s3').list_buckets()['Buckets'] if 'mission9-data' in b['Name']][0]

# Define S3 paths (Training only - ~67K images)
BUCKET = f's3://{BUCKET_NAME}'
PATH_Data = BUCKET + '/fruits-360_dataset/fruits-360/Training'
PATH_Result = BUCKET + '/Results'
PATH_PCA = BUCKET + '/Results_PCA'
PATH_CSV = BUCKET + '/Results_CSV'

print(f'BUCKET: {BUCKET}')
print(f'Dataset: Training only')

---
## 3. Import Libraries

In [None]:
# Data processing
import pandas as pd
from PIL import Image
import numpy as np
import io

# TensorFlow / MobileNetV2
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

# PySpark ML
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.sql.types import ArrayType, FloatType
from pyspark.ml.feature import PCA, StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT

tf.get_logger().setLevel('ERROR')
print(f"TensorFlow: {tf.__version__}")

---
## 4. Load Images from S3

In [None]:
print(f"Loading images from: {PATH_Data}")

# Load all JPG images recursively from S3
images = spark.read.format("binaryFile") \
    .option("pathGlobFilter", "*.jpg") \
    .option("recursiveFileLookup", "true") \
    .load(PATH_Data)

# Extract label from folder name
images = images.withColumn('label', element_at(split(images['path'], '/'), -2))
images.cache()

total_images = images.count()
labels = images.select('label').distinct().count()

print(f"Images loaded: {total_images:,}")
print(f"Labels: {labels}")
print(f"Partitions: {images.rdd.getNumPartitions()}")
images.select('path', 'label').show(5, truncate=60)

---
## 5. MobileNetV2 Model + Broadcast

In [None]:
print("Loading MobileNetV2...")

# Load pre-trained model (ImageNet weights)
model = MobileNetV2(weights='imagenet', include_top=True, input_shape=(224, 224, 3))

# Remove last layer to get 1280-dim features
new_model = Model(inputs=model.input, outputs=model.layers[-2].output)

# Freeze weights (inference only)
for layer in new_model.layers:
    layer.trainable = False

# Broadcast weights to all workers
broadcast_weights = sc.broadcast(new_model.get_weights())

print(f"Model loaded - Output: {new_model.output_shape}")
print(f"Weights broadcasted ({len(new_model.get_weights())} arrays)")

---
## 6. Featurization Functions (Pandas UDF)

In [None]:
# Recreate model on workers with broadcasted weights
def model_fn():
    model = MobileNetV2(weights='imagenet', include_top=True, input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input, outputs=model.layers[-2].output)
    new_model.set_weights(broadcast_weights.value)
    return new_model

# Preprocess image for MobileNetV2
def preprocess(content):
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

# Extract features for a batch
def featurize_series(model, content_series):
    input_data = np.stack(content_series.map(preprocess))
    preds = model.predict(input_data, verbose=0)
    output = [p.flatten() for p in preds]
    return pd.Series(output)

# Pandas UDF for distributed processing
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

print("Featurization functions defined")

---
## 7. Feature Extraction (Distributed)

In [None]:
import time

# Optimize batch size for Arrow
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")
NUM_PARTITIONS = 48

print(f"Extracting features... (partitions: {NUM_PARTITIONS})")
start_time = time.time()

# Repartition and apply featurization UDF
features_df = images.repartition(NUM_PARTITIONS).select(
    col("path"),
    col("label"),
    featurize_udf("content").alias("features")
)

# Save to Parquet on S3
features_df.write.mode("overwrite").parquet(PATH_Result)

elapsed = time.time() - start_time
print(f"Done in {elapsed/60:.1f} min -> {PATH_Result}")

---
## 8. Dimensionality Reduction with PCA (Spark ML)

In [None]:
# Reload features from Parquet
features_df = spark.read.parquet(PATH_Result)
print(f"Features loaded: {features_df.count()} images")

# Convert array to Spark ML Vector
@udf(VectorUDT())
def array_to_vector(arr):
    return Vectors.dense(arr)

features_vec_df = features_df.withColumn("features_vec", array_to_vector(col("features")))

In [None]:
# Standardize features (mean=0, std=1)
print("Standardization...")
scaler = StandardScaler(
    inputCol="features_vec",
    outputCol="features_scaled",
    withStd=True,
    withMean=True
)
scaler_model = scaler.fit(features_vec_df)
features_scaled_df = scaler_model.transform(features_vec_df)
print("Standardization done")

In [None]:
# PCA: reduce 1280 dimensions to 50
N_COMPONENTS = 50

print(f"PCA in progress ({N_COMPONENTS} components)...")
pca = PCA(
    k=N_COMPONENTS,
    inputCol="features_scaled",
    outputCol="pca_features"
)
pca_model = pca.fit(features_scaled_df)
features_pca_df = pca_model.transform(features_scaled_df)

# Calculate explained variance
explained_variance = pca_model.explainedVariance.toArray()
cumulative_variance = np.cumsum(explained_variance)
print("PCA done")
print(f"Total explained variance: {cumulative_variance[-1]*100:.2f}%")

---
## 9. Export Results to S3

In [None]:
# Convert Vector back to array for export
@udf(ArrayType(FloatType()))
def vector_to_array(v):
    return v.toArray().tolist()

# Select final columns
output_df = features_pca_df.select(
    "path",
    "label",
    vector_to_array("pca_features").alias("pca_features")
)

# Save as Parquet
output_df.write.mode("overwrite").parquet(PATH_PCA)
print(f"Parquet saved: {PATH_PCA}")

In [None]:
# Export to CSV with individual columns (f_0, f_1, ..., f_49)
print("CSV export...")
csv_df = output_df.select("label", "pca_features")

# Create individual feature columns
for i in range(N_COMPONENTS):
    csv_df = csv_df.withColumn(f"f_{i}", col("pca_features")[i])

feature_cols = [f"f_{i}" for i in range(N_COMPONENTS)]
csv_df = csv_df.select("label", *feature_cols)

# Save as single CSV file
csv_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(PATH_CSV)
print(f"CSV saved: {PATH_CSV}")

---
## 10. Validation and Summary

In [None]:
# Final validation
final_count = output_df.count()
sample = output_df.limit(5).toPandas()

print("=" * 60)
print("SUMMARY - MISSION 9")
print("=" * 60)
print(f"Images processed:      {final_count:,}")
print(f"Classes (labels):      {labels}")
print(f"Original dimension:    1280 (MobileNetV2)")
print(f"PCA dimension:         {N_COMPONENTS}")
print(f"Explained variance:    {cumulative_variance[-1]*100:.2f}%")
print("-" * 60)
print(f"S3 Bucket:             {BUCKET}")
print(f"Parquet:               {PATH_PCA}")
print(f"CSV:                   {PATH_CSV}")
print("=" * 60)
print("\nSample data (5 rows):")
display(sample) if 'display' in dir() else print(sample)

In [None]:
# Clean shutdown
spark.stop()
print("Spark session stopped")