# **Comprendre MapReduce**

## **Map**

```map``` prend en argument une fonction et une collection et retourne une collection. La fonction étant appliquée sur chaque élément de la collection.

**Exo1:** Utiliser la fonction ```map``` pour multiplier par 2 les éléments d'une liste. Faire une version en définissant une fonction et une seconde version en utilisant une fonction anonyme ```lambda```

In [1]:
a = [3, 1, 2]

#avec une fonction définie via def
def fois_2(x):
    return 2 * x

b = map(fois_2, a)
print(list(b))

#avec une fonction anonyme
c = map(lambda x: 2*x, a)
print(list(c))

[6, 2, 4]
[6, 2, 4]


**Exo2:** Créer une liste de 1000 entiers aléatoires. À l'aide de la fonction `map` retourner une collection qui contient `True` si le nombre était pair et `False` sinon 

In [2]:
import numpy as np
l = np.random.randint(10**6, size=1000)
res = list(map(lambda a: a%2==0, l))
#res

#### **Pour faire simple, la seule différence entre le ```map``` de python et celui de spark c'est que le ```map``` de ce dernier découpe le calul sur plusieurs machines pour paralléliser le traitement.**

## **Reduce**

La fonction ```reduce``` prend en entrée une collection et retourne une réduction de celle ci en lui appliquant une fonction d'agrégation itérative, c'est-à-dire, une fonction qui lit les valeurs de la liste de gauche à droite et ne renvoie qu'une seule valeur agrégée.  
La fonction d'agrégation doit donc prendre 2 arguments et ne renvoyer qu'une seule valeur.

Par exemple, ```reduce``` permet de calculer la somme des éléments d'une liste, ce qu'on va faire (enfin vous allez faire) de suite.

**Exo3:** calculer la somme de la liste ```a``` :
>1. en utilisant une boucle ```for```
>2. en utilisant la fonction ```reduce``` du module ```functools```

In [3]:
a = [1, 2, 3, 4, 5]

In [4]:
som = 0
for val in a :
    som += val
som

15

In [5]:
from functools import reduce
som2 = reduce(lambda x,y : x+y, a, 0)
som2

15

**Exo4:** générer une liste de 20 entiers entre 0 et 1000 et calculer le maximum de cette liste à l'aide de la fonction ```reduce```

In [6]:
b = np.random.randint(1000, size=20)
print(b)
maxi = reduce(lambda x,y : x if x>y else y, b)
maxi

[754 591 613 230  56 304 565  55  49 914 866  71 310 347 420 114 812 216
 316 832]


914

**Exo5:** importer la fonction ```accumulate``` du module ```itertools```, comprendre comment elle marche, la tester et la comparer avec ```reduce``` sur les 2 exemples précédents (somme et maximum d'une liste). Quelles sont les différences ?

In [7]:
from itertools import accumulate
print("Pour la somme :", reduce(lambda x,y : x+y, a, 0))
print(list(accumulate(a, lambda x,y : x+y)))
print("\nPour le maximum :", reduce(lambda x,y : x if x>y else y, b))
print(list(accumulate(b, lambda x,y : x if x>y else y)))

Pour la somme : 15
[1, 3, 6, 10, 15]

Pour le maximum : 914
[754, 754, 754, 754, 754, 754, 754, 754, 754, 914, 914, 914, 914, 914, 914, 914, 914, 914, 914, 914]


#### **Comme pour la fonction ```map``` la seule différence entre le ```reduce``` de python et celui de spark c'est que celui de spark découpe en plusieurs morceaux et parallélise le traitement.**

## **Petit bonus : Filter**


**Exo6**: créer une liste avec les valeurs suivantes : [-1, 3, 2, -1, 6, 8] puis utiliser la fonction ```filter``` pour récupérer uniquement les valeurs positives.

In [8]:
a = [-1, 3, 2, -1, 6, 8]
list(filter(lambda x: x>0, a))

[3, 2, 6, 8]

## **Map et Reduce**

On considère le mega big dataset suivant : 

In [9]:
a = ["ceci n'est pas du big data", 'Bonjour voilà du texte', "il est en retard tous les jours", "une bonne auberge", "elle est en Grèce"]

**Exo7:** utiliser la fonction ```map``` pour séparer chaque phrase en en mots

In [10]:
list(map(lambda s: s.split(), a))

[['ceci', "n'est", 'pas', 'du', 'big', 'data'],
 ['Bonjour', 'voilà', 'du', 'texte'],
 ['il', 'est', 'en', 'retard', 'tous', 'les', 'jours'],
 ['une', 'bonne', 'auberge'],
 ['elle', 'est', 'en', 'Grèce']]

**Exo8:** à l'aide de ```map``` et/ou ```reduce``` renvoyer le nombre total de mots dans ```a```

In [11]:
mapper = map(lambda phrase: len(phrase.split()), a)
reduce(lambda x,y : x+y, mapper)

24

## **WordCount : le "hello world" du MapReduce**

**Exo9:** on va illustrer le fonctionnement de MapReduce pour compter le nombre d'occurence de chaque mot dans la variable ```text``` définie ci-dessous. Il faut donc éxecuter les 4 étapes suivantes (le preprocessing étant un petit supplément) :
>1. SPLIT: découpage du texte en 4 sous-parties
>2. petite étape de peprocessing avec :
>>- suppression de la ponctuation,
>>- passage en minuscules
>>- suppression des mots de 1, 2 ou 3 lettres
>3. MAP: avec la fonction ```map``` renvoyer une liste de (clé, valeur) : ici clé=mot et valeur=occurence=1
>4. SHUFFLE (& SORT): regroupement des résultats : on doit avoir pour chaque clé le couple (clé, [val, val, val,...]) : ici (mot, [1,1,1,...])
>5. REDUCE: sommer les occurences uniques de chaque mot pour obtenir le nombre d'occurrences totales

In [12]:
text = "Si vous voulez mon avis concernant la morosité conjoncturelle, \
je n'exclus pas de réorganiser la simultanéité des hypothèses réalisables, \
avec toute la prudence requise. Eu égard à la fragilité actuelle, il ne faut \
pas s'interdire de se remémorer précisément les organisations matricielles \
opportunes, avec beaucoup de recul. Afin de circonvenir à cette inflexion de \
l'époque actuelle, je recommande d'essayer la somme des stratégies envisageables, \
même si ce n'est pas facile. Vu la dualité de la situation conjoncturelle, il ne \
faut pas négliger de gérer certaines synergies optimales, parce que nous le valons \
bien. Nonobstant la dualité de la situation observée, je n'exclus pas d'inventorier \
la somme des modalités réalisables, parce qu'il est temps d'agir. Si vous voulez mon \
avis concernant la baisse de confiance présente, je n'exclus pas d'inventorier la \
globalité des améliorations pertinentes, très attentivement."

In [13]:
#SPLIT
mots = text.split()
cuts = np.linspace(0, len(mots), 5, dtype=int)
grp_mots = [mots[cuts[k]:cuts[k+1]] for k in range(4)]
text_div = [" ".join(grp_mots[k]) for k in range(4)]
text_div

["Si vous voulez mon avis concernant la morosité conjoncturelle, je n'exclus pas de réorganiser la simultanéité des hypothèses réalisables, avec toute la prudence requise. Eu égard à la fragilité actuelle, il ne faut pas",
 "s'interdire de se remémorer précisément les organisations matricielles opportunes, avec beaucoup de recul. Afin de circonvenir à cette inflexion de l'époque actuelle, je recommande d'essayer la somme des stratégies envisageables, même si ce n'est",
 "pas facile. Vu la dualité de la situation conjoncturelle, il ne faut pas négliger de gérer certaines synergies optimales, parce que nous le valons bien. Nonobstant la dualité de la situation observée, je n'exclus",
 "pas d'inventorier la somme des modalités réalisables, parce qu'il est temps d'agir. Si vous voulez mon avis concernant la baisse de confiance présente, je n'exclus pas d'inventorier la globalité des améliorations pertinentes, très attentivement."]

In [14]:
#preprocessing du text, on définit la fonction et l'applique directement à nos 4 sous-ensemble
import string
def preprocess(s):
    temp = s.lower()
    temp = temp.replace("'"," ")
    temp = temp.translate(str.maketrans("","", string.punctuation))

    mots = temp.split()
    gros_mots = list(filter(lambda x: len(x)>3, mots))
    return " ".join(gros_mots)

In [15]:
#MAP
def mapper(txt):
    temp = preprocess(txt)
    return [(mot,1) for mot in temp.split()]

mapped = list(map(mapper, text_div)) #le "map" utilisé ici correspond en fait à la parallélisation du traitements:
                                     #il faut le voir comme le fait d'effectuer en parallèle les opérations sur chaque sous-texte
mapped

[[('vous', 1),
  ('voulez', 1),
  ('avis', 1),
  ('concernant', 1),
  ('morosité', 1),
  ('conjoncturelle', 1),
  ('exclus', 1),
  ('réorganiser', 1),
  ('simultanéité', 1),
  ('hypothèses', 1),
  ('réalisables', 1),
  ('avec', 1),
  ('toute', 1),
  ('prudence', 1),
  ('requise', 1),
  ('égard', 1),
  ('fragilité', 1),
  ('actuelle', 1),
  ('faut', 1)],
 [('interdire', 1),
  ('remémorer', 1),
  ('précisément', 1),
  ('organisations', 1),
  ('matricielles', 1),
  ('opportunes', 1),
  ('avec', 1),
  ('beaucoup', 1),
  ('recul', 1),
  ('afin', 1),
  ('circonvenir', 1),
  ('cette', 1),
  ('inflexion', 1),
  ('époque', 1),
  ('actuelle', 1),
  ('recommande', 1),
  ('essayer', 1),
  ('somme', 1),
  ('stratégies', 1),
  ('envisageables', 1),
  ('même', 1)],
 [('facile', 1),
  ('dualité', 1),
  ('situation', 1),
  ('conjoncturelle', 1),
  ('faut', 1),
  ('négliger', 1),
  ('gérer', 1),
  ('certaines', 1),
  ('synergies', 1),
  ('optimales', 1),
  ('parce', 1),
  ('nous', 1),
  ('valons', 1),
 

In [16]:
#SHUFFLE AND SORT: le but est de transformer la liste de listes de (mot,1) en une liste de (mot, [1,1,1,...,1]). Méthode "naïve".
liste = [couple for div in mapped for couple in div] #on regroupe tous les couples (clé, valeur) dans une seule liste
liste.sort() #on les trie

cur_wd = None
wd_occur = list()
shuffled_sorted = list()

for wd, cnt in liste:
    if wd == cur_wd:
        wd_occur.append(cnt)
    else:
        if cur_wd:
            shuffled_sorted.append((cur_wd,wd_occur))
        cur_wd, wd_occur = wd, [cnt]

if cur_wd:
    shuffled_sorted.append((cur_wd,wd_occur))
    
shuffled_sorted

[('actuelle', [1, 1]),
 ('afin', [1]),
 ('agir', [1]),
 ('améliorations', [1]),
 ('attentivement', [1]),
 ('avec', [1, 1]),
 ('avis', [1, 1]),
 ('baisse', [1]),
 ('beaucoup', [1]),
 ('bien', [1]),
 ('certaines', [1]),
 ('cette', [1]),
 ('circonvenir', [1]),
 ('concernant', [1, 1]),
 ('confiance', [1]),
 ('conjoncturelle', [1, 1]),
 ('dualité', [1, 1]),
 ('envisageables', [1]),
 ('essayer', [1]),
 ('exclus', [1, 1, 1]),
 ('facile', [1]),
 ('faut', [1, 1]),
 ('fragilité', [1]),
 ('globalité', [1]),
 ('gérer', [1]),
 ('hypothèses', [1]),
 ('inflexion', [1]),
 ('interdire', [1]),
 ('inventorier', [1, 1]),
 ('matricielles', [1]),
 ('modalités', [1]),
 ('morosité', [1]),
 ('même', [1]),
 ('nonobstant', [1]),
 ('nous', [1]),
 ('négliger', [1]),
 ('observée', [1]),
 ('opportunes', [1]),
 ('optimales', [1]),
 ('organisations', [1]),
 ('parce', [1, 1]),
 ('pertinentes', [1]),
 ('prudence', [1]),
 ('précisément', [1]),
 ('présente', [1]),
 ('recommande', [1]),
 ('recul', [1]),
 ('remémorer', [1])

In [17]:
#SHUFFLE & SORT : autre méthode plus "élégante" en utilisant un dictionnaire
from collections import defaultdict
dico = defaultdict(list)

liste = [couple for div in mapped for couple in div] #on regroupe tous les couples (clé, valeur) dans une seule liste

list(map(lambda tup : dico[tup[0]].append(tup[1]), liste))
list(dico.items())

[('vous', [1, 1]),
 ('voulez', [1, 1]),
 ('avis', [1, 1]),
 ('concernant', [1, 1]),
 ('morosité', [1]),
 ('conjoncturelle', [1, 1]),
 ('exclus', [1, 1, 1]),
 ('réorganiser', [1]),
 ('simultanéité', [1]),
 ('hypothèses', [1]),
 ('réalisables', [1, 1]),
 ('avec', [1, 1]),
 ('toute', [1]),
 ('prudence', [1]),
 ('requise', [1]),
 ('égard', [1]),
 ('fragilité', [1]),
 ('actuelle', [1, 1]),
 ('faut', [1, 1]),
 ('interdire', [1]),
 ('remémorer', [1]),
 ('précisément', [1]),
 ('organisations', [1]),
 ('matricielles', [1]),
 ('opportunes', [1]),
 ('beaucoup', [1]),
 ('recul', [1]),
 ('afin', [1]),
 ('circonvenir', [1]),
 ('cette', [1]),
 ('inflexion', [1]),
 ('époque', [1]),
 ('recommande', [1]),
 ('essayer', [1]),
 ('somme', [1, 1]),
 ('stratégies', [1]),
 ('envisageables', [1]),
 ('même', [1]),
 ('facile', [1]),
 ('dualité', [1, 1]),
 ('situation', [1, 1]),
 ('négliger', [1]),
 ('gérer', [1]),
 ('certaines', [1]),
 ('synergies', [1]),
 ('optimales', [1]),
 ('parce', [1, 1]),
 ('nous', [1]),
 

In [18]:
#REDUCE : ici c'est très simple puisqu'il s'agit de seulement sommer les valeurs de la liste
#on aurait pu intégrer cette étape facilement dans le SHUFFLE & SORT mais ici on cherche à dissocier les différentes étapes pour bien comprendre
def reducer(key, values):
    sum_val = reduce(lambda x,y : x+y, values) #on peut utiliser directement sum(), je force juste un peu l'utilisation du reduce ici pour rester dans l'esprit
    return (key, sum_val)

reduced = list(map(lambda tup: reducer(tup[0], tup[1]), shuffled_sorted)) #le "map" utilisé ici correspond en fait à la parallélisation du traitement sur chaque clé=mot
reduced

[('actuelle', 2),
 ('afin', 1),
 ('agir', 1),
 ('améliorations', 1),
 ('attentivement', 1),
 ('avec', 2),
 ('avis', 2),
 ('baisse', 1),
 ('beaucoup', 1),
 ('bien', 1),
 ('certaines', 1),
 ('cette', 1),
 ('circonvenir', 1),
 ('concernant', 2),
 ('confiance', 1),
 ('conjoncturelle', 2),
 ('dualité', 2),
 ('envisageables', 1),
 ('essayer', 1),
 ('exclus', 3),
 ('facile', 1),
 ('faut', 2),
 ('fragilité', 1),
 ('globalité', 1),
 ('gérer', 1),
 ('hypothèses', 1),
 ('inflexion', 1),
 ('interdire', 1),
 ('inventorier', 2),
 ('matricielles', 1),
 ('modalités', 1),
 ('morosité', 1),
 ('même', 1),
 ('nonobstant', 1),
 ('nous', 1),
 ('négliger', 1),
 ('observée', 1),
 ('opportunes', 1),
 ('optimales', 1),
 ('organisations', 1),
 ('parce', 2),
 ('pertinentes', 1),
 ('prudence', 1),
 ('précisément', 1),
 ('présente', 1),
 ('recommande', 1),
 ('recul', 1),
 ('remémorer', 1),
 ('requise', 1),
 ('réalisables', 2),
 ('réorganiser', 1),
 ('simultanéité', 1),
 ('situation', 2),
 ('somme', 2),
 ('stratégie

## **Pour : finir moyenne de liste et temps d'éxecution**

**Exo10:** à l'aide des fonctions ```map``` et ```reduce```, on va calculer la moyenne des éléments d'une liste. Et sans utiliser ```len()```...
>1. créer une liste d'entiers aléatoires de taille 10^7
>2. utiliser ```map``` et ```reduce``` pour caluler la moyenne sur cette liste
>3. utiliser ```%%time``` pour mesurer le temps d'éxecution de ce calcul
>4. découper la liste en 5 sous-listes de tailles égales
>5. importer le module Pool de la libraire multiprocessing et l'utiliser pour paralléliser les calculs sur chaque sous-liste. L'idée est de définir par exemple une fonction MapReduce_average qui reprend votre méthode utilisée pour le calcul de la moyenne et utiliser pool.map(MapReduce_average, liste_splitée)
>6. regarder avec %%time les différences de temps d'éxecution des 2 méthodes

In [19]:
a = np.random.randint(1000, size=10**7)

In [32]:
%%time
np.mean(a)

CPU times: user 15.5 ms, sys: 0 ns, total: 15.5 ms
Wall time: 14.9 ms


499.6591571

In [20]:
%%time
a_mapped = map(lambda val : (val,1), a)
a_reduce = reduce(lambda x,y : (x[0]+y[0],x[1]+y[1]), a_mapped)
a_reduce[0]/a_reduce[1]

CPU times: user 4.37 s, sys: 6.64 ms, total: 4.37 s
Wall time: 4.42 s


499.6591571

In [21]:
step = int(2*10**6)
a_split = [a[k*step:(k+1)*step] for k in range(5)]
a_split

[array([ 16, 379, 665, ..., 126, 222, 481]),
 array([306, 375, 726, ..., 380, 568, 471]),
 array([822, 374, 728, ...,  67, 384, 320]),
 array([436, 972, 507, ..., 490, 784,  40]),
 array([949, 414, 887, ..., 464, 924, 121])]

In [26]:
from multiprocessing import Pool
pool = Pool(5)

In [27]:
def mr_avg(liste):
    mapped = map(lambda i : (i,1), liste)
    return reduce(lambda x,y : (x[0]+y[0],x[1]+y[1]), mapped)

In [28]:
%%time
mapped = pool.map(mr_avg,a_split)
reduced = reduce(lambda x,y : (x[0]+y[0],x[1]+y[1]), mapped)
reduced[0]/reduced[1]

CPU times: user 42.8 ms, sys: 118 ms, total: 160 ms
Wall time: 2.14 s


499.6591571

In [29]:
pool.close()
pool.terminate()

## **Supplément**

**Exo11:** Calculer en utilisant le paradigme MapReduce le produit d'une matrice M avec un vecteur V.  
Ça peut paraître inutile mais cette opération est derrière l'algorithme du PageRank de Google, c'est d'ailleurs en partie pour ce calcul que MapReduce a été conçu. Dans ce cas, la dimension du problème est le nombre de pages web indexées, soit clairement un problème de Big Data.  
Par ailleurs, on l'a vu dans la régression linéaire notamment mais pas uniquement, ce produit $matrice*vecteur$ est omniprésent dans les problèmes d'optimisation aussi.
>*Quelques indications:*
>0. commencer par vous remettre dans le bain en revoyant comment on calcul un produit matriciel et plus exactement un produit $matrice*vecteur$
>1. générer une matrice aléatoire $M$ de taille (5,6) par exemple et un vecteur de taille 6
>2. transformer votre matrice $M$ en une liste de triplet $(i,j,m_{ij})$
>3. étape map : on peut choisir comme clé le numéro de ligne et l'étape map consistera donc à passer du triplet $(i,j,m_{ij})$ au couple $(i,m_{ij}*v_j)$
>4. étape shuffle&sort : il faut regrouper les couples (clé, valeur) en (clé, liste_de_valeurs), la clé étant ici l'indice de ligne $i$
>5. étape réduce : agréger les résultats en les sommant

*Pour aller plus loin:* cette solution marche convenablement lorsque chaque noeud a la capacité de stocker $V$ localement mais que faire si $V$ est trop grand ? Vous pouvez mettre en pratique votre solution avec les mêmes $M$ et $V$ que précédemment

In [33]:
#génération matrice
M = np.random.randint(10, size=(5,6))
V = np.random.randint(10, size=6)
print(M,V, sep='\n')

[[5 6 2 5 1 4]
 [8 5 8 7 7 2]
 [5 6 2 9 8 1]
 [9 9 3 0 5 2]
 [4 3 8 8 1 1]]
[6 4 7 6 2 6]


In [34]:
#transformation en liste de triplets
M_liste = [(i,j,M[i,j]) for i in range(M.shape[0]) for j in range(M.shape[1])]
M_liste

[(0, 0, 5),
 (0, 1, 6),
 (0, 2, 2),
 (0, 3, 5),
 (0, 4, 1),
 (0, 5, 4),
 (1, 0, 8),
 (1, 1, 5),
 (1, 2, 8),
 (1, 3, 7),
 (1, 4, 7),
 (1, 5, 2),
 (2, 0, 5),
 (2, 1, 6),
 (2, 2, 2),
 (2, 3, 9),
 (2, 4, 8),
 (2, 5, 1),
 (3, 0, 9),
 (3, 1, 9),
 (3, 2, 3),
 (3, 3, 0),
 (3, 4, 5),
 (3, 5, 2),
 (4, 0, 4),
 (4, 1, 3),
 (4, 2, 8),
 (4, 3, 8),
 (4, 4, 1),
 (4, 5, 1)]

In [35]:
#MAP
def prod1(triplet):
    i,j,Mij = triplet
    return i,Mij*V[j]

mapped = list(map(prod1, M_liste))
mapped

[(0, 30),
 (0, 24),
 (0, 14),
 (0, 30),
 (0, 2),
 (0, 24),
 (1, 48),
 (1, 20),
 (1, 56),
 (1, 42),
 (1, 14),
 (1, 12),
 (2, 30),
 (2, 24),
 (2, 14),
 (2, 54),
 (2, 16),
 (2, 6),
 (3, 54),
 (3, 36),
 (3, 21),
 (3, 0),
 (3, 10),
 (3, 12),
 (4, 24),
 (4, 12),
 (4, 56),
 (4, 48),
 (4, 2),
 (4, 6)]

In [36]:
#SHUFFLE AND SORT
from collections import defaultdict
dico = defaultdict(list)
list(map(lambda tup : dico[tup[0]].append(tup[1]), mapped))
shuffled = list(dico.items())
shuffled

[(0, [30, 24, 14, 30, 2, 24]),
 (1, [48, 20, 56, 42, 14, 12]),
 (2, [30, 24, 14, 54, 16, 6]),
 (3, [54, 36, 21, 0, 10, 12]),
 (4, [24, 12, 56, 48, 2, 6])]

In [37]:
#REDUCE
def reducer(key, values):
    return (key, sum(values))

reduced = list(map(lambda tup: reducer(tup[0], tup[1]), shuffled))
reduced

[(0, 124), (1, 192), (2, 144), (3, 133), (4, 148)]

In [38]:
#si on veut représenter le vecteur sous forme de liste:
[x[1] for x in reduced]

[124, 192, 144, 133, 148]

*Pour aller plus loin:* cas où $V$ ne peut pas être stocké entièrement dans les noeuds

La solution est assez triviale en théorie: puisque $V$ ne rentre pas dans un noeud, il faut le découper de sorte que chaque noeud s'occupe d'une partie du calcul.  
**Attention** derrière la simplicité de cette solution, il y a toutefois une petite subtilité. En effet, puisqu'on découpe $V$, il faut aussi découper $M$ de manière cohérente.

Dans notre exemple $M$ est de taille (5,6) et $V$ et de taille 6 c'est-à-dire en fait de taille (6,1).  
On découpe $V$ horizontalement en trois vecteurs de taille (2,1).  
Il faut donc pour pouvoir faire le produit découper $M$ "verticalement" en trois matrices de tailles (5,2).
Ensuite on aura plus qu'à regrouper les résultats.

In [39]:
print(M)
print(V)

[[5 6 2 5 1 4]
 [8 5 8 7 7 2]
 [5 6 2 9 8 1]
 [9 9 3 0 5 2]
 [4 3 8 8 1 1]]
[6 4 7 6 2 6]


In [40]:
M_splitted = [M[:,0:2], M[:,2:4],  M[:,4:6]]
V_splitted = [V[0:2], V[2:4], V[4:6]]

#on peut recréer un couple sous-matrice/sous-vecteur qui correspond à l'information stockée dans chaque noeud du cluster
MV = [(M_splitted[k], V_splitted[k]) for k in range(3)]
MV

[(array([[5, 6],
         [8, 5],
         [5, 6],
         [9, 9],
         [4, 3]]),
  array([6, 4])),
 (array([[2, 5],
         [8, 7],
         [2, 9],
         [3, 0],
         [8, 8]]),
  array([7, 6])),
 (array([[1, 4],
         [7, 2],
         [8, 1],
         [5, 2],
         [1, 1]]),
  array([2, 6]))]

In [41]:
#transformation en liste de triplets
def triplets(block):
    M = block[0]
    V = block[1]
    return [(i,j,M[i,j]) for i in range(M.shape[0]) for j in range(M.shape[1])], list(V)

MV_triplets = list(map(triplets, MV))
MV_triplets

[([(0, 0, 5),
   (0, 1, 6),
   (1, 0, 8),
   (1, 1, 5),
   (2, 0, 5),
   (2, 1, 6),
   (3, 0, 9),
   (3, 1, 9),
   (4, 0, 4),
   (4, 1, 3)],
  [6, 4]),
 ([(0, 0, 2),
   (0, 1, 5),
   (1, 0, 8),
   (1, 1, 7),
   (2, 0, 2),
   (2, 1, 9),
   (3, 0, 3),
   (3, 1, 0),
   (4, 0, 8),
   (4, 1, 8)],
  [7, 6]),
 ([(0, 0, 1),
   (0, 1, 4),
   (1, 0, 7),
   (1, 1, 2),
   (2, 0, 8),
   (2, 1, 1),
   (3, 0, 5),
   (3, 1, 2),
   (4, 0, 1),
   (4, 1, 1)],
  [2, 6])]

In [42]:
#MAP
def mapper(block):
    M = block[0]
    V = block[1]
    return [(triplet[0], triplet[2]*V[triplet[1]]) for triplet in M]
    
mapped = list(map(mapper, MV_triplets))
mapped

[[(0, 30),
  (0, 24),
  (1, 48),
  (1, 20),
  (2, 30),
  (2, 24),
  (3, 54),
  (3, 36),
  (4, 24),
  (4, 12)],
 [(0, 14),
  (0, 30),
  (1, 56),
  (1, 42),
  (2, 14),
  (2, 54),
  (3, 21),
  (3, 0),
  (4, 56),
  (4, 48)],
 [(0, 2),
  (0, 24),
  (1, 14),
  (1, 12),
  (2, 16),
  (2, 6),
  (3, 10),
  (3, 12),
  (4, 2),
  (4, 6)]]

In [43]:
# SHUFFLE AND SORT
def shfld(block):
    dico = defaultdict(list)
    list(map(lambda tup : dico[tup[0]].append(tup[1]), block))
    return list(dico.items())

shuffled = list(map(shfld, mapped))
shuffled

[[(0, [30, 24]), (1, [48, 20]), (2, [30, 24]), (3, [54, 36]), (4, [24, 12])],
 [(0, [14, 30]), (1, [56, 42]), (2, [14, 54]), (3, [21, 0]), (4, [56, 48])],
 [(0, [2, 24]), (1, [14, 12]), (2, [16, 6]), (3, [10, 12]), (4, [2, 6])]]

In [44]:
#REDUCE
def reducer(block):
    return list(map(lambda tup: (tup[0], sum(tup[1])), block))

reduced = list(map(reducer, shuffled))
reduced

[[(0, 54), (1, 68), (2, 54), (3, 90), (4, 36)],
 [(0, 44), (1, 98), (2, 68), (3, 21), (4, 104)],
 [(0, 26), (1, 26), (2, 22), (3, 22), (4, 8)]]

In [45]:
#On réitère SHUFFLE AND SORT + REDUCE sur la liste qui regroupe les résultats de chaque noeud
liste = [item for sublist in reduced for item in sublist]
reducer(shfld(liste))

[(0, 124), (1, 192), (2, 144), (3, 133), (4, 148)]

In [46]:
#pour avoir le vecteur final
[c[1] for c in reducer(shfld(liste))]

[124, 192, 144, 133, 148]