Préambule : ce projet est associé au cours Hadoop et Map_Reduce et se compose de deux projets pour implémenter Map Reduce et pour utiliser via Python et la librairie hdfs3 l'utilisation d'hadoop. Ce Notebook est autoportant avec des chemins en relatif avec l'arborescence des fichiers et dossiers associés.


# Projet Map Reduce

## Mapper et Reducer

### Mapper

Le mapper va charger uniquement :
* le premier champs et extraire le mois (deux premiers caractères)
* le second champs pour ensuite filter les départs
* le troisième champs pour connaitre la ville 
* le 11ème champs pour éviter le double comptage
* le 13ème champs pour le maximum de seats

Les clés, valeurs exposés correspondent au mois et nombre de seats en filtrant sur la ville Sydney dans le cas d'un départ de cette ville et dans le cas où le champs stop est à 0.

### Reducer

Le réducer utilise un dictionnaire permettant de stocker par mois les résultats en les sommant. Un test est réalisé pour s'assurer que le nombre de seats est un entier. On trie le dictionnaire de résultats pour l'affficher par mois croissant.

## Test en local du mapper et reducer

In [1]:
!cat Reservation/data.csv | python Reservation/Reservation_mapper.py | python Reservation/Reservation_reduce.py

Month : 01	 Number of passengers from Sydney : 6190421
Month : 02	 Number of passengers from Sydney : 4766537
Month : 03	 Number of passengers from Sydney : 6261788
Month : 04	 Number of passengers from Sydney : 4960807
Month : 05	 Number of passengers from Sydney : 4928827
Month : 06	 Number of passengers from Sydney : 5935277
Month : 07	 Number of passengers from Sydney : 5213361
Month : 08	 Number of passengers from Sydney : 5112502
Month : 09	 Number of passengers from Sydney : 6590224
Month : 10	 Number of passengers from Sydney : 5187097
Month : 11	 Number of passengers from Sydney : 5051415
Month : 12	 Number of passengers from Sydney : 7250241


## Execution HDFS

### Creation répertoire HDFS

In [2]:
!hadoop fs -mkdir /Reservation

### Copie du fichier data.csv sur HDFS

In [3]:
!hadoop fs -put Reservation/data.csv /Reservation/ 

In [4]:
!hadoop fs -ls /Reservation/

Found 1 items
-rw-r--r--   1 cloudera supergroup    6126033 2019-12-16 01:10 /Reservation/data.csv


### Lancement MAP Reduce

In [5]:
!sudo hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.6.0-mr1-cdh5.13.0.jar \
-mapper "python $(pwd)/Reservation/Reservation_mapper.py" \
-reducer "python $(pwd)/Reservation/Reservation_reduce.py" \
-input /Reservation/data.csv \
-output /Reservation/out

packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.13.0.jar] /tmp/streamjob1549823992262428099.jar tmpDir=null
19/12/16 01:10:44 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/12/16 01:10:44 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/12/16 01:10:44 INFO mapred.FileInputFormat: Total input paths to process : 1
19/12/16 01:10:45 INFO mapreduce.JobSubmitter: number of splits:2
19/12/16 01:10:45 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1576155683072_0023
19/12/16 01:10:45 INFO impl.YarnClientImpl: Submitted application application_1576155683072_0023
19/12/16 01:10:45 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1576155683072_0023/
19/12/16 01:10:45 INFO mapreduce.Job: Running job: job_1576155683072_0023
19/12/16 01:10:55 INFO mapreduce.Job: Job job_1576155683072_0023 running in uber mode : false
19/12/16 01:10:55 INFO mapreduce.Job:  map 0% reduce 

### Recupération du fichier de résultats en local ou visualisation

In [6]:
!hadoop fs -get /Reservation/out/part-00000

get: `part-00000': File exists


In [7]:
!hadoop fs -cat /Reservation/out/part-00000

Month : 01	 Number of passengers from Sydney : 6190421
Month : 02	 Number of passengers from Sydney : 4766537
Month : 03	 Number of passengers from Sydney : 6261788
Month : 04	 Number of passengers from Sydney : 4960807
Month : 05	 Number of passengers from Sydney : 4928827
Month : 06	 Number of passengers from Sydney : 5935277
Month : 07	 Number of passengers from Sydney : 5213361
Month : 08	 Number of passengers from Sydney : 5112502
Month : 09	 Number of passengers from Sydney : 6590224
Month : 10	 Number of passengers from Sydney : 5187097
Month : 11	 Number of passengers from Sydney : 5051415
Month : 12	 Number of passengers from Sydney : 7250241


### Supression de l'ensemble du répertoire et de son contenu

In [8]:
!hadoop fs -rm -r /Reservation/

Deleted /Reservation


# HDFS

Installation de la librairie hdfs3

In [9]:
conda install -c conda-forge hdfs3

Collecting package metadata (current_repodata.json): done
Solving environment: done

# All requested packages already installed.


Note: you may need to restart the kernel to use updated packages.


## Connexion à HDFS via hdfs3

In [10]:
from hdfs3 import HDFileSystem

In [11]:
hdfs = HDFileSystem(host="localhost", port=8020)

## Création d'un répertoire sur HDFS

In [27]:
hdfs.mkdir("/HDFS_Data/")

In [28]:
hdfs.ls("/")

['//HDFS_Data',
 '//Repertoire1',
 '//benchmarks',
 '//enterprise-deployment.json',
 '//hbase',
 '//solr',
 '//test',
 '//tmp',
 '//user',
 '//var']

## Mettre les fichiers de données sur HDFS

In [29]:
hdfs.put("HDFS_Data/data1.txt", "/HDFS_Data/data1.txt")
hdfs.put("HDFS_Data/data2.txt", "/HDFS_Data/data2.txt")
hdfs.put("HDFS_Data/data3.txt", "/HDFS_Data/data3.txt")
hdfs.put("HDFS_Data/data4.txt", "/HDFS_Data/data4.txt")

In [30]:
hdfs.ls("/HDFS_Data/")

['/HDFS_Data/data1.txt',
 '/HDFS_Data/data2.txt',
 '/HDFS_Data/data3.txt',
 '/HDFS_Data/data4.txt']

## Comptabiliser le nombre de mots

In [31]:
def count_words(file):
    word_counts = {}
    for line in file:
        for word in line.strip().split():
            if (word in word_counts) :
                word_counts[word] += 1
            else:
                word_counts[word] = 1
    return word_counts

Juste pour vérifier le fonctionnement de la fonction prédéfinie

In [32]:
print(count_words(['Anthony Anthony Toto', 'Toto Titi']))

{'Anthony': 2, 'Toto': 2, 'Titi': 1}


On parcourt l'ensemble des fichiers hdfs, on les ouvre, on ajoute le contenu dans une liste puis on appelle la fonction pour déterminer le nombre de mots.

In [38]:
import os
listFile = []
for file in hdfs.glob(os.path.join('/HDFS_Data', '*.txt')):
    with hdfs.open(file) as f:
        print(f.info())
        content = f.read()
        content = (str)(content)
        listFile.append(content)

resultWord = count_words(listFile)

{'kind': 'file', 'name': '/HDFS_Data/data1.txt', 'last_mod': 1576487616, 'size': 67742, 'replication': 3, 'block_size': 67108864, 'owner': 'cloudera', 'group': 'supergroup', 'permissions': 511, 'last_access': 1576487616, 'encryption_info': None}
{'kind': 'file', 'name': '/HDFS_Data/data2.txt', 'last_mod': 1576487616, 'size': 68130, 'replication': 3, 'block_size': 67108864, 'owner': 'cloudera', 'group': 'supergroup', 'permissions': 511, 'last_access': 1576487616, 'encryption_info': None}
{'kind': 'file', 'name': '/HDFS_Data/data3.txt', 'last_mod': 1576487616, 'size': 67933, 'replication': 3, 'block_size': 67108864, 'owner': 'cloudera', 'group': 'supergroup', 'permissions': 511, 'last_access': 1576487616, 'encryption_info': None}
{'kind': 'file', 'name': '/HDFS_Data/data4.txt', 'last_mod': 1576487616, 'size': 67911, 'replication': 3, 'block_size': 67108864, 'owner': 'cloudera', 'group': 'supergroup', 'permissions': 511, 'last_access': 1576487616, 'encryption_info': None}


In [39]:
for key in sorted(resultWord) :
    print("%s %s"% (key, resultWord[key]))

' 4
Aenean 159
Aliquam 172
Aliquam. 1
Class 35
Cras 168
Curabitur 159
Curae; 31
Curae;\r\n\r\nMaecenas 1
Curae;\r\n\r\nSed 1
Donec 284
Duis 127
Etiam 151
Fusce 134
In 182
Integer 165
Interdum 28
Lorem 25
Maecenas 144
Mauris 146
Morbi 162
Nam 156
Nulla 181
Nullam 175
Nunc 141
Orci 19
Pellentesque 172
Phasellus 180
Praesent 143
Proin 149
Quisque 149
Sed 305
Suspendisse 175
Ut 155
Vestibulum 172
Vivamus 155
a 400
a, 55
a. 28
a.\r\n\r\nIn 1
a.\r\n\r\nOrci 1
ac 504
ac, 43
ac. 31
ac.\r\n\r\nSuspendisse 1
ac.\r\n\r\nVestibulum 1
accumsan 155
accumsan, 16
accumsan. 17
accumsan.\r\n\r\nIn 1
ad 38
adipiscing 30
aliquam 138
aliquam, 7
aliquam. 23
aliquam.\r\n\r\nAliquam 1
aliquet 149
aliquet, 12
aliquet. 34
aliquet.\r\n\r\nSuspendisse 1
amet 402
amet, 86
amet. 25
amet.\r\n\r\nUt 1
ante 218
ante, 34
ante. 48
ante.\r\n\r\nAliquam 1
ante.\r\n\r\nCurabitur 1
ante.\r\n\r\nEtiam 1
ante.\r\n\r\nFusce 1
ante.\r\n\r\nNullam 1
ante.\r\n\r\nPellentesque 1
ante.\r\n\r\nPraesent 1
ante.\r\n\r\nSuspendisse 2
a

In [40]:
print("Le nombre de mots qui se repete : " + str(len(resultWord)))

Le nombre de mots qui se repete : 881


In [41]:
print("Le top 10 des mots les plus repetes\n")
top10 = sorted(resultWord.items(), key=lambda k_v: k_v[1], reverse=True)[:10]
for (mot,count) in top10 :
    print("Le mot " + str(mot) + " est repete " + str(count))

Le top 10 des mots les plus repetes

Le mot et est repete 544
Le mot sit est repete 514
Le mot ac est repete 504
Le mot in est repete 477
Le mot sed est repete 452
Le mot id est repete 425
Le mot eget est repete 419
Le mot ut est repete 415
Le mot quis est repete 415
Le mot vel est repete 413


## Suppression du répertoire HDFS_Data

In [22]:
!hadoop fs -rm -r /HDFS_Data/

Deleted /HDFS_Data


In [23]:
!hadoop fs -ls

Found 7 items
drwxr-xr-x   - cloudera cloudera          0 2019-12-12 13:39 Magasin
drwxr-xr-x   - cloudera cloudera          0 2019-12-13 02:02 SalaireMinMax
drwxr-xr-x   - cloudera cloudera          0 2019-12-12 14:46 WordCount
drwxr-xr-x   - cloudera cloudera          0 2019-12-12 13:17 countword
-rw-r--r--   1 cloudera cloudera      53655 2019-12-12 05:43 enterprise-deployment.json
drwxr-xr-x   - cloudera cloudera          0 2019-12-12 13:00 temperature
drwxr-xr-x   - cloudera cloudera          0 2019-12-12 06:03 test
