<a href="https://colab.research.google.com/github/Matis24/Brexit-speech-analysis/blob/main/BIG_DATA_PROJECT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**PROJET DONNEES MASSIVES**

In [1]:
!pip install pyspark



In [2]:
# Importer les bibliothèques nécessaires
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DateType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.clustering import LDA
from pyspark.ml import Pipeline

In [3]:
# Créer une session Spark
spark = SparkSession.builder \
    .appName("LDA Topic Modeling on Title and Description") \
    .getOrCreate()

# Définition du schéma des données
schema = StructType([
    StructField("Title", StringType(), True),
    StructField("Author", StringType(), True),
    StructField("Source", StringType(), True),
    StructField("Date", DateType(), True),
    StructField("URL", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Content", StringType(), True)
])

# Charger le fichier CSV avec le schéma défini
df = spark.read.csv("/content/articles_proche_orient.csv", header=True, schema=schema)

# fichier contenant la catégory de l'article (1 : l'article traite des conflits au moyen orient, 0 : sinon)
df_article_category = spark.read.csv("/content/article_category.csv", sep = ";", header=True, inferSchema=True)

Nous disposons ici de deux DataFrames : `df`, qui contient les articles, et `df_article_category`,
qui renferme la catégorie de chaque article (indiquant si l'article traite du conflit au Moyen-Orient ou non).
Pour obtenir un seul DataFrame, nous pouvons exploiter le concept de **broadcast** en Spark.
Le broadcast permet d'optimiser les jointures entre une grande table (ici les articles) et une petite table
(ici les catégories). Plutôt que de déplacer de grandes quantités de données à travers le cluster, Spark diffuse
la petite table à chaque nœud. Ainsi, chaque nœud reçoit une copie complète de `df_article_category` et peut
effectuer la jointure localement avec sa partition de `df`.
Cela réduit les échanges de données entre nœuds et améliore significativement les performances,
en particulier lorsque la petite table peut être conservée en mémoire sur chaque nœud.


In [4]:
# Noeud 1 (Partition 1 du DataFrame d'articles)  <--- Reçoit df_article_category complet
# Noeud 2 (Partition 2 du DataFrame d'articles)  <--- Reçoit df_article_category complet
# Noeud 3 (Partition 3 du DataFrame d'articles)  <--- Reçoit df_article_category complet
# ...

from pyspark.sql.functions import broadcast

# Appliquer une jointure avec broadcast
df = df.join(broadcast(df_article_category), df['URL'] == df_article_category['URL'], how='left')


Le DataFrame résultant de la jointure sera utilisé dans la suite du projet. Pour éviter de recalculer cette étape coûteuse, notamment la jointure, à chaque utilisation, nous le mettons en cache.

In [5]:
# Mettre en cache le DataFrame après la jointure
df.cache()

DataFrame[Title: string, Author: string, Source: string, Date: date, URL: string, Description: string, Content: string, URL: string, Category: int]

Avant d'aller plus loin, vérifions combien de noeuds nous disposons.

In [6]:
# Récupérer le nombre de cœurs disponibles dans Spark
num_cores = spark.sparkContext.defaultParallelism
print(f"Nombre de cœurs disponibles : {num_cores}")

Nombre de cœurs disponibles : 2


Il y'a 2 cœurs, ce qui permet d'exécuter des tâches en parallèle et d'améliorer les performances en répartissant le travail sur ces deux cœurs. Vérifions maintenant le nombre de partitions du jeu de données.

In [7]:
# Vérifier le nombre de partitions
num_partitions = df.rdd.getNumPartitions()
print(f"Nombre de partitions après la jointure et le cache : {num_partitions}")

Nombre de partitions après la jointure et le cache : 1


Il n'y a actuellement qu'une seule partition, ce qui signifie que toutes les données seront traitées sur un seul nœud de manière séquentielle. Cela peut potentiellement surcharger le nœud et ralentir le traitement global. Pour exploiter pleinement le parallélisme de Spark, nous pourrions ajuster manuellement le nombre de partitions. Mais pourquoi Spark a-t-il décidé de garder une seule partition ?

In [8]:
# Taille idéale d'une partition en bytes (128 Mo)
partition_size = 128 * 1024 * 1024  # 128 Mo

# Estimation de la taille des données en bytes
data_size = df.rdd.map(lambda row: len(str(row))).sum()

# Calcul du nombre optimal de partitions
num_partitions_optimal = max(1, int(data_size / partition_size))

print(f"Taille estimée des données : {data_size} bytes")
print(f"Nombre de partitions optimal basé sur la taille des données : {num_partitions_optimal}")


Taille estimée des données : 81789 bytes
Nombre de partitions optimal basé sur la taille des données : 1


Spark a gardé le nombre de partitions à 1 car la taille des données est jugée trop faible pour justifier une division en plusieurs partitions. Dans ces cas-là, Spark adopte une approche intelligente : au lieu de créer des partitions supplémentaires qui engendreraient un surcoût inutile de gestion et d'échanges de données entre les partitions, il préfère optimiser le traitement en regroupant tout dans une seule partition. Cela permet d'éviter les ralentissements liés aux shuffles et d'améliorer l'efficacité sur de petits volumes de données.

Qui dit Spark et données massives, dit traitement distribué. Pour la suite du projet, nous allons donc fixer le nombre de partitions à 2 pour avoir un semblant de parallélisme, même si dans ce cas précis, ce n'est pas strictement nécessaire en raison de la petite taille des données. (mais supposons que nous avons une grande quantité de données)...

In [9]:
# Ajuster manuellement le nombre de partitions à 3 pour profiter du parallélisme
df = df.repartition(2)

# Vérifier le nombre de partitions après répartition
num_partitions_after = df.rdd.getNumPartitions()
print(f"Nombre de partitions après répartition : {num_partitions_after}")

Nombre de partitions après répartition : 2


# **Exploration du DataFrame**

In [10]:
df.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Author: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- URL: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Content: string (nullable = true)
 |-- URL: string (nullable = true)
 |-- Category: integer (nullable = true)



In [11]:
df.show()

+--------------------+--------------------+-------------------+----------+--------------------+--------------------+--------------------+--------------------+--------+
|               Title|              Author|             Source|      Date|                 URL|         Description|             Content|                 URL|Category|
+--------------------+--------------------+-------------------+----------+--------------------+--------------------+--------------------+--------------------+--------+
|Middle East tensi...|                NULL| Al Jazeera English|2024-10-09|https://www.aljaz...|Fears of a wideni...|Fears of a wideni...|https://www.aljaz...|       1|
|<ul><li>A Hezboll...|                NULL|               NULL|      NULL|                NULL|                NULL|                NULL|                NULL|    NULL|
|Could the Middle ...|                NULL| Al Jazeera English|2024-10-09|https://www.aljaz...|More than one yea...|More than one yea...|https://www.aljaz...|  

In [12]:
# Supprimer la colonne 'URL' en double après la jointure
df = df.drop(df_article_category['URL'])

# Supprimer les lignes contenant des valeurs NULL
df = df.na.drop()

In [13]:
# Compter le nombre d'articles par source et par autheur
df.groupBy('Source', 'Author')\
    .count()\
    .orderBy('count', ascending = False)\
    .show()

+-------------------+--------------------+-----+
|             Source|              Author|count|
+-------------------+--------------------+-----+
|   Business Insider|        Jake Epstein|    4|
|Yahoo Entertainment|             Reuters|    3|
|   Business Insider|    Thibault Spirlet|    2|
|Yahoo Entertainment|editorial-team@si...|    2|
|           ABC News| Shannon K. Kingston|    2|
|   Business Insider|      Hannah Abraham|    2|
|Yahoo Entertainment|The Associated Press|    2|
|   Business Insider|        Taylor Rains|    1|
|Yahoo Entertainment|Samia Nakhoul, Ah...|    1|
|           ABC News|         Jon Haworth|    1|
| Al Jazeera English|Erin Hale, Alasta...|    1|
|                NPR|       Gus Contreras|    1|
|Yahoo Entertainment|       Matthew Petti|    1|
|                NPR|         Emma Bowman|    1|
|           ABC News|            ABC News|    1|
|           The Hill|Assaf Zoran, opin...|    1|
|                NPR|        Luke Garrett|    1|
|                NPR

Bien que nous ayons réparti les données en 2 partitions pour tirer parti du parallélisme, des opérations comme `groupBy` nécessitent souvent une réorganisation des données entre les partitions, provoquant un shuffle qui peut ralentir le traitement.

Pour éviter ces shuffles et optimiser les performances, nous pourrons réorganisé les données par `Source` et `Author` avant l'agrégation. Cela permet de regrouper efficacement les données et de minimiser les échanges entre les partitions, améliorant ainsi l'efficacité du `groupBy`. Comparons ces deux approches pour voir l'impact sur les performances.


In [14]:
import time

# Mesurer le temps d'exécution du groupBy avec la répartition générale
start_time_global = time.time()

df.groupBy('Source', 'Author')\
    .count()\
    .orderBy('count', ascending=False)\
    .show()

end_time_global = time.time()
execution_time_global = end_time_global - start_time_global


# Deuxième approche : créer un nouveau DataFrame avec répartition par 'Source' et 'Author'
df_specific = df.repartition(2, 'Source', 'Author')

# Mesurer le temps d'exécution du groupBy avec la répartition spécifique
start_time_specific = time.time()

df_specific.groupBy('Source', 'Author')\
    .count()\
    .orderBy('count', ascending=False)\
    .show()

end_time_specific = time.time()
execution_time_specific = end_time_specific - start_time_specific


# Comparaison des deux approches
print("\nComparaison des temps d'exécution :")
print(f"Répartition générale : {execution_time_global:.4f} secondes")
print(f"Répartition par 'Source' et 'Author' : {execution_time_specific:.4f} secondes")


+-------------------+--------------------+-----+
|             Source|              Author|count|
+-------------------+--------------------+-----+
|   Business Insider|        Jake Epstein|    4|
|Yahoo Entertainment|             Reuters|    3|
|   Business Insider|    Thibault Spirlet|    2|
|Yahoo Entertainment|editorial-team@si...|    2|
|           ABC News| Shannon K. Kingston|    2|
|   Business Insider|      Hannah Abraham|    2|
|Yahoo Entertainment|The Associated Press|    2|
|   Business Insider|        Taylor Rains|    1|
|Yahoo Entertainment|Samia Nakhoul, Ah...|    1|
|           ABC News|         Jon Haworth|    1|
| Al Jazeera English|Erin Hale, Alasta...|    1|
|                NPR|       Gus Contreras|    1|
|Yahoo Entertainment|       Matthew Petti|    1|
|                NPR|         Emma Bowman|    1|
|           ABC News|            ABC News|    1|
|           The Hill|Assaf Zoran, opin...|    1|
|                NPR|        Luke Garrett|    1|
|                NPR

La répartition par `Source` et `Author` a réduit le temps d'exécution.

Dans le premier cas, avec une répartition générale, les données sont réparties en 2 partitions sans tenir compte des colonnes `Source` et `Author`.
Lors du `groupBy('Source', 'Author')`, Spark doit alors réorganiser les données entre les partitions, car les lignes avec les mêmes valeurs de `Source` et `Author` peuvent être dispersées sur plusieurs partitions.
Cela déclenche un **shuffle** : les données doivent être déplacées d'une partition à l'autre pour regrouper les valeurs similaires, ce qui ralentit le traitement.

Dans le second cas, avant d'effectuer le `groupBy`, nous avons explicitement réparti les données par `Source` et `Author`. Cela signifie que toutes les lignes ayant les mêmes valeurs pour ces colonnes sont regroupées dans les mêmes partitions dès le départ.
Ainsi, lorsque Spark effectue le `groupBy`, il n'a plus besoin de réorganiser les données : le **shuffle** est évité, car les données sont déjà bien organisées pour l'agrégation, ce qui accélère le traitement. Bien sûr, ici la différence de temps ne nous change pas la vie, mais dans un cas hypothétique ou nous avions de milliards d'articles, l'optimisation prendrait tout son sens.


In [15]:
# Filtrer les articles publiés en octobre 2024
from pyspark.sql.functions import month, year, col
df_october_2024 = df.filter((year(col("Date")) == 2024) & (month(col("Date")) == 10))
df_october_2024.show()

+--------------------+--------------------+-------------------+----------+--------------------+--------------------+--------------------+--------+
|               Title|              Author|             Source|      Date|                 URL|         Description|             Content|Category|
+--------------------+--------------------+-------------------+----------+--------------------+--------------------+--------------------+--------+
|Israel is weighin...|      Mikhaila Friel|   Business Insider|2024-10-03|https://www.busin...|Experts predict I...|Iran attacked Isr...|       1|
|Motorola's new Mo...|vishnu.skar@gmail...|    Android Central|2024-10-03|https://www.andro...|Motorola unveils ...|What you need to ...|       1|
|A year into Israe...|Kevin Shalvey, Da...|           ABC News|2024-10-07|https://abcnews.g...|The terror attack...|LONDON -- The Ham...|       1|
|Soldier who attem...|         Jon Haworth|           ABC News|2024-10-12|https://abcnews.g...|A United States A...|A 

In [16]:
print(f"Nombre d'articles publiés en octobre 2024 : {df_october_2024.count()}")

Nombre d'articles publiés en octobre 2024 : 49


In [17]:
from pyspark.sql.functions import length

# Calculer la longueur de chaque article (colonne "Content")
df = df.withColumn("Content_Length", length(col("Content")))
df.show()

+--------------------+--------------------+-------------------+----------+--------------------+--------------------+--------------------+--------+--------------+
|               Title|              Author|             Source|      Date|                 URL|         Description|             Content|Category|Content_Length|
+--------------------+--------------------+-------------------+----------+--------------------+--------------------+--------------------+--------+--------------+
|Israel is weighin...|      Mikhaila Friel|   Business Insider|2024-10-03|https://www.busin...|Experts predict I...|Iran attacked Isr...|       1|           127|
|How serious shoul...|bradypsnyder@gmai...|    Android Central|2024-09-16|https://www.andro...|Motorola has had ...|What you need to ...|       1|            21|
|The US is sending...|    Thibault Spirlet|   Business Insider|2024-09-24|https://www.busin...|It comes after Le...|Smoke rose from I...|       1|           112|
|Motorola's new Mo...|vishnu

In [18]:
from pyspark.sql.functions import avg
average_content_length = df.agg(avg("Content_Length")).collect()[0][0]
print(f"Longueur moyenne des articles : {average_content_length:.2f} caractères")

Longueur moyenne des articles : 161.45 caractères


# **Exploration RDD**

Les RDDs (Resilient Distributed Datasets) sont l'API fondamentale de Spark pour manipuler des données distribuées de manière parallèle et tolérante aux pannes. Ils permettent des transformations immuables comme map, filter, et reduce, tout en distribuant automatiquement les données à travers les nœuds d'un cluster.

In [19]:
rdd_df = df.rdd
rdd_df.take(2)

[Row(Title="Israel is weighing up how to strike back against Iran. Here's how it could play out.", Author='Mikhaila Friel', Source='Business Insider', Date=datetime.date(2024, 10, 3), URL='https://www.businessinsider.com/israel-potential-counterattack-iran-options-assasinations-oil-nuclear-military-bases-2024-10', Description="Experts predict Israel's counterattack on Iran could hit oil production or nuclear facilities.", Content='Iran attacked Israel with nearly 200 missiles on Tuesday. Israel is now weighing up its response.JACK GUEZ/AFP via Getty Images', Category=1, Content_Length=127),
 Row(Title="How serious should we be taking Motorola's five-year OS update promise?", Author='bradypsnyder@gmail.com (Brady Snyder)', Source='Android Central', Date=datetime.date(2024, 9, 16), URL='https://www.androidcentral.com/phones/how-serious-should-we-be-taking-motorolas-five-year-os-update-promise', Description="Motorola has had issues with providing timely Android version updates in the past

In [20]:
# Compter le nombre d'articles par source et par autheur
rdd_df.map(lambda row: ((row.Source, row.Author), 1)) \
                       .reduceByKey(lambda a, b: a + b) \
                       .sortBy(lambda x: x[1], ascending=False).take(4)

[(('Business Insider', 'Jake Epstein'), 4),
 (('Yahoo Entertainment', 'Reuters'), 3),
 (('Business Insider', 'Thibault Spirlet'), 2),
 (('ABC News', 'Shannon K. Kingston'), 2)]

In [21]:
# filtrer les articles publiés en octobre 2024
rdd_october_2024 = rdd_df.filter(lambda row: row.Date.year == 2024 and row.Date.month == 10)
rdd_october_2024.take(2)


[Row(Title="Israel is weighing up how to strike back against Iran. Here's how it could play out.", Author='Mikhaila Friel', Source='Business Insider', Date=datetime.date(2024, 10, 3), URL='https://www.businessinsider.com/israel-potential-counterattack-iran-options-assasinations-oil-nuclear-military-bases-2024-10', Description="Experts predict Israel's counterattack on Iran could hit oil production or nuclear facilities.", Content='Iran attacked Israel with nearly 200 missiles on Tuesday. Israel is now weighing up its response.JACK GUEZ/AFP via Getty Images', Category=1, Content_Length=127),
 Row(Title="Motorola's new Moto G phone now has 'military-grade' toughness and an IP68 rating", Author='vishnu.skar@gmail.com (Vishnu Sarangapurkar)', Source='Android Central', Date=datetime.date(2024, 10, 3), URL='https://www.androidcentral.com/phones/motorola-announces-moto-g75-for-europe-and-other-markets', Description='Motorola unveils its new budget Android handset for European and Middle Easte

In [22]:
print(f"Nombre d'articles publiés en octobre 2024 : {rdd_october_2024.count()}")

Nombre d'articles publiés en octobre 2024 : 49


In [23]:
# Calculer la longueur de chaque article
rdd_with_lengths = rdd_df.map(lambda row: (row, len(row.Content)))

# Afficher les résultats avec les longueurs
rdd_with_lengths.take(2)


[(Row(Title="Israel is weighing up how to strike back against Iran. Here's how it could play out.", Author='Mikhaila Friel', Source='Business Insider', Date=datetime.date(2024, 10, 3), URL='https://www.businessinsider.com/israel-potential-counterattack-iran-options-assasinations-oil-nuclear-military-bases-2024-10', Description="Experts predict Israel's counterattack on Iran could hit oil production or nuclear facilities.", Content='Iran attacked Israel with nearly 200 missiles on Tuesday. Israel is now weighing up its response.JACK GUEZ/AFP via Getty Images', Category=1, Content_Length=127),
  127),
 (Row(Title="How serious should we be taking Motorola's five-year OS update promise?", Author='bradypsnyder@gmail.com (Brady Snyder)', Source='Android Central', Date=datetime.date(2024, 9, 16), URL='https://www.androidcentral.com/phones/how-serious-should-we-be-taking-motorolas-five-year-os-update-promise', Description="Motorola has had issues with providing timely Android version updates i

In [24]:
# Calculer la longueur moyenne des articles
lengths_rdd = rdd_df.map(lambda row: len(row.Content))
total_length = lengths_rdd.reduce(lambda a, b: a + b)
count = lengths_rdd.count()
average_length = total_length / count

# Afficher la longueur moyenne
print(f"Longueur moyenne des articles : {average_length:.2f} caractères")


Longueur moyenne des articles : 161.45 caractères


Bien que les RDDs offrent une flexibilité accrue avec des transformations de bas niveau comme `map`, `filter`, et `reduceByKey`, ils ne profitent pas des optimisations automatiques de Spark, comme celles des DataFrames via Catalyst et Tungsten. Cette absence d'optimisation les rend souvent moins performants, surtout pour des tâches complexes ou des volumes importants.

À l'inverse, les DataFrames sont optimisés automatiquement, ce qui accélère considérablement les transformations tout en offrant une syntaxe proche de SQL. Voyons une comparaison du temps d'exécution entre RDD et DataFrame pour le calcul de la longueur moyenne des articles.

**Comparaison performance rdd vs dataframe**

In [25]:
import time

# Calculer la longueur moyenne des articles avec DataFrame
start_time_df = time.time()
df = df.withColumn("Content_Length", length(col("Content")))
average_content_length_df = df.agg(avg("Content_Length").alias("Avg_Content_Length")).collect()[0]["Avg_Content_Length"]
end_time_df = time.time()

# Calculer la longueur moyenne des articles avec RDD
start_time_rdd = time.time()
rdd = df.rdd
lengths_rdd = rdd.map(lambda row: len(row.Content) if row.Content else 0)
total_length_rdd = lengths_rdd.reduce(lambda a, b: a + b)
count_rdd = lengths_rdd.count()
average_content_length_rdd = total_length_rdd / count_rdd
end_time_rdd = time.time()

# Calculer les temps d'exécution
execution_time_df = end_time_df - start_time_df
execution_time_rdd = end_time_rdd - start_time_rdd

# Affichage formaté des résultats
print("Comparaison entre DataFrame et RDD pour le calcul de la longueur moyenne des articles")
print("---------------------------------------------------------------------")
print(f"1. Avec DataFrame :")
print(f"   - Taille moyenne des articles : {average_content_length_df:.2f} caractères")
print(f"   - Temps d'exécution : {execution_time_df} ")
print("---------------------------------------------------------------------")
print(f"2. Avec RDD :")
print(f"   - Taille moyenne des articles : {average_content_length_rdd:.2f} caractères")
print(f"   - Temps d'exécution : {execution_time_rdd} ")
print("---------------------------------------------------------------------")


Comparaison entre DataFrame et RDD pour le calcul de la longueur moyenne des articles
---------------------------------------------------------------------
1. Avec DataFrame :
   - Taille moyenne des articles : 161.45 caractères
   - Temps d'exécution : 0.6586177349090576 
---------------------------------------------------------------------
2. Avec RDD :
   - Taille moyenne des articles : 161.45 caractères
   - Temps d'exécution : 1.161259412765503 
---------------------------------------------------------------------


On voit que la différence est importante pour une tâche aussi simple que le calcul de la longueur moyenne, avec un petit volume de données. Cette différence va s'accentuer rapidement lorsque les tâches deviendront plus complexes et que la taille des données augmentera. Dans la suite, on va utiliser le dataframe.

# **Spark SQL**

In [26]:
# Créer une table temporaire à partir du DataFrame
df.createOrReplaceTempView("articles")

In [27]:
# Exécuter une requête SQL pour compter le nombre d'article par auteur
resultat_auteurs = spark.sql("""
    SELECT Author, Source, COUNT(*) AS Nombre_Articles
    FROM articles
    GROUP BY Author, Source
    ORDER BY Nombre_Articles DESC
""")
resultat_auteurs.show()

+--------------------+-------------------+---------------+
|              Author|             Source|Nombre_Articles|
+--------------------+-------------------+---------------+
|        Jake Epstein|   Business Insider|              4|
|             Reuters|Yahoo Entertainment|              3|
|      Hannah Abraham|   Business Insider|              2|
|    Thibault Spirlet|   Business Insider|              2|
|editorial-team@si...|Yahoo Entertainment|              2|
| Shannon K. Kingston|           ABC News|              2|
|The Associated Press|Yahoo Entertainment|              2|
|Kevin Shalvey, Da...|           ABC News|              1|
|    Al Jazeera Staff| Al Jazeera English|              1|
|Jake Epstein,Chri...|   Business Insider|              1|
|       Gus Contreras|                NPR|              1|
|Alice Tecotzky,Er...|   Business Insider|              1|
|        Inside Story| Al Jazeera English|              1|
|       Matthew Petti|Yahoo Entertainment|              

In [28]:
# Exécuter une autre requête SQL pour obtenir les articles publiés en octobre 2024
resultat_octobre = spark.sql("""
    SELECT *
    FROM articles
    WHERE Date >= '2024-10-01' AND Date < '2024-11-01'
""")
resultat_octobre.show()

+--------------------+--------------------+-------------------+----------+--------------------+--------------------+--------------------+--------+--------------+
|               Title|              Author|             Source|      Date|                 URL|         Description|             Content|Category|Content_Length|
+--------------------+--------------------+-------------------+----------+--------------------+--------------------+--------------------+--------+--------------+
|Israel is weighin...|      Mikhaila Friel|   Business Insider|2024-10-03|https://www.busin...|Experts predict I...|Iran attacked Isr...|       1|           127|
|Motorola's new Mo...|vishnu.skar@gmail...|    Android Central|2024-10-03|https://www.andro...|Motorola unveils ...|What you need to ...|       1|            21|
|A year into Israe...|Kevin Shalvey, Da...|           ABC News|2024-10-07|https://abcnews.g...|The terror attack...|LONDON -- The Ham...|       1|            54|
|Soldier who attem...|      

In [29]:
# obtenir la longueur des articles
resultat_long_articles = spark.sql("""
    SELECT Title, LENGTH(Content) AS Longueur_Content
    FROM articles
    ORDER BY Longueur_Content DESC
""")
resultat_long_articles.show()

+--------------------+----------------+
|               Title|Longueur_Content|
+--------------------+----------------+
|How Israel’s brut...|             215|
|Biden and Netanya...|             214|
| 331 Days of Failure|             214|
|US has yet to lay...|             214|
|Why Netanyahu Won...|             214|
|Middle East lates...|             214|
|Everything We Kno...|             214|
|China 'firmly sup...|             214|
|What Americans th...|             214|
|What Israel’s Ass...|             214|
|What’s Next in th...|             214|
|Opinion - Israel ...|             214|
|One boy's story s...|             214|
|Pig Butchering Sc...|             214|
|Iranian Ballistic...|             214|
|Biden Can End His...|             214|
|The Middle East i...|             214|
|US Navy warships ...|             214|
|Iran launches mis...|             214|
|Three Top Dividen...|             214|
+--------------------+----------------+
only showing top 20 rows



# **Prétraitement**

In [30]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

In [31]:
# Prétraitement des données
df_clean = df.dropDuplicates().na.drop()

# Transformation des colonnes : convertir en minuscules et supprimer la ponctuation
df_clean = df_clean.withColumn("Title", regexp_replace(lower(col("Title")), "[^a-zA-Z\\s]", "")) \
                   .withColumn("Description", regexp_replace(lower(col("Description")), "[^a-zA-Z\\s]", "")) \
                   .withColumn("Content", regexp_replace(lower(col("Content")), "[^a-zA-Z\\s]", ""))

# Tokénisation
tokenizer = Tokenizer(inputCol="Description", outputCol="tokens")

# Suppression des mots vides
stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")

# Création de vecteurs de caractéristiques avec CountVectorizer
count_vectorizer = CountVectorizer(inputCol="filtered_tokens", outputCol="features")

# Création du pipeline (sans IDF)
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, count_vectorizer])

# Ajustement du pipeline au DataFrame nettoyé
pipeline_model = pipeline.fit(df_clean)

# Transformation des données pour obtenir le DataFrame final
df_final = pipeline_model.transform(df_clean)

# Affichage des résultats
df_final.select("Date", "Title", "Description", "Content", "filtered_tokens", "features").show(truncate=True)


+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|      Date|               Title|         Description|             Content|     filtered_tokens|            features|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|2024-10-09|biden and netanya...|white house says ...|washington dc uni...|[white, house, sa...|(613,[6,14,37,93,...|
|2024-09-26|israels air war m...|the israeli air f...|waves of israeli ...|[israeli, air, fo...|(613,[3,4,8,12,33...|
|2024-09-30|did the abraham a...|many conservative...|the abraham accor...|[many, conservati...|(613,[0,2,5,57,87...|
|2024-09-24|a deployed us nav...|the replenishment...|    an official said|[replenishment, o...|(613,[45,58,82,11...|
|2024-09-30|how qatar became ...|the history behin...|ismail haniyeh po...|[history, behind,...|(613,[6,13,90,115...|
|2024-10-04|middle east lates...|an israeli airstr...|an

# **Machine learning**

On applique LDA avec différentes valeurs de k.

In [32]:
from pyspark.ml.clustering import LDA

# Liste des valeurs de k à tester
k_values = [2, 3, 4, 5, 6, 7, 8, 10]

# Dictionnaire pour stocker les modèles et les résultats de perplexité
lda_models = {}
perplexity_results = {}

# Boucle pour entraîner le modèle LDA pour chaque valeur de k et calculer la perplexité
for k in k_values:
    print(f"Training LDA k={k}")

    # Définir le modèle LDA
    lda = LDA(k=k, seed=1, optimizer="em", featuresCol="features")

    # Ajuster le modèle sur le dataset
    lda_model = lda.fit(df_final)

    # Calculer la perplexité
    perplexity = lda_model.logPerplexity(df_final)

    # Stocker le modèle et le résultat de perplexité
    lda_models[k] = lda_model  # Stocker le modèle LDA pour k
    perplexity_results[k] = perplexity
    print(f"Perplexity pour k={k}: {perplexity}")

# Afficher les résultats de perplexité pour chaque k
for k, perplexity in perplexity_results.items():
    print(f"k={k}, Perplexity={perplexity}")

Training LDA k=2
Perplexity pour k=2: 7.506096242179689
Training LDA k=3
Perplexity pour k=3: 8.977341355598556
Training LDA k=4
Perplexity pour k=4: 11.147061929403117
Training LDA k=5
Perplexity pour k=5: 14.233701299034397
Training LDA k=6
Perplexity pour k=6: 18.21728632080531
Training LDA k=7
Perplexity pour k=7: 23.1470135492663
Training LDA k=8
Perplexity pour k=8: 29.021525030985728
Training LDA k=10
Perplexity pour k=10: 43.6975437023373
k=2, Perplexity=7.506096242179689
k=3, Perplexity=8.977341355598556
k=4, Perplexity=11.147061929403117
k=5, Perplexity=14.233701299034397
k=6, Perplexity=18.21728632080531
k=7, Perplexity=23.1470135492663
k=8, Perplexity=29.021525030985728
k=10, Perplexity=43.6975437023373


k = 2, minimise la perplexité. 2 topics offrent le meilleur compromis pour modéliser efficacement la structure thématique des données sans complexité excessive.

In [33]:
# ---- Affichage des topics pour le modèle avec k=2 ----

# Récupérer le modèle entraîné avec k=2
best_lda_model = lda_models[2]  # Modèle pour k=2

# Récupérer les topics pour k=2 avec les 10 mots les plus importants par topic
topics = best_lda_model.describeTopics(maxTermsPerTopic=10)
vocabArray = pipeline_model.stages[2].vocabulary  # Récupérer le vocabulaire

# Affichage des topics pour k=2
print(f"\nTopics for the best LDA Model with k=2:")
for topic in topics.collect():
    print(f"Topic ID: {topic[0]}")
    print("Words:", [vocabArray[i] for i in topic[1]])  # Traduire les indices en mots
    print()


Topics for the best LDA Model with k=2:
Topic ID: 0
Words: ['east', 'middle', 'israel', '', 'us', 'war', 'lebanon', 'hezbollah', 'israeli', 'attack']

Topic ID: 1
Words: ['israel', 'middle', 'iran', 'east', 'hezbollah', '', 'us', 'israeli', 'lebanon', 'said']



In [34]:
df_final.select("filtered_tokens").show(truncate=False)


+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|filtered_tokens                                                                                                                                                             |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[white, house, says, two, leaders, discussed, confrontation, iran, minute, conversation]                                                                                    |
|[israeli, air, force, pounding, enormous, missile, arsenal, hezbollah, built, past, years]                                                                                  |
|[many, conservatives, saw, abraham, accords, way, get, us, forces, middle, east, architect, agreement, pushing, regime]     