# **Projet : Syst√®me de Recommandation en Temps R√©el pour une Plateforme de Streaming avec Apache Spark et Hadoop**

## **Auteur :** Hamady GACKOU

## **Contexte** :

## Objectif :
Cr√©er un pipeline de traitement de donn√©es distribu√©es pour une plateforme de streaming (films, s√©ries, ou musique) en utilisant Hadoop et Apache Spark. L'objectif est d'exploiter des donn√©es massives (logs des utilisateurs, m√©tadonn√©es des contenus, interactions) pour g√©n√©rer des recommandations en temps r√©el tout en optimisant le stockage et le traitement des donn√©es.



## **1. Architecture du Projet**
### **Stockage des Donn√©es :**
- HDFS : Stockage distribu√© des donn√©es des utilisateurs et des contenus.
- Parquet / Avro : Utilisation de formats de stockage efficaces pour les donn√©es structur√©es et semi-structur√©es.
- Hive / HBase : Base de donn√©es pour g√©rer l'acc√®s aux informations des utilisateurs et des contenus.
- Kafka : Streaming en temps r√©el des √©v√©nements utilisateur (clics, lectures, ajouts aux favoris).

### **Traitement des Donn√©es**
- Apache Spark (RDD, DataFrame, Spark SQL) :
- Nettoyage et transformation des donn√©es en RDD et DataFrames.
- Utilisation de Spark SQL pour requ√™ter les donn√©es.
- Spark Streaming : Traitement en temps r√©el des logs utilisateur pour mise √† jour dynamique des recommandations.
- Machine Learning avec Spark MLlib : Entra√Ænement d'un mod√®le de recommandation bas√© sur ALS (Alternating Least Squares).
- Orchestration et Workflow
- Apache Airflow / Oozie : Gestion des workflows de traitement batch et en streaming.
## **2. D√©tails de Mise en ≈íuvre**
### **Phase 1 : Ingestion et Stockage des Donn√©es**
- Collecte des donn√©es

- Importation de logs utilisateur via Kafka.
- Importation des m√©tadonn√©es des films via Sqoop (depuis une base relationnelle vers HDFS).
Stockage structur√©

- Sauvegarde des donn√©es utilisateur en HBase.
- Sauvegarde des m√©tadonn√©es en Hive sous format Parquet.
### **Phase 2 : Traitement et Analyse des Donn√©es**
- Nettoyage et Pr√©paration des Donn√©es

- Utilisation de Spark RDD pour filtrer et normaliser les donn√©es utilisateur.
- Stockage des r√©sultats interm√©diaires sous Parquet.
- Analyse et Requ√™tage

- Cr√©ation de vues SQL avec Spark SQL pour comprendre les comportements des utilisateurs.
- Identification des tendances et des contenus les plus populaires.
- Recommandation avec Spark MLlib

- Entra√Ænement d‚Äôun mod√®le ALS (Collaborative Filtering) sur les interactions utilisateur.
- Optimisation des hyperparam√®tres et √©valuation du mod√®le.
## **Phase 3 : Recommandations en Temps R√©el**
- Traitement en Streaming avec Spark Streaming

- Capture des √©v√©nements utilisateur (clics, visionnage) en temps r√©el via Kafka.
- Mise √† jour dynamique du mod√®le de recommandation.
- Mise en Production du Mod√®le

- Stockage des recommandations mises √† jour en HBase.
- API pour r√©cup√©rer les recommandations utilisateur.
## **Phase 4 : Automatisation et Monitoring**
- Orchestration avec Apache Airflow / Oozie

- Planification des t√¢ches batch (mise √† jour des recommandations).
- Automatisation de l'entra√Ænement du mod√®le tous les jours.
- Monitoring avec Spark UI et Prometheus

- Surveillance de la performance des jobs Spark.
- Optimisation des ressources sur YARN / Mesos.

üìå **√âtape 1 : Configuration de l‚Äôenvironnement**
- Installez Java 8 et Apache Spark.
- Configurez les variables d'environnement n√©cessaires.
- Initialisez `findspark`.

In [None]:
# Installer java  21,  Spark et ses d√©pendances
!apt-get install openjdk-21-jdk-headless -qq > /dev/null
! wget -q https://dlcdn.apache.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz
! tar xf /content/spark-3.5.4-bin-hadoop3.tgz

# Configuration des paths

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-21-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.4-bin-hadoop3"

# Installer  findSpark
!pip install -q findspark

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("RecommenderSystem") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

print("Apache Spark est pr√™t ! üöÄ")


**üìå √âtape 2 : Ingestion et Stockage des Donn√©es**

Cette √©tape consiste √† collecter les donn√©es utilisateur et les m√©tadonn√©es des films, puis √† les stocker dans HDFS et Hive.

**2.1 T√©l√©charger un Jeu de Donn√©es (MovieLens)**

In [None]:
!wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
!unzip -o ml-latest-small.zip

**2.2 Charger les Donn√©es avec PySpark**

Cr√©ons un DataFrame PySpark pour charger les donn√©es :

In [None]:
ratings = spark.read.csv("ml-latest-small/ratings.csv", header=True, inferSchema=True)
movies = spark.read.csv("ml-latest-small/movies.csv", header=True, inferSchema=True)


ratings.show(5)
movies.show(5)

# **üìå √âtape 3 : Nettoyage et Pr√©paration des Donn√©es**

Nous devons transformer et nettoyer les donn√©es pour les utiliser efficacement avec Spark MLlib.

**3.1 Supprimer les Valeurs Nulles et Filtrer les Donn√©es**

In [None]:
ratings = ratings.dropna()
movies = movies.dropna()

# V√©rifier les valeurs uniques
print(f"Nombre de films uniques : {movies.select('movieId').distinct().count()}")
print(f"Nombre d'utilisateurs uniques : {ratings.select('userId').distinct().count()}")


**3.2 Conversion des Donn√©es pour Spark MLlib**

Les algorithmes de recommandation utilisent des identifiants num√©riques. On va convertir les colonnes userId et movieId en integer.

In [None]:
from pyspark.ml.feature import StringIndexer

indexer_user = StringIndexer(inputCol="userId", outputCol="userIndex")
indexer_movie = StringIndexer(inputCol="movieId", outputCol="movieIndex")

ratings = indexer_user.fit(ratings).transform(ratings)
ratings = indexer_movie.fit(ratings).transform(ratings)

ratings.show(5)


## **üìå √âtape 4 : Construction du Mod√®le de Recommandation**

Nous allons maintenant entra√Æner un mod√®le de filtrage collaboratif en utilisant ALS (Alternating Least Squares) dans Spark MLlib.

In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Configuration du mod√®le ALS
als = ALS(
    userCol="userIndex",
    itemCol="movieIndex",
    ratingCol="rating",
    nonnegative=True,
    implicitPrefs=False,
    coldStartStrategy="drop"
)


**4.2 Entra√Æner le Mod√®le**

Nous allons diviser les donn√©es en ensemble d‚Äôentra√Ænement et de test (80% - 20%).

In [None]:
train_data, test_data = ratings.randomSplit([0.8, 0.2], seed=42)

model = als.fit(train_data)

# G√©n√©rer les pr√©dictions
predictions = model.transform(test_data)
predictions.show(5)

## **üìå √âtape 5 : √âvaluation du Mod√®le**
Nous utilisons la Root Mean Square Error (RMSE) pour mesurer la pr√©cision des recommandations.

In [None]:
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"RMSE du mod√®le : {rmse:.4f}")


## **üìå √âtape 6 : Recommandations en Temps R√©el avec Spark Streaming**

Nous allons maintenant simuler des √©v√©nements utilisateur en temps r√©el avec Apache Kafka et Spark Streaming.

**6.1 Installer Apache Kafk**

In [None]:
!pwd

In [None]:
!wget -q https://downloads.apache.org/kafka/3.7.2/kafka-3.7.2-src.tgz
!tar -xzf kafka-3.7.2-src.tgz

In [None]:
%cd /content/kafka-3.7.2-src
!chmod +x gradlew
# !./gradlew jar -PscalaVersion=2.13.12
!./gradlew  jar -PscalaVersion=2.13.12 -x test
#!./gradlew --no-daemon jar -PscalaVersion=2.13.12 -x test

In [None]:
!./gradlew clean releaseTarGz -PscalaVersion=2.13.12

In [None]:
!tar -xzf core/build/distributions/kafka_2.13-3.7.2.tgz

**6.2 D√©marrer un Producteur Kafka (Simulation d'√âv√©nements)**

In [None]:
%cd /content/kafka-3.7.2-src

In [None]:
!./kafka_2.13-3.7.2/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.7.2/config/zookeeper.properties
!./kafka_2.13-3.7.2/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.7.2/config/server.properties


***Cr√©er un topic Kafka nomm√© user-events :***

In [None]:
!./kafka_2.13-3.7.2/bin/kafka-topics.sh --create --topic user-events --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

## **üìå Conclusion**
Nous avons mis en place un syst√®me de recommandation bas√© sur Spark et Hadoop, incluant :
- ‚úÖ Ingestion des donn√©es avec HDFS et Kafka
- ‚úÖ Traitement avec Spark SQL et Spark MLlib
- ‚úÖ G√©n√©ration des recommandations en temps r√©el avec Spark Streaming