In [1]:
#commencer ici avec jupyter
from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .master('local[*]')\
        .getOrCreate()
sc = spark.sparkContext

In [2]:
#parallelize crée un RDD à partir d'une liste Python pour exécuter des calculs en parallèle.
data = sc.parallelize([1,2,3,4,5])
print (data)

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274


In [3]:
data = sc.parallelize([(1,2),(3,4),(3,6),(3,4)])
#collect récupère tous les éléments d'un RDD et retourne une liste 
data.collect()

[(1, 2), (3, 4), (3, 6), (3, 4)]

In [4]:
type(data)

pyspark.rdd.RDD

In [5]:
data.count()

4

In [6]:
num = sc.parallelize([5,5,4,3,2,9,2],5)
#retourne le nombre de partitions
num.getNumPartitions()



5

In [7]:
#map applique une fonction à chaque élément d'un RDD et retourne un nouveau RDD transformé
num.map(lambda a : a*2).collect()

[10, 10, 8, 6, 4, 18, 4]

In [8]:
num.map(lambda a : pow(a,3)).collect()#a*a

[125, 125, 64, 27, 8, 729, 8]

In [9]:
names = sc.parallelize([ "Bills", "Mark","Brain","Mick"])

In [10]:
names.map(lambda a : "Mr. "+ a).collect()

['Mr. Bills', 'Mr. Mark', 'Mr. Brain', 'Mr. Mick']

In [12]:
rdd = sc.parallelize([ 2, 3,4])
rdd.collect()

[2, 3, 4]

In [13]:
rdd.map(lambda x: range(1, x)).collect()

[range(1, 2), range(1, 3), range(1, 4)]

In [14]:
#elle la méme chose que map mais elle en plus aplatie les résultats en une seule liste
rdd.flatMap(lambda x: range(1, x)).collect()

[1, 1, 2, 1, 2, 3]

In [15]:
a = sc.parallelize([1,2,3])

In [16]:
a.map(lambda x: (x,x*10,57)).collect()

[(1, 10, 57), (2, 20, 57), (3, 30, 57)]

In [17]:
a.flatMap(lambda x: (x,x*10,57)).collect()

[1, 10, 57, 2, 20, 57, 3, 30, 57]

In [18]:
#remarquer la différence avec map
a.map(lambda x: (x,x*10,57)).collect()

[(1, 10, 57), (2, 20, 57), (3, 30, 57)]

In [19]:
rdd = sc.parallelize([1, 2, 3, 4, 5])

# Utilisation de map pour multiplier chaque élément par 2
result_map = rdd.map(lambda x: x * 2).collect()

print("Résultat avec map:", result_map)

Résultat avec map: [2, 4, 6, 8, 10]


In [20]:
#mapPartitions permet de prendre une fonction "plus complexe que map"
# La fonction mapPartitions applique la transformation à chaque partition. 
#Elle permet de traiter les données par partitions entières plutôt qu'élément par élément,
#ce qui peut être plus efficace si la transformation comporte
# des opérations coûteuses à initialiser (comme la connexion à une base de données).
def multiply_by_two(iterator):
    return (x * 2 for x in iterator)
#mapPartitions applique une fonction à chaque partition du RDD au lieu de chaque élément
result_mapPartitions = rdd.mapPartitions(multiply_by_two).collect()

print("Résultat avec mapPartitions:", result_mapPartitions)

Résultat avec mapPartitions: [2, 4, 6, 8, 10]


In [21]:
num.collect()

[5, 5, 4, 3, 2, 9, 2]

In [22]:
#filter sélectionne les éléments qui respectent la condition donnée
num.filter(lambda x : x%3 == 1).collect()

[4]

In [23]:
names.collect()

['Bills', 'Mark', 'Brain', 'Mick']

In [24]:
names.filter(lambda x : "a" in x).collect()

['Mark', 'Brain']

In [25]:
#le tri ascendant 
names.sortBy(lambda x:x[0],1).collect()

['Bills', 'Brain', 'Mark', 'Mick']

In [26]:
#le tri descendant   
names.sortBy(lambda x:x[0],0).collect()

['Mark', 'Mick', 'Bills', 'Brain']

In [27]:
num.collect()

[5, 5, 4, 3, 2, 9, 2]

In [28]:
num2 = sc.parallelize([1,1,7,9,4,10,15])
num2.collect()

[1, 1, 7, 9, 4, 10, 15]

In [29]:
#fait l'union de deux RDD
num3=num2.union(num)
num3.collect()

[1, 1, 7, 9, 4, 10, 15, 5, 5, 4, 3, 2, 9, 2]

In [30]:
num3.sortBy(lambda x:x,0).collect()

[15, 10, 9, 9, 7, 5, 5, 4, 4, 3, 2, 2, 1, 1]

In [31]:
#élimine les redondonces dans une RDD
num3.distinct().collect()

[7, 1, 15, 9, 2, 10, 3, 4, 5]

In [32]:
x = sc.parallelize([1,2,3], 2)
y = sc.parallelize([3,4], 1)

In [33]:
z = x.union(y)
z.collect()

[1, 2, 3, 3, 4]

In [34]:
parallel = sc.parallelize(range(1,20))
parallel.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

In [35]:
# sample: Return a random sample subset RDD of the input RDD
# API: (withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
# Note This is not guaranteed to provide exactly the fraction specified of the total count of the given
parallel.sample(True,.2,).collect()

[1, 8, 16]

In [36]:
parallel.sample(False,.2,seed=19).collect()

[6, 12, 14, 17]

In [37]:
parallel.sample(False,.2).collect()

[1, 5, 12, 13, 14]

In [38]:
parallel.sample(True,.2).collect()

[6]

In [39]:
parallel.sample(False,.2,seed=23).collect()

[4, 5, 17]

### Transformations larges

In [40]:
names.collect()

['Bills', 'Mark', 'Brain', 'Mick']

In [41]:
#groupBy regroupe les éléments d'un RDD selon une clé spécifiée
names_gr = names.groupBy(lambda x : x[0]).collect()

In [42]:
names_gr

[('B', <pyspark.resultiterable.ResultIterable at 0x7f98e8f42c80>),
 ('M', <pyspark.resultiterable.ResultIterable at 0x7f98e8d54190>)]

In [43]:
for (k,v) in names_gr:
    print(k,list(v)) 

B ['Bills', 'Brain']
M ['Mark', 'Mick']


In [44]:
aa = sc.parallelize([1, 1, 2, 3, 5, 8])


In [45]:
result = aa.groupBy(lambda x: x % 3).collect()

In [46]:
result

[(2, <pyspark.resultiterable.ResultIterable at 0x7f98e8d55150>),
 (0, <pyspark.resultiterable.ResultIterable at 0x7f98e8d27250>),
 (1, <pyspark.resultiterable.ResultIterable at 0x7f98e8d26770>)]

In [47]:
for (k,v) in result:
    print(k, list(v) )

2 [2, 5, 8]
0 [3]
1 [1, 1]


In [48]:
num.collect()

[5, 5, 4, 3, 2, 9, 2]

In [49]:
num2.collect()

[1, 1, 7, 9, 4, 10, 15]

In [50]:
#fait l'intersection de 2 RDD
num.intersection(num2).collect()

[9, 4]

In [51]:
num2.intersection(num).collect()

[9, 4]

In [52]:
#retourne l'union de 2 RDD en éliminant luers intersection
num.subtract(num2).collect()

[2, 2, 3, 5, 5]

In [53]:
num2.subtract(num).collect() # equivalent de num2 privé de num

[7, 1, 1, 15, 10]

In [54]:
num.collect()

[5, 5, 4, 3, 2, 9, 2]

In [55]:
num.distinct().collect()

[5, 2, 3, 4, 9]

## Transformation sur les paires (Key, Value)

In [56]:
data. collect()

[(1, 2), (3, 4), (3, 6), (3, 4)]

In [57]:
#countByValue compte le nombre d'occurrences de chaque valeur dans un RDD
data.countByValue() #pas que les clé valeur

defaultdict(int, {(1, 2): 1, (3, 4): 2, (3, 6): 1})

In [58]:
data.count()

4

In [59]:
dataStr = sc.parallelize([(3,'mike'),(2,'john'),(3,'rambo'),(4,'bill'),(1,'mike')])
dataStr.collect()

[(3, 'mike'), (2, 'john'), (3, 'rambo'), (4, 'bill'), (1, 'mike')]

### dataStr.count()

In [60]:
dataStr.countByValue()

defaultdict(int,
            {(3, 'mike'): 1,
             (2, 'john'): 1,
             (3, 'rambo'): 1,
             (4, 'bill'): 1,
             (1, 'mike'): 1})

In [61]:
#top renvoie les n premiers éléments d'un RDD
data.top(4)

[(3, 6), (3, 4), (3, 4), (1, 2)]

In [62]:
data1= sc.parallelize([(1, 2), (3, 6), (3, 1), (2, 4)])

In [63]:
#sortByKey trie la RDD par la clé 
data1.sortByKey().collect()

[(1, 2), (2, 4), (3, 6), (3, 1)]

In [64]:
# lookup : Return all value associated with the given key.
data1.lookup(3)

[6, 1]

In [65]:
#retoune une liste des clés
data.keys().collect()

[1, 3, 3, 3]

In [66]:
data.map(lambda x:x[0]).collect()

[1, 3, 3, 3]

In [67]:
#retoune une liste des valeurs
data.values().collect()

[2, 4, 6, 4]

In [68]:
data.map(lambda x:x[1]).collect()

[2, 4, 6, 4]

In [69]:
data.collect()

[(1, 2), (3, 4), (3, 6), (3, 4)]

In [70]:
#fait un map sur les valeurs
data.mapValues(lambda a : a*a).collect()

[(1, 4), (3, 16), (3, 36), (3, 16)]

In [71]:
data.collect()

[(1, 2), (3, 4), (3, 6), (3, 4)]

In [72]:
#reduceByKey regroupe les éléments d'un RDD par clé et applique une fonction de réduction sur les valeurs d'une meme clé
data.reduceByKey(lambda x, y : x+y).collect()

[(1, 2), (3, 14)]

In [73]:
data.reduceByKey(max).collect()

[(1, 2), (3, 6)]

In [74]:
# groupBy: This transformation groups all the rows with the same key into a single row.
result = data.groupByKey().collect()

In [75]:
result

[(1, <pyspark.resultiterable.ResultIterable at 0x7f98ea2b4d30>),
 (3, <pyspark.resultiterable.ResultIterable at 0x7f98e8d7dea0>)]

In [76]:
for (k,v) in result:
    print(k, list(v))

1 [2]
3 [4, 6, 4]


In [77]:
aa = data.groupByKey().mapValues(sum)

In [78]:
aa.collect()

[(1, 2), (3, 14)]

In [79]:
bb = data.groupByKey().mapValues(max)

In [80]:
bb.collect()

[(1, 2), (3, 6)]

In [81]:
# reduceByKey
# Functionality: reduceByKey combines values with the same key using a specified associative and commutative reduce function.
# Efficiency: It performs better than groupByKey because it reduces data on the map side before shuffling it across the network, minimizing data transfer.
# Use Case: Ideal for scenarios where you need to aggregate data, such as summing values, counting occurrences, or finding maximum/minimum values.
# val rdd = sc.parallelize(Seq(("a", 1), ("b", 1), ("a", 1), ("b", 1)))
# val reducedRDD = rdd.reduceByKey(_ + _)
# // Result: [("a", 2), ("b", 2)]

# groupByKey followed by mapValues
# Functionality: groupByKey groups all values with the same key into a single sequence, and mapValues applies a function to each value in the sequence.
# Efficiency: Less efficient than reduceByKey because it shuffles all the data across the network, which can lead to higher memory usage and slower performance.
# Use Case: Useful when you need to perform operations that require access to all values for a key, such as calculating statistics or applying complex transformations.
# val rdd = sc.parallelize(Seq(("a", 1), ("b", 1), ("a", 1), ("b", 1)))
# val groupedRDD = rdd.groupByKey()
# val mappedRDD = groupedRDD.mapValues(values => values.sum)
# // Result: [("a", 2), ("b", 2)]

# Key Differences
# Performance: reduceByKey is generally more efficient due to pre-aggregation before the shuffle.
# Memory Usage: groupByKey can lead to higher memory usage as it collects all values for a key before applying transformations.
# Use Cases: Choose reduceByKey for simple aggregations and groupByKey followed by mapValues for more complex operations that require access to all values for a key.

In [82]:
data.collect()

[(1, 2), (3, 4), (3, 6), (3, 4)]

In [83]:
#flatMapValues applique flatMap sur les valeurs
data.flatMapValues(lambda x: range(1, x)).collect()

[(1, 1),
 (3, 1),
 (3, 2),
 (3, 3),
 (3, 1),
 (3, 2),
 (3, 3),
 (3, 4),
 (3, 5),
 (3, 1),
 (3, 2),
 (3, 3)]

In [84]:
data.collect()

[(1, 2), (3, 4), (3, 6), (3, 4)]

In [85]:
data2 = sc.parallelize([(3,9)])
data2.collect()

[(3, 9)]

In [86]:
#subtractByKey fait un substract sur les élements du meme clé
data.subtractByKey(data2).collect()

[(1, 2)]

In [87]:
data2.subtractByKey(data).collect()

[]

## 5.Les actions

## Create RDD and their Basic Actions

In [88]:
names = sc.parallelize(['Adam','Cray','Shaun','Brain','Mark','Christ','Shail','Satya','Mark','Norby','Frans','Mark','Bill'])

In [89]:
type(names)

pyspark.rdd.RDD

In [90]:
names.collect()

['Adam',
 'Cray',
 'Shaun',
 'Brain',
 'Mark',
 'Christ',
 'Shail',
 'Satya',
 'Mark',
 'Norby',
 'Frans',
 'Mark',
 'Bill']

In [91]:
type(a)

pyspark.rdd.RDD

In [92]:
#countByValue compte le nombre d'occurrences de chaque valeur dans un RDD
names.countByValue()

defaultdict(int,
            {'Adam': 1,
             'Cray': 1,
             'Shaun': 1,
             'Brain': 1,
             'Mark': 3,
             'Christ': 1,
             'Shail': 1,
             'Satya': 1,
             'Norby': 1,
             'Frans': 1,
             'Bill': 1})

In [93]:

def f(x): print(x)
a=sc.parallelize([1, 2, 3, 4, 5]).foreach(lambda x : print(x))

In [94]:
type(a)

NoneType

In [95]:
a=sc.parallelize([(1,2),(2,4)])

In [96]:
a.countByValue()

defaultdict(int, {(1, 2): 1, (2, 4): 1})

In [97]:
names.collect()

['Adam',
 'Cray',
 'Shaun',
 'Brain',
 'Mark',
 'Christ',
 'Shail',
 'Satya',
 'Mark',
 'Norby',
 'Frans',
 'Mark',
 'Bill']

In [98]:
#take renvoie les n premiers éléments d'un RDD
names.take(5)

['Adam', 'Cray', 'Shaun', 'Brain', 'Mark']

In [99]:
#textFile charge un fichier texte chaque ligne étant un élément du RDD
employees = sc.textFile("employee.txt")

In [100]:
type(employees)

pyspark.rdd.RDD

In [101]:
employees.collect()

['Adam',
 'Cray',
 'Shaun',
 'Brain',
 'Mark',
 'Christ',
 'Shail',
 'Satya',
 'Mark',
 'Norby',
 'Frans',
 'Mark',
 'Bill']

In [102]:
employees.first()

'Adam'

In [103]:
employees.count()

13

In [104]:
employees.top(5)

['Shaun', 'Shail', 'Satya', 'Norby', 'Mark']

In [105]:
employees.top(19)

['Shaun',
 'Shail',
 'Satya',
 'Norby',
 'Mark',
 'Mark',
 'Mark',
 'Frans',
 'Cray',
 'Christ',
 'Brain',
 'Bill',
 'Adam']

In [106]:
employees.distinct().count()

11

## Autres exemples

In [107]:
num = sc.parallelize([5,5,4,3,2,9,2],9)
num.collect()

[5, 5, 4, 3, 2, 9, 2]

In [108]:
num.reduce(max)

9

In [109]:
from operator import add
num.reduce(add)

30

In [110]:
num.take(4)

[5, 5, 4, 3]

In [111]:
num.countByValue()

defaultdict(int, {5: 2, 4: 1, 3: 1, 2: 2, 9: 1})

In [112]:
type(num)

pyspark.rdd.RDD

In [113]:
#GLOM : RDD OF TUPLES

In [114]:
#glom transforme chaque partition d'un RDD en une liste, regroupant les éléments d'une partition dans un seul élément RDD
num.glom().collect()

[[], [5], [5], [4], [], [3], [2], [9], [2]]

In [115]:
type(num.glom())

pyspark.rdd.PipelinedRDD

In [116]:
num.max()

9

In [117]:
num.min()

2

In [118]:
num.mean()

4.285714285714286

In [119]:
num.collect()

[5, 5, 4, 3, 2, 9, 2]

In [120]:
num.reduce(lambda a,b: a+b)

30

In [121]:
num.reduce(lambda a,b: a*b)

10800

In [122]:
num.reduce(lambda x,y: x if x > y else y)

9

In [123]:
def myfun(a,b):
    return a*2 + b*2

In [124]:
num.reduce(myfun)

872

In [125]:
num.collect()

[5, 5, 4, 3, 2, 9, 2]

In [126]:
#takeOrdered renvoie les n premiers éléments d'un RDD triés
num.takeOrdered(3)

[2, 2, 3]

In [127]:
# fold: the initial value for the accumulated result of each partition for the op operator,
# and also the initial value for the combine results from different partitions

In [128]:
num = sc.parallelize([5,5,4,3,2,9,2],2)
num.collect()

[5, 5, 4, 3, 2, 9, 2]

In [129]:
num.glom().collect()

[[5, 5, 4], [3, 2, 9, 2]]

In [130]:
num.reduce(lambda a,b: a+b)

30

In [131]:
num.reduce(lambda a,b: a*b)

10800

In [132]:
#fold combine les éléments d'un RDD avec une fonction et une valeur initiale, en appliquant l'opération de manière associative sur toutes les partitions
num.fold(2,lambda a,b:a+b)

36

In [133]:
num.fold(2,lambda a,b : a*b )

86400

In [134]:
from operator import add
b=sc.parallelize([1, 2, 3, 4, 5])
b.fold(1, add)


18

In [135]:
from operator import add,mul
num3 = sc.parallelize([5,5,4,3,2,9,2]).fold(10,mul)
num3

10800000

In [136]:
b = sc.parallelize(range(1,10))

In [137]:
b.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9]

## Application: 
Donner l'implémentation de Word Count avec Spark

In [140]:
import re

texte = sc.parallelize(["bonjour Hadoop. Bonjour Spark!"])
def clean_and_split(text):
    text = re.sub(r'[^\w\s]', '', text)
    return text.lower().split()
word_count = (texte.flatMap(clean_and_split)
          .map(lambda x: (x, 1))
          .reduceByKey(lambda a, b: a + b)
          .collect())
print(word_count)


[('hadoop', 1), ('bonjour', 2), ('spark', 1)]
