BDLE 2024

date du document  :  06/12/2024

# TP 8 et 9 : traitement sur des partitions (séance 2)

 tri, regroupement, fenetre, jointure



**Auteur:**

**ZHOU runlin 28717281**

**ZHANG zhile 21201131**



# Indications, méthode

L'objectif est de comprendre la notion de traitement sur des données partitionnées.

*   Savoir *décomposer* un traitement complexe (une requête SQL) en une suite d'étapes.

*   Savoir *séparer* les étapes de **traitement dans une partition** des étapes de **repartitionnement des données**. Dans une étape de traitement, on peut si nécesaire calculer un atribut qui servira à un repartitionnement ultérieur.


Indications:

*   Les données sont gérées par spark et sont supposées être très volumineuses. Elle ne doivent jamais être "remontées" entièrement dans l'application. Ne jamais invoquer un collect() sur la totalité des données.

*   On peut remonter dans l'application le résultat d'une requête si on sait que sa taille est petite.

*   On peut faire "descendre" vers spark des données auxilliaires provenant de l'application (supposées de petite taille). Ces données auxilliaires pourront ensuite être lues lors d'un prochain traitement dans une partition.


Implémentation :

*   Savoir définir une fonction UDF qui implémente le traitement dans une partition et qui est invoquée avec mapPartition ou mapPartitionWithIndex.

*   La fonction repartition permet de repartitionner les données

*   La fonction broadcast permet de diffuser des données auxilliaires




# Préparation

Installer pyspark et findspark :


In [1]:
!pip install -q pyspark
!pip install -q findspark
print("installé")

installé


Démarrer la session spark

In [2]:
import os
import glob

pyspark_dir = glob.glob('/usr/local/lib/python*/dist-packages/pyspark')[0]
print("pyspark directory is", pyspark_dir)
os.environ["SPARK_HOME"] = pyspark_dir
os.environ["JAVA_HOME"] = "/usr"

pyspark directory is /usr/local/lib/python3.10/dist-packages/pyspark


In [3]:
# Principaux import
import findspark
from pyspark.sql import SparkSession
from pyspark import SparkConf

# pour les dataframe et udf
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import *

# pour le chronomètre
import time

# pour les itérateurs
from typing import Iterator


# initialise les variables d'environnement pour spark
findspark.init()

# Démarrage session spark
# --------------------------
def demarrer_spark():
  local = "local[*]"
  appName = "TP"
  configLocale = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "6G").\
  set("spark.driver.memory","3G").\
  set("spark.sql.catalogImplementation","in-memory")

  spark = SparkSession.builder.config(conf = configLocale).getOrCreate()
  sc = spark.sparkContext
  sc.setLogLevel("ERROR")

  spark.conf.set("spark.sql.autoBroadcastJoinThreshold","-1")

  # On ajuste l'environnement d'exécution des requêtes à la taille du cluster (4 coeurs)
  spark.conf.set("spark.sql.shuffle.partitions","4")

  # désactiver adaptive query
  spark.conf.set("spark.sql.adaptive.enable", "false")

  print("session démarrée, son id est ", sc.applicationId)
  return spark

# spark.stop()
spark = demarrer_spark()

session démarrée, son id est  local-1733519842531


Redéfinir la fonction **display** pour afficher le résultat des requêtes dans un tableau

In [4]:
import pandas as pd
from google.colab import data_table

# alternatives to Databricks display function.

def display(df, n=100):
  return data_table.DataTable(df.limit(n).toPandas(), include_index=False, num_rows_per_page=10)


Définir le tag **%%sql** pour pouvoir écrire plus simplement des requêtes en SQL dans une cellule

In [5]:
from IPython.core.magic import (register_line_magic, register_cell_magic, register_line_cell_magic)

def removeComments(query):
  result = ""
  for line in query.split('\n'):
    if not(line.strip().startswith("--")):
      result += line + "\n"
  return result

@register_line_cell_magic
def sql(line, cell=None):
    "To run a sql query. Use:  %%sql"
    val = cell if cell is not None else line
    tabRequetes = removeComments(val).split(";")
    derniere = None
    est_requete = False
    for r in tabRequetes:
        r = r.strip()
        if len(r) > 2:
          derniere = spark.sql(r)
          est_requete = ( r.lower().startswith('select') or r.lower().startswith('with') )
    if(est_requete):
      return display(derniere)
    else:
      return print('ok')

Utiliaires : Chronomètres

In [6]:
#------------------------------
# Chronometre : chronoPersist2
#------------------------------
import time

# Ce chronometre garantit que chaque tuple du dataframe est lu entièrement.
# En effet il est nécessaire de lire le détail de chaque tuple avant de les 'copier' en mémoire.
def chronoPersist(df):
    df.unpersist()
    t1 = time.perf_counter()
    count = df.persist().count()
    t2 = time.perf_counter()
    df.unpersist()
    print('durée: {:.1f} s'.format(t2 - t1), 'pour lire', count , 'elements')

def chronoPersist2(df):
  dest = df.selectExpr("*", "1")
  t1 = time.perf_counter()
  count = dest.persist().count()
  t2 = time.perf_counter()
  dest.unpersist()
  print('durée: {:.1f} s'.format(t2 - t1), 'pour lire', count , 'elements')

def chronoCount(df):
  t1 = time.perf_counter()
  count = df.count()
  t2 = time.perf_counter()
  print('durée: {:.1f} s'.format(t2 - t1), 'pour dénombrer', count , 'elements')

print("fonctions définies")

fonctions définies


# Accès aux données

In [7]:
import os
local_dir = "/local/data"
os.makedirs(local_dir, exist_ok=True)
os.listdir(local_dir)

[]

URL pour l'accès aux datasets

In [8]:
# URL du dossier PUBLIC_DATASET contenant des fichiers de données pour les TP
# ---------------------------------------------------------------------------
# en cas de problème avec le téléchargement des datasets, aller directement sur l'URL ci-dessous
PUBLIC_DATASET_URL = "https://nuage.lip6.fr/s/LqD9N23kxrfHopr"
PUBLIC_DATASET=PUBLIC_DATASET_URL + "/download?path="

print("URL du dossier contenant les datasets ", PUBLIC_DATASET_URL)

URL du dossier contenant les datasets  https://nuage.lip6.fr/s/LqD9N23kxrfHopr


In [9]:
import os
from urllib import request
import zipfile

# download dataset if not already donwloaded
def download_file(web_dir, local_dir, file):
  local_file = local_dir + "/" + file
  web_file = web_dir + "/" + file
  if(os.path.isfile(local_file)):
    print(file, "is already stored")
  else:
    print("downloading from URL: ", web_file , "save in : " + local_file)
    request.urlretrieve(web_file , local_file)

def unzip_file(local_dir, file):
  with zipfile.ZipFile(local_dir + "/" + file, 'r') as zip_ref:
    zip_ref.extractall(local_dir)
  # os.remove(local_dir + "/" + file)


web_dir = PUBLIC_DATASET + "/movielens"

download_file(web_dir, local_dir, "notes1M.zip")
unzip_file(local_dir, "notes1M.zip")

download_file(web_dir, local_dir, "ratings3M.zip")
unzip_file(local_dir, "ratings3M.zip")

download_file(web_dir, local_dir, "films.json")

web_dir = PUBLIC_DATASET + "/movielens/ml-latest"

download_file(web_dir, local_dir, "movies.csv")


# Liste des fichiers
os.listdir(local_dir)

downloading from URL:  https://nuage.lip6.fr/s/LqD9N23kxrfHopr/download?path=/movielens/notes1M.zip save in : /local/data/notes1M.zip
downloading from URL:  https://nuage.lip6.fr/s/LqD9N23kxrfHopr/download?path=/movielens/ratings3M.zip save in : /local/data/ratings3M.zip
downloading from URL:  https://nuage.lip6.fr/s/LqD9N23kxrfHopr/download?path=/movielens/films.json save in : /local/data/films.json
downloading from URL:  https://nuage.lip6.fr/s/LqD9N23kxrfHopr/download?path=/movielens/ml-latest/movies.csv save in : /local/data/movies.csv


['notes1M.json',
 'movies.csv',
 'films.json',
 'notes1M.zip',
 'ratings3M.csv',
 'ratings3M.zip']

### Dataframe les Films

début du fichier film.json

In [10]:
! head -n 2 /local/data/films.json

{"nF":8754,"titre":"Prime of Miss Jean Brodie, The (1969)","g":["Drama"]}
{"nF":111486,"titre":"Lesson of the Evil (Aku no kyôten) (2012)","g":["Thriller"]}


définir le dataframe des films

In [11]:
schema_film = "nF long, titre String, g Array<String>"
# schema_film = StructType([StructField('nF',LongType()),
#                           StructField('titre',StringType()),
#                           StructField('g',ArrayType(StringType()))])

films = spark.read.json(local_dir + "/" + "films.json", schema = schema_film).selectExpr("nF", "titre", "g as genres")
# print('schema:', films.schema)

print('Nombre total de films:', films.count())

# extrait
films_extrait = films.where("nF < 100").repartition(3).persist()

print("Nombre films dans l'extrait:", films_extrait.count())

print("Schéma des données:")
films_extrait.printSchema()

# print(films_extrait.storageLevel)

display(films_extrait)

Nombre total de films: 9125
Nombre films dans l'extrait: 92
Schéma des données:
root
 |-- nF: long (nullable = true)
 |-- titre: string (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)



Unnamed: 0,nF,titre,genres
0,94,Beautiful Girls (1996),"[Comedy, Drama, Romance]"
1,45,To Die For (1995),"[Comedy, Drama, Thriller]"
2,58,"Postman, The (Postino, Il) (1994)","[Comedy, Drama, Romance]"
3,54,"Big Green, The (1995)","[Children, Comedy]"
4,9,Sudden Death (1995),[Action]
...,...,...,...
87,38,It Takes Two (1995),"[Children, Comedy]"
88,15,Cutthroat Island (1995),"[Action, Adventure, Romance]"
89,85,Angels and Insects (1995),"[Drama, Romance]"
90,64,Two if by Sea (1996),"[Comedy, Romance]"


### Dataframe Les Notes

In [12]:
notes_schema = "nF long, nU long, note double, annee long"
# notes_schema = StructType([StructField('nF',LongType()),
#                            StructField('nU',LongType()),
#                            StructField('note',DoubleType()),
#                            StructField('annee',LongType())])

notes = spark.read.json(local_dir + "/" + "notes1M.json", schema = notes_schema).selectExpr("nF", "nU", "note", "annee")
print('Nombre total de notes:',  notes.count())

#extrait
notes_extrait = notes.where("nU < 1000").join(films_extrait, "nF").select(notes["nF"], "nU", "note", "annee").repartition(3).persist()
print("Nombre de notes dans l'extrait:", notes_extrait.count())
display(notes_extrait)

Nombre total de notes: 1301573
Nombre de notes dans l'extrait: 185


Unnamed: 0,nF,nU,note,annee
0,21,638,4.0,1996
1,15,875,4.0,1996
2,76,160,4.0,2000
3,19,151,2.0,2004
4,6,24,4.0,2001
...,...,...,...,...
95,39,306,3.0,1996
96,1,667,4.0,1999
97,1,44,5.0,1996
98,5,196,4.0,1996


# Exercice 1 : Traitement sur des partitions

Dans les question suivantes, on demande d'evaluer les requetes **sans** utiliser les fonctions prédéfinies SQL (select, explode, where, join, groupBy, orderBy, distinct, ...)


#### Rappel sur les itérateurs

Un exemple de données

In [13]:
# names = ['Alice', 'Bob', 'Carol', 'Dave', 'Eva', 'Fabio' , 'Greg', 'Hans']
names = ['Alice', 'Bob', 'Carol', 'Dave', 'Eva']

Manipuler un itérateur.

Exemple a) Pour chaque prénom, afficher "Hello prénom".

In [14]:
print('iteration "for" sur un itérateur : ')
input_iterator = iter(names)

for n in input_iterator:
  print('Hello '+ n)


print()
print('iteration "while" sur un iterateur : ')
input_iterator = iter(names)
n = next(input_iterator, None)
while n is not None:
  print('Hello '+ n)
  n = next(input_iterator, None)

iteration "for" sur un itérateur : 
Hello Alice
Hello Bob
Hello Carol
Hello Dave
Hello Eva

iteration "while" sur un iterateur : 
Hello Alice
Hello Bob
Hello Carol
Hello Dave
Hello Eva


Exemple b) Afficher seulement les prénoms se terminant par la lettre 'e'.

In [15]:
input_iterator = iter(names)
for x in input_iterator:
  if x[-1] == 'e':
    print('Hello '+ x)

Hello Alice
Hello Dave


Fonction manipulant un itérateur en **entrée** et en **sortie**

In [16]:
def hello_many(name_iterator: Iterator) -> Iterator:
  for name in name_iterator:
    yield "Hello " + name
    time.sleep(0.5)  # Pause de 1/2 seconde

input_iterator = iter(names)

output_iterator = hello_many(input_iterator)

for x in output_iterator:
  print(x)
print("done")

Hello Alice
Hello Bob
Hello Carol
Hello Dave
Hello Eva
done


Exemple d'itérateur sur des films.

On demande de calculer pour chaque film son nombre de genres. Le schéma du résultat est (titre, nb)

In [17]:
liste_locale_films = films_extrait.limit(5).collect()
# print(liste_locale_films)
input_iterator = iter(liste_locale_films)

def nb_genres_films(film_iterator: Iterator) -> Iterator:
  for f in film_iterator:
    yield (f.titre, len(f.genres))

output_iterator = nb_genres_films(input_iterator)

for x in output_iterator:
  print(x)
print("done")

('Beautiful Girls (1996)', 3)
('To Die For (1995)', 3)
('Postman, The (Postino, Il) (1994)', 3)
('Big Green, The (1995)', 2)
('Sudden Death (1995)', 1)
done


Itérateur sur des films. Maintenant, les films ne sont plus des données locales mais sont *dans Spark*.

In [18]:
result = films_extrait.limit(5).rdd.mapPartitions(nb_genres_films).toDF(['titre', 'nb'])
display(result)

Unnamed: 0,titre,nb
0,Beautiful Girls (1996),3
1,To Die For (1995),3
2,"Postman, The (Postino, Il) (1994)",3
3,"Big Green, The (1995)",2
4,Sudden Death (1995),1


### Fonction part_size_simple invoquée par mapPartitions

La fonction part_size calcule la taille d'une partition (ensemble de tuples). Sa signature est prédéfinie pour pemettre son invocation par le moteur de requêtes de Spark.
*   Elle prend en paramètre un itérateur sur une partition.
*   Elle retourne un itérateur sur une partition qui est un singleton (nbtuples).


Invocation avec **mapPartitions**

In [19]:
def part_size_simple(iterateur: Iterator) -> Iterator:
  size=0
  for s in iterateur:
    size += 1
  yield (size,)

# test
film_size = films_extrait.limit(10).rdd.mapPartitions(part_size_simple).toDF(['size'])
display(film_size)

Unnamed: 0,size
0,10


On peut constater que part_size_simple est invoquée sur **chaque** partition

In [20]:
print("Nombre de partitions: ", films_extrait.rdd.getNumPartitions())

film_size = films_extrait.rdd.mapPartitions(part_size_simple).toDF(['size'])
display(film_size)

Nombre de partitions:  3


Unnamed: 0,size
0,31
1,31
2,30


### Fonction part_size invoquée avec mapPartitionsWithIndex

La fonction part_size calcule la taille de chaque partition en préciant aussi le **numéro de partition** concerné.

Elle prend en entrée 2 paramètres:
*    un numéro de partition
*    un itérateur sur les tuples d'une partition

Elle retourne un itérateur sur des couples (numéro de partition, taille)

Invocation avec **mapPartitionsWithIndex**

In [21]:
# La fonction part_size prend 2 paramètres : un itérateur sur une partition.
# Elle retourne un itérateur sur une partition qui est un singleton (nbtuples).

def part_size(partID, iterateur: Iterator) -> Iterator:
  size=0
  for s in iterateur:
    size += 1
  yield (partID, size)

# Invocation
film_size = films_extrait.rdd.mapPartitionsWithIndex(part_size).toDF(['partID', 'size'])
display(film_size)

Unnamed: 0,partID,size
0,0,31
1,1,31
2,2,30


#### Fonction showPartitionSize

La fonction *showPartitionSize*  affiche le nombre d'éléments dans chaque partition.

In [22]:
def showPartitionSize(df):
  if df.isEmpty():
    print("empty dataframe")
  else:
    #invoquer la fonction partSize sur chaque partition
    t = df.selectExpr("1").rdd.mapPartitionsWithIndex(part_size).toDF(['partID', 'size'])

    # Rmq : selectExpr("1") sert à simplifier la partition pour ne garder qu'une seule colonne contentant des "1",
    # ce qui est suffisant pour compter le nombre de tuples dans la partition.

    #affichage
    # t.show()
    return display(t)

print('showPartitionSize définie')

showPartitionSize définie


Afficher la taille de chaque partition

In [23]:
showPartitionSize(films_extrait)

Unnamed: 0,partID,size
0,0,31
1,1,31
2,2,30


#### Fonction showPartitions

La fonction *showPartitions*  affiche les _n_ premiers éléments de chaque partition


In [24]:
def showPartitions(df, N=5 , display_per_partition=True, partID_field = "partID"):
  if df.isEmpty():
     print("empty dataframe")
  else:
    nb_partitions = df.rdd.getNumPartitions()

    # la fonction topN est invoquée sur une partition.
    # topN retourne un itérateur sur une partition qui contient les N premiers éléments de cette partition
    def topN(partID, iterateur):
      c=1
      for t in iterateur:
        if c > N:
          break
        tuple_avec_numero_partition = (partID, *t)
        yield tuple_avec_numero_partition
        c+=1

    #-- fin de la fonction topN ---

    # nom de l'attribut représentant le numéro de partition
    if partID_field in df.schema.fieldNames():
      i = 1
      while partID_field + str(i) in df.schema.fieldNames():
        i+=1
      partID_field = partID_field + str(i)
    nom_attributs = [partID_field] + df.schema.fieldNames()
    premiers_tuples = df.rdd.mapPartitionsWithIndex(topN).toDF(nom_attributs)
    premiers_tuples.persist()

    if(display_per_partition):
      # afficher séparément le contenu des N premiers tuples de chaque partition
      for partID in range(nb_partitions):
        print("partition", partID)
        # afficher la partition partID
        premiers_tuples.where(col(partID_field)==partID).drop(partID_field).show(N, False)
    else:
      return display(premiers_tuples)

print('showPartitions définie')


showPartitions définie


Afficher séparément les premiers éléments de chaque partition

In [25]:
showPartitions(films_extrait,10)

partition 0
+---+-------------------------------------------+----------------------------------------+
|nF |titre                                      |genres                                  |
+---+-------------------------------------------+----------------------------------------+
|94 |Beautiful Girls (1996)                     |[Comedy, Drama, Romance]                |
|45 |To Die For (1995)                          |[Comedy, Drama, Thriller]               |
|58 |Postman, The (Postino, Il) (1994)          |[Comedy, Drama, Romance]                |
|54 |Big Green, The (1995)                      |[Children, Comedy]                      |
|9  |Sudden Death (1995)                        |[Action]                                |
|4  |Waiting to Exhale (1995)                   |[Comedy, Drama, Romance]                |
|80 |White Balloon, The (Badkonake sefid) (1995)|[Children, Drama]                       |
|87 |Dunston Checks In (1996)                   |[Children, Comedy]           

On peut aussi afficher le contenu de toutes les partitions dans un seul tableau en ajoutant un pseudo-attribut représentant le numéro de partition.

In [26]:
showPartitions(films_extrait, 5, display_per_partition=False)

Unnamed: 0,partID,nF,titre,genres
0,0,94,Beautiful Girls (1996),"[Comedy, Drama, Romance]"
1,0,45,To Die For (1995),"[Comedy, Drama, Thriller]"
2,0,58,"Postman, The (Postino, Il) (1994)","[Comedy, Drama, Romance]"
3,0,54,"Big Green, The (1995)","[Children, Comedy]"
4,0,9,Sudden Death (1995),[Action]
5,1,14,Nixon (1995),[Drama]
6,1,71,Fair Game (1995),[Action]
7,1,49,When Night Is Falling (1995),"[Drama, Romance]"
8,1,27,Now and Then (1995),"[Children, Drama]"
9,1,59,"Confessional, The (Confessionnal, Le) (1995)","[Drama, Mystery]"


## Question 1 : Regrouper les films par genre et les compter

En vous inspirant de la fonction *topN*,  définir des fonctions pour compter le nombre de films par genre. Le schéma du résultat est (genre, n).

In [27]:
def count_movies_by_genre(genre_iterator):
    genre_counts = {}
    for movie in genre_iterator:
        for genre in movie.genres:
            genre_counts[genre] = genre_counts.get(genre, 0) + 1
    for genre, count in genre_counts.items():
        yield (genre, count)

f1 = films_extrait.rdd.mapPartitions(count_movies_by_genre).toDF(['genre', 'n'])
showPartitions(f1, 5, display_per_partition=False)

Unnamed: 0,partID,genre,n
0,0,Comedy,13
1,0,Drama,19
2,0,Romance,10
3,0,Thriller,5
4,0,Children,6
5,1,Drama,17
6,1,Action,8
7,1,Romance,7
8,1,Children,4
9,1,Mystery,2


In [29]:
def total_genre(iterateur):
    genre_count = {}
    for genre, n in iterateur:
        if genre not in genre_count:
            genre_count[genre] = 0
        genre_count[genre] += n

    for genre, total in genre_count.items():
        yield (genre, total)

f2 = f1.repartition(3, 'genre')
# showPartitions(f2, 5, display_per_partition=False)
f3 = f2.rdd.mapPartitions(total_genre).toDF(['genre', 'n'])
showPartitions(f3, 5, display_per_partition=False)

Unnamed: 0,partID,genre,n
0,0,Comedy,31
1,0,Children,12
2,0,Action,17
3,0,Crime,14
4,0,Adventure,12
5,1,Romance,23
6,1,Thriller,20
7,1,War,2
8,1,Sci-Fi,5
9,1,Fantasy,5


## Question 2 : Numérotation

Définir une fonction qui attribue un numéro de 1 à $n$ aux tuples de notes_extrait. Le résultat a les attributs de notes + un attribut *num*.

Indications :
*    les valeurs de l'attribut *num* sont **consécutives**.
*    les numéros dans la $1^{ère}$ partition vont de 1 à $n_0$, avec $n_i$ étant le nombre de tuples dans la $i^{ème}$ partition.
*    les numéros dans la $2^{ème}$ partition vont de $(n_0+1)$ à $(n_0+n_1)$, et ainsi de suite pour les partitions suivantes.
*   il est possible d'utiliser `d_spark = spark.sparkContext.broadcast(d)` avec `d` étant un objet python de l'application (exple une liste). La variable `d_spark` est une copie de `d` et peut être lue dans toute fonction exécutée par spark. L'instruction `d_spark.value` permet de lire la copie de `d`.


In [49]:
# rearch the number of the lines in each partition
partition_sizes = notes_extrait.rdd.mapPartitions(part_size_simple).toDF(['size']).collect()

# regroup the counts by previous lines' count
# the first partition has 0 previous line
cumulative_sizes = [0]
for i in range(1, len(partition_sizes)):
    cumulative_sizes.append(partition_sizes[i]['size'] + cumulative_sizes[i - 1])

# previous methode : Partition numbering manually
# offsets = {}
# for i in range(len(partition_sizes)):
#     offsets[i] = cumulative_sizes[i]

# apply the spark broadcast
offsets = spark.sparkContext.broadcast(cumulative_sizes)

def add_partition_offset(partID, note_iterator: Iterator) -> Iterator:
    offset = offsets.value[partID]
    i = 1
    for note in note_iterator:
        yield (offset + i, note.nF, note.nU, note.note, note.annee)
        i += 1

f1 = notes_extrait.rdd.mapPartitionsWithIndex(add_partition_offset).toDF(['global_num', 'nF', 'nU', 'note', 'annee'])
showPartitions(f1, 15, display_per_partition=False)

Unnamed: 0,partID,global_num,nF,nU,note,annee
0,0,1,21,638,4.0,1996
1,0,2,15,875,4.0,1996
2,0,3,76,160,4.0,2000
3,0,4,19,151,2.0,2004
4,0,5,6,24,4.0,2001
5,0,6,45,898,4.0,1999
6,0,7,6,619,2.0,1996
7,0,8,1,392,3.5,2006
8,0,9,1,606,4.0,2002
9,0,10,6,483,4.0,2007


vérification que la numérotation est correcte

In [48]:
# Vérification
n = f1.count()
print('nb tuples:', n)
num_max = f1.groupBy().max('global_num').collect()[0][0]
print("valeur max de l'indice global_num:", num_max)

assert(n == num_max)

nb tuples: 185
valeur max de l'indice global_num: 185


## Question 3 : Tri des films par titre

3.1) En vous inspirant de la fonction *topN*  définir une fonction qui trie les données de *film_extrait* par titre. Le résultat doit avoir le même schéma que film.

Indications :
*  a) On veut segmenter le domaine de l'attribut $titre$ en $n$ intervalles, avec $n$ étant le nombre de partitions. Les intervalles sont numérotés de $0$ à $(n-1)$. Pour cela définir une fonction `num_intervalle(titre: string)-> Int`.
Il est possible de considérer la première lettre du titre pour déterminer le numéro d'intervalle
*  b) Associer un numéro d'intervalle $i \in [0,n[$ à chaque tuple.
*  c) Repartitionner les données en fonction du numéro d'intervalle : les tuples associée au $i^{ème}$ intervalle vont dans la partition $i$.
*  d) Trier les données dans chaque partition.



Exemple de méthode **non scalable** à éviter
*   Remonter les données dans l'application avec un collect
*   Trier les données dans l'application
*   Re-descendre les données dans spark avec un createDataFrame

In [50]:
def partitioner(titre):
    first_letter = titre[0].upper()
    if 'A' <= first_letter <= 'E':
        return 0
    elif 'F' <= first_letter <= 'J':
        return 1
    elif 'K' <= first_letter <= 'O':
        return 2
    elif 'P' <= first_letter <= 'T':
        return 3
    else:
        return 4
film1 = films_extrait.rdd.map(lambda x: (partitioner(x.titre), x.nF, x.titre, x.genres)).toDF(['p', 'nF', 'titre', 'genres'])

repartitionner les données par **intervalle** en fonction de l'attribut p

In [51]:
film2 = film1.repartitionByRange(5, col('p'))
showPartitions(film2,20, display_per_partition=False)

Unnamed: 0,partID,p,nF,titre,genres
0,0,0,94,Beautiful Girls (1996),"[Comedy, Drama, Romance]"
1,0,0,54,"Big Green, The (1995)","[Children, Comedy]"
2,0,0,87,Dunston Checks In (1996),"[Children, Comedy]"
3,0,0,88,Black Sheep (1996),[Comedy]
4,0,0,12,Dracula: Dead and Loving It (1995),"[Comedy, Horror]"
...,...,...,...,...,...
77,4,4,80,"White Balloon, The (Badkonake sefid) (1995)","[Children, Drama]"
78,4,4,86,White Squall (1996),"[Action, Adventure, Drama]"
79,4,4,93,Vampire in Brooklyn (1995),"[Comedy, Horror, Romance]"
80,4,4,49,When Night Is Falling (1995),"[Drama, Romance]"


In [52]:
def sort_partition(iterator):
    sorted_data = sorted(iterator, key=lambda x: x["titre"].lower())
    return sorted_data
film_tri_global = film2.rdd.mapPartitions(sort_partition).toDF(['p', 'nF', 'titre', 'genres'])
showPartitions(film_tri_global,15, display_per_partition=False)

Unnamed: 0,partID,p,nF,titre,genres
0,0,0,19,Ace Ventura: When Nature Calls (1995),[Comedy]
1,0,0,37,Across the Sea of Time (1995),"[Documentary, IMAX]"
2,0,0,11,"American President, The (1995)","[Comedy, Drama, Romance]"
3,0,0,85,Angels and Insects (1995),"[Drama, Romance]"
4,0,0,82,Antonia's Line (Antonia) (1995),"[Comedy, Drama]"
...,...,...,...,...,...
61,4,4,93,Vampire in Brooklyn (1995),"[Comedy, Horror, Romance]"
62,4,4,4,Waiting to Exhale (1995),"[Comedy, Drama, Romance]"
63,4,4,49,When Night Is Falling (1995),"[Drama, Romance]"
64,4,4,80,"White Balloon, The (Badkonake sefid) (1995)","[Children, Drama]"


3.2) Le résultat obtenu est-il partitionné de la même façon que celui de la requête équivalente suivante ?
*   resultat1 = film_extrait.orderBy('titre')


In [53]:
resultat1 = films_extrait.orderBy('titre')
showPartitions(resultat1,15, display_per_partition=False)

Unnamed: 0,partID,nF,titre,genres
0,0,19,Ace Ventura: When Nature Calls (1995),[Comedy]
1,0,37,Across the Sea of Time (1995),"[Documentary, IMAX]"
2,0,11,"American President, The (1995)","[Comedy, Drama, Romance]"
3,0,85,Angels and Insects (1995),"[Drama, Romance]"
4,0,82,Antonia's Line (Antonia) (1995),"[Comedy, Drama]"
5,0,23,Assassins (1995),"[Action, Crime, Thriller]"
6,0,34,Babe (1995),"[Children, Drama]"
7,0,13,Balto (1995),"[Adventure, Animation, Children]"
8,0,94,Beautiful Girls (1996),"[Comedy, Drama, Romance]"
9,0,74,Bed of Roses (1996),"[Drama, Romance]"


3.3) Modifier la solution précédente de telles sorte que
*  a) Les intervalles du domaine de l'attribut $titre$ soient équilibrés : chaque intervalle doit être associé à $m$ films avec $m = (nb\_films/n)  \pm n$. Cela nécessite de trier les données avant de les repartitionner.
*  b) Repartitionner les données en fonction du numéro d'intervalle : les tuples associée au $i^{ème}$ intervalle vont dans la partition $i$.
*  c) Dans chaque intervalle, fusionner les données sachant qu'elles ont déjà été triées à l'étape a).


In [55]:
def calculate_letter_distribution(data):
    letter_count = {}
    for record in data:
        letter = record["titre"][0].lower() # compter le nombre de ligne par le premier lettre du titre
        letter_count[letter] = letter_count.get(letter, 0) + 1

    for letter, count in letter_count.items():
        yield (letter, count)

def get_boundaries(m, n, distribution):
    boundaries = []
    partition_id = 0
    curr_count = 0
    curr_partition = []
    for letter, count in distribution:
        curr_partition.append(letter)
        curr_count += count
        if curr_count >= m:
            boundaries.append((curr_partition, partition_id))
            curr_partition = []
            curr_count = 0
            partition_id += 1
    # ajouter les dernier partition dans la list
    if curr_partition:
        boundaries.append(curr_partition, partition_id)

    return boundaries # format : List[Tuple(list_des_lettres, num_partition)]


# step a): calculer la distribution des titres
# obtenir le distribution du titire de chaque partition
letter_distribution = films_extrait.rdd.mapPartitions(calculate_letter_distribution).toDF(['letter', 'count']).collect()

# distribution global :
global_distribution = {}
for letter, count in letter_distribution:
    global_distribution[letter] = global_distribution.get(letter, 0) + count
sorted_global_distribution = sorted(global_distribution.items(), key=lambda x: x[0])

nb_partitions = films_extrait.rdd.getNumPartitions()
total_count = 0
for letter, count in sorted_global_distribution:
    total_count += count
m = total_count // nb_partitions       # taille de la patition m = (nbfilms / n)

partition_boundaries = get_boundaries(m, nb_partitions, sorted_global_distribution)
for partition, partition_id in partition_boundaries:
    print(f"Partition {partition_id}: {', '.join(partition)}")

Partition 0: a, b, c, d, e
Partition 1: f, g, h, i, j, k, l, m
Partition 2: n, o, p, r, s, t, u, v, w


In [56]:
def new_partitioner(titre):
    first_letter = titre[0].lower()
    for partition, partition_id in partition_boundaries:
        if first_letter in partition:
            return partition_id
    return -1

film3 = films_extrait.rdd.map(lambda x: (new_partitioner(x.titre), x.nF, x.titre, x.genres)).toDF(['p', 'nF', 'titre', 'genres'])
films = film3.repartitionByRange(4, col('p'))
showPartitions(films,20, display_per_partition=False)

Unnamed: 0,partID,p,nF,titre,genres
0,0,0,94,Beautiful Girls (1996),"[Comedy, Drama, Romance]"
1,0,0,54,"Big Green, The (1995)","[Children, Comedy]"
2,0,0,87,Dunston Checks In (1996),"[Children, Comedy]"
3,0,0,88,Black Sheep (1996),[Comedy]
4,0,0,12,Dracula: Dead and Loving It (1995),"[Comedy, Horror]"
5,0,0,39,Clueless (1995),"[Comedy, Romance]"
6,0,0,35,Carrington (1995),"[Drama, Romance]"
7,0,0,13,Balto (1995),"[Adventure, Animation, Children]"
8,0,0,22,Copycat (1995),"[Crime, Drama, Horror, Mystery, Thriller]"
9,0,0,36,Dead Man Walking (1995),"[Crime, Drama]"


## Question 4 : Regrouper et trier

Calculer le nombre de notes par année dans l'ordre croissant des années.

## Question 5 : top fréquence

Définir une fonction qui affiche les 5 mots les plus fréquents dans les titres

**Indications**

*   Découper le titre en liste de mots puis compter la fréquence des mots dans chaque partition. On obtient des couples (mot, fréquence)
*   Repartitionner les couples (mot, fréquence) en fonction d'un mot
*   Additionner les fréquences pour chaque mot et les trier pour ne garder que les 5 plus fréquents.
*   Transférer vers l'application les 5 mots plus fréquents de chaque partition puis fusionner les listes pour connaitre les 5 mots globalement les plus fréquents.

Exemple de méthode non scalable à éviter car elle génère **beaucoup de transferts**  :
*   Découper le titre en liste de mots pour obtenir des couples (film,mot)
*   Repartitionner les données en fonction d'un mot du titre : attention, les données repartionnées sont très volumineuses.
*   Compter la frequence des mots dans chaque partition



In [57]:
import re

def split_title(titre):
    titre = re.sub(r'[(),.:;!?0-9]', ' ', titre.lower()) # Supprimer les ponctuations et les chiffres
    titre = re.sub(r'\s+', ' ', titre) # Suppression des espaces supplémentaires
    return titre.split()
# Découper le titre en liste de mots
films_split_titre = films_extrait.rdd.map(lambda x: (x.nF, split_title(x.titre))).toDF(['nF', 'word_list'])
showPartitions(films_split_titre, 15, display_per_partition=False)

Unnamed: 0,partID,nF,word_list
0,0,94,"[beautiful, girls]"
1,0,45,"[to, die, for]"
2,0,58,"[postman, the, postino, il]"
3,0,54,"[big, green, the]"
4,0,9,"[sudden, death]"
5,0,4,"[waiting, to, exhale]"
6,0,80,"[white, balloon, the, badkonake, sefid]"
7,0,87,"[dunston, checks, in]"
8,0,88,"[black, sheep]"
9,0,20,"[money, train]"


In [58]:
def count_words(data):
    word_count = {}
    for _, word_list in data:
        for word in word_list:
            if word not in word_count:
                word_count[word] = 0
            word_count[word] += 1
    for word, count in word_count.items():
        yield (word, count)

# compter la fréquence des mots dans chaque partition
films_word_freq = films_split_titre.rdd.mapPartitions(count_words).toDF(['word', 'freq'])
# Repartitionner les couples (mot, fréquence) en fonction d'un mot
films_regroup = films_word_freq.repartition(3, 'word')

def count_words_total(iterator):
    word_count = {}
    for word, count in iterator:
        if word not in word_count:
            word_count[word] = 0
        word_count[word] += count

    # trier pour ne garder que les 5 plus fréquents.
    word_count = sorted(word_count.items(), key=lambda x: x[1], reverse=True)[:5]
    for word, count in word_count:
        yield (word, count)
# Additionner les fréquences pour chaque mot
films_freq_total = films_regroup.rdd.mapPartitions(count_words_total).toDF(['word', 'freq'])
print(type(films_freq_total))
showPartitions(films_freq_total, 15, display_per_partition=False)


<class 'pyspark.sql.dataframe.DataFrame'>


Unnamed: 0,partID,word,freq
0,0,the,18
1,0,eye,2
2,0,k,2
3,0,postman,1
4,0,sudden,1
5,1,and,6
6,1,to,5
7,1,of,5
8,1,when,4
9,1,dead,4


In [59]:
# transférer vers l'application les 5 mots plus fréquents de chaque partition
def get_words(iterator):
    words = []
    for word, count in iterator:
        words.append((word, count))
    return words

# fusionner les listes pour connaitre les 5 mots globalement
res = films_freq_total.rdd.mapPartitions(get_words).collect()
res = sorted(res, key=lambda x: x[1], reverse=True)[:5]

print("top5 des mots les plus fréquents :")
for word, count in res:
    print(f"{word}: {count}")

top5 des mots les plus fréquents :
the: 18
in: 7
a: 7
and: 6
to: 5


## Question 6 : Jointure
Définir les fonctions pour calculer la jointure entre les extraits de films et de notes.
Indication ne pas utiliser de jointure sur le nF.

Comparer le résultat avec la requête équivalente :
resultat = notes_extrait.join(films_extrait, "nF")


**Indications**

 * Attribuer à chaque film et chaque note un numéro de partition en fonction du numéro de film $nF$. Par exemple, le numéro de partition $numP$ peut être :
 $$ numP = nF \%  p $$
 avec $p$ étant le nombre de partitions
 * Repartitionner les données en fonction du numéro de partition $numP$

 * Former des paires de partitions de film et de note telles qu'elles ont le même numéro de partition. Puis calculer la jointure.


Remarque : Un solution correcte mais pas la plus rapide (car elle fait une jointure par boucles imbriquée qui est souvent mois rapide qu'une jointure par hachage ou par tri fusion) est de calculer tous les couples (film,note) qui ont le même numéro de partition (au lieu de former des paires de partition film/note). Il suffit ensuite calculer la jointure en filtrant les couples qui satisfont la condition de jointure.

Ne pas faire de jointure sur le $nF$, qui est la solution de référence avec laquelle il faut vous comparer.

In [None]:
reference = notes_extrait.join(films_extrait, "nF")
display(reference)

## Question 7: Jointure en cas de données déséquilibrées

Proposer un scénario de jointure avec des données très déséquilibrées :
un tier des notes concernent le même film.

# Exercice 2: Plan d'une requête

In [None]:
# Extraction du plan logique
# reference.explain(extended=False)

Visualiser le plan d'une requête

In [None]:
import io
import sys

def plan_requete(requete: DataFrame, simple=True):
  original_out = sys.stdout # sortie originale

  output = io.StringIO()  # Créer un buffer pour capturer la sortie
  sys.stdout = output  # Rediriger stdout vers le buffer
  requete.explain(extended=False)
  explain_output = output.getvalue()   # Récupérer le contenu capturé

  sys.stdout = original_out #restaurer la sortie originale
  return explain_output

In [None]:
requete1 = notes_extrait.join(films_extrait, "nF")
plan1 = plan_requete(requete1)
print(plan1)

In [None]:
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- Project [nF#180L, nU#181L, note#182, annee#183L, titre#2749, genres#6]
#    +- SortMergeJoin [nF#180L], [nF#2748L], Inner
#       :- Sort [nF#180L ASC NULLS FIRST], false, 0
#       :  +- Exchange hashpartitioning(nF#180L, 4), ENSURE_REQUIREMENTS, [plan_id=1272]
#       :     +- Filter isnotnull(nF#180L)
#       :        +- InMemoryTableScan [nF#180L, nU#181L, note#182, annee#183L], [isnotnull(nF#180L)]
#       :              +- InMemoryRelation [nF#180L, nU#181L, note#182, annee#183L], StorageLevel(disk, memory, deserialized, 1 replicas)
#       :                    +- AdaptiveSparkPlan isFinalPlan=true
#                               +- == Final Plan ==
#                                  ShuffleQueryStage 3
#                                  +- Exchange RoundRobinPartitioning(3), REPARTITION_BY_NUM, [plan_id=351]
#                                     +- *(5) Project [nF#180L, nU#181L, note#182, annee#183L]
#                                        +- *(5) SortMergeJoin [nF#180L], [nF#0L], Inner
#                                           :- *(3) Sort [nF#180L ASC NULLS FIRST], false, 0
#                                           :  +- AQEShuffleRead coalesced
#                                           :     +- ShuffleQueryStage 0
#                                           :        +- Exchange hashpartitioning(nF#180L, 4), ENSURE_REQUIREMENTS, [plan_id=228]
#                                           :           +- *(1) Filter ((isnotnull(nU#181L) AND (nU#181L < 1000)) AND isnotnull(nF#180L))
#                                           :              +- FileScan json [nF#180L,nU#181L,note#182,annee#183L] Batched: false, DataFilters: [isnotnull(nU#181L), (nU#181L < 1000), isnotnull(nF#180L)], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/local/data/notes1M.json], PartitionFilters: [], PushedFilters: [IsNotNull(nU), LessThan(nU,1000), IsNotNull(nF)], ReadSchema: struct<nF:bigint,nU:bigint,note:double,annee:bigint>
#                                           +- *(4) Sort [nF#0L ASC NULLS FIRST], false, 0
#                                              +- AQEShuffleRead coalesced
#                                                 +- ShuffleQueryStage 2
#                                                    +- Exchange hashpartitioning(nF#0L, 4), ENSURE_REQUIREMENTS, [plan_id=279]
#                                                       +- *(2) Filter isnotnull(nF#0L)
#                                                          +- TableCacheQueryStage 1
#                                                             +- InMemoryTableScan [nF#0L], [isnotnull(nF#0L)]
#                                                                   +- InMemoryRelation [nF#0L, titre#1, genres#6], StorageLevel(disk, memory, deserialized, 1 replicas)
#                                                                         +- AdaptiveSparkPlan isFinalPlan=true
#                                           +- == Final Plan ==
#                                              ShuffleQueryStage 0
#                                              +- Exchange RoundRobinPartitioning(3), REPARTITION_BY_NUM, [plan_id=73]
#                                                 +- *(1) Project [nF#0L, titre#1, g#2 AS genres#6]
#                                                    +- *(1) Filter (isnotnull(nF#0L) AND (nF#0L < 100))
#                                                       +- FileScan json [nF#0L,titre#1,g#2] Batched: false, DataFilters: [isnotnull(nF#0L), (nF#0L < 100)], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/local/data/films.json], PartitionFilters: [], PushedFilters: [IsNotNull(nF), LessThan(nF,100)], ReadSchema: struct<nF:bigint,titre:string,g:array<string>>
#                                           +- == Initial Plan ==
#                                              Exchange RoundRobinPartitioning(3), REPARTITION_BY_NUM, [plan_id=40]
#                                              +- Project [nF#0L, titre#1, g#2 AS genres#6]
#                                                 +- Filter (isnotnull(nF#0L) AND (nF#0L < 100))
#                                                    +- FileScan json [nF#0L,titre#1,g#2] Batched: false, DataFilters: [isnotnull(nF#0L), (nF#0L < 100)], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/local/data/films.json], PartitionFilters: [], PushedFilters: [IsNotNull(nF), LessThan(nF,100)], ReadSchema: struct<nF:bigint,titre:string,g:array<string>>
#                               +- == Initial Plan ==
#                                  Exchange RoundRobinPartitioning(3), REPARTITION_BY_NUM, [plan_id=184]
#                                  +- Project [nF#180L, nU#181L, note#182, annee#183L]
#                                     +- SortMergeJoin [nF#180L], [nF#0L], Inner
#                                        :- Sort [nF#180L ASC NULLS FIRST], false, 0
#                                        :  +- Exchange hashpartitioning(nF#180L, 4), ENSURE_REQUIREMENTS, [plan_id=178]
#                                        :     +- Filter ((isnotnull(nU#181L) AND (nU#181L < 1000)) AND isnotnull(nF#180L))
#                                        :        +- FileScan json [nF#180L,nU#181L,note#182,annee#183L] Batched: false, DataFilters: [isnotnull(nU#181L), (nU#181L < 1000), isnotnull(nF#180L)], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/local/data/notes1M.json], PartitionFilters: [], PushedFilters: [IsNotNull(nU), LessThan(nU,1000), IsNotNull(nF)], ReadSchema: struct<nF:bigint,nU:bigint,note:double,annee:bigint>
#                                        +- Sort [nF#0L ASC NULLS FIRST], false, 0
#                                           +- Exchange hashpartitioning(nF#0L, 4), ENSURE_REQUIREMENTS, [plan_id=179]
#                                              +- Filter isnotnull(nF#0L)
#                                                 +- InMemoryTableScan [nF#0L], [isnotnull(nF#0L)]
#                                                       +- InMemoryRelation [nF#0L, titre#1, genres#6], StorageLevel(disk, memory, deserialized, 1 replicas)
#                                                             +- AdaptiveSparkPlan isFinalPlan=true
#                               +- == Final Plan ==
#                                  ShuffleQueryStage 0
#                                  +- Exchange RoundRobinPartitioning(3), REPARTITION_BY_NUM, [plan_id=73]
#                                     +- *(1) Project [nF#0L, titre#1, g#2 AS genres#6]
#                                        +- *(1) Filter (isnotnull(nF#0L) AND (nF#0L < 100))
#                                           +- FileScan json [nF#0L,titre#1,g#2] Batched: false, DataFilters: [isnotnull(nF#0L), (nF#0L < 100)], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/local/data/films.json], PartitionFilters: [], PushedFilters: [IsNotNull(nF), LessThan(nF,100)], ReadSchema: struct<nF:bigint,titre:string,g:array<string>>
#                               +- == Initial Plan ==
#                                  Exchange RoundRobinPartitioning(3), REPARTITION_BY_NUM, [plan_id=40]
#                                  +- Project [nF#0L, titre#1, g#2 AS genres#6]
#                                     +- Filter (isnotnull(nF#0L) AND (nF#0L < 100))
#                                        +- FileScan json [nF#0L,titre#1,g#2] Batched: false, DataFilters: [isnotnull(nF#0L), (nF#0L < 100)], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/local/data/films.json], PartitionFilters: [], PushedFilters: [IsNotNull(nF), LessThan(nF,100)], ReadSchema: struct<nF:bigint,titre:string,g:array<string>>
#       +- Sort [nF#2748L ASC NULLS FIRST], false, 0
#          +- Exchange hashpartitioning(nF#2748L, 4), ENSURE_REQUIREMENTS, [plan_id=1273]
#             +- Filter isnotnull(nF#2748L)
#                +- InMemoryTableScan [nF#2748L, titre#2749, genres#6], [isnotnull(nF#2748L)]
#                      +- InMemoryRelation [nF#2748L, titre#2749, genres#6], StorageLevel(disk, memory, deserialized, 1 replicas)
#                            +- AdaptiveSparkPlan isFinalPlan=true
#                               +- == Final Plan ==
#                                  ShuffleQueryStage 0
#                                  +- Exchange RoundRobinPartitioning(3), REPARTITION_BY_NUM, [plan_id=73]
#                                     +- *(1) Project [nF#0L, titre#1, g#2 AS genres#6]
#                                        +- *(1) Filter (isnotnull(nF#0L) AND (nF#0L < 100))
#                                           +- FileScan json [nF#0L,titre#1,g#2] Batched: false, DataFilters: [isnotnull(nF#0L), (nF#0L < 100)], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/local/data/films.json], PartitionFilters: [], PushedFilters: [IsNotNull(nF), LessThan(nF,100)], ReadSchema: struct<nF:bigint,titre:string,g:array<string>>
#                               +- == Initial Plan ==
#                                  Exchange RoundRobinPartitioning(3), REPARTITION_BY_NUM, [plan_id=40]
#                                  +- Project [nF#0L, titre#1, g#2 AS genres#6]
#                                     +- Filter (isnotnull(nF#0L) AND (nF#0L < 100))
#                                        +- FileScan json [nF#0L,titre#1,g#2] Batched: false, DataFilters: [isnotnull(nF#0L), (nF#0L < 100)], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/local/data/films.json], PartitionFilters: [], PushedFilters: [IsNotNull(nF), LessThan(nF,100)], ReadSchema: struct<nF:bigint,titre:string,g:array<string>>



## Question 1 : Parser le plan

A partir du plan au format 'texte', construire l'arbre des opérations.
Elaguer l'arbre : supprimer les noeuds siutés sous une opération `InMemoryTableScan`.
Repérer les données échangées.