# 🎯 ETL Spark - Analyse des Influenceurs YouTube
## Problématique : Identifier et collaborer avec les influenceurs clés

**Objectifs :**
- Cartographie des influenceurs par niche et localisation
- Analyse concurrentielle des partenariats
- Analyse des tendances marché et formats publicitaires
- Sauvegarde structurée dans HDFS avec séparation métadonnées/données

## 1. Configuration de l'environnement Spark
Initialisation de Spark Session et configuration HDFS

In [2]:
# Import des bibliothèques nécessaires
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Configuration Spark avec HDFS
spark = SparkSession.builder \
    .appName("YouTubeInfluencersETL") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .getOrCreate()

# Configuration des chemins HDFS
HDFS_BASE_PATH = "/user/influencers_analysis"
DATA_PATH = f"{HDFS_BASE_PATH}/data"
METADATA_PATH = f"{HDFS_BASE_PATH}/metadata"

print("✅ Spark Session initialisée")
print(f"🔗 Connexion HDFS configurée: {spark.conf.get('spark.hadoop.fs.defaultFS')}")
print(f"📁 Chemin de stockage: {HDFS_BASE_PATH}")

ModuleNotFoundError: No module named 'pyspark'

## 2. Chargement et lecture du fichier CSV
Lecture des données d'influenceurs YouTube

In [None]:
# Chemin du fichier CSV local
csv_file_path = "/workspace/TP-groupe/old/youtube_all_channels_20250828_003821.csv"

# Définition du schéma pour optimiser la lecture
schema = StructType([
    StructField("channel_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("custom_url", StringType(), True),
    StructField("published_at", StringType(), True),
    StructField("country", StringType(), True),
    StructField("default_language", StringType(), True),
    StructField("subscriber_count", LongType(), True),
    StructField("view_count", LongType(), True),
    StructField("video_count", LongType(), True),
    StructField("keywords", StringType(), True),
    StructField("topic_categories", StringType(), True),
    StructField("topic_ids", StringType(), True),
    StructField("privacy_status", StringType(), True),
    StructField("is_linked", BooleanType(), True),
    StructField("made_for_kids", BooleanType(), True),
    StructField("subscriber_tier", StringType(), True)
])

# Chargement du DataFrame
df_raw = spark.read \
    .option("header", "true") \
    .option("inferSchema", "false") \
    .schema(schema) \
    .csv(csv_file_path)

print(f"📊 Données chargées: {df_raw.count()} lignes, {len(df_raw.columns)} colonnes")
print("\n🔍 Aperçu de la structure:")
df_raw.printSchema()

In [None]:
# Affichage des premières lignes
print("📋 Aperçu des données:")
df_raw.select("title", "country", "subscriber_count", "subscriber_tier", "topic_categories").show(5, truncate=False)

# Statistiques de base
print("\n📈 Statistiques descriptives:")
df_raw.describe("subscriber_count", "view_count", "video_count").show()

## 3. Nettoyage et validation des données
Nettoyage des données manquantes et validation des métriques

In [None]:
# Analyse des valeurs manquantes
print("🔍 Analyse des valeurs manquantes:")
missing_analysis = df_raw.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df_raw.columns
])
missing_analysis.show()

# Nettoyage des données
df_clean = df_raw \
    .filter(col("subscriber_count").isNotNull() & (col("subscriber_count") >= 0)) \
    .filter(col("view_count").isNotNull() & (col("view_count") >= 0)) \
    .filter(col("video_count").isNotNull() & (col("video_count") > 0)) \
    .fillna({
        "country": "Unknown",
        "default_language": "Unknown", 
        "keywords": "",
        "topic_categories": "",
        "privacy_status": "public"
    })

print(f"🧹 Après nettoyage: {df_clean.count()} lignes conservées")
print(f"📉 Lignes supprimées: {df_raw.count() - df_clean.count()}")

In [None]:
# Standardisation et validation des données
df_standardized = df_clean \
    .withColumn("published_at", to_date(col("published_at"))) \
    .withColumn("country", upper(trim(col("country")))) \
    .withColumn("default_language", upper(trim(col("default_language")))) \
    .withColumn("topic_categories_list", split(col("topic_categories"), "\\|")) \
    .withColumn("keywords_list", split(col("keywords"), ",")) \
    .withColumn("etl_timestamp", current_timestamp()) \
    .withColumn("data_quality_score", 
                when(col("country") != "UNKNOWN", 1).otherwise(0) +
                when(col("topic_categories") != "", 1).otherwise(0) +
                when(col("keywords") != "", 1).otherwise(0))

print("✅ Données standardisées et validées")
df_standardized.select("channel_id", "country", "data_quality_score").show(5)

## 4. Transformation des données pour l'analyse des influenceurs
Création des métriques d'engagement et scores d'influence

In [None]:
# Calcul des métriques d'engagement
df_metrics = df_standardized \
    .withColumn("avg_views_per_video", 
                when(col("video_count") > 0, col("view_count") / col("video_count")).otherwise(0)) \
    .withColumn("engagement_rate", 
                when(col("subscriber_count") > 0, 
                     (col("view_count") / col("video_count")) / col("subscriber_count") * 100).otherwise(0)) \
    .withColumn("views_per_subscriber", 
                when(col("subscriber_count") > 0, col("view_count") / col("subscriber_count")).otherwise(0)) \
    .withColumn("content_consistency", 
                when(col("video_count") >= 100, "High")
                .when(col("video_count") >= 50, "Medium")
                .otherwise("Low"))

# Score d'influence composite
df_influence = df_metrics \
    .withColumn("subscriber_score", 
                when(col("subscriber_count") >= 10000000, 5)
                .when(col("subscriber_count") >= 1000000, 4)
                .when(col("subscriber_count") >= 100000, 3)
                .when(col("subscriber_count") >= 10000, 2)
                .otherwise(1)) \
    .withColumn("engagement_score",
                when(col("engagement_rate") >= 10, 5)
                .when(col("engagement_rate") >= 5, 4)
                .when(col("engagement_rate") >= 2, 3)
                .when(col("engagement_rate") >= 1, 2)
                .otherwise(1)) \
    .withColumn("influence_score", 
                (col("subscriber_score") * 0.4 + 
                 col("engagement_score") * 0.4 + 
                 col("data_quality_score") * 0.2))

print("📊 Métriques d'engagement calculées")
df_influence.select("title", "subscriber_count", "engagement_rate", "influence_score").show(5)

In [None]:
# Catégorisation par niche
def extract_main_niche(categories_list):
    """Extraire la niche principale depuis les catégories"""
    if not categories_list or len(categories_list) == 0:
        return "General"
    
    # Mapping des catégories vers des niches
    niche_mapping = {
        "Music": ["Music", "Pop music", "Hip hop music", "Rock music", "Electronic music"],
        "Gaming": ["Video game culture", "Action game", "Sports game", "Racing video game"],
        "Lifestyle": ["Lifestyle (sociology)", "Entertainment"],
        "Education": ["Educational", "Technology", "Science"],
        "Sports": ["Sport", "Basketball", "Football"],
        "Film": ["Film", "Television program"]
    }
    
    for niche, keywords in niche_mapping.items():
        for category in categories_list:
            if any(keyword.lower() in category.lower() for keyword in keywords):
                return niche
    
    return "Other"

# UDF pour l'extraction de niche
extract_niche_udf = udf(extract_main_niche, StringType())

df_niches = df_influence \
    .withColumn("main_niche", extract_niche_udf(col("topic_categories_list"))) \
    .withColumn("is_micro_influencer", 
                when((col("subscriber_count").between(10000, 100000)) & 
                     (col("engagement_rate") >= 2), True).otherwise(False)) \
    .withColumn("is_macro_influencer", 
                when(col("subscriber_count") >= 1000000, True).otherwise(False))

print("🎯 Catégorisation par niche terminée")
df_niches.groupBy("main_niche").count().orderBy(desc("count")).show()

## 5. Analyse de la cartographie des influenceurs
Identification des top influenceurs par niche et localisation

In [None]:
# Top influenceurs par niche
print("🏆 TOP 10 INFLUENCEURS PAR NICHE")
print("=" * 50)

niches = df_niches.select("main_niche").distinct().collect()

for niche_row in niches:
    niche = niche_row['main_niche']
    top_in_niche = df_niches \
        .filter(col("main_niche") == niche) \
        .orderBy(desc("influence_score")) \
        .select("title", "country", "subscriber_count", "engagement_rate", "influence_score") \
        .limit(3)
    
    print(f"\n📂 Niche: {niche}")
    top_in_niche.show(truncate=False)

In [None]:
# Analyse géographique des influenceurs
print("🌍 RÉPARTITION GÉOGRAPHIQUE DES INFLUENCEURS")

geo_analysis = df_niches \
    .groupBy("country", "main_niche") \
    .agg(
        count("*").alias("influencer_count"),
        avg("subscriber_count").alias("avg_subscribers"),
        avg("engagement_rate").alias("avg_engagement"),
        avg("influence_score").alias("avg_influence_score")
    ) \
    .orderBy(desc("influencer_count"))

geo_analysis.show(20)

# Métriques d'engagement par pays
print("\n📊 MÉTRIQUES D'ENGAGEMENT PAR PAYS (Top 10)")
country_engagement = df_niches \
    .groupBy("country") \
    .agg(
        count("*").alias("total_influencers"),
        avg("engagement_rate").alias("avg_engagement_rate"),
        sum("subscriber_count").alias("total_reach"),
        countDistinct("main_niche").alias("niche_diversity")
    ) \
    .orderBy(desc("total_influencers"))

country_engagement.show(10)

## 6. Analyse concurrentielle et des partenariats
Détection des collaborations et formats de partenariat

In [None]:
# Analyse des mots-clés pour détecter les partenariats
brand_keywords = ["brand", "sponsor", "partnership", "collaboration", "ad", "promo", 
                  "review", "unboxing", "giveaway", "discount"]

df_partnerships = df_niches \
    .withColumn("keywords_lower", lower(col("keywords"))) \
    .withColumn("has_brand_keywords", 
                when(col("keywords_lower").rlike("|".join(brand_keywords)), True).otherwise(False)) \
    .withColumn("potential_partnerships", 
                when(col("has_brand_keywords") & (col("subscriber_count") >= 50000), "High")
                .when(col("has_brand_keywords"), "Medium")
                .otherwise("Low"))

print("🤝 ANALYSE DES PARTENARIATS POTENTIELS")
partnership_analysis = df_partnerships \
    .groupBy("main_niche", "potential_partnerships") \
    .agg(
        count("*").alias("channel_count"),
        avg("subscriber_count").alias("avg_reach"),
        avg("engagement_rate").alias("avg_engagement")
    ) \
    .orderBy("main_niche", desc("channel_count"))

partnership_analysis.show(20)

In [None]:
# Formats de contenu les plus performants
print("📱 ANALYSE DES FORMATS DE CONTENU PAR PERFORMANCE")

# Simulation de détection de format basée sur les métriques
df_content_formats = df_partnerships \
    .withColumn("avg_video_views", col("view_count") / col("video_count")) \
    .withColumn("content_format_preference",
                when(col("avg_video_views") > 1000000, "Viral Content")
                .when(col("engagement_rate") > 5, "High Engagement")
                .when(col("video_count") > 500, "Consistent Creator")
                .otherwise("Standard Content")) \
    .withColumn("monetization_potential",
                when((col("subscriber_count") >= 100000) & (col("engagement_rate") >= 2), "High")
                .when(col("subscriber_count") >= 10000, "Medium")
                .otherwise("Low"))

format_performance = df_content_formats \
    .groupBy("main_niche", "content_format_preference") \
    .agg(
        count("*").alias("creator_count"),
        avg("subscriber_count").alias("avg_subscribers"),
        avg("engagement_rate").alias("avg_engagement"),
        avg("influence_score").alias("avg_influence")
    ) \
    .orderBy("main_niche", desc("avg_influence"))

format_performance.show(30, truncate=False)

## 7. Analyse des tendances et engagement
Sujets tendance et formats publicitaires performants

In [None]:
# Analyse des sujets tendance par audience
print("📈 SUJETS TENDANCE PAR TAILLE D'AUDIENCE")

# Extraction des topics populaires
df_topics = df_content_formats \
    .filter(col("topic_categories") != "") \
    .withColumn("topic", explode(col("topic_categories_list"))) \
    .filter(col("topic") != "")

topic_trends = df_topics \
    .groupBy("topic", "subscriber_tier") \
    .agg(
        count("*").alias("channel_count"),
        avg("engagement_rate").alias("avg_engagement"),
        avg("subscriber_count").alias("avg_subscribers")
    ) \
    .filter(col("channel_count") >= 3) \
    .orderBy(desc("avg_engagement"))

topic_trends.show(20, truncate=False)

In [None]:
# Performance par tier d'influenceur
print("🎯 PERFORMANCE PAR TIER D'INFLUENCEUR")

tier_performance = df_content_formats \
    .groupBy("subscriber_tier", "main_niche") \
    .agg(
        count("*").alias("influencer_count"),
        avg("engagement_rate").alias("avg_engagement"),
        avg("views_per_subscriber").alias("avg_views_per_sub"),
        avg("influence_score").alias("avg_influence_score")
    ) \
    .orderBy("subscriber_tier", desc("avg_engagement"))

tier_performance.show(30)

# Recommandations pour formats publicitaires
print("\n💡 RECOMMANDATIONS FORMATS PUBLICITAIRES")
ad_format_reco = df_content_formats \
    .filter(col("engagement_rate") >= 3) \
    .groupBy("main_niche") \
    .agg(
        count("*").alias("high_engagement_creators"),
        avg("subscriber_count").alias("avg_reach"),
        avg("engagement_rate").alias("avg_engagement"),
        collect_list("title").alias("top_creators")
    ) \
    .orderBy(desc("high_engagement_creators"))

ad_format_reco.select("main_niche", "high_engagement_creators", "avg_reach", "avg_engagement").show(truncate=False)

## 8. Création des tables de métadonnées
Séparation des métadonnées des données principales

In [None]:
# Création de la table de métadonnées
metadata_df = df_content_formats \
    .select(
        col("channel_id").alias("entity_id"),
        lit("youtube_channel").alias("entity_type"),
        col("etl_timestamp").alias("created_at"),
        lit("youtube_api_v3").alias("data_source"),
        col("data_quality_score"),
        lit("2025-08-28").alias("extraction_date"),
        lit("active").alias("status"),
        struct(
            col("privacy_status"),
            col("is_linked"),
            col("made_for_kids")
        ).alias("source_metadata"),
        struct(
            col("subscriber_score"),
            col("engagement_score"),
            col("influence_score")
        ).alias("computed_metrics")
    ) \
    .withColumn("metadata_id", monotonically_increasing_id())

print("📋 Table de métadonnées créée")
metadata_df.show(5)

# Création de la table principale (données business)
main_data_df = df_content_formats \
    .select(
        col("channel_id"),
        col("title"),
        col("country"),
        col("main_niche"),
        col("subscriber_count"),
        col("view_count"),
        col("video_count"),
        col("engagement_rate"),
        col("avg_views_per_video"),
        col("content_format_preference"),
        col("monetization_potential"),
        col("potential_partnerships"),
        col("influence_score"),
        col("subscriber_tier")
    )

print("💼 Table de données principales créée")
main_data_df.show(5)

In [None]:
# Table de référence pour les niches et catégories
niche_reference_df = df_topics \
    .groupBy("main_niche", "topic") \
    .agg(
        count("*").alias("channel_count"),
        avg("engagement_rate").alias("avg_engagement")
    ) \
    .withColumn("niche_id", monotonically_increasing_id()) \
    .withColumn("created_at", current_timestamp())

print("🏷️ Table de référence des niches créée")
niche_reference_df.show(10)

# Table d'analyse géographique
geo_reference_df = df_content_formats \
    .groupBy("country", "main_niche") \
    .agg(
        count("*").alias("influencer_count"),
        avg("subscriber_count").alias("avg_subscribers"),
        avg("engagement_rate").alias("avg_engagement"),
        sum("subscriber_count").alias("total_market_reach")
    ) \
    .withColumn("geo_id", monotonically_increasing_id()) \
    .withColumn("analysis_date", current_date())

print("🌍 Table de référence géographique créée")
geo_reference_df.show(10)

## 9. Sauvegarde dans HDFS avec indexation
Persistance structurée avec partitionnement et index

In [None]:
# Configuration des chemins de sauvegarde
DATA_PATHS = {
    "main_data": f"{DATA_PATH}/influencers_main",
    "metadata": f"{METADATA_PATH}/channel_metadata", 
    "niche_reference": f"{DATA_PATH}/niche_reference",
    "geo_reference": f"{DATA_PATH}/geo_reference"
}

print("💾 SAUVEGARDE DES DONNÉES DANS HDFS")
print("=" * 50)

# 1. Sauvegarde des données principales (partitionnées par pays et niche)
print("📊 Sauvegarde des données principales...")
main_data_df \
    .coalesce(10) \
    .write \
    .mode("overwrite") \
    .partitionBy("country", "main_niche") \
    .parquet(DATA_PATHS["main_data"])

print(f"✅ Données principales sauvegardées: {DATA_PATHS['main_data']}")

# 2. Sauvegarde des métadonnées (partitionnées par date)
print("📋 Sauvegarde des métadonnées...")
metadata_df \
    .coalesce(5) \
    .write \
    .mode("overwrite") \
    .partitionBy("extraction_date") \
    .parquet(DATA_PATHS["metadata"])

print(f"✅ Métadonnées sauvegardées: {DATA_PATHS['metadata']}")

In [None]:
# 3. Sauvegarde des tables de référence
print("🏷️ Sauvegarde des références de niches...")
niche_reference_df \
    .coalesce(2) \
    .write \
    .mode("overwrite") \
    .parquet(DATA_PATHS["niche_reference"])

print(f"✅ Références niches sauvegardées: {DATA_PATHS['niche_reference']}")

print("🌍 Sauvegarde des références géographiques...")
geo_reference_df \
    .coalesce(3) \
    .write \
    .mode("overwrite") \
    .partitionBy("country") \
    .parquet(DATA_PATHS["geo_reference"])

print(f"✅ Références géographiques sauvegardées: {DATA_PATHS['geo_reference']}")

# Création des vues temporaires pour indexation
print("\n🔍 Création des index (vues temporaires)...")
main_data_df.createOrReplaceTempView("influencers_main_indexed")
metadata_df.createOrReplaceTempView("metadata_indexed")
niche_reference_df.createOrReplaceTempView("niche_reference_indexed")
geo_reference_df.createOrReplaceTempView("geo_reference_indexed")

print("✅ Index créés pour requêtes optimisées")

## 10. Validation de la persistance des données
Vérification de l'intégrité et test des performances

In [None]:
# Validation de l'intégrité des données
print("🔍 VALIDATION DE L'INTÉGRITÉ DES DONNÉES")
print("=" * 50)

# 1. Lecture et vérification des données principales
main_data_read = spark.read.parquet(DATA_PATHS["main_data"])
print(f"📊 Données principales: {main_data_read.count()} lignes")
print(f"📁 Partitions: {len(main_data_read.select('country', 'main_niche').distinct().collect())}")

# 2. Vérification des métadonnées
metadata_read = spark.read.parquet(DATA_PATHS["metadata"])
print(f"📋 Métadonnées: {metadata_read.count()} lignes")

# 3. Test de cohérence entre les tables
coherence_check = spark.sql("""
    SELECT 
        m.entity_id,
        m.data_quality_score,
        i.influence_score,
        CASE 
            WHEN m.entity_id = i.channel_id THEN 'OK'
            ELSE 'ERROR'
        END as coherence_status
    FROM metadata_indexed m
    LEFT JOIN influencers_main_indexed i ON m.entity_id = i.channel_id
    LIMIT 10
""")

print("\n🔗 Test de cohérence métadonnées <-> données principales:")
coherence_check.show()

In [None]:
# Test des performances des requêtes indexées
print("⚡ TEST DES PERFORMANCES DES REQUÊTES")
print("=" * 40)

# Requête 1: Top influenceurs par niche (optimisée par index)
start_time = datetime.now()
top_gaming = spark.sql("""
    SELECT title, country, subscriber_count, engagement_rate, influence_score
    FROM influencers_main_indexed 
    WHERE main_niche = 'Gaming' 
    ORDER BY influence_score DESC 
    LIMIT 5
""")
top_gaming.show()
query1_time = (datetime.now() - start_time).total_seconds()
print(f"⏱️ Requête 1 exécutée en {query1_time:.2f} secondes")

# Requête 2: Analyse géographique (utilise les partitions)
start_time = datetime.now()
geo_analysis = spark.sql("""
    SELECT country, 
           COUNT(*) as influencer_count,
           AVG(subscriber_count) as avg_subscribers,
           AVG(engagement_rate) as avg_engagement
    FROM influencers_main_indexed 
    WHERE country IN ('US', 'FR', 'GB')
    GROUP BY country
    ORDER BY avg_engagement DESC
""")
geo_analysis.show()
query2_time = (datetime.now() - start_time).total_seconds()
print(f"⏱️ Requête 2 exécutée en {query2_time:.2f} secondes")

In [None]:
# Résumé final et recommandations
print("🎯 RÉSUMÉ DE L'ANALYSE D'INFLUENCEURS")
print("=" * 50)

# Statistiques finales
final_stats = spark.sql("""
    SELECT 
        COUNT(*) as total_influencers,
        COUNT(DISTINCT country) as countries_covered,
        COUNT(DISTINCT main_niche) as niches_covered,
        AVG(subscriber_count) as avg_subscribers,
        AVG(engagement_rate) as avg_engagement,
        AVG(influence_score) as avg_influence_score
    FROM influencers_main_indexed
""")

print("📊 STATISTIQUES FINALES:")
final_stats.show()

# Recommandations par objectif marketing
print("\n💡 RECOMMANDATIONS MARKETING:")

# Top niches pour partenariats
top_niches = spark.sql("""
    SELECT main_niche,
           COUNT(*) as influencer_count,
           AVG(engagement_rate) as avg_engagement,
           SUM(subscriber_count) as total_reach
    FROM influencers_main_indexed
    WHERE monetization_potential = 'High'
    GROUP BY main_niche
    ORDER BY avg_engagement DESC
    LIMIT 5
""")

print("🎯 Top 5 niches pour partenariats:")
top_niches.show(truncate=False)

print("\n✅ ETL TERMINÉ AVEC SUCCÈS!")
print(f"📁 Données sauvegardées dans: {HDFS_BASE_PATH}")
print(f"🔍 Index créés pour requêtes optimisées")
print(f"📋 Métadonnées séparées des données principales")

## 🎯 Réponses aux questions marketing

### Cartographie des influenceurs ✅
- **Créateurs les plus influents**: Identifiés par niche et pays via `influence_score`
- **Audience et engagement**: Métriques calculées (`engagement_rate`, `views_per_subscriber`)
- **Formats de partenariat**: Analysés via `potential_partnerships` et `monetization_potential`

### Analyse concurrentielle ✅  
- **Détection de partenariats**: Analyse des mots-clés brand/sponsor
- **Stratégies performantes**: Formats de contenu par performance

### Tendances marché ✅
- **Sujets d'intérêt**: Analyse des `topic_categories` par engagement
- **Formats publicitaires**: Performance par tier d'influenceur et niche

### Architecture de données ✅
- **Séparation métadonnées/données**: Tables distinctes avec références
- **Partitionnement**: Par pays/niche pour requêtes optimisées  
- **Indexation**: Vues temporaires pour performances
- **Persistance HDFS**: Format Parquet avec compression