<a href="https://colab.research.google.com/github/Bessel7/data-analyst/blob/main/TP2_Programmation_sous_MapReduce.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Introduction à la programmation distribuée sous <i>MapReduce</i>
L'objet principal de ce notebook est de maîtriser la programmation de traitement distribué sous <b>MapReduce</b>. 
Pour rappel, sous Hadoop, il ne revient pas au programmeur applicatif de mettre en œuvre les mécanismes de réplication, ni de gérer la traçabilité des grains et leur réaffectation. Ces tâches sont de la responsabilité du framework. 

Le programmeur applicatif a néanmoins le rôle de reformuler les algorithmes qu'il souhaite mettre en œuvre sur la plateforme distribuée suivant le mécanisme d’exécution <i>MapReduce</i>, un mécanisme d’exécution très populaire, présent dans plusieurs frameworks distribués.

<i>MapReduce</i> décompose l’ensemble des opérations à réaliser en deux types de tâches <b>élémentaires</b> et <b>uniformes</b>, les <i>Map</i> et les <i>Reduce</i>. Chaque donnée passe d'abord par une tâche <i>Map</i> qui la transforme et éventuellement par une seconde tâche <i>Reduce</i>. 

Aucun ordre d'exécution particulier n'est attendu entre différentes tâches <i>Map</i> ou entre différentes tâches <i>Reduce</i>. Une ou plusieurs tâches <i>Map</i> et/ou <i>Reduce</i> peuvent être facilement assignées à chaque nœud de calcul. Un nœud de calcul peut correspondre à un ordinateur individuel ou à un cœur d'une unité centrale multi-cœur. Dans ce dernier cas, la mémoire vive et le stockage de masse de l’ordinateur sont partagés entre les cœurs.

Le fonctionnement général de MapReduce est constitué des étapes suivantes:
<ol>
<li>L'ensemble de données à traiter est découpé en fragments (<i>chunks</i>).</li>
<li>Chaque tâche <i>Map</i> est assignée à un nœud de calcul qui reçoit un ou plusieurs fragments que la tâche <i>Map</i> transforme en une séquence de paires \[clé, valeur].</li>
<li>Chaque tâche <i>Reduce</i> est associée à une ou plusieurs clés et est assignée à un nœud de calcul.</li>
<li>Les paires (clé, valeur) produites par les <i>Map</i> sont groupées par clés et stockées sur les nœuds de calcul qui exécuteront les tâches <i>Reduce</i> respectives (étape shuffle).</li>
<li>Chaque tâche <i>Reduce</i> combine, pour chaque clé qui lui est associée, les valeurs des paires [clé, valeur] avec cette clé ; les résultats sont stockés et constituent le résultat du traitement.</li>
</ol>

Le programmeur écrit les fonctions <i>Map</i> et <i>Reduce</i>, le framework se charge du reste comme illustré ci-dessous dans le décompte distribué de la fréquence de chaque mot d'un corpus.
<!-- img width="70%" src="https://res.cloudinary.com/talend/image/upload/q_auto,w_923,h_486/resources/seo-articles/seo-what-is-mapreduce_gj9ehi.webp" -->

<img width="70%" src="https://www.nayaa.fr/bigdata/mr-execution-ex.png">

##Installation du Java Development Kit (JDK) 
Hadoop est écrit en Java et nécessite donc l'installation d'exécution de Java.

Installation du JDK

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Création de la variable d'environnement <JAVA_HOME> pour situer l'emplacement d'installationde Java 

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Installation du framework Hadoop

Téléchargement depuis les archives de la fondation Apache

In [None]:
!wget https://archive.apache.org/dist/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz

--2023-01-17 14:00:19--  https://archive.apache.org/dist/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 500749234 (478M) [application/x-gzip]
Saving to: ‘hadoop-3.3.0.tar.gz.1’


2023-01-17 14:00:38 (26.0 MB/s) - ‘hadoop-3.3.0.tar.gz.1’ saved [500749234/500749234]



Extraction de l'archive

In [None]:
!tar -xzvf hadoop-3.3.0.tar.gz

[1;30;43mLe flux de sortie a été tronqué et ne contient que les 5000 dernières lignes.[0m
hadoop-3.3.0/share/doc/hadoop/hadoop-project-dist/hadoop-common/build/source/hadoop-common-project/hadoop-common/target/api/org/apache/hadoop/fs/FSDataOutputStream.html
hadoop-3.3.0/share/doc/hadoop/hadoop-project-dist/hadoop-common/build/source/hadoop-common-project/hadoop-common/target/api/org/apache/hadoop/fs/TrashPolicyDefault.Emptier.html
hadoop-3.3.0/share/doc/hadoop/hadoop-project-dist/hadoop-common/build/source/hadoop-common-project/hadoop-common/target/api/org/apache/hadoop/fs/HarFileSystem.html
hadoop-3.3.0/share/doc/hadoop/hadoop-project-dist/hadoop-common/build/source/hadoop-common-project/hadoop-common/target/api/org/apache/hadoop/fs/PathExistsException.html
hadoop-3.3.0/share/doc/hadoop/hadoop-project-dist/hadoop-common/build/source/hadoop-common-project/hadoop-common/target/api/org/apache/hadoop/fs/XAttrSetFlag.html
hadoop-3.3.0/share/doc/hadoop/hadoop-project-dist/hadoop-common/b

Copie du dossier extrait dans l'emplacement <user/local>

In [None]:
!cp -r hadoop-3.3.0/ /usr/local/

## Programmation de tâches distribuées avec <i>MapReduce</i>

Création d'un repertoire <myinput> pour contenir le jeu de données à tester durant cet exercice e d'un second pour les résultats du traitement distribué

In [None]:
!mkdir -p ~/myinput
!mkdir -p ~/myoutput

Télachargement du jeu de données dans le fichier <u>purchases.txt</u>

In [None]:
!curl -L -o 'purchases.txt' 'https://drive.google.com/u/0/uc?id=1NS-PSXW8bSNpzFH4XRbtmMnMGhXBdYy6&export=download&confirm=t'

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100  201M  100  201M    0     0   142M      0  0:00:01  0:00:01 --:--:--  204M


Déplacement du fichier <u>purchases.txt</u> dans le répertoire <myinput>

In [None]:
!mv purchases.txt ~/myinput/purchases.txt

Vérification que les fichiers ont été bien copiés

In [None]:
!ls ~/myinput

purchases.txt


Affichage des premiers lignes du fichier. Le format des enregistrement est le suivant:
<table border='1'><tr>
<td>Date</td><td>Heure</td><td>Magasin</td><td>Produit</td><td>Montant</td><td>Moyen_de_paiement</td>
</tr></table>
La tabulation <b>\t</b> est utilisée comme séparateur de colonne ✅

In [None]:
!head -10  ~/myinput/purchases.txt

2012-01-01	09:00	San Jose	Men's Clothing	214.05	Amex
2012-01-01	09:00	Fort Worth	Women's Clothing	153.57	Visa
2012-01-01	09:00	San Diego	Music	66.08	Cash
2012-01-01	09:00	Pittsburgh	Pet Supplies	493.51	Discover
2012-01-01	09:00	Omaha	Children's Clothing	235.63	MasterCard
2012-01-01	09:00	Stockton	Men's Clothing	247.18	MasterCard
2012-01-01	09:00	Austin	Cameras	379.6	Visa
2012-01-01	09:00	New York	Consumer Electronics	296.8	Cash
2012-01-01	09:00	Corpus Christi	Toys	25.38	Discover
2012-01-01	09:00	Fort Worth	Toys	213.88	Visa


## Activité 1

Le travail consiste à utiliser <i>MapReduce</i> avec le langage Python et effectuer un traitement distribué. Notre but est de déterminer le total des achats par magasin en exploitant les données du fichier <purchases.txt>.

Vous devrez implémenter les fonctions <i>Map<i> et <i>Reduce</> du traitement.

Contenu de traitement dans la phase \<map>. <h2>C'est à vous de le faire &#8987;</h2>

In [None]:
#!/usr/bin/python
import sys
# Mettez vos instructions ...
for line in sys.stdin:
  data = line.strip().split("\t")
  if len(data) == 6:
    date, time, store, item, amount, payment = data
    print(store, "\t", str(amount))

Sauvegarde du code de traitement de la phase <map> dans le fichier "/content/map.py"

In [None]:
# All cell codes are stored in a List variable "In"
with open('/content/mapper.py', 'w') as f:
  f.write(In[11]) 
f.close()

Attribution de permission d'accès et d'exécution sur le fichier <mapper.py>

In [None]:
!chmod u+rwx /content/mapper.py

Test du traitement de la phase <map> sur quelques enregistrements

In [None]:
!head -10 ~/myinput/purchases.txt | python3 /content/mapper.py

San Jose 	 214.05
Fort Worth 	 153.57
San Diego 	 66.08
Pittsburgh 	 493.51
Omaha 	 235.63
Stockton 	 247.18
Austin 	 379.6
New York 	 296.8
Corpus Christi 	 25.38
Fort Worth 	 213.88


Contenu de traitement de la phase \<reduce>. <h2>C'est à vous de le faire aussi &#128521;</h2>

In [None]:
#!/usr/bin/python
from operator import itemgetter
import sys
# Mettez vos instructions ...
salesTotal = 0
oldKey = None

for line in sys.stdin:
  data = line.strip().split("\t")
  if len(data) != 2:
    continue
  
  thisKey, thisSale = data
  if oldKey and oldKey != thisKey:
    print(oldKey, "\t", str(salesTotal))
    salesTotal = 0
    
  oldKey = thisKey
  salesTotal += float (thisSale)

if oldKey != None:
  print(oldKey, "\t", str(salesTotal))

Sauvegarde du code de traitement de la phase <reduce> dans le fichier "/content/reduce.py"

In [None]:
# All cell codes are stored in a List variable "In"
with open('/content/reducer.py', 'w') as f:
  f.write(In[15]) 
f.close()

Attribution de permission d'accès et d'exécution sur le fichier <reducer.py>

In [None]:
!chmod u+rwx /content/reducer.py

Test du traitement distribué sur quelques enregistrements

In [None]:
!head -50 ~/myinput/purchases.txt | python3 /content/mapper.py | sort | python3 /content/reducer.py

Anchorage  	 327.6
Aurora  	 117.81
Austin  	 1176.98
Boston  	 418.94
Buffalo  	 483.82
Chandler  	 758.17
Chicago  	 31.08
Corpus Christi  	 25.38
Fort Wayne  	 370.55
Fort Worth  	 367.45
Fremont  	 222.61
Fresno  	 466.64
Greensboro  	 290.82
Honolulu  	 345.18
Houston  	 309.16
Indianapolis  	 135.96
Las Vegas  	 146.65
Lincoln  	 136.9
Madison  	 16.78
Minneapolis  	 182.05
Newark  	 39.75
New York  	 296.8
Norfolk  	 189.01
Omaha  	 491.31
Philadelphia  	 351.31
Pittsburgh  	 968.77
Portland  	 108.69
Reno  	 168.70999999999998
Riverside  	 268.29
San Bernardino  	 170.2
San Diego  	 66.08
San Francisco  	 260.65
San Jose  	 429.87
Spokane  	 291.5
Stockton  	 247.18
Tulsa  	 205.06
Virginia Beach  	 376.11


Lancement d'un job entier. Le résultat est dans le dossier "~/tryout".

In [None]:
!rm -r ~/myoutput
!/usr/local/hadoop-3.3.0/bin/hadoop jar /usr/local/hadoop-3.3.0/share/hadoop/tools/lib/hadoop-streaming-3.3.0.jar -input ~/myinput -output ~/myoutput -file /content/mapper.py  -file /content/reducer.py  -mapper 'python mapper.py'  -reducer 'python reducer.py'

2023-01-17 14:01:17,373 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/content/mapper.py, /content/reducer.py] [] /tmp/streamjob5218509498225147991.jar tmpDir=null
2023-01-17 14:01:18,373 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2023-01-17 14:01:18,505 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-01-17 14:01:18,505 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2023-01-17 14:01:18,527 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2023-01-17 14:01:18,716 INFO mapred.FileInputFormat: Total input files to process : 1
2023-01-17 14:01:18,740 INFO mapreduce.JobSubmitter: number of splits:7
2023-01-17 14:01:18,996 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local534642282_0001
2023-01-17 14:01:18,996 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-01-17 14:01:19,393 INFO mapred.Loca

Affichage du contenu du dossier "~/myoutput".

In [None]:
!ls ~/myoutput

part-00000  _SUCCESS


Affichage d'un part du résultat contenu dans le fichier de sortie. On y trouve la fréquence de chaque mot contenu dans le corpus de documents.

In [None]:
!tail -n 30 ~/myoutput/part-00000

Pittsburgh  	 10090124.820000002
Plano  	 10046103.609999977
Portland  	 10007635.769999985
Raleigh  	 10061442.540000036
Reno  	 10079955.160000028
Richmond  	 9992941.589999935
Riverside  	 10006695.420000065
Rochester  	 10067606.92000001
Sacramento  	 10123468.179999929
Saint Paul  	 10057233.569999969
San Antonio  	 10014441.700000057
San Bernardino  	 9965152.03999996
San Diego  	 9966038.389999935
San Francisco  	 9995570.540000021
San Jose  	 9936721.410000049
Santa Ana  	 10050309.929999996
Scottsdale  	 10037929.849999992
Seattle  	 9936267.37000001
Spokane  	 10083362.979999928
St. Louis  	 10002105.14000001
St. Petersburg  	 9986495.539999947
Stockton  	 10006412.639999853
Tampa  	 10106428.550000058
Toledo  	 10020768.880000055
Tucson  	 9998252.469999956
Tulsa  	 10064955.899999967
Virginia Beach  	 10086553.500000007
Washington  	 10139363.38999994
Wichita  	 10083643.209999999
Winston–Salem  	 10044011.830000004


# Activité 2
Nous continuons à travailler avec le même fichier en entrées (purchases.txt), mais pour obtenir des résultats différents. <u>Le but est donc d’écrire vos propres Mappers et Reducers</u>.
<ol>
<li>Donner le nombre de paiement par mode de paiement.</li>
<li>Quel est le chiffre d'affaire réalisé selon les jours de la semaine ?</li>
<li>Quelle est la liste des magasins ?</li>
<li>Quel est le nombre total des ventes et la valeur totale des ventes de tous magasins confondus ?</i>
</ol>

# **Correction**

**1 - Donner le nombre de paiement par mode de paiement.**

In [None]:
#!/usr/bin/python
import sys
# Mettez vos instructions ...
for line in sys.stdin:
  data = line.strip().split("\t")
  if len(data) == 6:
    date, time, store, item, amount, payment = data
    print(payment, "\t", 1)

In [None]:
# All cell codes are stored in a List variable "In"
with open('/content/mapper.py', 'w') as f:
  f.write(In[97]) 
f.close()

In [None]:
!chmod u+rwx /content/mapper.py

In [None]:
!head -10 ~/myinput/purchases.txt | python3 /content/mapper.py

Amex 	 1
Visa 	 1
Cash 	 1
Discover 	 1
MasterCard 	 1
MasterCard 	 1
Visa 	 1
Cash 	 1
Discover 	 1
Visa 	 1


In [None]:
#!/usr/bin/python
from operator import itemgetter
import sys
# Mettez vos instructions ...
current_count = 0
oldKey = None

for line in sys.stdin:
  data = line.strip().split("\t")
  if len(data) != 2:
    continue
  
  thisKey, count = data

  try:
      count = int(count)
  except ValueError:
      #count was not a number, so silently
      #ignore/discard this line
      continue

  if oldKey and oldKey != thisKey:
    print(oldKey, "\t", int(current_count))
    current_count = 0
    
  oldKey = thisKey
  current_count += count

if oldKey != None:
  print(oldKey, "\t", int(current_count))

In [None]:
# All cell codes are stored in a List variable "In"
with open('/content/reducer.py', 'w') as f:
  f.write(In[109]) 
f.close()

In [None]:
!chmod u+rwx /content/reducer.py

In [None]:
!head -50 ~/myinput/purchases.txt | python3 /content/mapper.py | sort | python3 /content/reducer.py

Amex  	 10
Cash  	 20
Discover  	 28
MasterCard  	 37
Visa  	 50


In [None]:
!rm -r ~/myoutput
!/usr/local/hadoop-3.3.0/bin/hadoop jar /usr/local/hadoop-3.3.0/share/hadoop/tools/lib/hadoop-streaming-3.3.0.jar -input ~/myinput -output ~/myoutput -file /content/mapper.py  -file /content/reducer.py  -mapper 'python mapper.py'  -reducer 'python reducer.py'

2023-01-17 15:02:03,250 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/content/mapper.py, /content/reducer.py] [] /tmp/streamjob1446424535333531386.jar tmpDir=null
2023-01-17 15:02:03,956 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2023-01-17 15:02:04,052 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-01-17 15:02:04,052 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2023-01-17 15:02:04,071 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2023-01-17 15:02:04,256 INFO mapred.FileInputFormat: Total input files to process : 1
2023-01-17 15:02:04,283 INFO mapreduce.JobSubmitter: number of splits:7
2023-01-17 15:02:04,502 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1574291105_0001
2023-01-17 15:02:04,502 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-01-17 15:02:04,946 INFO mapred.Loc

In [None]:
!ls ~/myoutput

part-00000  _SUCCESS


In [None]:
!tail -n 30 ~/myoutput/part-00000

Amex  	 826535
Cash  	 828770
Discover  	 827426
MasterCard  	 828524
Visa  	 827221


# **Correction** 

**2 - Quel est le chiffre d'affaire réalisé selon les jours de la semaine ?**

In [None]:
#!/usr/bin/python
import sys
from datetime import datetime
# Mettez vos instructions ...
for line in sys.stdin:
  data = line.strip().split("\t")
  if len(data) == 6:
    date, time, store, item, amount, payment = data
    day = datetime.fromisoformat(date)
    print(day.strftime("%A"), "\t", float(amount))

In [None]:
# All cell codes are stored in a List variable "In"
with open('/content/mapper.py', 'w') as f:
  f.write(In[116]) 
f.close()

In [None]:
!chmod u+rwx /content/mapper.py

In [None]:
!head -10 ~/myinput/purchases.txt | python3 /content/mapper.py

Sunday 	 214.05
Sunday 	 153.57
Sunday 	 66.08
Sunday 	 493.51
Sunday 	 235.63
Sunday 	 247.18
Sunday 	 379.6
Sunday 	 296.8
Sunday 	 25.38
Sunday 	 213.88


In [None]:
#!/usr/bin/python
from operator import itemgetter
import sys
# Mettez vos instructions ...
current_turnover = 0
oldKey = None

for line in sys.stdin:
  data = line.strip().split("\t")
  if len(data) != 2:
    continue
  
  thisKey, count = data

  if oldKey and oldKey != thisKey:
    print(oldKey, "\t", str(current_turnover))
    current_turnover = 0
    
  oldKey = thisKey
  current_turnover += float(count)

if oldKey != None:
  print(oldKey, "\t", str(current_turnover))

In [None]:
# All cell codes are stored in a List variable "In"
with open('/content/reducer.py', 'w') as f:
  f.write(In[126]) 
f.close()

In [None]:
!chmod u+rwx /content/reducer.py

In [None]:
!head -50 ~/myinput/purchases.txt | python3 /content/mapper.py | sort | python3 /content/reducer.py

Sunday  	 11259.819999999998


In [None]:
!rm -r ~/myoutput
!/usr/local/hadoop-3.3.0/bin/hadoop jar /usr/local/hadoop-3.3.0/share/hadoop/tools/lib/hadoop-streaming-3.3.0.jar -input ~/myinput -output ~/myoutput -file /content/mapper.py  -file /content/reducer.py  -mapper 'python mapper.py'  -reducer 'python reducer.py'

2023-01-17 15:17:36,548 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/content/mapper.py, /content/reducer.py] [] /tmp/streamjob6140463737155378217.jar tmpDir=null
2023-01-17 15:17:37,300 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2023-01-17 15:17:37,439 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-01-17 15:17:37,439 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2023-01-17 15:17:37,463 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2023-01-17 15:17:37,687 INFO mapred.FileInputFormat: Total input files to process : 1
2023-01-17 15:17:37,707 INFO mapreduce.JobSubmitter: number of splits:7
2023-01-17 15:17:37,932 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1950166958_0001
2023-01-17 15:17:37,932 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-01-17 15:17:38,306 INFO mapred.Loc

In [None]:
!ls ~/myoutput

part-00000  _SUCCESS


In [None]:
!tail -n 30 ~/myoutput/part-00000

Friday  	 147414929.49999785
Monday  	 150364112.0699967
Saturday  	 147410177.56999636
Sunday  	 150296795.46999726
Thursday  	 147353780.56999773
Tuesday  	 147246658.13999906
Wednesday  	 144371499.93999988


# **Correction** 

**3 - Quelle est la liste des magasins ?**

In [None]:
#!/usr/bin/python
import sys
# Mettez vos instructions ...
for line in sys.stdin:
  data = line.strip().split("\t")
  if len(data) == 6:
    date, time, store, item, amount, payment = data
    print(store)

In [None]:
# All cell codes are stored in a List variable "In"
with open('/content/mapper.py', 'w') as f:
  f.write(In[132]) 
f.close()

In [None]:
!chmod u+rwx /content/mapper.py

In [None]:
!head -30 ~/myinput/purchases.txt | python3 /content/mapper.py

San Jose
Fort Worth
San Diego
Pittsburgh
Omaha
Stockton
Austin
New York
Corpus Christi
Fort Worth
Las Vegas
Newark
Austin
Greensboro
San Francisco
Lincoln
Buffalo
San Jose
Boston
Houston
Las Vegas
Virginia Beach
Riverside
Tulsa
Reno
Chicago
Fort Wayne
San Bernardino
Madison
Austin


In [None]:
#!/usr/bin/python
from operator import itemgetter
import sys
# Mettez vos instructions ...
oldKey = None

for line in sys.stdin:
  data = line.strip().split("\t")
  if len(data) != 1:
    continue
  
  thisKey = data
  if oldKey and oldKey != thisKey:
    print(oldKey)
    
  oldKey = thisKey

if oldKey != None:
  print(oldKey)

In [None]:
# All cell codes are stored in a List variable "In"
with open('/content/reducer.py', 'w') as f:
  f.write(In[135]) 
f.close()

In [None]:
!chmod u+rwx /content/reducer.py

In [None]:
!head -50 ~/myinput/purchases.txt | python3 /content/mapper.py | sort | python3 /content/reducer.py

['Anchorage']
['Aurora']
['Austin']
['Boston']
['Buffalo']
['Chandler']
['Chicago']
['Corpus Christi']
['Fort Wayne']
['Fort Worth']
['Fremont']
['Fresno']
['Greensboro']
['Honolulu']
['Houston']
['Indianapolis']
['Las Vegas']
['Lincoln']
['Madison']
['Minneapolis']
['Newark']
['New York']
['Norfolk']
['Omaha']
['Philadelphia']
['Pittsburgh']
['Portland']
['Reno']
['Riverside']
['San Bernardino']
['San Diego']
['San Francisco']
['San Jose']
['Spokane']
['Stockton']
['Tulsa']
['Virginia Beach']


In [None]:
!rm -r ~/myoutput
!/usr/local/hadoop-3.3.0/bin/hadoop jar /usr/local/hadoop-3.3.0/share/hadoop/tools/lib/hadoop-streaming-3.3.0.jar -input ~/myinput -output ~/myoutput -file /content/mapper.py  -file /content/reducer.py  -mapper 'python mapper.py'  -reducer 'python reducer.py'

2023-01-17 15:22:14,115 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/content/mapper.py, /content/reducer.py] [] /tmp/streamjob4966580498203347478.jar tmpDir=null
2023-01-17 15:22:14,958 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2023-01-17 15:22:15,084 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-01-17 15:22:15,084 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2023-01-17 15:22:15,113 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2023-01-17 15:22:15,324 INFO mapred.FileInputFormat: Total input files to process : 1
2023-01-17 15:22:15,348 INFO mapreduce.JobSubmitter: number of splits:7
2023-01-17 15:22:15,605 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local884149691_0001
2023-01-17 15:22:15,605 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-01-17 15:22:16,042 INFO mapred.Loca

In [None]:
!ls ~/myoutput

part-00000  _SUCCESS


In [None]:
!tail -n 30 ~/myoutput/part-00000

['Pittsburgh']	
['Plano']	
['Portland']	
['Raleigh']	
['Reno']	
['Richmond']	
['Riverside']	
['Rochester']	
['Sacramento']	
['Saint Paul']	
['San Antonio']	
['San Bernardino']	
['San Diego']	
['San Francisco']	
['San Jose']	
['Santa Ana']	
['Scottsdale']	
['Seattle']	
['Spokane']	
['St. Louis']	
['St. Petersburg']	
['Stockton']	
['Tampa']	
['Toledo']	
['Tucson']	
['Tulsa']	
['Virginia Beach']	
['Washington']	
['Wichita']	
['Winston–Salem']	


# **Correction**

**4 - Quel est le nombre total des ventes et la valeur totale des ventes de tous les magasins confondus ?**

In [None]:
#!/usr/bin/python
import sys
# Mettez vos instructions ...
for line in sys.stdin:
  data = line.strip().split("\t")
  if len(data) == 6:
    date, time, store, item, amount, payment = data
    print(item, "\t", str(amount))

In [None]:
# All cell codes are stored in a List variable "In"
with open('/content/mapper.py', 'w') as f:
  f.write(In[155]) 
f.close()

In [None]:
!chmod u+rwx /content/mapper.py

In [None]:
!head -10 ~/myinput/purchases.txt | python3 /content/mapper.py

Men's Clothing 	 214.05
Women's Clothing 	 153.57
Music 	 66.08
Pet Supplies 	 493.51
Children's Clothing 	 235.63
Men's Clothing 	 247.18
Cameras 	 379.6
Consumer Electronics 	 296.8
Toys 	 25.38
Toys 	 213.88


In [None]:
#!/usr/bin/python
from operator import itemgetter
import sys
# Mettez vos instructions ...
total_Sale = 0
count = 0

for line in sys.stdin:
  data = line.strip().split("\t")
  if len(data) != 2:
    continue
    
  count += 1
  total_Sale += float(data[1])

if count != 0:
  print(f"Nombre total de ventes : {count} \n Valeur totale des ventes: {total_Sale}")

In [None]:
# All cell codes are stored in a List variable "In"
with open('/content/reducer.py', 'w') as f:
  f.write(In[161]) 
f.close()

In [None]:
!chmod u+rwx /content/reducer.py

In [None]:
!head -50 ~/myinput/purchases.txt | python3 /content/mapper.py | sort | python3 /content/reducer.py

Nombre total de ventes : 50 
 Valeur totale des ventes: 11259.819999999998


In [None]:
!rm -r ~/myoutput
!/usr/local/hadoop-3.3.0/bin/hadoop jar /usr/local/hadoop-3.3.0/share/hadoop/tools/lib/hadoop-streaming-3.3.0.jar -input ~/myinput -output ~/myoutput -file /content/mapper.py  -file /content/reducer.py  -mapper 'python mapper.py'  -reducer 'python reducer.py'

2023-01-17 15:50:19,940 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/content/mapper.py, /content/reducer.py] [] /tmp/streamjob5581750343310435674.jar tmpDir=null
2023-01-17 15:50:20,699 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2023-01-17 15:50:20,806 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-01-17 15:50:20,806 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2023-01-17 15:50:20,831 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2023-01-17 15:50:21,088 INFO mapred.FileInputFormat: Total input files to process : 1
2023-01-17 15:50:21,110 INFO mapreduce.JobSubmitter: number of splits:7
2023-01-17 15:50:21,346 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local204468955_0001
2023-01-17 15:50:21,346 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-01-17 15:50:21,686 INFO mapred.Loca

In [None]:
!ls ~/myoutput

part-00000  _SUCCESS


In [None]:
!tail -n 30 ~/myoutput/part-00000

Nombre total de ventes : 4138476 	
 Valeur totale des ventes: 1034457953.2599641	


# Références
**Réalisez des calculs distribués sur des données massives** : 
https://openclassrooms.com/fr/courses/4297166-realisez-des-calculs-distribues-sur-des-donnees-massives/4308656-familiarisez-vous-avec-hadoop

**Hadoop : la nouvelle infrastructure de gestion de données** : https://juvenal-chokogoue.developpez.com/tutoriels/hadoop-fonctionnement/

**MapReduce : comment l’utiliser pour le Big Data ?** : https://datascientest.com/mapreduce

**Calcul distribué: Hadoop et MapReduce** : http://b3d.bdpedia.fr/calculdistr.html

**Language Processing and Python** : https://www.nltk.org/book/ch01.html