# Introduction : Préparation des données

In [49]:
import json
import re
from pyspark.sql import SparkSession


def get_spark_session_and_df():
    spark = SparkSession.builder.getOrCreate()

    with open("books.json") as f:
        books = json.load(f)

    for book in books:
        book["text"] = book["text"].lower()
        book["text"] = re.sub(r"[^\w\s]", "", book["text"])

    df = spark.createDataFrame(books)
    return spark, df


spark, df = get_spark_session_and_df()
print(
"Voici les données de travail avec lequels nous allons réaliser ce TP.\n",
"Les données ont été processées pour être en minuscule et sans ponctuation.\n",
)
print(df.show())

Voici les données de travail avec lequels nous allons réaliser ce TP.
 Les données ont été processées pour être en minuscule et sans ponctuation.

+--------------------+--------------------+
|            filename|                text|
+--------------------+--------------------+
| milton-paradise.txt| paradise lost by...|
|edgeworth-parents...| the parent  s as...|
|     austen-emma.txt| emma by jane aus...|
| chesterton-ball.txt| the ball and the...|
|       bible-kjv.txt| the king james b...|
|chesterton-thursd...| the man who was ...|
|     blake-poems.txt| poems by william...|
|shakespeare-caesa...| the tragedie of ...|
|  whitman-leaves.txt| leaves of grass ...|
|melville-moby_dic...| moby dick by her...|
+--------------------+--------------------+

None


In [50]:
# Réinitialisation de la session spark entre chaque run
spark, df = get_spark_session_and_df()
spark.stop()

# Exercice 3 : Algorithme naif

In [51]:

spark, df = get_spark_session_and_df()

all_text = df.select("text").rdd.flatMap(lambda x: x).collect()

even_count = 0
odd_count = 0
for text in all_text:
    text = text.split()
    for word in text:
        if len(word) % 2 == 0:
            even_count += 1
        else:
            odd_count += 1

print(f"Nombre de mots avec un nombre nombre pair de caractères : {even_count}")
print(f"Nombre de mots avec un nombre nombre impair de caractères : {odd_count}")


Nombre de mots avec un nombre nombre pair de caractères : 511
Nombre de mots avec un nombre nombre impair de caractères : 635


# Exercice 4 : Algorithme "MapReduce"

In [None]:

spark, df = get_spark_session_and_df()


print(df.glom().collect())

text_rdd = df.select("text").rdd.flatMap(lambda x: x)
# On crée un RDD de tuples (parité, 1) pour chaque mot
word_parity_rdd = text_rdd.flatMap(
    lambda text: [
        ("even" if len(word) % 2 == 0 else "odd", 1)
        for word in text.split()
    ]
)
print(word_parity_rdd.glom().collect())
parity_counts = word_parity_rdd.reduceByKey(lambda x, y: x + y)
print(parity_counts.glom().collect())
print(parity_counts.toDebugString().decode('utf-8'))

parity_counts = parity_counts.collect()

print(f"Nombre de mots avec un nombre pair de caractères : {parity_counts[0][1]}")
print(f"Nombre de mots avec un nombre impair de caractères : {parity_counts[1][1]}")



[[], [('even', 1), ('even', 1), ('even', 1), ('even', 1), ('even', 1), ('even', 1), ('even', 1), ('odd', 1), ('even', 1), ('odd', 1), ('odd', 1), ('odd', 1), ('even', 1), ('odd', 1), ('odd', 1), ('odd', 1), ('even', 1), ('even', 1), ('odd', 1), ('even', 1), ('odd', 1), ('even', 1), ('odd', 1), ('odd', 1), ('odd', 1), ('even', 1), ('odd', 1), ('odd', 1), ('odd', 1), ('odd', 1), ('odd', 1), ('odd', 1), ('even', 1), ('even', 1), ('even', 1), ('even', 1), ('even', 1), ('odd', 1), ('odd', 1), ('odd', 1), ('odd', 1), ('even', 1), ('odd', 1), ('even', 1), ('odd', 1), ('even', 1), ('even', 1), ('even', 1), ('even', 1), ('even', 1), ('even', 1), ('even', 1), ('odd', 1), ('even', 1), ('odd', 1), ('even', 1), ('even', 1), ('even', 1), ('even', 1), ('odd', 1), ('odd', 1), ('odd', 1), ('even', 1), ('even', 1), ('odd', 1), ('odd', 1), ('even', 1), ('odd', 1), ('even', 1), ('even', 1), ('even', 1), ('odd', 1), ('odd', 1), ('odd', 1), ('odd', 1), ('odd', 1), ('odd', 1), ('odd', 1), ('even', 1), ('odd'

# Exercice 5 : Décompte de la fréquence de chaque mot

In [53]:
import pyspark.sql.functions as F

spark, df = get_spark_session_and_df()

df = df.withColumn("words", F.explode(F.split(F.col("text"), " ")))

word_counts = df.groupBy("words").count()
print(word_counts.show(truncate=False))

+------------+-----+
|words       |count|
+------------+-----+
|us          |3    |
|sion        |1    |
|who         |3    |
|by          |12   |
|above       |3    |
|aonian      |1    |
|more        |2    |
|man         |3    |
|heavenly    |1    |
|thy         |7    |
|milton      |1    |
|lost        |3    |
|muse        |1    |
|brook       |1    |
|shepherd    |1    |
|woe         |1    |
|how         |1    |
|top         |1    |
|thee        |2    |
|disobedience|1    |
+------------+-----+
only showing top 20 rows

None
