# Hadoop  

Una volta installato Hadoop e configurato su *single-node* in modalità [*pseudo-distributed*](https://hadoop.apache.org/docs/r3.0.0/hadoop-project-dist/hadoop-common/SingleCluster.html) (ogni daemon esegue su un processo Java separato), tramite il seguente comando avviene l'esecuzione dei vari servizi, visualizzabili poi tramite *jps*

In [22]:
! /usr/local/cellar/hadoop/3.3.1/libexec/sbin/start-all.sh

Starting namenodes on [localhost]
localhost: /Users/gabrielesavoia/.bashrc: line 1: pyenv: command not found
Starting datanodes
localhost: /Users/gabrielesavoia/.bashrc: line 1: pyenv: command not found
Starting secondary namenodes [MacBook-Air-di-Gabriele.local]
MacBook-Air-di-Gabriele.local: /Users/gabrielesavoia/.bashrc: line 1: pyenv: command not found
2021-11-29 19:21:23,917 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting resourcemanager
Starting nodemanagers
localhost: /Users/gabrielesavoia/.bashrc: line 1: pyenv: command not found


## Elenco dei servizi attivi 
Alcuni si riferiscono all'**HDFS** (NameNode, SecondaryNameNode e DataNode), mentre altri a **YARN** (ResourceManager e NodeManager).

In [25]:
! jps

27028 SecondaryNameNode
28039 Jps
27224 ResourceManager
26889 DataNode
27326 NodeManager
26783 NameNode


# HDFS  

Trasferisco il file di nome 'documents.txt' nel HDFS in locazione '/'.

In [144]:
! hdfs dfs -put ./data/documents.txt /

2021-11-30 10:18:22,203 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [145]:
! hdfs dfs -ls /

2021-11-30 10:18:29,585 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 4 items
-rw-r--r--   1 gabrielesavoia supergroup        299 2021-11-30 10:18 /documents.txt
drwxr-xr-x   - gabrielesavoia supergroup          0 2021-11-30 10:16 /output
drwx------   - gabrielesavoia supergroup          0 2021-11-29 23:00 /tmp
drwxr-xr-x   - gabrielesavoia supergroup          0 2021-11-29 23:00 /user


**Interfaccia web** per il monitoraggio dell'HDFS :  

<img src='./img/note_HDFS.png' width="900" height="900">  

**DataNode** monitoraggio :  

<img src='./img/note_HDFS_DataNode_usage.png' width="900" height="900"> 

# MapReduce  

Come esempio dimostrativo, è stata implementata una versione basilare di MapReduce per la creazione di un **InvertedIndex** (ad ogni parola è associata la corrispondente posting list con i documenti in cui questa si presenta).  
In particolare è stata utilizzata la libreria [**MRJob**](https://mrjob.readthedocs.io/en/latest/) mediante la quale è possibile scrivere applicazioni MapReduce in Python per poi eseguerle sia in locale che in un cluster Hadoop. Nonostante Hadoop sia scritto principalmente di Java, mette a disposizione un particolare modulo definito [**HadoopStreaming**](https://mrjob.readthedocs.io/en/latest/guides/concepts.html#hadoop-streaming-and-mrjob) mediante il quale è possibile interagire con MapReduce anche da altri linguaggi in quanto la comunicazione avviene tramite *stdin* e *stdout.*  

Di seguito è riportato il codice relativo al **job MapReduce** *InvertedIndexMR*, nel quale sono definite principalmente due funzioni :  
* **map**: si occupa di leggere in input riga per riga del file 'documents.txt'. Questo file infatti contiene per ciasuna riga l'id del documento separato poi con un ':' dal relativo testo. La map ritorna quindi, senza considerare le stopword, coppie del tipo : (word, doc_id );
* **reduce**: elabora il risultato della map e ritorna in output, per ogni parola, la corrispondente posting list contenente, senza duplicati, i documenti a cui fa parte.  

Di seguito è riportato ciò che si vuole ottenere.  

<img src='./img/inverted_index.jpg' width="500" height="500">

In [161]:
%%file inverted_index.py
from mrjob.job import MRJob

# Non utilizzo NLTK dal momento che si tratta di codice di esempio
stop_words = ['il', 'con', 'i', 'a', 'e', 'al', 'con', '.', ',', 'nella', 'nei', 'nel', 'per', 'di', 'la', 'va', 'alle']

class InvertedIndexMR(MRJob):
    
    def mapper(self, _, line):
        """
        Input : righe del file.
        Return : coppie (word, doc_id)
        """
        doc_id, doc_text = line.split(':')
        doc_id = doc_id.strip()
        for word in doc_text.split():
            word = word.lower()
            if word not in stop_words:
                yield word, doc_id

    def reducer(self, word, doc_list): 
        """
        Input : coppie (word, [doc_id_1, doc_id_1, ... , doc_id_n] )
        Return : coppie (word, [doc_id_1, ... , doc_id_n] )            --> senza duplicati di documenti
        """
        unique_doc = list(set(doc_list))
        
        yield word, unique_doc

if __name__ == '__main__':
    InvertedIndexMR.run()

Overwriting inverted_index.py


## Documento da elaborare  

Il documento su cui viene eseguita la funzione di MapReduce è salvato in *./data/documents.txt*.

In [162]:
! cat ./data/documents.txt

doc1 : La storia iniziò nella città di Berlino nel lontano 1790
doc2 : Oggi Luca va con tutti i suoi amici a giocare a calcetto
doc3 : Questo fine settimana esco con alcuni amici di amici
doc4 : Durante la notte alcuni animali escono per cercare cibo
doc5 : Matteo e Luca torneranno a casa alle 23

## Run in locale  

In questo caso ci si riferisce al documento in locale presente in *./data*. 

In [163]:
! python inverted_index.py ./data/documents.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /var/folders/z3/yl30gjf55_x2qqgttnn1gjjc0000gn/T/inverted_index.gabrielesavoia.20211130.171917.118742
Running step 1 of 1...
job output is in /var/folders/z3/yl30gjf55_x2qqgttnn1gjjc0000gn/T/inverted_index.gabrielesavoia.20211130.171917.118742/output
Streaming final output from /var/folders/z3/yl30gjf55_x2qqgttnn1gjjc0000gn/T/inverted_index.gabrielesavoia.20211130.171917.118742/output...
"oggi"	["doc2"]
"questo"	["doc3"]
"settimana"	["doc3"]
"storia"	["doc1"]
"cercare"	["doc4"]
"cibo"	["doc4"]
"citt\u00e0"	["doc1"]
"durante"	["doc4"]
"esco"	["doc3"]
"escono"	["doc4"]
"fine"	["doc3"]
"giocare"	["doc2"]
"inizi\u00f2"	["doc1"]
"lontano"	["doc1"]
"luca"	["doc5", "doc2"]
"matteo"	["doc5"]
"notte"	["doc4"]
"animali"	["doc4"]
"berlino"	["doc1"]
"calcetto"	["doc2"]
"casa"	["doc5"]
"suoi"	["doc2"]
"torneranno"	["doc5"]
"tutti"	["doc2"]
"1790"	["doc1"]
"23"	["doc5"]
"alcuni"	["doc4

## Run in Hadoop  

In questo caso invece, ci si riferisce al file presente nell'HDFS. Per l'esecuzione è necessario inoltre definire la posizione del file *hadoop-streaming-3.3.1.jar*. 


In [158]:
! hdfs dfs -rm -R /output

2021-11-30 11:35:14,467 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted /output


In [159]:
! python inverted_index.py -r hadoop hdfs:///documents.txt --hadoop-streaming-jar hadoop-streaming-3.3.1.jar --output hdfs:///output

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in $PATH...
Found hadoop binary: /usr/local/bin/hadoop
Using Hadoop version 3.3.1
Creating temp directory /var/folders/z3/yl30gjf55_x2qqgttnn1gjjc0000gn/T/inverted_index.gabrielesavoia.20211130.103519.667336
uploading working dir files to hdfs:///user/gabrielesavoia/tmp/mrjob/inverted_index.gabrielesavoia.20211130.103519.667336/files/wd...
Copying other local files to hdfs:///user/gabrielesavoia/tmp/mrjob/inverted_index.gabrielesavoia.20211130.103519.667336/files/
Running step 1 of 1...
  Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  packageJobJar: [/var/folders/z3/yl30gjf55_x2qqgttnn1gjjc0000gn/T/hadoop-unjar5602148106066165414/] [] /var/folders/z3/yl30gjf55_x2qqgttnn1gjjc0000gn/T/streamjob8707569005565255841.jar tmpDir=null
  Connecting to ResourceManager at /127.0.0.1:8032
  Connecting to ResourceManager at /1

In [160]:
! hadoop fs -cat hdfs:///output/part-00000

2021-11-30 11:38:18,857 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
"1790"	["doc1"]
"23"	["doc5"]
"alcuni"	["doc3", "doc4"]
"amici"	["doc2", "doc3"]
"animali"	["doc4"]
"berlino"	["doc1"]
"calcetto"	["doc2"]
"casa"	["doc5"]
"cercare"	["doc4"]
"cibo"	["doc4"]
"citt\u00e0"	["doc1"]
"durante"	["doc4"]
"esco"	["doc3"]
"escono"	["doc4"]
"fine"	["doc3"]
"giocare"	["doc2"]
"inizi\u00f2"	["doc1"]
"lontano"	["doc1"]
"luca"	["doc2", "doc5"]
"matteo"	["doc5"]
"notte"	["doc4"]
"oggi"	["doc2"]
"questo"	["doc3"]
"settimana"	["doc3"]
"storia"	["doc1"]
"suoi"	["doc2"]
"torneranno"	["doc5"]
"tutti"	["doc2"]


# YARN  

Durante l'esecuzione del job MapReduce di prima, tramite l'interfaccia web di YARN, è stata monitorata l'esecuzione dell'applicazione. In particolare nella figura di seguito sono riportate tutte le applicazioni eseguite (la prima è quella in esecuzione non ancora terminata):  

<img src='./img/note_all_app.png' width="900" height="900">  

E' inoltre possibile avere dettagli maggiori riferiti alle singole applicazioni :  

<img src='./img/note_single_app.png' width="900" height="900"> 