<h1> Data 831 (TP 1) - CAULLIREAU Dorian </h1>

Le TP se décompose en trois parties : (1) une partie introductive qui va nous faire travailler sur Map-Filter-Reduce avec Python, (2) une partie qui se propose de passer à l'échelle avec du multithreading et (3) une partie qui sera utilisée par la suite dans le TP 3.

Les parties 1 et 2 sont là pour se préparer à ce qu'il sera fait dans la partie 3.

## Partie 1 : MapReduce 

Hadoop est écrit en Java mais nous ferons cela en Python pour nous faciliter la vie.
Les questions ci-dessous sont pour nous familiariser avec Map-Filter-Reduce.


La fonction map ne nécessite pas de faire une importation car elle est native dans Python tout comme filter. La fonction reduce se trouve dans functools.

La fonction map est assez simple à comprendre, il s'agit d'appliquer un comportement à un ensemble de données. Deux éléments à prendre en compte : 

(1) Le comportement est représenté par une fonction ou une fonction anonyme

(2) L'ensemble des données doit être _iterable_. Il doit donc être possible d'accéder à chacun des éléments de façon séparée.

Voici un petit exemple : 

In [3]:
#creation d'une fonction pour obtenir le carré d'un nombre
##bien faire attention que puisque nous devons l'utiliser 
#dans le map, il faut qu'il y ait un return
def squared(n):
    return n*n
    
#l'utilisation de list ici autour du map permet d'obtenir
#les données sous une forme utilisable, essayez sans et vous
#verrez
res = list(map(squared, [1, 2, 3, 4]))

In [None]:
#on affiche le résultat, pas de surprise sur le résultat
print(res)

[1, 4, 9, 16]


Nous ne sommes pas obligés d'utiliser une fonction externe pour la fonction du map. Nous pouvons aussi utiliser une fonction anonyme pour faire la même chose

In [None]:
list(map(lambda x : x * x, [1, 2, 3, 4]))

[1, 4, 9, 16]

C'est même plus simple de la sorte, par contre il est préférable de conserver une telle approche pour de petites fonctions afin d'éviter un code lourd et difficile à vérifier

### Question
Histoire de commencer simplement, proposez une fonction map qui permet de transformer en majuscules les données (nous supposons que les données sont effectivement des chaines de caractères)

In [None]:
#Votre code ici

list(map(lambda x : x.upper(), ["hip", "hip", "hip", "hourra"]))

['HIP', 'HIP', 'HIP', 'HOURRA']

### Question

Un des exemples les plus classiques lorsque nous commençons Map-Reduce est de compter la fréquence de chaque mot dans un texte. Nous allons le travailler en deux temps. Commencez par définir une fonction va renvoyer pour une liste, le nombre 1 correspondant à la fréquence. Il s'agira ensuite d'appliquer une fonction reduce pour comptabiliser.

Par exemple pour : ["a","b","c","d"]

cela donnera [("a",1), ("b", 1), ("c", 1), ("d", 1)]


In [None]:
list(map(lambda x : (x,1), ["a","b","c","d"]))

[('a', 1), ('b', 1), ('c', 1), ('d', 1)]

### Question

Dans le précédent exemple, nous avons simplifié le travail puisque nous avons formé déjà une liste des chaines de caractères. Proposez cette fois une fonction map pour transformer le texte (il sera court) en une liste de mots. A nouveau pour nous simplifier la vie, nous ferons un texte avec seulement des espaces.

In [2]:
list(map(lambda x : (x,1), "Le petit enfant dans la forêt".split(" ")))

[('Le', 1), ('petit', 1), ('enfant', 1), ('dans', 1), ('la', 1), ('forêt', 1)]

### Question

Et pourquoi ne pas chainer les map ? Essayez avec une fonction square et ensuite une fonction puissance 3.

In [4]:
# Votre code ici
list(map(lambda x : x * x, map(squared, [1, 2, 3, 4])))

[1, 16, 81, 256]

La construction n'est pas map().map() comme nous aurions pu avoir envie 

Maintenant voyons la fonction filter qui ne sera pas plus compliquée. Comme son nom l'indique, il s'agit de sélectionner les données qui seront conservées

Voici un premier exemple : 

In [None]:
def positive(nb):
    if nb >= 0:
        return True
    else:
        return False
    
res = list(filter(positive, [1, 2, -5, -3, 4]))
print(res)

[1, 2, 4]


### Question

Ecrivez une fonction filter qui ne va retenir d'un texte que les mots faisant plus de 5 caractères de longueur

In [5]:
list(filter(lambda x :len(x) >= 5, "Les petits enfants dans la forêt".split(" ")))

['petits', 'enfants', 'forêt']

Nous voyons que les deux fonction map et filter seront utilisés de concert pour sélectionner les données à traiter. Nous pouvons bien évidemment appliquer cette approche pour des textes plus longs comme celui d'_Alice in Wonderland_. 



In [19]:
file = open("data/alice.txt","r")
data = file.read().replace('\n',' ').split(" ")
file.close()

#Pour l'affichage, on préfère afficher que quelques mots/expressions (donc long)
sizeword = 15
print(list(filter(lambda x :len(x) > sizeword, data)))
 

['waistcoat-pocket,', 'seen--everything', 'rabbit-hole--and', 'Jack-in-the-box,', 'bread-and-butter,', 'treacle-well--eh,', 'things--everything', 'bitter--and--and', 'WASHING--extra."\'', '"Uglification,"\'', 'means--to--make--anything--', 'adventures--beginning', 'bread-and-butter', 'bread-and-butter.', 'bread-and-butter', 'bread-and-butter,', '`important--unimportant--', "unimportant--important--'", 'farm-yard--while']


Voyons donc maintenant la dernière fonction reduce. Contrairement aux deux autres, celle-ci ne se trouve pas être native dans Python et il faut donc faire un import.

L'aspect déroutant de cette fonction reduce est de prendre les deux premiers éléments de la liste et de générer un résultat qui sera ensuite cumuler avec la valeur suivante. Il est donc important de bien choisir sa fonction de calcul et surtout la valeur retour de la fonction comme nous le verrons dans un exemple


In [None]:
from functools import reduce
product = reduce((lambda x, y: x * y), [1, 2, 3, 4, 5])
print(product)

120


In [None]:
# Voici un code qui ne fonctionne pas 
reduce(print, ["a", "b", "c", "d", "e"])

a b
None c
None d
None e


Comme on peut le voir, les deux premiers éléments ont été appariés puis ensuite les autres avec la valeur cumulative qui est ici None.

On voit tout de suite que sur une fonction reduce il est nécessaire de disposer d'une valeur numérique

### Question

Tentez de calculer le nombre d'occurrences de chacune des lettres qui sont passées en paramètres.

Il y a réellement plusieurs écueils pour réussir, en particulier pour la définition des données

In [None]:
array = ["a","a","a","a","a","a"]
find_value = "a"

def compteur_occurence(x,y):
  print(x,y)
  return (find_value, x[1]+y[1])

reduce(compteur_occurence, map(lambda x: (x,1), array))


('a', 1) ('a', 1)
('a', 2) ('a', 1)
('a', 3) ('a', 1)
('a', 4) ('a', 1)
('a', 5) ('a', 1)


('a', 6)

Oui, effectivement le principal problème est que l'approche avec reduce impose que nous n'avons pas des lettres différentes sinon le tout va être aggloméré en un tout sans pour autant distinguer les différentes lettres, il faut donc appeler la fonction reduce avec une seule lettre à la fois.

### Question

Ecrivez les fonctions nécessaires pour classer les données et faire des blocs en fonction de la lettre

In [20]:
string = sorted("dabca")
print("Resultat : " , list(map(lambda x : (x,1), string)))

Resultat :  [('a', 1), ('a', 1), ('b', 1), ('c', 1), ('d', 1)]


Afin de poursuivre sur cet exercice, effectuez le début du tutoriel : 
https://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

Comme vous pourrez vous en rendre compte, il est nécessaire d'adapter un peu pour répondre à nos besoins

## Partie 2 : Scaling

Nous allons prendre le tutoriel : 
https://towardsdatascience.com/a-beginners-introduction-into-mapreduce-2c912bb5e6ac

Il faudra faire attention, il y a quelques erreurs mais c'est assez facile à corriger

In [None]:
def find_longest_string(list_of_strings):
    longest_string = None
    longest_string_len = 0     
    
    for s in list_of_strings:
        if len(s) > longest_string_len:
            longest_string_len = len(s)
            longest_string = s    
    return longest_string



In [None]:
list_of_strings = ['abc', 'python', 'dima']
%time max_length = print(find_longest_string(list_of_strings))

python
CPU times: user 943 µs, sys: 847 µs, total: 1.79 ms
Wall time: 3.62 ms


In [None]:
large_list_of_strings = list_of_strings*1000
%time print(find_longest_string(large_list_of_strings))

python
CPU times: user 2.79 ms, sys: 0 ns, total: 2.79 ms
Wall time: 2.84 ms


In [None]:
large_list_of_strings = list_of_strings*100000000
%time max_length = max(large_list_of_strings, key=len)

CPU times: user 13.8 s, sys: 71.3 ms, total: 13.9 s
Wall time: 13.9 s


In [None]:
%%time

#step 1:
list_of_string_lens = [len(s) for s in list_of_strings]
list_of_string_lens = zip(list_of_strings, list_of_string_lens)

#step 2:
max_len = max(list_of_string_lens, key=lambda t: t[1])
print(max_len)

('python', 6)
CPU times: user 1.03 ms, sys: 3 µs, total: 1.04 ms
Wall time: 1.07 ms


In [21]:
mapper = len

def reducer(p, c):
    if p[1] > c[1]:
        return p
    return c

In [None]:
%%time

#step 1
mapped = map(mapper, list_of_strings)
mapped = zip(list_of_strings, mapped)#step 2:
reduced = reduce(reducer, mapped)

print(reduced)

('python', 6)
CPU times: user 1.13 ms, sys: 12 µs, total: 1.14 ms
Wall time: 1.43 ms


In [None]:
def chunkify(a, n):
    k, m = divmod(len(a), n)
    return [a[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n)]

In [None]:
data_chunks = chunkify(list_of_strings*100,30)

data_chunks = filter(lambda x: len(x)>0, data_chunks)
#step 1:
reduced_all = []
for chunk in data_chunks:
    mapped_chunk = map(mapper, chunk)
    mapped_chunk = zip(chunk, mapped_chunk)
    
    reduced_chunk = reduce(reducer, mapped_chunk)
    reduced_all.append(reduced_chunk)
    
#step 2:
reduced = reduce(reducer, reduced_all)

print(reduced)

('python', 6)


In [None]:
def chunks_mapper(chunk):
    mapped_chunk = map(mapper, chunk) 
    mapped_chunk = zip(chunk, mapped_chunk)
    return reduce(reducer, mapped_chunk)

In [None]:
%%time

data_chunks = chunkify(list_of_strings,30)
data_chunks = filter(lambda x: len(x)>0, data_chunks)

#step 1:
mapped = map(chunks_mapper, data_chunks)

#step 2:
reduced = reduce(reducer, mapped)

print(reduced)

('python', 6)
CPU times: user 1.26 ms, sys: 22 µs, total: 1.28 ms
Wall time: 3.12 ms


In [None]:
from multiprocessing import Pool

pool = Pool(8)
data_chunks = chunkify(list_of_strings*1000,8)
data_chunks = filter(lambda x: len(x)>0, data_chunks)

#step 1:
mapped = pool.map(chunks_mapper, data_chunks)

#step 2:
reduced = reduce(reducer, mapped)

print(reduced)

[('python', 6), ('python', 6), ('python', 6), ('python', 6), ('python', 6), ('python', 6), ('python', 6), ('python', 6)]
('python', 6)


In [None]:
from sklearn.datasets import fetch_20newsgroups

news = fetch_20newsgroups(subset='train')

data = news.data*10

In [None]:
import re 
import nltk
from nltk.corpus import stopwords
from collections import Counter 

nltk.download('stopwords')

ENGLISH_STOP_WORDS = set(stopwords.words('english'))

def clean_word(word):
    return re.sub(r'[^\w\s]','',word).lower()
    
def word_not_in_stopwords(word):
    return word not in ENGLISH_STOP_WORDS and word and word.isalpha()
    
def find_top_words(data):
    cnt = Counter()
    for text in data:
        tokens_in_text = text.split()
        tokens_in_text = map(clean_word, tokens_in_text)
        tokens_in_text = filter(word_not_in_stopwords, tokens_in_text)
        cnt.update(tokens_in_text)
        
    return cnt.most_common(10)

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [None]:
%time find_top_words(data)

CPU times: user 1min 1s, sys: 463 ms, total: 1min 2s
Wall time: 1min 2s


[('subject', 122520),
 ('lines', 118240),
 ('organization', 111850),
 ('would', 88690),
 ('one', 86470),
 ('writes', 78360),
 ('article', 67540),
 ('people', 58320),
 ('dont', 58130),
 ('like', 57570)]

In [None]:
def mapper(text):
    tokens_in_text = text.split()
    tokens_in_text = map(clean_word, tokens_in_text)
    tokens_in_text = filter(word_not_in_stopwords, tokens_in_text)
    return Counter(tokens_in_text)

def reducer(cnt1, cnt2):
    cnt1.update(cnt2)
    return cnt1
    
def chunk_mapper(chunk):
    mapped = map(mapper, chunk)
    reduced = reduce(reducer, mapped)
    return reduced

In [None]:
%%time

data_chunks = chunkify(data, 36)
data_chunks = filter(lambda x: len(x)>0, data_chunks)

#step 1:
mapped = pool.map(chunk_mapper, data_chunks)

#step 2:
reduced = reduce(reducer, mapped)

print(reduced.most_common(10))

Et puis pour finir : https://pymotw.com/2/multiprocessing/mapreduce.html

# Partie 3 : Map Suffle Reduce au pays des échecs

Jusqu’à présent, vous êtes habitués à travailler en mono-processus. Vous créez votre programme qui ingère les données de façon séquentielle et vous produisez le résultat à la fin. Ceci fonctionne très bien tant que le volume ou la vélocité reste peu importante. Mais quand nous parlons de faibles volumes, nous faisons allusion à quelques centaines de Mo, voire 1 ou 2 Go. Au-dessus, cela devient compliqué. Une base de données de 2 Go ne tient plus en mémoire sauf à utiliser Pandas par exemple en Python. 

Il faut donc paralléliser pour s’en sortir. C’est ici qu’interviennent des solutions comme Hadoop et MapReduce. Même s’il est possible de faire Hadoop et MapReduce en local sur votre machine (par l’intermédiaire d’une machine virtuelle, pesant au bas mot 5 Go), cela ne présente pas d’intérêt. En effet, il faudrait pouvoir lancer de nombreux (dizaines, centaines) nœuds pour noter un réel gain.

Mais ce n’est pas pour autant que nous ne pouvons pas avoir un premier feeling de l’apport de la parallélisation. 

En règle générale, nous présentons le cas d’école classique pour MapReduce à savoir compter le nombre de mots dans un texte mais il se trouve que vous l’avez déjà traité. Nous allons donc proposer quelque chose de nouveau dans le domaine des échecs.



Récupérer l’archive zip qui se trouve sur le Moodle du module Data 831. Elle contient un ensemble de parties d’échecs. C’est cette base qui servira pour l’exercice de MapReduce.

### Question 

Chaque position aux échecs peut être repérée de façon unique par l’intermédiaire de sa description au format FEN ou EPD. La description EPD permet de s’abstraire du nombre de coups joués. C’est donc cette description que nous choisirons. 
A partir du module python-chess (https://python-chess.readthedocs.io/en/latest/), obtenez pour une position sa description EPD. 


In [5]:
import chess.pgn

pgn = open("data.pgn")

first_game = chess.pgn.read_game(pgn)
second_game = chess.pgn.read_game(pgn)

board = first_game.board()

print(board.epd())


rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq -


### Question

Complétez le code précédent pour que l’ensemble des positions pour une partie soit connu.

In [6]:
saved_data = []

pgn = open("data/Lautier.pgn")
board = first_game.board()

for move in first_game.mainline_moves():
    saved_data.append(board.epd())
    board.push(move)

print("Nb de coup jouée : " + str(len(saved_data)))

Nb de coup jouée : 107


### Question

A nouveau, complétez le code pour que nous prenions en compte l’ensemble des parties qui se trouve dans l’archive.

In [7]:
#Votre code ici

pgn = open("data/Lautier.pgn")
game = chess.pgn.read_game(pgn)

games = []

while(game):
    board = game.board()

    data = []
    for move in game.mainline_moves():
        data.append(board.epd())
        board.push(move)
    games.append(data)


    # print("Nombre de coups pour cette partie : ", len(data))    
    
    game = chess.pgn.read_game(pgn) 

print("Nombre de partie " + str(len(games)))

Nombre de partie 1809


Pour la suite, nous pouvons faire une fonction pour récupérer l'ensemble des EPDS

In [8]:
def get_epds():

    pgn = open("data/Lautier.pgn")
    game = chess.pgn.read_game(pgn)
    epds = []

    while(game):
        epd = []
        board = game.board()
        epd.append(board.epd())
        for move in game.mainline_moves():
            board.push(move)
            epd.append(board.epd())

        epds.extend(epd)
        game = chess.pgn.read_game(pgn)

    return epds

### Question

Si vous avez compris le principe derrière Map, il faut que votre fonction Map renvoie une donnée à savoir l’élément et un compteur qui est toujours à 1. Faites de même ici avec la position au format EPD.

In [9]:
def myMap(epd):
    return (epd,1)

### Question

Il faut écrire la fonction Reduce qui somme par positions. Elle doit accepter de recevoir des entrées et compter le nombre d’occurrences. 
La sortie de la fonction Reduce sera la position et le nombre d’occurrences.

In [10]:

def myReducer(tab_epds):

    return_array = []

    for x, y in tab_epds:
        return_array.append((x, sum(y[1])))
        
    return return_array

### Question

Nous parlons souvent de MapReduce sans mentionner l’étape intermédiaire de Shuffle qui groupe les éléments selon la clé (à savoir ici la position EPD). Ecrire une fonction Shuffle qui va prendre en entrée ce qui provient de Map et qui va grouper les éléments par la clé.

In [11]:
def myShuffle(tab_epds):

    return_array = {}

    for x, y in tab_epds:
        if (x in return_array.keys()):
            return_array[x][1].append(y)
        else:
            return_array[x] = (x, [y])
            
    return return_array.items()

### Question

Créez la chaine de traitement MapShuffleReduce pour obtenir un résultat. Notez le temps nécessaire pour la réalisation de l’ensemble du traitement.

On récupère l'ensemble des epds pour l'ensemble des parties

In [19]:
array_epds = get_epds()

myMapped = list(map(myMap, array_epds))

myShffled = myShuffle(myMapped)

myReduced = myReducer(myShffled)

In [23]:
myFilter = filter(lambda x: x[1]>10, myReduced)

mySorted = sorted(myFilter, key=lambda x: x[1])

for x, y in mySorted:
    print(x, y)

rnbqk2r/1p2bppp/p2p1n2/4p3/4P3/1NN5/PPP1BPPP/R1BQ1RK1 b kq - 11
rnbqk1nr/ppppppbp/6p1/8/2PP4/8/PP2PPPP/RNBQKBNR w KQkq - 11
rnbqkbnr/pp1p1ppp/4p3/2p5/4P3/2N2N2/PPPP1PPP/R1BQKB1R b KQkq - 11
rnbqkb1r/ppp1pppp/3p1n2/8/2PP4/8/PP2PPPP/RNBQKBNR w KQkq - 11
rn1qkb1r/p1pp1ppp/bp2pn2/8/2PP4/5NP1/PP1NPP1P/R1BQKB1R b KQkq - 11
rn1qkb1r/pbp2ppp/1p2p3/3n4/3P4/P1N2N2/1PQ1PPPP/R1B1KB1R b KQkq - 11
rnbqkb1r/pppppp1p/5np1/8/2P5/2N5/PP1PPPPP/R1BQKBNR w KQkq - 11
r1bqkbnr/pp1p1ppp/2n1p3/2p5/3PP3/2N2N2/PPP2PPP/R1BQKB1R b KQkq - 11
r1bqkbnr/pp1p1ppp/2n1p3/8/3pP3/2N2N2/PPP2PPP/R1BQKB1R w KQkq - 11
rnbqk2r/pp1p1ppp/4pn2/8/1bPp4/2N1P3/PP2NPPP/R1BQKB1R w KQkq - 11
rnbqk2r/pp1p1ppp/4pn2/8/1bPP4/2N5/PP2NPPP/R1BQKB1R b KQkq - 11
r1bqkb1r/pp1ppppp/2n2n2/2p5/2PP4/2N2N2/PP2PPPP/R1BQKB1R b KQkq - 11
r1bqkb1r/pp1ppppp/2n2n2/8/2Pp4/2N2N2/PP2PPPP/R1BQKB1R w KQkq - 11
rnbqkbnr/ppp2ppp/3p4/4p3/2P5/2N5/PP1PPPPP/R1BQKBNR w KQkq - 11
r1bqk2r/pppn1ppp/4pn2/3P2B1/1b1P4/2N2N2/PP2PPPP/R2QKB1R b KQkq - 11
rnbqkb1r/ppp2ppp/4pn2/8

### Question

Il est clair que nous sommes encore sur du séquentiel et bien loin d’une approche Hadoop avec distribution du traitement entre plusieurs nœuds. Réfléchissez comment vous pouvez faire en sorte d’avoir une parallélisation des traitements. Est-ce au niveau du Map ? Au niveau du Reduce ? Qu’en est-il du Shuffle ? Donnez une explication qui servira de support pour la question suivante.

Nous pouvons instaurer un traitement en parallèle, avec plusieurs Mapper et plusieurs Reducer, nous pouvons avoir un traitement plus rapide. 

### Question

Mettre en place la proposition de la question 9. Regardez en quoi cela change quelque chose si vous augmentez le nombre de Map, le nombre de Reduce. Conservez les temps obtenus en fonction des différents cas envisagés

Présentation des solutions