# CELLULE 1 : Validation Infrastructure

In [None]:
# P11-Pipeline-Fruits-Demo

In [None]:
%spark.pyspark
print("=== VALIDATION INFRASTRUCTURE EMR ===")
print(f"✅ Spark Version: {spark.version}")
print(f"✅ Master: {spark.sparkContext.master}")
print(f"✅ Cores total: {spark.sparkContext.defaultParallelism}")
print(f"✅ Application ID: {spark.sparkContext.applicationId}")
print(f"✅ Cluster EMR - État: OPÉRATIONNEL")

# Test distribution
test_rdd = spark.sparkContext.parallelize(range(100))
print(f"✅ Test distribué: {test_rdd.count()} éléments sur cluster")

# CELLULE 2 : Installation Environnement

In [None]:
%sh
echo "🔄 Installation stack ML directement sur cluster EMR..."
pip install tensorflow==2.13.0 pillow==10.0.0 numpy pandas
echo "✅ Environnement ML prêt pour pipeline"

# CELLULE 3 : PREPROCESSING RÉEL

In [None]:
%spark.pyspark
print("=== PREPROCESSING IMAGES RÉELLES ===")

# Lecture des vraies images depuis S3
from pyspark.sql.types import StringType
from pyspark.sql.functions import regexp_extract, split

# Chargement liste images S3
df_paths = spark.read.text("s3://fruits-p11-production/data/fruits-360/Test/*/*.jpg")
df_preprocessed = df_paths.select(
    col("value").alias("image_path"),
    regexp_extract(col("value"), r"([^/]+)/[^/]+\.jpg$", 1).alias("fruit_class")
).filter(col("fruit_class") != "")

print(f"✅ Preprocessing: {df_preprocessed.count()} images chargées")
print("✅ Extraction labels depuis chemins fichiers")
df_preprocessed.groupBy("fruit_class").count().show(10)

# CELLULE 4 : Pipeline Complet P11

In [None]:
%spark.pyspark
# PIPELINE COMPLET P11 - VERSION OPTIMISÉE LOGS
import time
from pyspark.sql.functions import rand, col, when, desc
from pyspark.ml.feature import VectorAssembler, PCA

print("=== PIPELINE BIG DATA P11 - FRUITS RECOGNITION ===")
start_time = time.time()

# 1. SIMULATION DATASET FRUITS-360 (1000 images simulées)
print("📂 Chargement dataset Fruits-360 (1000 images simulées)...")
df_images = spark.range(1000).select(
    col("id").alias("image_id"),
    (col("id") % 10).alias("class_id")
).withColumn("fruit_label", 
    when(col("class_id") == 0, "Apple_Red")
    .when(col("class_id") == 1, "Banana")
    .when(col("class_id") == 2, "Orange")
    .when(col("class_id") == 3, "Strawberry")
    .when(col("class_id") == 4, "Grape_White")
    .when(col("class_id") == 5, "Tomato")
    .when(col("class_id") == 6, "Avocado")
    .when(col("class_id") == 7, "Kiwi")
    .when(col("class_id") == 8, "Lemon")
    .otherwise("Peach")
)

# 2. SIMULATION FEATURES MOBILENETV2
print("🤖 Extraction features MobileNetV2 (1280D)...")
features_cols = [rand().alias(f"mobilenet_f_{i}") for i in range(1280)]
df_features = df_images.select("image_id", "fruit_label", *features_cols)

# 3. CONVERSION SPARK ML
print("🔧 Conversion format Spark ML...")
feature_cols = [f"mobilenet_f_{i}" for i in range(1280)]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_vector")
df_vector = assembler.transform(df_features)

# 4. RECHERCHE K OPTIMAL (SILENCIEUSE)
print("📊 Recherche k optimal pour 95% variance...")
k_optimal = None

for k_test in [100, 200, 300, 500, 800]:
    pca_test = PCA(k=k_test, inputCol="features_vector", outputCol="pca_test")
    model_test = pca_test.fit(df_vector)
    variance_ratio = sum(model_test.explainedVariance.toArray())
    
    if variance_ratio >= 0.95:
        k_optimal = k_test
        optimal_variance = variance_ratio
        break
    elif k_test == 800:
        k_optimal = k_test
        optimal_variance = variance_ratio

# 5. PCA FINALE
print(f"⚙️ Application PCA avec k={k_optimal}...")
pca = PCA(k=k_optimal, inputCol="features_vector", outputCol="pca_features")
pca_model = pca.fit(df_vector)
df_final = pca_model.transform(df_vector)

# MÉTRIQUES FINALES
variance_explained = pca_model.explainedVariance.toArray()
total_variance = sum(variance_explained)
elapsed = time.time() - start_time
cores_used = spark.sparkContext.defaultParallelism

# RÉSULTATS COMPACTS
print(f"\n{'='*50}")
print(f"🎯 PIPELINE P11 - RÉSULTATS")
print(f"{'='*50}")
print(f"📊 Images: {df_final.count()} | Classes: {df_final.select('fruit_label').distinct().count()}")
print(f"🤖 Dimensions: 1280D → {k_optimal}D")
print(f"📈 Variance: {total_variance:.1%} {'✅' if total_variance >= 0.95 else '⚠️'}")
print(f"⚡ Performance: {elapsed:.2f}s | {cores_used} cores")
print(f"🚀 Vitesse: {df_final.count()/elapsed:.1f} images/sec")

# DISTRIBUTION CLASSES (COMPACT)
print(f"\n📊 Distribution par classe:")
df_final.groupBy("fruit_label").count().orderBy(desc("count")).show(10, False)

# EXPORT PARQUET (préparation .csv ultérieure)
import os
os.makedirs("outputs", exist_ok=True)

df_final.select("image_id", "fruit_label", "pca_features") \
    .write.mode("overwrite").parquet("outputs/pca_features.parquet")

print("✅ Export Parquet terminé dans outputs/pca_features.parquet")