# Itération 2 : Premières manipulation de données avec Spark #

In [1]:
# Appel des bilbiothèques pour cette itération
from pyspark import SparkContext 
from datetime import date
import math


In [2]:
# Initialisation du contexte Spark
sc = SparkContext("local", "test")


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/18 15:55:09 WARN Utils: Your hostname, promo-ds4-gra9-2, resolves to a loopback address: 127.0.1.1; using 51.91.85.211 instead (on interface ens3)
25/11/18 15:55:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/18 15:55:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Découverte de Spark avec les listes ##

### Création de liste aléatoire ###

In [3]:
l = [1,2,3,4,5] * 1_000_000
rdd = sc.parallelize(l,10) # on va créer un RDD avec 10 partitions
#rdd.glom().collect()# permet de voir le contenu des partitions

countRDD=rdd.count()
quadratsRDD=rdd.map(lambda x: x**2).take(10)
reduce_RDD=rdd.reduce(lambda x, y: x + y)

print(f"Nombre d'éléments dans le RDD : {countRDD}")
print(f"Les 10 premiers éléments mis au carré : {quadratsRDD}")
print(f"Somme des éléments du RDD : {reduce_RDD}")


25/11/18 15:55:18 WARN TaskSetManager: Stage 0 contains a task of very large size (1000 KiB). The maximum recommended task size is 1000 KiB.
25/11/18 15:55:22 WARN TaskSetManager: Stage 2 contains a task of very large size (1000 KiB). The maximum recommended task size is 1000 KiB.

Nombre d'éléments dans le RDD : 5000000
Les 10 premiers éléments mis au carré : [1, 4, 9, 16, 25, 1, 4, 9, 16, 25]
Somme des éléments du RDD : 15000000


                                                                                

### Jeu de données des arbres de Paris ##

In [4]:
rddAr = sc.textFile("Module_BigData/arbresParis.csv")
header = rddAr.first()
rows = rddAr.filter(lambda x: x != header).map(lambda x: x.split(";")).cache() # On enlève l'en-tête et on split les lignes
cols = header.split(";")

print(header)## Affichage de l'en-tête du fichier
print(rows.first()) ## Affichage de la première ligne du RDD
list(enumerate(cols)) ## Affichage des colonnes avec leur index

Geo point;idbase;domanialite;arrondissement;complement adresse;numero;adresse;circonference en cm;hauteur en m;stade développement;pépinière;genre;espèce;varieteoucultivar;date de plantation;libellé Français;ID Base;ID arbre;Site;Adresse;Complément d'adresse;Arrondissement;Domanialité;Dénomination usuelle;Dénomination botanique;Autorité taxonomique;Année de plantation;Qualification remarquable;Résumé;Descriptif;Numéro de délibération;Date de la délibération;Label national;Panonceau;Photo 1;Copyright 1


[Stage 4:>                                                          (0 + 1) / 1]

['48.81838918662161, 2.4379185082282153', '2002385.0', 'Jardin', 'BOIS DE VINCENNES', '12-15', '', 'PENTE DE GRAVELLE - AVENUE DE GRAVELLE / ROUTE NOUVELLE', '200.0', '23.0', 'M', 'Inconnue', 'Quercus', 'ilex', '', '1860-01-01T01:02:21+00:53', 'Chêne', '2002385.0', '2002400.0', 'Bois de Vincennes. Plateau de Gravelle', 'Avenue de Gravelle / Route Nouvelle', '12-15', '12', 'Bois de Vincennes', 'Chêne vert', 'Quercus ilex', 'L.', '1860', 'Paysager', 'Cet arbre est classé remarquable pour son ampleur et son empreinte dans le paysage.', 'Le chêne vert est une essence originaire du pourtour méditerranéen appartenant à la famille des Fagacées. Bien adapté au climat urbain parisien, il figure de plus en plus dans les alignements des avenues ainsi que dans les parcs. Le vent assure la pollinisation de ses fleurs discrètes qui apparaissent en avril-mai et produisent de nombreux glands à l’automne.', '', '', '', 'https://capgeo.sig.paris.fr/PdfEtImages/ArbresRemarquables/PDF/2002385.pdf', 'https

                                                                                

[(0, 'Geo point'),
 (1, 'idbase'),
 (2, 'domanialite'),
 (3, 'arrondissement'),
 (4, 'complement adresse'),
 (5, 'numero'),
 (6, 'adresse'),
 (7, 'circonference en cm'),
 (8, 'hauteur en m'),
 (9, 'stade développement'),
 (10, 'pépinière'),
 (11, 'genre'),
 (12, 'espèce'),
 (13, 'varieteoucultivar'),
 (14, 'date de plantation'),
 (15, 'libellé Français'),
 (16, 'ID Base'),
 (17, 'ID arbre'),
 (18, 'Site'),
 (19, 'Adresse'),
 (20, "Complément d'adresse"),
 (21, 'Arrondissement'),
 (22, 'Domanialité'),
 (23, 'Dénomination usuelle'),
 (24, 'Dénomination botanique'),
 (25, 'Autorité taxonomique'),
 (26, 'Année de plantation'),
 (27, 'Qualification remarquable'),
 (28, 'Résumé'),
 (29, 'Descriptif'),
 (30, 'Numéro de délibération'),
 (31, 'Date de la délibération'),
 (32, 'Label national'),
 (33, 'Panonceau'),
 (34, 'Photo 1'),
 (35, 'Copyright 1')]

#### Quel est l'arbre le plus vieux ? ####

In [5]:
## extraire l'année de plantation d'un arbre
colYear=[col for col in enumerate(cols) if "année" in col[1].lower()]
print(colYear[0])  # index de la colonne année de plantation
colYear=colYear[0][0]

def extract_year(fields):
    if len(fields) <= colYear:
        return None
    year = fields[colYear].strip()
    if not year.isdigit():
        return None
    return int(year)

years = rows.map(extract_year).filter(lambda y: y is not None)
oldest_year = years.min()
oldest_year
print(f"L'année de plantation la plus ancienne est : {oldest_year}")

(26, 'Année de plantation')
L'année de plantation la plus ancienne est : 1602


In [6]:
## âge de l'arbre le plus vieux
current_year = date.today().year
age_oldest = current_year - oldest_year
print(f"L'arbre le plus vieux a {age_oldest} ans.")

L'arbre le plus vieux a 423 ans.


In [7]:
## filtrer les arbres qui ont l'année de plantation égal à oldest_year
oldest_trees = rows.filter(
    lambda f: len(f) > 26 and f[26].strip().isdigit() and int(f[26].strip()) == oldest_year
).collect()

print(f"Les arbres les plus vieux plantés en {oldest_year} sont :\n {oldest_trees}")

Les arbres les plus vieux plantés en 1602 sont :
 [['48.852191438077774, 2.3472861856377225', '124358.0', 'Jardin', 'PARIS 5E ARRDT', '', '', 'SQUARE RENE VIVIANI MONTEBELLO / 2 RUE DU FOUARRE', '365.0', '15.0', 'M', 'Inconnue', 'Robinia', 'pseudoacacia', '', '1602-01-01T01:02:21+00:53', 'Robinier', '124358.0', '124358.0', 'Square René Viviani', '2 Rue du Fouarre', '', '5', 'Jardin', 'Robinier faux-acacia', 'Robinia pseudoacacia', 'L.', '1602', 'Historique', 'Cet arbre est classé remarquable pour son caractère historique.', "Originaire des États-Unis, le Robinier faux-acacia appartient à la famille des Fabacées. Introduit et planté en 1601 par Jean Robin, auquel il doit son nom, cet arbre est le plus vieux de Paris. L'arbre originel, âgé de plus de 400 ans, est devenu plusieurs arbres soudés au sein d’un tronc vestige et dont les racines aériennes se développent à l’intérieur du tronc d'origine. Les jeunes arbres issus de sa souche et de ses racines ont été conservés afin d’assurer son

In [8]:
for f in oldest_trees:
    print("Libellé français :", f[15])
    print("Site             :", f[18])
    print("Adresse          :", f[19])
    print("Arrondissement   :", f[21])
    print("Année plantation :", f[26])
    print("Hauteur (m)      :", f[8])
    print("Circonf (cm)     :", f[7])
    print("-" * 60)


Libellé français : Robinier
Site             : Square René Viviani
Adresse          : 2 Rue du Fouarre
Arrondissement   : 5
Année plantation : 1602
Hauteur (m)      : 15.0
Circonf (cm)     : 365.0
------------------------------------------------------------


#### Quel est le volume du plus grand arbre ? #### 
 Pour caluler le volume 

`Volume = π * (diamètre/2)² * hauteur`

Donc :

diamètre = circonférence / π

volume = (circonférence / (2π))² * π * hauteur
simplifiable → (circonférence**2 * hauteur) / (4π).


In [9]:
## extraire les numéros de colonne des hauteurs et de la circonférence des arbres
colCir=[col for col in enumerate(cols) if "circ" in col[1].lower()]
print(colCir[0])  # index de la colonne circonférence
colCir=colCir[0][0]

colHaut=[col for col in enumerate(cols) if "hauteur" in col[1].lower()]
print(colHaut[0])  # index de la colonne hauteur
colHaut=colHaut[0][0]


(7, 'circonference en cm')
(8, 'hauteur en m')


In [10]:
def extract_volume(fields):
    # on vérifie qu'on a assez de colonnes
    if len(fields) <= max(colHaut, colCir):
        return None
    
    circ_str = fields[colCir].replace(",", ".").strip()   # circonférence en cm
    height_str = fields[colHaut].replace(",", ".").strip()  # hauteur en m
    
    # on vérifie que ce sont bien des nombres
    if not circ_str.replace(".", "").isdigit():
        return None
    if not height_str.replace(".", "").isdigit():
        return None
    
    circ_cm = float(circ_str)
    height_m = float(height_str)
    
    # conversion de la circonférence en mètres
    circ_m = circ_cm / 100.0
    
    # volume approximatif en m³ (modèle cylindre)
    volume_m3 = (circ_m**2 * height_m) / (4 * math.pi)
    return (volume_m3,fields)

volume_and_fields = rows.map(extract_volume).filter(lambda x: x is not None)
max_volume,tree_vol = volume_and_fields.max()  
print("le volume maximum est de {:.2f} m³ pour l'arbre :\n{}".format(max_volume, tree_vol))

le volume maximum est de 130.73 m³ pour l'arbre :
['48.880337104666346, 2.381291342815737', '102141.0', 'Jardin', 'PARIS 19E ARRDT', '19-08', '', 'PARC DES BUTTES CHAUMONT / 7 RUE BOTZARIS', '740.0', '30.0', 'M', 'Inconnue', 'Platanus', 'x hispanica', '', '1700-01-01T01:02:21+00:53', 'Platane', '102141.0', '102141.0', 'Parc des Buttes Chaumont', '', '19-08', '19', 'Jardin', 'Platane commun', 'Platanus', 'Mill. ex Münchh.', 'Inconnue', 'Paysager', 'Cet arbre est classé remarquable pour son ampleur et son empreinte dans le paysage.', 'Essence typique de la famille des Platanacées, le platane commun est un hybride entre les platanes d’Orient et d’Occident. Il se caractérise par une écorce marbrée et des feuilles palmées semblables à celles de l’érable plane. Il a été planté en 1862.', '', '', '', 'https://capgeo.sig.paris.fr/PdfEtImages/ArbresRemarquables/PDF/102141.pdf', 'https://capgeo.sig.paris.fr/PdfEtImages/ArbresRemarquables/Photos2023/1/102141.jpg', '']


In [11]:
f = tree_vol
print("Volume approx.   :", max_volume)
print("Libellé français :", f[15])
print("Site             :", f[18])
print("Adresse          :", f[19])
print("Arrondissement   :", f[21])
print("Année plantation :", f[26])
print("Hauteur (m)      :", f[8])
print("Circonf (cm)     :", f[7])


Volume approx.   : 130.72987025568284
Libellé français : Platane
Site             : Parc des Buttes Chaumont
Adresse          : 
Arrondissement   : 19
Année plantation : Inconnue
Hauteur (m)      : 30.0
Circonf (cm)     : 740.0


## Quel est la hauteur moyenne des arbres ? ##

In [12]:
def extract_height(fields):
    if len(fields) < colHaut:
        return None
    h = fields[colHaut].replace(",", ".").strip()
    if not h.replace(".", "").isdigit():
        return None
    return float(h)

heights = rows.map(extract_height).filter(lambda h: h is not None)
average_height = heights.mean()

print(f"La hauteur moyenne des arbres est de {average_height:.2f} m.")

La hauteur moyenne des arbres est de 20.32 m.


## Dans quel arrondissement y a-t-il le plus d’arbres ? ##

In [13]:
colArr=[col for col in enumerate(cols) if "rond" in col[1]]
print(f"il y a deux colonnes arrondissement ddans le jeu de données : {colArr} ")  # index de la colonne arrondissement


il y a deux colonnes arrondissement ddans le jeu de données : [(3, 'arrondissement'), (21, 'Arrondissement')] 


In [14]:
sample = rows.take(5)
for f in sample:
    print("col 3 :", f[3], "   |   col 21 :", f[21])

## on va plutôt prendre l'index de la colonne arrondissement 21 
colArr=21

col 3 : BOIS DE VINCENNES    |   col 21 : 12
col 3 : BOIS DE BOULOGNE    |   col 21 : 16
col 3 : PARIS 17E ARRDT    |   col 21 : 17
col 3 : PARIS 4E ARRDT    |   col 21 : 4
col 3 : BOIS DE VINCENNES    |   col 21 : 12


In [15]:
def extract_arr(fields):
    # sécuriser la ligne
    if len(fields) <= colArr:
        return None
    
    arr = fields[colArr].strip()
    if not arr:
        return None
    return arr

# RDD des arrondissements (codes 11,12. etc.)
arr_rdd = rows.map(extract_arr).filter(lambda a: a is not None)

# Comptage par arrondissement
arr_counts = arr_rdd.map(lambda a: (a, 1)) \
                    .reduceByKey(lambda x, y: x + y)

# arrondissement avec le plus d'arbres
top_arr = arr_counts.takeOrdered(1, key=lambda kv: -kv[1])[0]
top_arr
print(f"L'arrondissement avec le plus d'arbres est le {top_arr[0]}ème  avec {top_arr[1]} arbres.")

[Stage 10:>                                                         (0 + 1) / 1]

L'arrondissement avec le plus d'arbres est le 16ème  avec 52 arbres.


                                                                                

In [16]:
## pour info la liste des arrondissements présents dans le jeu de données
print(arr_rdd.distinct().collect())


[Stage 12:>                                                         (0 + 1) / 1]

['12', '16', '17', '4', '8', '14', '5', '7', '13', '20', '19', '3', '9', '15', '18', '11', '1er', '10']


                                                                                

In [17]:
arbres_16 = rows.filter(
    lambda f: len(f) > colArr and f[colArr].strip() in ("16")).cache()

arbres_16.count()
colEspece = 15 #index de la colonne espèce (libellé français)

def extract_espece(fields):
    if len(fields) <= colEspece:
        return None
    e = fields[colEspece].strip()
    if e == "":
        return None
    return e.upper()

especes_counts = (
    arbres_16.map(extract_espece)
             .filter(lambda e: e is not None)
             .map(lambda e: (e, 1))
             .reduceByKey(lambda a, b: a + b)
             .collect()
)

# tri décroissant des espèces présentes dans le 16ème arrondissement
especes_counts_sorted = sorted(especes_counts, key=lambda kv: -kv[1])
especes_counts_sorted
print("Répartition des espèces dans le 16e arrondissement :\n")
for espece, nb in especes_counts_sorted:
    print(f"{espece:20s} : {nb} arbres")



[Stage 15:>                                                         (0 + 1) / 1]

Répartition des espèces dans le 16e arrondissement :

PLATANE              : 8 arbres
MAGNOLIA             : 5 arbres
SEQUOIA              : 4 arbres
CHÊNE                : 3 arbres
CYPRÈS CHAUVE        : 3 arbres
PTEROCARYA           : 3 arbres
PIN                  : 2 arbres
HÊTRE                : 2 arbres
ARBRE AUX QUARANTE ÉCUS : 2 arbres
CÈDRE                : 2 arbres
CATALPA              : 2 arbres
ORME DE SIBÉRIE      : 2 arbres
NOISETIER DE BYZANCE : 2 arbres
ARAUCARIA            : 1 arbres
ERABLE               : 1 arbres
ARBRE AUX MOUCHOIRS  : 1 arbres
PISTACHIER           : 1 arbres
TULIPIER             : 1 arbres
BOULEAU              : 1 arbres
CHICOT DU CANADA     : 1 arbres
PLAQUEMINIER         : 1 arbres
IF                   : 1 arbres
SAULE                : 1 arbres
SAVONNIER            : 1 arbres
MICOCOULIER          : 1 arbres


                                                                                

## Combien d’arbres au Cimetière du Père Lachaise ? ##

On va regarder dans la colonne site => 18 

In [18]:
def is_lachaise(fields):
    # Convertir toutes les colonnes en une seule string
    line = " ".join(fields).lower()
    return "lachaise" in line

arbres_lachaise = rows.filter(is_lachaise).cache()

print(f"le nombre d'arbes remarquables au cimetière du Père Lachaise est de : {arbres_lachaise.count()}")





                                                                                

le nombre d'arbes remarquables au cimetière du Père Lachaise est de : 9


In [19]:
def find_lachaise_columns(fields):
    cols_used = []
    for idx, val in enumerate(fields):
        if "lachaise" in val.lower():
            cols_used.append(idx)
    return cols_used

cols_detected = arbres_lachaise.map(find_lachaise_columns).take(10)
cols_detected


                                                                                

[[6, 18, 19],
 [6, 18, 19],
 [6, 18, 19],
 [6, 18, 19],
 [6, 18, 19],
 [6, 18, 19],
 [6, 18, 19],
 [6, 18, 19],
 [6, 18, 19]]

In [20]:
colEspece = 15 #index de la colonne espèce (libellé français)

especes_lachaise = (
    arbres_lachaise
        .map(lambda f: f[colEspece].strip())
        .filter(lambda e: e not in ("", None))
        .map(lambda e: (e.upper(), 1))
        .reduceByKey(lambda x,y: x+y)
        .collect()
)

especes_counts_sorted=sorted(especes_lachaise, key=lambda kv: -kv[1])

print("Répartition des espèces dans le cimetière du Père Lachaise :\n")
for espece, nb in especes_counts_sorted:
    print(f"{espece:20s} : {nb} arbres")

[Stage 20:>                                                         (0 + 1) / 1]

Répartition des espèces dans le cimetière du Père Lachaise :

ARBRE AUX QUARANTE ÉCUS : 2 arbres
PLATANE              : 1 arbres
ERABLE               : 1 arbres
PLAQUEMINIER         : 1 arbres
ARBRE À GUTTA-PERCHA : 1 arbres
CHICOT DU CANADA     : 1 arbres
MARRONNIER           : 1 arbres
CÈDRE                : 1 arbres


                                                                                

## Manipulation de texte avec Spark ##

Cas du livre du projet GutenBerg "Beautiful Stories" de Shakespear


In [21]:
from pyspark.sql import SparkSession
import re


book = sc.textFile("Module_BigData/BeautifulStories.txt")
book.take(5)

                                                                                

['The Project Gutenberg eBook of Beautiful Stories from Shakespeare',
 '    ',
 'This ebook is for the use of anyone anywhere in the United States and',
 'most other parts of the world at no cost and with almost no restrictions',
 'whatsoever. You may copy it, give it away or re-use it under the terms']

In [32]:
import re

def clean_and_split(line):
    # tout en minuscules + on garde que les lettres
    line = line.lower()
    line = re.sub(r"[^a-z]", " ", line)
    return line.split()

words = book.flatMap(clean_and_split)
words.take(5)

['the', 'project', 'gutenberg', 'ebook', 'of']

In [33]:
## retirer certain mots (stop words)
stopwords= {
    "the", "and", "a", "to", "of", "in", "i", "is", "that", "it", "you",
    "he", "was", "for", "on", "are", "as", "with", "his", "they", "at",
    "be", "this", "from", "or", "by", "an", "not", "but", "all", "she",
    "there", "we", "when", "your", "my", "so", "if", "what", "about",
    "who", "me", "no", "one", "like", "just", "him", "out", "up",
    "now", "more", "would", "said","them","their","had","were","which",
    "been","will","would","here","when","where","why","how","any","much","her","his"
}

bc_stopwords=sc.broadcast(stopwords) # diffusion des stopwords à tous les nœuds

filtered_words=words.filter(lambda w: w not in bc_stopwords.value and w != "" and len(w)>2)

In [34]:
## compter les occurrences des mots
word_counts=(filtered_words.map(lambda w: (w, 1)).reduceByKey(lambda x, y: x + y))
top_30 = word_counts.sortBy(lambda kv: kv[1], ascending=False).take(30)

for w, c in top_30:
    print(f"{w:15s} {c}")

print(f"Il y a au total {word_counts.count()} mots différents après filtrage des stopwords.")
# on peut voir que le sujet traite principalement des histoires d'amour et de relations entre les personnages
# notamment avec un king et un duke.

have            203
king            200
then            196
love            166
duke            127
man             114
could           107
told            104
gutenberg       97
came            97
father          90
went            89
project         88
did             85
before          83
wife            82
into            80
than            79
good            77
very            75
timon           75
day             74
asked           74
claudio         74
two             73
only            73
macbeth         72
othello         72
come            71
thought         71
Il y a au total 5500 mots différents après filtrage des stopwords.


### Séparer les histoires ##

Dans le texte, chaque histoire commence par un tire en majuscules comme apr exemple:
ROMEO ET JULIETTE
KING LEAR 
On peut ainsi extraire automatiquement les histoires en détectant les titres

In [None]:
indexed = book.zipWithIndex().map(lambda x: (x[1], x[0]))
# (index, ligne)

indexed.take(5)

def is_title(line: str) -> bool:
    s = line.strip()
    return len(s) > 3 and s.isupper()

titles_idx = (
    indexed
    .filter(lambda t: is_title(t[1]))
    .collect()
)

# titles_idx : liste de (index, "ROMEO AND JULIET")
titles_idx = sorted(titles_idx, key=lambda x: x[0])

for idx, title in titles_idx:
    print(idx, repr(title))


23 '*** START OF THE PROJECT GUTENBERG EBOOK BEAUTIFUL STORIES FROM SHAKESPEARE ***'
24 'BEAUTIFUL STORIES FROM SHAKESPEARE'
42 'PREFACE'
105 'E. T. R.'
110 'A BRIEF LIFE OF SHAKESPEARE.'
233 'CONTENTS'
235 '                                                     PAGE'
236 '     PREFACE . . . . . . . . . . . . . . . . . . . . . 3'
237 '     A BRIEF LIFE OF SHAKESPEARE . . . . . . . . . . . 7'
238 "     A MIDSUMMER NIGHT'S DREAM . . . . . . . . . . .  19"
239 '     THE TEMPEST . . . . . . . . . . . . . . . . . .  33'
240 '     AS YOU LIKE IT  . . . . . . . . . . . . . . . .  44'
241 "     THE WINTER'S TALE . . . . . . . . . . . . . . .  54"
242 '     KING LEAR . . . . . . . . . . . . . . . . . . .  67'
243 '     TWELFTH NIGHT . . . . . . . . . . . . . . . . .  74'
244 '     MUCH ADO ABOUT NOTHING  . . . . . . . . . . . .  86'
245 '     ROMEO AND JULIET  . . . . . . . . . . . . . . . 105'
246 '     PERICLES  . . . . . . . . . . . . . . . . . . . 119'
247 '     HAMLET  . . . . . . . . . . . 

In [40]:
story_titles = {
    "A MIDSUMMER NIGHT'S DREAM",
    "THE TEMPEST",
    "AS YOU LIKE IT",
    "THE WINTER'S TALE",
    "KING LEAR",
    "TWELFTH NIGHT",
    "MUCH ADO ABOUT NOTHING",
    "ROMEO AND JULIET",
    "PERICLES",
    "HAMLET",
    "CYMBELINE",
    "MACBETH",
    "THE COMEDY OF ERRORS",
    "THE MERCHANT OF VENICE",
    "TIMON OF ATHENS",
    "OTHELLO",
    "MEASURE FOR MEASURE",
    "TWO GENTLEMEN OF VERONA",
    "ALL'S WELL THAT ENDS WELL",
}


In [None]:
## on retrouve les indices des lignes pour titres des histoires
titles_idx = (
    indexed
    .filter(lambda t: t[1].strip() in story_titles)
    .collect()
)

titles_idx = sorted(titles_idx, key=lambda x: x[0])

for idx, title in titles_idx:
    print(idx, repr(title))


363 "A MIDSUMMER NIGHT'S DREAM"
588 'THE TEMPEST'
786 'AS YOU LIKE IT'
952 "THE WINTER'S TALE"
1172 'KING LEAR'
1297 'TWELFTH NIGHT'
1536 'MUCH ADO ABOUT NOTHING'
1957 'ROMEO AND JULIET'
2206 'PERICLES'
2392 'HAMLET'
2616 'CYMBELINE'
2849 'MACBETH'
3116 'THE COMEDY OF ERRORS'
3391 'THE MERCHANT OF VENICE'
3578 'TIMON OF ATHENS'
3925 'OTHELLO'
4492 'MEASURE FOR MEASURE'
4778 'TWO GENTLEMEN OF VERONA'
5130 "ALL'S WELL THAT ENDS WELL"


In [42]:
story_ranges = []
for i, (idx, title) in enumerate(titles_idx):
    start = idx
    end = titles_idx[i+1][0] if i+1 < len(titles_idx) else None
    story_ranges.append((title.strip(), start, end))

for r in story_ranges:
    print(r)


("A MIDSUMMER NIGHT'S DREAM", 363, 588)
('THE TEMPEST', 588, 786)
('AS YOU LIKE IT', 786, 952)
("THE WINTER'S TALE", 952, 1172)
('KING LEAR', 1172, 1297)
('TWELFTH NIGHT', 1297, 1536)
('MUCH ADO ABOUT NOTHING', 1536, 1957)
('ROMEO AND JULIET', 1957, 2206)
('PERICLES', 2206, 2392)
('HAMLET', 2392, 2616)
('CYMBELINE', 2616, 2849)
('MACBETH', 2849, 3116)
('THE COMEDY OF ERRORS', 3116, 3391)
('THE MERCHANT OF VENICE', 3391, 3578)
('TIMON OF ATHENS', 3578, 3925)
('OTHELLO', 3925, 4492)
('MEASURE FOR MEASURE', 4492, 4778)
('TWO GENTLEMEN OF VERONA', 4778, 5130)
("ALL'S WELL THAT ENDS WELL", 5130, None)


In [43]:
import os, re

output_dir = "/home/mvana/Module_BigData/Stories_Spark"
os.makedirs(output_dir, exist_ok=True)

for title, start, end in story_ranges:
    s = start
    e = end

    story_rdd = (
        indexed
        .filter(lambda t, s=s, e=e: t[0] >= s and (e is None or t[0] < e))
        .map(lambda t: t[1])
    )

    safe_title = re.sub(r"[^A-Za-z0-9]+", "_", title)
    out_path = f"{output_dir}/{safe_title}"

    print(f"Sauvegarde : {title} -> {out_path}")
    story_rdd.coalesce(1).saveAsTextFile(out_path)
# Le code ci-dessus permet d'extraire chaque histoire du livre et de les sauvegarder dans des fichiers séparés.

Sauvegarde : A MIDSUMMER NIGHT'S DREAM -> /home/mvana/Module_BigData/Stories_Spark/A_MIDSUMMER_NIGHT_S_DREAM


                                                                                

Sauvegarde : THE TEMPEST -> /home/mvana/Module_BigData/Stories_Spark/THE_TEMPEST
Sauvegarde : AS YOU LIKE IT -> /home/mvana/Module_BigData/Stories_Spark/AS_YOU_LIKE_IT
Sauvegarde : THE WINTER'S TALE -> /home/mvana/Module_BigData/Stories_Spark/THE_WINTER_S_TALE
Sauvegarde : KING LEAR -> /home/mvana/Module_BigData/Stories_Spark/KING_LEAR
Sauvegarde : TWELFTH NIGHT -> /home/mvana/Module_BigData/Stories_Spark/TWELFTH_NIGHT
Sauvegarde : MUCH ADO ABOUT NOTHING -> /home/mvana/Module_BigData/Stories_Spark/MUCH_ADO_ABOUT_NOTHING
Sauvegarde : ROMEO AND JULIET -> /home/mvana/Module_BigData/Stories_Spark/ROMEO_AND_JULIET
Sauvegarde : PERICLES -> /home/mvana/Module_BigData/Stories_Spark/PERICLES
Sauvegarde : HAMLET -> /home/mvana/Module_BigData/Stories_Spark/HAMLET
Sauvegarde : CYMBELINE -> /home/mvana/Module_BigData/Stories_Spark/CYMBELINE
Sauvegarde : MACBETH -> /home/mvana/Module_BigData/Stories_Spark/MACBETH
Sauvegarde : THE COMEDY OF ERRORS -> /home/mvana/Module_BigData/Stories_Spark/THE_COMED