# P11 - Traitement Big Data Cloud : Classification de Fruits

**Projet** : OpenClassrooms AI Engineer P11  
**Environnement** : AWS EMR Notebook  
**Dataset** : Fruits-360 (Kaggle)  

## Pipeline PySpark

1. Chargement des images depuis S3
2. Extraction de features avec TensorFlow MobileNetV2
3. Broadcast des poids du mod√®le (optimisation distribu√©e)
4. R√©duction de dimension avec PCA
5. Sauvegarde des r√©sultats sur S3

---

## ‚ö†Ô∏è Configuration EMR

- ‚úÖ SparkSession d√©j√† disponible (variable `spark`)
- ‚úÖ Acc√®s S3 pr√©configur√©
- ‚úÖ Cluster multi-workers pour traitement distribu√©

**Bucket S3** : `oc-p11-fruits-david-scanu`

---
## 1. Configuration et Setup

### 1.1 D√©sactivation des warnings

In [None]:
import os
import warnings
import logging

# D√©sactiver les warnings TensorFlow
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

# D√©sactiver les warnings Python
warnings.filterwarnings('ignore')

# Configurer le logging PySpark
logging.getLogger('py4j').setLevel(logging.ERROR)
logging.getLogger('pyspark').setLevel(logging.ERROR)

print("‚úÖ Configuration des warnings appliqu√©e")

### 1.2 Installation de TensorFlow (si n√©cessaire)

Sur EMR Notebooks, TensorFlow n'est pas toujours pr√©-install√©.

In [None]:
# V√©rifier si TensorFlow est install√©
try:
    import tensorflow as tf
    print(f"‚úÖ TensorFlow d√©j√† install√©: {tf.__version__}")
except ImportError:
    print("‚è≥ Installation de TensorFlow...")
    !pip install tensorflow==2.16.1 -q
    print("‚úÖ TensorFlow install√©")
    print("‚ö†Ô∏è  IMPORTANT: Red√©marrez le kernel (Kernel ‚Üí Restart Kernel)")

### 1.3 Import des librairies

In [None]:
# PySpark imports
from pyspark.sql.functions import col, pandas_udf, element_at, split, udf
from pyspark.sql.types import ArrayType, FloatType, StringType
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors, VectorUDT

# TensorFlow imports
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

# Autres imports
from PIL import Image
import pandas as pd
import numpy as np
import io
import time

print("‚úÖ Imports r√©ussis")

### 1.4 Configuration des chemins S3

In [None]:
# ============================================================
# CONFIGURATION S3
# ============================================================

BUCKET_NAME = "oc-p11-fruits-david-scanu"

# Chemins S3
S3_INPUT_PATH = f"s3://{BUCKET_NAME}/data/raw/Training/"
S3_FEATURES_OUTPUT = f"s3://{BUCKET_NAME}/data/features/"
S3_PCA_OUTPUT = f"s3://{BUCKET_NAME}/data/pca/"

print(f"üì¶ Bucket S3: {BUCKET_NAME}")
print(f"üì• Input: {S3_INPUT_PATH}")
print(f"üì§ Features output: {S3_FEATURES_OUTPUT}")
print(f"üì§ PCA output: {S3_PCA_OUTPUT}")

### 1.5 Configuration de Spark

Sur EMR Notebooks, la variable `spark` est d√©j√† disponible.

In [None]:
# Configuration du niveau de log
spark.sparkContext.setLogLevel("WARN")

# R√©cup√©rer le SparkContext pour le broadcast
sc = spark.sparkContext

print(f"‚úÖ SparkSession EMR configur√©e")
print(f"   Version Spark: {spark.version}")
print(f"   Master: {spark.sparkContext.master}")
print(f"   App Name: {spark.sparkContext.appName}")

---
## 2. Chargement des Donn√©es depuis S3

### 2.1 Modes de chargement

| Mode | Nombre d'images | Usage |
|------|-----------------|-------|
| **MINI** | 100-500 | Tests rapides |
| **APPLES** | ~6,400 | Validation |
| **FULL** | ~67,000 | Production compl√®te |

In [None]:
# ============================================================
# CONFIGURATION DU MODE DE CHARGEMENT
# ============================================================

# D√©commenter UNE SEULE option :

# MODE 1: MINI TEST (100 images) - RECOMMAND√â pour d√©buter
TEST_MODE = "mini"
MAX_IMAGES = 100

# MODE 2: SUBSET POMMES (~6,400 images)
# TEST_MODE = "apples"

# MODE 3: DATASET COMPLET (~67,000 images)
# TEST_MODE = "full"

# ============================================================
# CHARGEMENT DES IMAGES
# ============================================================

if TEST_MODE == "mini":
    print(f"üîç Mode: MINI TEST ({MAX_IMAGES} images)")
    image_path = f"{S3_INPUT_PATH}Apple*/*.jpg"
    df_images = spark.read.format("binaryFile").load(image_path).limit(MAX_IMAGES)
    
elif TEST_MODE == "apples":
    print(f"üîç Mode: SUBSET POMMES (~6,400 images)")
    image_path = f"{S3_INPUT_PATH}Apple*/*.jpg"
    df_images = spark.read.format("binaryFile").load(image_path)
    
elif TEST_MODE == "full":
    print(f"üîç Mode: DATASET COMPLET (~67,000 images)")
    image_path = f"{S3_INPUT_PATH}*/*.jpg"
    df_images = spark.read.format("binaryFile").load(image_path)

else:
    raise ValueError(f"Mode inconnu: {TEST_MODE}")

# Afficher le r√©sultat
num_images = df_images.count()
print(f"‚úÖ {num_images} images charg√©es depuis S3")
print(f"\nüëÄ Aper√ßu:")
df_images.show(5, truncate=60)

### 2.2 Extraction des labels

In [None]:
# Extraire le label depuis le chemin
# s3://bucket/data/raw/Training/Apple Braeburn/image.jpg -> Apple Braeburn

df_with_labels = df_images.withColumn(
    "label",
    element_at(split(col("path"), "/"), -2)
)

print(f"‚úÖ Labels extraits")
print(f"\nüìä Distribution des classes:")
df_with_labels.groupBy("label").count().orderBy("label").show(10, truncate=False)

---
## 3. Extraction de Features avec MobileNetV2

### 3.1 Chargement du mod√®le

In [None]:
# Charger MobileNetV2 sans la couche de classification
model = MobileNetV2(
    weights='imagenet',
    include_top=False,
    pooling='avg'
)

print("‚úÖ Mod√®le MobileNetV2 charg√©")
print(f"   Input shape: {model.input_shape}")
print(f"   Output shape: {model.output_shape}")
print(f"   Dimension des features: {model.output_shape[1]}")

### 3.2 Broadcast des poids du mod√®le

**Optimisation critique** : Le broadcast distribue les poids une seule fois √† tous les workers, √©vitant des t√©l√©chargements r√©p√©t√©s.

In [None]:
# Extraire les poids
model_weights = model.get_weights()

print(f"üì¶ Nombre de tenseurs de poids: {len(model_weights)}")
print(f"üì¶ Taille en m√©moire: {sum([w.nbytes for w in model_weights]) / 1024 / 1024:.2f} MB")

# Broadcaster les poids
broadcast_weights = sc.broadcast(model_weights)

print("‚úÖ Poids broadcast√©s √† tous les workers")

### 3.3 D√©finition de la Pandas UDF

La Pandas UDF permet d'appliquer TensorFlow de mani√®re distribu√©e sur le cluster.

In [None]:
# Sch√©ma de sortie : array de 1280 floats
features_schema = ArrayType(FloatType())

@pandas_udf(features_schema)
def extract_features_udf(content_series: pd.Series) -> pd.Series:
    """
    Extrait les features avec MobileNetV2.
    Ex√©cut√© sur chaque worker Spark.
    """
    # Reconstruire le mod√®le dans le worker
    local_model = MobileNetV2(
        weights=None,
        include_top=False,
        pooling='avg'
    )
    
    # Charger les poids broadcast√©s
    local_model.set_weights(broadcast_weights.value)
    
    def process_image(content):
        try:
            # Charger l'image
            img = Image.open(io.BytesIO(content))
            
            # Convertir en RGB
            if img.mode != 'RGB':
                img = img.convert('RGB')
            
            # Redimensionner (224x224)
            img = img.resize((224, 224))
            
            # Convertir en array
            img_array = img_to_array(img)
            img_array = np.expand_dims(img_array, axis=0)
            img_array = preprocess_input(img_array)
            
            # Extraire les features
            features = local_model.predict(img_array, verbose=0)
            
            return features[0].tolist()
            
        except Exception as e:
            print(f"Erreur: {e}")
            return None
    
    return content_series.apply(process_image)

print("‚úÖ Pandas UDF d√©finie")

### 3.4 Extraction des features

In [None]:
print("‚è≥ Extraction des features...")
start_time = time.time()

# Appliquer l'extraction
df_features = df_with_labels.withColumn(
    "features",
    extract_features_udf(col("content"))
)

# Filtrer les erreurs
df_features = df_features.filter(col("features").isNotNull())

# Cache pour r√©utilisation
df_features.cache()
count = df_features.count()

elapsed_time = time.time() - start_time

print(f"‚úÖ Features extraites pour {count} images")
print(f"   Temps d'ex√©cution: {elapsed_time:.2f} secondes")
print(f"   Vitesse: {count / elapsed_time:.2f} images/seconde")
df_features.select("label", "features").show(5, truncate=60)

---
## 4. R√©duction de Dimension avec PCA

### 4.1 Pr√©paration des donn√©es

In [None]:
# Convertir array ‚Üí vecteur dense pour PCA
array_to_vector = udf(lambda a: Vectors.dense(a), VectorUDT())

df_for_pca = df_features.withColumn(
    "features_vector",
    array_to_vector(col("features"))
)

df_for_pca.cache()
count = df_for_pca.count()

print(f"‚úÖ {count} vecteurs pr√©par√©s pour PCA")

### 4.2 Application de la PCA

R√©duction de 1280 dimensions ‚Üí 200 dimensions

In [None]:
K_COMPONENTS = 200

print(f"‚è≥ Application de la PCA (1280 ‚Üí {K_COMPONENTS} dimensions)...")
start_time = time.time()

# Cr√©er et entra√Æner le mod√®le PCA
pca = PCA(
    k=K_COMPONENTS,
    inputCol="features_vector",
    outputCol="pca_features"
)

pca_model = pca.fit(df_for_pca)

# Appliquer la transformation
df_pca = pca_model.transform(df_for_pca)

df_pca.cache()
count = df_pca.count()

elapsed_time = time.time() - start_time

print(f"‚úÖ PCA appliqu√©e avec succ√®s !")
print(f"   Dimensions: 1280 ‚Üí {K_COMPONENTS}")
print(f"   Images: {count}")
print(f"   Temps: {elapsed_time:.2f} secondes")

df_pca.select("label", "pca_features").show(5, truncate=60)

### 4.3 Analyse de la variance expliqu√©e

In [None]:
# Variance expliqu√©e
explained_variance = pca_model.explainedVariance

print(f"üìä Variance expliqu√©e:")
print(f"   Total: {sum(explained_variance):.4f}")
print(f"   Top 10 composantes:")
for i, var in enumerate(explained_variance[:10]):
    print(f"   PC{i+1}: {var:.6f}")

# Variance cumul√©e
cumsum_variance = np.cumsum(explained_variance)
print(f"\n   Variance cumul√©e (50 premi√®res composantes): {cumsum_variance[49]:.4f}")
print(f"   Variance cumul√©e (toutes {K_COMPONENTS} composantes): {cumsum_variance[-1]:.4f}")

---
## 5. Sauvegarde des R√©sultats sur S3

### 5.1 Sauvegarde en Parquet

In [None]:
# S√©lectionner les colonnes pertinentes
df_final = df_pca.select("path", "label", "pca_features")

# Sauvegarder en Parquet sur S3
pca_output_path = S3_PCA_OUTPUT + "pca_results"

print(f"‚è≥ Sauvegarde sur S3...")
df_final.write.mode("overwrite").parquet(pca_output_path)

print(f"‚úÖ R√©sultats PCA sauvegard√©s: {pca_output_path}")

### 5.2 Sauvegarde en CSV (optionnel)

In [None]:
# Convertir vecteur ‚Üí string pour CSV
def vector_to_string(v):
    if v is None:
        return None
    return ",".join([str(float(x)) for x in v.toArray()])

vector_to_string_udf = udf(vector_to_string, StringType())

df_final_csv = df_final.withColumn(
    "pca_features_string",
    vector_to_string_udf(col("pca_features"))
).select("path", "label", "pca_features_string")

# Sauvegarder en CSV
csv_output_path = S3_PCA_OUTPUT + "pca_results_csv"

print(f"‚è≥ Sauvegarde CSV sur S3...")
df_final_csv.write.mode("overwrite").option("header", "true").csv(csv_output_path)

print(f"‚úÖ R√©sultats CSV sauvegard√©s: {csv_output_path}")

---
## 6. V√©rification et Nettoyage

### 6.1 V√©rification des fichiers sur S3

In [None]:
# Lister les fichiers cr√©√©s
print("üìÅ Fichiers Parquet:")
!aws s3 ls s3://{BUCKET_NAME}/data/pca/pca_results/ --human-readable

print("\nüìÅ Fichiers CSV:")
!aws s3 ls s3://{BUCKET_NAME}/data/pca/pca_results_csv/ --human-readable

### 6.2 Lib√©ration des ressources

In [None]:
# Unpersist les DataFrames
df_features.unpersist()
df_for_pca.unpersist()
df_pca.unpersist()

# D√©truire le broadcast
broadcast_weights.unpersist()

print("‚úÖ Ressources lib√©r√©es")

---
## üìä R√©sum√© de l'ex√©cution

**Pipeline complet ex√©cut√© :**

1. ‚úÖ Chargement des images depuis S3
2. ‚úÖ Extraction des labels
3. ‚úÖ Extraction de features avec MobileNetV2 (broadcast optimis√©)
4. ‚úÖ R√©duction PCA (1280 ‚Üí 200 dimensions)
5. ‚úÖ Sauvegarde des r√©sultats sur S3 (Parquet + CSV)

**Prochaines √©tapes :**

- Ex√©cuter sur le dataset complet (~67,000 images)
- Analyser les r√©sultats
- Optimiser le nombre de composantes PCA si n√©cessaire
- Arr√™ter le cluster EMR pour √©viter les co√ªts