# Spark

# RDD

## TP2_Exercice1_RDD

- Introduction à pyspark
- Introduction du RDD

In [8]:
# Introduction sur les RDD
# import des librairies bumpy et py spark
import numpy as np
from pyspark import SparkContext
# add est la fonction addition
from operator import add

In [27]:


# Initialisation de Spark Context 4 partitions
# Dans ce notebook, on utilisera SparkContext
# On verra que ce n'est pas la seule façon de faire.
sc = SparkContext(master="local[4]")

# Initialisation d'un vecteur de 20 nbre aléatoires (entiers compris entre 0 et 9 inclus)
# utilisation d'une fonction numpy
A = np.random.randint(0,10,20)

# Creation d'un RDD parallelisé à partir du vecteur précédent  
# Le RDD est distribué sur les 4 partitions
RDD_A = sc.parallelize(A)

# Affichage fonction .collect()
# .collect renvoie une liste de tous les éléments du RDD (qui peut être afficher)
print("Liste des élements du RDD : ",RDD_A.collect())

# Comptons le nombre d'éléments
print("Nbre d'éléments de RDD_A : ",RDD_A.count())
# On retouve 20

# print(RDD_A) va juste indiquer le type du RDD
print("Type de RDD : ",RDD_A)

# Affichage des partitions (grâce à la fonction glom())
# .glom() va fusionner en considérant les élément de chaque partition
# .collect crée une liste 
# On remarque les 4 partitions
print("Liste des élements du RDD par partitions : ", RDD_A.glom().collect())

# on n'oublie d'arrêter le spark context
sc.stop()

Liste des élements du RDD :  [4, 4, 4, 1, 2, 9, 7, 3, 4, 9, 7, 5, 8, 9, 6, 0, 1, 5, 5, 2]
Nbre d'éléments de RDD_A :  20
Type de RDD :  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262
Liste des élements du RDD par partitions :  [[4, 4, 4, 1, 2], [9, 7, 3, 4, 9], [7, 5, 8, 9, 6], [0, 1, 5, 5, 2]]


In [6]:
# On recommence la même chose avec 2 partitions
# On remarque qu'il n'y pplus 
# Initialisation de Spark Context 2 partitions
sc = SparkContext(master="local[2]")

# Initialisation d'un vecteur de 20 nbre aléatoires (entiers)
A = np.random.randint(0,10,20)

# Creation d'un RDD parallelise a partir du vecteur précédent  
RDD_A = sc.parallelize(A)

# Affichage
print("Liste des élements du RDD : ",RDD_A.collect())

# print(RDD_A) va juste indiquer le type du RDD
print("Type de RDD : ",RDD_A)

# Affichage des partitions
# Ici, on remarque les deux partitions
print("Liste des élements du RDD par partitions : ",RDD_A.glom().collect())

# on n'oublie d'arrêter le spark context
sc.stop()

Liste des élements du RDD :  [7, 2, 4, 3, 1, 9, 9, 8, 9, 6, 9, 9, 6, 4, 4, 5, 0, 3, 5, 8]
Type de RDD :  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262
Liste des élements du RDD par partitions :  [[7, 2, 4, 3, 1, 9, 9, 8, 9, 6], [9, 9, 6, 4, 4, 5, 0, 3, 5, 8]]


L'intégralité des fonctions applicables aux RDD se retrouve à la page :

https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
    
On peut y rescencer une centaine de fonctions.

In [22]:
# Initialisation de Spark Context 4 partitions
sc = SparkContext(master="local[4]")

# Initialisation d'un vecteur de 20 nbre aléatoires (entierer)
A = np.random.randint(0,10,20)

# Creation d'un RDD parallelise a partir du vecteur précédent  
RDD_A = sc.parallelize(A)

# Affichage
print("Liste des élements du RDD : ",RDD_A.collect())

# Création d'un vecteur avec des valeurs uniques
RDD_A_Distinct = RDD_A.distinct()

print("")
# Affichage
# Les éléments de RDD_A_Distinct ne sont pas triés
print("Liste des élemnts (unique) de RDD_A : ",RDD_A_Distinct.collect())


print("")
# Somme des valeur de RDD_A
# Nous allons présenter 4 méthodes 
print("Somme des élements de RDD_A")
# Utilisation de la fonction sum()
print("Utilisation de sum() : ",RDD_A.sum())

# utilisation de la fonction reduce avec appliquant x + y
print("Utilisation de reduce(lambda x,y:x+y) : ",RDD_A.reduce(lambda x,y:x+y))

# utilisation de fold
# fold s'applique d'abord sur chaque partition puis effectue une réduction de l'ensemble des partitions réduites
# 0 est un valeur neutre
print("Utilisation de fold(0,lambda x,y:x+y) : ",RDD_A.fold(0,lambda x,y:x+y))

# utilisation de fold et de l'opérateur add
print("Utilisation de fold(0,add) : ",RDD_A.fold(0,add))

# On remarque que les résultats sont identiques

print("")

# filtrer en ne retenant que les nbres pairs
# Filter renvoie un nouveau RDD contenant uniquement les éléments qui satisfont à un prédicat.
# ici le prédicat est x%2==0 
print("Affichage des nbres pairs : ", RDD_A.filter(lambda x:x%2==0 ).collect())

print("")
# Affichge de quelques fonctions statistiques
# affichage du min, max, écart type, des stats
print(" min : ",RDD_A.min()," ; max : ", RDD_A.max()," ; stdev :", RDD_A.stdev())
print(" utilisation de stats() : ",RDD_A.stats())

print("")
# faire un map en mettant au carre
# a chaque élement du rdd, j'applique la fonction définie
print("Mise au carré du RDD ")
RDD_B = RDD_A.map(lambda x:x*x )

# afficher
print("Utilisation de map(lambda x:x*x ) : ",RDD_B.collect())

# définir une fonction
def square_x(x):
   return x*x

# faire un map en appliquant une fonction
RDD_B2 = RDD_A.map(square_x)

#Affichage
print("définition de square_x Utilisation de map(square_x ) : ", RDD_B2.collect())

print("")
# La fonction map es ttrès très souvent utilisée
# faire un map en mettant au carre
# a chaque élement du rdd, j'applique la fonction définie
RDD_BB = RDD_A.map(lambda x:(x,1) )

#Affichage
print("Ici, on crée (x,1) : ", RDD_BB.collect())


# on n'oublie d'arrêter le spark context
sc.stop()



Liste des élements du RDD :  [4, 7, 3, 8, 9, 0, 0, 5, 1, 4, 0, 4, 4, 7, 7, 6, 2, 2, 0, 0]

Liste des élemnts (unique) de RDD_A :  [4, 8, 0, 9, 5, 1, 6, 2, 7, 3]

Somme des élements de RDD_A
Utilisation de sum() :  73
Utilisation de reduce(lambda x,y:x+y) :  73
Utilisation de fold(0,lambda x,y:x+y) :  73
Utilisation de fold(0,add) :  73

Affichege des nbre pairs :  [4, 8, 0, 0, 4, 0, 4, 4, 6, 2, 2, 0, 0]

 min :  0  ; max :  9  ; stdev : 2.903015673398957
 utilisation de stats() :  (count: 20, mean: 3.65, stdev: 2.903015673398957, max: 9.0, min: 0.0)

Mise au carré du RDD 
Utilisation de map(lambda x:x*x ) :  [16, 49, 9, 64, 81, 0, 0, 25, 1, 16, 0, 16, 16, 49, 49, 36, 4, 4, 0, 0]
définition de square_x Utilisation de map(square_x ) :  [16, 49, 9, 64, 81, 0, 0, 25, 1, 16, 0, 16, 16, 49, 49, 36, 4, 4, 0, 0]

Ici, on crée (x,1) :  [(4, 1), (7, 1), (3, 1), (8, 1), (9, 1), (0, 1), (0, 1), (5, 1), (1, 1), (4, 1), (0, 1), (4, 1), (4, 1), (7, 1), (7, 1), (6, 1), (2, 1), (2, 1), (0, 1), (0, 1)]


In [26]:
# Initialisation de Spark Context 4 partitions
sc = SparkContext(master="local[4]")

# On va maintenant combiner 2 RDD

# créer 2 vecteurs aléatoires puis les RDD associés
C = np.random.randint(0,10,2)
RDD_C = sc.parallelize(C)
D = A = np.random.randint(0,10,3)
RDD_D = sc.parallelize(D)
print("RDD_C : ",RDD_C.collect())
print("RDD_D : ",RDD_D.collect())

# Fusionner les 2 RDD et afficher
print("Fusion à l'aide de + : ",(RDD_C+RDD_D).collect())

# Former tous les couples possible (c_i, d_j)
print("Création de tous les couples possibles (c_i, d_j) : ", RDD_C.cartesian(RDD_D).collect())
# Former tous les couples possible (c_i, c_j)
print("Création de tous les couples possibles (c_i, c_j) : ", RDD_C.cartesian(RDD_C).collect())
# on n'oublie d'arrêter le spark context
sc.stop()

RDD_C :  [3, 5]
RDD_D :  [5, 5, 1]
Fusion à l'aide de + :  [3, 5, 5, 5, 1]
Création de tous les couples possibles (c_i, d_j) :  [(3, 5), (3, 5), (3, 1), (5, 5), (5, 5), (5, 1)]
Création de tous les couples possibles (c_i, c_j) :  [(3, 3), (3, 5), (5, 3), (5, 5)]


In [25]:
# En cas d'erreur de codage, il faut arrêter sc avant de relancer
# Dans ce cas la, excécuter cette ligne, puis revenez à votre cellule
sc.stop()