__Enable python3 kernel__

### Step 0: Setting up the environment

Before starting the mapreduce study, we will download the necessary datasets

In [None]:

!wget https://raw.githubusercontent.com/legifrance/Les-codes-en-vigueur/master/forestier_mayotte.txt -O data/forestier_mayotte.txt

--2019-11-10 20:00:04--  https://raw.githubusercontent.com/legifrance/Les-codes-en-vigueur/master/forestier_mayotte.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.120.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.120.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1381 (1.3K) [text/plain]
Saving to: ‘data/forestier_mayotte.txt’


2019-11-10 20:00:04 (6.30 MB/s) - ‘data/forestier_mayotte.txt’ saved [1381/1381]



In [None]:
# national police ethics
!wget https://raw.githubusercontent.com/legifrance/Les-codes-en-vigueur/master/deontologie_police_nationale.txt -O data/deontologie_police_nationale.txt

--2019-11-10 20:00:06--  https://raw.githubusercontent.com/legifrance/Les-codes-en-vigueur/master/deontologie_police_nationale.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.120.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.120.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 7488 (7.3K) [text/plain]
Saving to: ‘data/deontologie_police_nationale.txt’


2019-11-10 20:00:06 (17.6 MB/s) - ‘data/deontologie_police_nationale.txt’ saved [7488/7488]



In [None]:
# river public domain
!wget https://raw.githubusercontent.com/legifrance/Les-codes-en-vigueur/master/domaine_public_fluvial.txt -O data/domaine_public_fluvial.txt

--2019-11-10 20:00:08--  https://raw.githubusercontent.com/legifrance/Les-codes-en-vigueur/master/domaine_public_fluvial.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.120.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.120.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 71173 (70K) [text/plain]
Saving to: ‘data/domaine_public_fluvial.txt’


2019-11-10 20:00:08 (5.60 MB/s) - ‘data/domaine_public_fluvial.txt’ saved [71173/71173]



In [None]:
# public health
!wget https://raw.githubusercontent.com/legifrance/Les-codes-en-vigueur/master/sante_publique.txt -O data/sante_publique.txt

--2019-11-10 20:00:09--  https://raw.githubusercontent.com/legifrance/Les-codes-en-vigueur/master/sante_publique.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.120.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.120.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 18111630 (17M) [text/plain]
Saving to: ‘data/sante_publique.txt’


2019-11-10 20:00:11 (18.9 MB/s) - ‘data/sante_publique.txt’ saved [18111630/18111630]



In [None]:
!wget https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2017-13/segments/1490218189495.77/wet/CC-MAIN-20170322212949-00140-ip-10-233-31-227.ec2.internal.warc.wet.gz\
    -O data/big-file.gz
!cd data && gunzip big-file.gz

--2019-11-10 20:00:11--  https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2017-13/segments/1490218189495.77/wet/CC-MAIN-20170322212949-00140-ip-10-233-31-227.ec2.internal.warc.wet.gz
Resolving commoncrawl.s3.amazonaws.com (commoncrawl.s3.amazonaws.com)... 52.216.112.3
Connecting to commoncrawl.s3.amazonaws.com (commoncrawl.s3.amazonaws.com)|52.216.112.3|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 154369177 (147M) [application/octet-stream]
Saving to: ‘data/big-file.gz’


2019-11-10 20:00:21 (16.6 MB/s) - ‘data/big-file.gz’ saved [154369177/154369177]



__Enable java kernel__

### <font color="red"></font> Step 1: Make a non-parallelized sequential program that counts the number of occurrences of words in a file.

In [None]:
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;

import static java.util.stream.Collectors.*;

public class WordsCount{
    
    public static ArrayList<String> read_file(String filename){
        ArrayList<String> lines = new ArrayList<>();
        try {
            lines = (ArrayList<String>) Files.readAllLines(Paths.get(filename));
        } catch (IOException e) {
            System.out.println("Problem when loading file");
            return null;
        }
        return lines;
    }
    
    public static ArrayList<String[]> tokenize(ArrayList<String> lines){
        ArrayList<String[]> tokenize_corpus = new ArrayList<>();
        for (String line : lines) {
            // removes punctuations
            line = line.replaceAll("\\p{Punct}","").toLowerCase().trim();

            tokenize_corpus.add(line.split(" "));
        }
        return tokenize_corpus;
    }
    
    public static HashMap<String, Double> words_count(String filename){
        // Function for the question 1
        ArrayList<String> lines = WordsCount.read_file(filename);
        ArrayList<String[]> lines_tokenized = WordsCount.tokenize(lines);
        HashMap<String, Double> words_count = new HashMap<>();

        for (String[] line_tokenized: lines_tokenized) {
            for (String word:line_tokenized) {
                if (word.isEmpty() != true){
                    if (words_count.get(word) == null) {
                            words_count.put(word, 1.0);
                    } else {
                            words_count.put(word, words_count.get(word)+1);
                    }
                }
            }
        }
        return words_count;
    }
    
    public static HashMap<String, Double> sorted_map_by_numeric_value(HashMap<String, Double> hash_map){
        // Function for question 2
        HashMap<String, Double> sorted_map = hash_map
                                            .entrySet()
                                            .stream()
                                            .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
                                            .collect(toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e2,
                                                    LinkedHashMap::new));
        return sorted_map;
    }
    
    public static HashMap<String, Double> sort(HashMap<String, Double> hash_map){
        // Function for question 3
        HashMap<String, Double> sorted_map = hash_map
                                                .entrySet()
                                                .stream()
                                                .sorted(new Comparator<Entry<String, Double>>() {
                                                    @Override
                                                    public int compare(Entry<String, Double> e1, Entry<String, Double> e2) {
                                                        if (e1.getValue().equals(e2.getValue())) {
                                                                return e1.getKey().compareTo(e2.getKey());
                                                        } else {
                                                            return e2.getValue().compareTo(e1.getValue());
                                                        }
                                                    }
                                                }).collect(toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1,
                                                        LinkedHashMap::new));
        return sorted_map;
    }
    
    public static void print_map(Map<String, Double> hash_map) {
        for (Entry<String, Double> el:hash_map.entrySet()) {
                System.out.println(el.getKey() + " " + el.getValue());
        }
    }
    
    public static void print_map(Map<String, Double> hash_map, Integer n) {
        Integer count = 0;
        for (Entry<String, Double> el:hash_map.entrySet()) {
            if(count == n){
                break;
            } else {
              System.out.println(el.getKey() + " " + el.getValue());
                count++;
            }
        }
    }
    
    public static void main(String[] args){
        System.out.print("Hello Jupyter from java");
    }
}

__1. First pure sequential counting__

Implement a software in java that counts the number of occurrences of words in an input file in a non-parallelized way (single-threaded, single-threaded), using a single processor.<br>
Which data structure is the most relevant to store the results: List, HashMap or HashSet or another? Why ?

 

Test your program with an input file input.txt with content: 
```sh 
Deer Beer River 
Car Car River 
Deer Car Beer
```
Results:
```sh 
Deer 2 
Beer 2 
River 2 
Car 3
```

In [None]:
WordsCount.print_map(WordsCount.words_count("data/input.txt"));

deer 2.0
car 3.0
river 2.0
beer 2.0


__2.First pure sequential sort__

Modify your program to sort by number of occurrences:
Results:
```sh
Car 3 
Deer 2 
Beer 2 
River 2
```

In [None]:
HashMap<String, Double> words_count = WordsCount.words_count("data/input.txt");
WordsCount.print_map(WordsCount.sorted_map_by_numeric_value(words_count));

car 3.0
deer 2.0
river 2.0
beer 2.0


__3.Second alphabetical sort in pure sequential__

Modify the program to sort alphabetically for words with equal number of occurrences: 

Result:
```sh
Car 3 
Beer 2 
Deer 2
River 2 
```

In [None]:
HashMap<String, Double> words_count = WordsCount.words_count("data/input.txt");
WordsCount.print_map(WordsCount.sort(words_count));

car 3.0
beer 2.0
deer 2.0
river 2.0


__4. Test of the sequential program on the forest code of Mayotte__

Then test your program with the Mayotte forestry code available on github forêt_mayotte.txt:
https://github.com/legifrance/Les-codes-en-vigueur 

Did your program work the first time?

Check by opening the text file that it contains text and not HTML code.

Do not waste time correcting any errors due to special characters or suspicious or illegible words (anyway later there will be Chinese in the text).

The program worked the first time, however special characters and punctuation skewed the results. As requested in the statement, I didn't spend much time on data cleaning. I only added a special Java regex to remove punctuation.

In [None]:
HashMap<String, Double> words_count = WordsCount.words_count("data/forestier_mayotte.txt");
WordsCount.print_map(WordsCount.sort(words_count), 15);

de 12.0
biens 8.0
ou 8.0
code 6.0
forestier 6.0
des 5.0
partie 5.0
agroforestiers 4.0
aux 4.0
communes 4.0
dispositions 4.0
forestiers 4.0
gestion 4.0
le 4.0
les 4.0


__5. The 50 words of the code of ethics of the national police__

Test your program with the national police code of ethics available on github deontologie_police_nationale.txt: https://github.com/legifrance/Les-codes-en-vigueur

Also, don't waste time filtering out special characters or other weird words. Why ? Because we will then work on texts in Chinese, Japanese, Arabic and other languages. If you implement a filter step here in French it will be useless afterwards. What are the first 5 words (that look like words) in the top 50 of the sorted result list? Keep the answer to include in the report.

The first 5 words in the top 50 of the sorted list are "de, la, police, et, des"

In [None]:
HashMap<String, Double> words_count = WordsCount.words_count("data/deontologie_police_nationale.txt");
WordsCount.print_map(WordsCount.sort(words_count), 50);

de 98.0
la 51.0
police 38.0
et 36.0
des 33.0
le 27.0
à 25.0
les 24.0
article 20.0
nationale 20.0
↬ 19.0
en 13.0
est 13.0
titre 13.0
ou 12.0
qui 11.0
fonctionnaires 10.0
lautorité 10.0
aux 9.0
code 9.0
fonctionnaire 9.0
par 9.0
commandement 8.0
du 8.0
leur 8.0
ses 8.0
au 7.0
devoirs 7.0
déontologie 7.0
il 7.0
ne 7.0
a 6.0
dans 6.0
cas 5.0
faire 5.0
lordre 5.0
ordres 5.0
pour 5.0
sa 5.0
se 5.0
si 5.0
sont 5.0
tout 5.0
doit 4.0
droits 4.0
elle 4.0
exécution 4.0
missions 4.0
pas 4.0
personne 4.0


__6. The 50 words of the fluvial public domain code__

Test your program with public domain river code domain_public_fluvial.txt.

What are the first 5 words (that look like words) in the top 50 of the sorted result list? Keep the answer to include in the report.

The first 5 words in the top 50 of the sorted list are "de, le, la, du, et"

In [None]:
HashMap<String, Double> words_count = WordsCount.words_count("data/domaine_public_fluvial.txt");
WordsCount.print_map(WordsCount.sort(words_count), 50);

de 630.0
le 429.0
la 370.0
du 347.0
et 295.0
les 240.0
des 222.0
à 209.0
est 173.0
dans 150.0
par 124.0
sur 123.0
ou 120.0
en 109.0
article 107.0
bateau 106.0
↬ 103.0
au 97.0
un 79.0
pour 73.0
tribunal 72.0
lieu 69.0
larticle 68.0
aux 66.0
il 66.0
dimmatriculation 64.0
qui 54.0
bureau 50.0
code 48.0
titre 48.0
navigation 46.0
que 46.0
dun 44.0
bateaux 43.0
où 43.0
son 43.0
domicile 42.0
se 41.0
créanciers 40.0
juge 38.0
a 37.0
propriétaire 37.0
commerce 36.0
saisie 36.0
intérieure 35.0
délai 34.0
nom 34.0
prix 34.0
sil 33.0
cas 32.0


__7. The 50 words of the public health code__
Test your program with the public health code sante_publique.txt.

What are the first 5 words (that look like words) in the top 50 of the sorted result list? Keep the answer to include in the report.

The first 5 words in the top 50 of the sorted list are "de, le, des, à, et"

In [None]:
HashMap<String, Double> words_count = WordsCount.words_count("data/sante_publique.txt");
WordsCount.print_map(WordsCount.sort(words_count), 50);

de 190889.0
la 82599.0
des 67813.0
à 65546.0
et 62702.0
les 62474.0
le 56800.0
du 48514.0
ou 40848.0
en 31742.0
par 30718.0
au 25730.0
dans 25104.0
article 23844.0
larticle 22998.0
↬ 21810.0
est 21792.0
un 20536.0
santé 20197.0
l 18762.0
pour 16876.0
aux 16144.0
sont 15048.0
une 13868.0
sur 12624.0
que 11348.0
r 11236.0
qui 10150.0
dun 9672.0
peut 8608.0
directeur 8604.0
lagence 8284.0
conditions 8094.0
être 8070.0
cas 8040.0
ne 7714.0
a 7622.0
conseil 7560.0
dune 7374.0
son 7298.0
général 6912.0
il 6666.0
ce 6352.0
ces 6188.0
1° 6114.0
dispositions 5926.0
2° 5872.0
leur 5766.0
pas 5702.0
sécurité 5282.0


__8. Sequential timing__

Time your program to the public health code.

In [None]:
double start_time = System.currentTimeMillis();
HashMap<String, Double> words_count = WordsCount.words_count("data/sante_publique.txt");
double end_time = System.currentTimeMillis();
double total_time = end_time - start_time;
System.out.print(String.format("Durée du word count: %f s", total_time/1000))

Durée du word count: 0.961000 s

__9. Work on large files__

Test your program on a real case: an extract of all the internet pages transformed into plain text format (WET format). All pages on the internet in text format are available at [commoncrawl](http://commoncrawl.org/the-data/get-started/): every month, around 3 billion web pages or 250 TB of data are stored . This data is available in slices of less than approximately 1GB, you will work on a slice of 380MB.
I chose a particular slice to have a comparison between us (you can test on other slices if you want). Download this installment here:
[large file](https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2017-13/segments/1490218189495.77/wet/CC-MAIN-20170322212949-00140-ip-10-233-31 -227.ec2.internal.warc.wet.gz)
Unzip and get file CC-MAIN-20170322212949-00140-ip-10-233-31-227.ec2.internal.warc.wet
This is a slice containing a set of websites in plain text (WET) format.
Test your program with this file as input. Time it.

In [None]:
double start_time = System.currentTimeMillis();
HashMap<String, Double> words_count = WordsCount.words_count("data/big-file");
double end_time = System.currentTimeMillis();
double total_time = end_time - start_time;
System.out.print(String.format("Durée du word count: %f s", total_time/1000))

Durée du word count: 61.482000 s

__Enable the phython3 kernel__



### <font color="red">Step 2</font> : Work with multiple computers in a network.

__1. Short name, long name__

What is the SHORT name of your computer (the simple name without the domain)? What is the LONG name of your computer (the name with the domain)? How to find them on the command line? On the school computers, is it possible to obtain these names other than on the command line? Add the responses to your report.

In current docker architecture, there is no difference between short name and long name of machines

In [None]:
!echo Nom long: `hostname -f`
!echo Nom court: `hostname`

Nom long: master
Nom court: master


__2. IP adress__

How to know the (several) IP addresses of your computer from the command line? Otherwise (through a website for example)? Add the responses to your report.

In [None]:
!echo Adresse ip : `hostname -I`

Adresse ip : 172.21.0.5


__3. From name to IP__

How to get IP addresses from the command line from a computer name? Add the responses to your report.

In [None]:
!host worker-1

worker-1 has address 172.21.0.3


__4. From IP to name__

How, from an IP address, to obtain the associated names on the command line? Add the responses to your report.

In [None]:
!host 172.19.0.4

Host 4.0.19.172.in-addr.arpa. not found: 3(NXDOMAIN)


__5. Ping pong inside!__

Test communication with other computers (not yours) from the school network using the ping command (to stop the ping press CTRL + C). followed by the short name, the long name, the IP. Do all three methods work? Add the responses to your report.

In [None]:
!ping worker-1

PING worker-1 (172.21.0.3) 56(84) bytes of data.
64 bytes from worker-1.docker_default (172.21.0.3): icmp_seq=1 ttl=64 time=4.23 ms
64 bytes from worker-1.docker_default (172.21.0.3): icmp_seq=2 ttl=64 time=0.069 ms
64 bytes from worker-1.docker_default (172.21.0.3): icmp_seq=3 ttl=64 time=0.098 ms
64 bytes from worker-1.docker_default (172.21.0.3): icmp_seq=4 ttl=64 time=0.126 ms
64 bytes from worker-1.docker_default (172.21.0.3): icmp_seq=5 ttl=64 time=0.105 ms
64 bytes from worker-1.docker_default (172.21.0.3): icmp_seq=6 ttl=64 time=0.067 ms
64 bytes from worker-1.docker_default (172.21.0.3): icmp_seq=7 ttl=64 time=0.055 ms
64 bytes from worker-1.docker_default (172.21.0.3): icmp_seq=8 ttl=64 time=0.060 ms
^C

--- worker-1 ping statistics ---
8 packets transmitted, 8 received, 0% packet loss, time 7275ms
rtt min/avg/max/mdev = 0.055/0.601/4.233/1.373 ms


In [None]:
!ping 172.19.0.4

PING 172.19.0.4 (172.19.0.4) 56(84) bytes of data.
^C

--- 172.19.0.4 ping statistics ---
3 packets transmitted, 0 received, 100% packet loss, time 2118ms



All methods work

__7. Calculate in command line on local computer

How to launch a command line calculation on your computer (for example 2 + 3)? Among the multiple possible answers, which ones allow you to launch the calculation and obtain the result by pressing the <Enter> key only once? Add the responses to your report.

In [None]:
!echo $((3+5)) 

8


__8. Calculate from the command line on a remote computer

How do I run a calculation (e.g. 2+3) from the command line on another computer (remotely)? You will certainly need to authenticate yourself with a password. How to get the result of the calculation immediately after typing your password? Add the responses to your report.

Passwordless ssh communication between docker containers is already configured.

In [None]:
!ssh root@worker-1 echo $((3+5))

8


### <font color="red">Step 3</font> : Work with local files or on an NFS server.

__1. Absolute path__

What is the absolute path of your personal directory, your home directory? (“cd” then “pwd” commands)

In [None]:
!cd && pwd

/home/jovyan


__2. A file in the home directory

Create a file fperso.txt containing the text “hello” in your home directory (on a school computer).
Check the contents of the file with this command exactly:
```sh
cat ~/fperso.txt
```

In [None]:
!echo bonjour > $HOME/fperso.txt && cat $HOME/fperso.txt

bonjour


__3. Where is the file located in the home directory?

Is this file on the hard drive of the computer or somewhere else? How can I find out where this file is physically stored, using which command?

In [None]:
!df -k $HOME/fperso.txt

Filesystem     1K-blocks     Used Available Use% Mounted on
overlay        263174212 25483060 224252996  11% /


__4. A folder and a file in the temporary directory

Create a folder `/tmp/<your username>` replacing <your username> (do not put the characters < and > ).
Create an ftemp.txt file in the `/tmp/<your username>` directory.
Check the contents of the file with this command exactly:
```sh
cat /tmp/<your username>/ftemp.txt
```
Is this folder and file on the computer's hard drive or somewhere else? How do I know where these items are physically stored, using which command?

In [None]:
!mkdir /tmp/$NB_USER && echo bonjour > /tmp/$NB_USER/ftemp.txt
!cat /tmp/$NB_USER/ftemp.txt

# To know where the file is stored
!df -k /tmp/$NB_USER/ftemp.txt

bonjour
Filesystem     1K-blocks     Used Available Use% Mounted on
overlay        263174212 25483068 224252988  11% /


__ File copy between remote computer __

In [None]:
!ssh root@worker-1 mkdir /tmp/$NB_USER
!scp /tmp/$NB_USER/ftemp.txt root@worker-1:/tmp/$NB_USER
!ssh root@worker-1 ls /tmp/$NB_USER

ftemp.txt                                     100%    8     6.6KB/s   00:00    
ftemp.txt


### <font color="red">Step 4</font> : Launch java programs remotely manually.

__Enable java kernel__

__A first SLAVE program under Eclipse__

Make a Java program named “SLAVE” which calculates 3+5, displays the result, under Eclipse (To launch Eclipse: Menu applications> development), launch this program in Eclipse (green arrow “run”)

In [None]:
public class Slave {

    public static void main() throws InterruptedException {
        System.out.println(3 + 5);
    }
}

In [None]:
new Slave().main()

8


__1. Export to JAR__

Export the Java program in executable slave.jar (Java ARchive called Runnable) under Eclipse. Be careful to check that the JAR is of the “Runnable”/”executable” type.

Le fichier `slave.jar` se trouve dans dossier `hadoop-from-scratch/jar`

__2. Run on local hard disk

Create the directory `/tmp/<your username>/`
Copy slave.jar executable to `/tmp/<your username>/` directory
Test and Run the slave.jar from the command line on your local computer.

__Enable python3 kernel__

In [None]:
!cp jar/slave.jar /tmp/$NB_USER && java -jar /tmp/$NB_USER/slave.jar

8


__3. JAR copy and remote execution

From machine A containing `/tmp/<your username>/slave.jar`
Create remotely on machine B (if it does not exist) a directory `/tmp/<your user name>/`
Copy slave.jar to machine B in the directory `/tmp/<your username>/`
Run remotely (from A on machine B) the slave.jar.
What is the command typed to perform this last action?

In [None]:
!ssh root@worker-1 mkdir /tmp/$NB_USER
!scp jar/slave.jar root@worker-1:/tmp/$NB_USER
!ssh root@worker-1 java -jar /tmp/$NB_USER/slave.jar

mkdir: cannot create directory '/tmp/jovyan': File exists
slave.jar                                     100%   10KB   4.6MB/s   00:00    
8


__Enable java kernel__

### <font color="red">Step 5</font> : Run command line programs from java and show standard output and error output.

__1. A MASTER java program that launches another command line program!

Write a java program named “MASTER” that runs the following command using ProcessBuilder:
```sh
ls -al /tmp
```
(you can also test this command in a terminal before)
Retrieve and display the output of this command.
You should use ProcessBuilder this way:

```java
ProcessBuilder pb = new ProcessBuilder("ls", “-al”, “/tmp”);
```

In [None]:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class Master {
    public static void run_shell_command(String[] command) {
        ProcessBuilder pb = new ProcessBuilder(command);

        try {
            Process process = pb.start();
            BufferedReader reader_input = new BufferedReader(new InputStreamReader(process.getInputStream()));

            String line;
            while ((line = reader_input.readLine()) != null) {
                System.out.println(line);
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws IOException {
        Master.run_shell_command(args[0].split(" "));
    }
}

In [None]:
new Master().main(new String[]{"ls -al /tmp"});

total 32
drwxrwxrwt 1 root   root  4096 Nov 10 20:03 .
drwxr-xr-x 1 root   root  4096 Nov 10 19:56 ..
drwxr-xr-x 2 jovyan users 4096 Aug  3 03:25 .empty
-rw-r--r-- 1 jovyan users  413 Nov 10 12:34 gradle-worker-classpath17894997741142749954txt
drwxr-xr-x 1 jovyan users 4096 Nov 10 20:04 hsperfdata_jovyan
drwxr-xr-x 2 root   root  4096 Nov 10 12:34 hsperfdata_root
drwxr-xr-x 2 jovyan users 4096 Nov 10 20:03 jovyan


__2. A MASTER java program that handles errors when launching another program from the command line.

Modify your "MASTER" program to show error output if there is an error while executing the command. Test the error output with a command that fails with an error output. For example, try running the command “ls /jesuisunhero”.
Explanations: if you type the command “ls /jesuisunhero”, the /jesuisunhero folder does not exist, you will get an error like “cannot access /jesuisunhero: no file or folder of this type.” which is displayed in the error output. Indeed, there are two outputs: the standard outputs (without error) and the error outputs.
You should use ProcessBuilder this way:
```java
ProcessBuilder pb = new ProcessBuilder("ls", "/jesuisunhero”);
```
On `pb` you can get the standard output stream and the error output stream.

In order to handle possible errors when launching the process, we can use the `getErrorStream` method of the `Process` object. This method returns the error message stream of the process.

In [None]:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class Master {
    public static void run_shell_command(String[] command) {
        ProcessBuilder pb = new ProcessBuilder(command);

        try {
            Process process = pb.start();
            BufferedReader reader_input = new BufferedReader(new InputStreamReader(process.getInputStream()));

            String line;
            while ((line = reader_input.readLine()) != null) {
                System.out.println(line);
            }
            
            BufferedReader reader_error = new BufferedReader(new InputStreamReader(process.getErrorStream()));

            while ((line = reader_error.readLine()) != null) {
                System.out.println(line);
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws IOException {
        Master.run_shell_command(args[0].split(" "));
    }
}

In [None]:
new Master().main(new String[]{"ls -al /fr"});

ls: cannot access '/fr': No such file or directory


__3. A MASTER java program that launches a slave.jar from the command line.

Modify your “MASTER” program so that it launches “SLAVE”, i.e. slave.jar located on the same machine as “MASTER” in the folder
```sh
/tmp/<your username>/slave.jar
```

In [None]:
new Master().main(new String[]{"java -jar jar/slave.jar"});

8


### <font color="red">Step 6</font> : Manage MASTER timeouts.

__1. A SLAVE that simulates a 10 second calculation.

Modify your SLAVE program so that it simulates a 10 second wait before displaying the result of the 3+5 calculation. For this use
`Thread.sleep(10000);`
Check the proper functioning of the SLAVE and note that there are 10 seconds between the start of the SLAVE and the display of the result. Be careful not to display anything before the 10 seconds for the following question to work correctly.
Generate the slave.jar again. Copy it overwriting the slave.jar in the `/tmp/<your username>/slave.jar` folder
Test launch from MASTER.

In [None]:
public class Slave {

    public static void main() throws InterruptedException {
        Thread.sleep(10000);
        System.out.println(3 + 5);
    }
}

In [None]:
new Slave().main()

8


__2. Manage timeouts at MASTER level

Modify the MASTER so that it waits for something to be written to the standard output (without error) or to the error output of the SLAVE for a certain maximum time. At the end of the allotted time, the MASTER considers a timeout.
It stops any threads (if you use threads - not mandatory) dealing with outputs and/or stops the process created with ProcessBuilder.

You will need to check the following TESTs:
- TEST1: Test the proper functioning of the timeout by launching the SLAVE with a timeout of 2 seconds on the outputs (standard and error). The timeout being shorter (at the level of the MASTER 2 seconds) than the calculation time of the SLAVE (10 seconds), the MASTER must stop the possible threads (if you use some) and the process.
- TEST 2: Then test with a timeout of 15 seconds, there should be no timeout.
- TEST 3: Then test by changing the SLAVE so that it no longer writes to the standard output (without error) but to the error output. To do this, replace in the Slave the System.out.print… by System.err.print…

In [None]:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class Master {

    static class ProcessLauncher {
        Process process;
        Integer timeout;
        String[] command;
        boolean error = false;
        ThreadReaderStream input_stream;
        ThreadReaderStream error_stream;

        ProcessLauncher(String command, Integer timeout) {
            this.timeout = timeout;
            this.command = command.split(" ");
            ProcessBuilder builder = new ProcessBuilder(this.command);
            try {
                this.process = builder.start();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
            assert process != null;
            input_stream = new ThreadReaderStream(process.getInputStream());
            error_stream = new ThreadReaderStream(process.getErrorStream());
            input_stream.start();
            error_stream.start();
        }

        boolean launch_process() throws InterruptedException {
            String line;
            while (((line = input_stream.queue.poll(this.timeout, TimeUnit.SECONDS)) != null)) {
                System.out.print(line + "\n");
            }
            while (((line = error_stream.queue.poll()) != null)) {
                System.out.print(line + "\n");
                error = true;
            }
            return !error;
        }
    }

    static class ThreadReaderStream extends Thread {
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
        BufferedReader reader;

        ThreadReaderStream(InputStream stream) {
            this.reader = new BufferedReader(new InputStreamReader(stream));
        }

        @Override
        public void run() {
            try {
                String line;
                while ((line = reader.readLine()) != null) {
                    queue.put(line);
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    public static void main(String[] args) throws InterruptedException {
        // Deploy the
        String[] pc = args;
        // Une facon de lancer les threads, sans attendre de retour de leur part:
        Arrays.asList(pc).parallelStream()
                .forEach(command -> {
                    try {
                        new ProcessLauncher(command, 2).launch_process();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }});
    }
}

### <font color="red">Step 7</font> Automatically deploy the SLAVE program on a set of machines.

From this step, the classes present in the `src.mapreduce` package of the project will be used to answer the questions. In order to simplify the report, certain questions will be dealt with briefly. The `mapreduce` package contains 5 classes:

- Master: This is the master program of the mapreduce.
- Slave: Corresponds to the code executed on the workers (map, shuffle and reduce).
- Deploy: Used to deploy items to workers
- Clean: Used to clean environments on workers
- Utils: Contains useful functions common between all classes

All process launches on the machines are done in parallel via the methods of the `Utils` class:

- `launch_actions_with_return`: Launch a process and get the output.
- `launch_actions_without_return`: Launches a process without waiting for a return.

The `parallelStream()` method perm

In [None]:
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class Utils {


    static ArrayList<String> read_file(String filename) {
        ArrayList<String> lines;
        try {
            lines = (ArrayList<String>) Files.readAllLines(Paths.get(filename));
        } catch (IOException e) {
            System.out.println("Problem when loading file");
            return null;
        }
        return lines;
    }

    static ArrayList<String[]> tokenize(ArrayList<String> lines) {
        ArrayList<String[]> tokenize_corpus = new ArrayList<>();
        for (String line : lines) {
            // removes punctuations
            line = line.toLowerCase().trim();

            tokenize_corpus.add(line.split(" "));
        }

        return tokenize_corpus;
    }

    static void launch_actions_without_return(ArrayList<String> actions) throws InterruptedException {
        ArrayList<ProcessLauncher> launchers = new ArrayList<>();
        for (String command : actions) {
            launchers.add(new ProcessLauncher(command, 2));
            System.out.println(command);
        }

        launchers.parallelStream().forEach(ProcessLauncher::launch_process);

        for (ProcessLauncher launcher : launchers)
            launcher.input_stream.join();
    }

    static List<Boolean> launch_actions_with_return(ArrayList<String> actions) throws InterruptedException {
        ArrayList<ProcessLauncher> launchers = new ArrayList<>();
        for (String command : actions) {
            launchers.add(new ProcessLauncher(command, 2));
        }
        List<Boolean> returnValue = launchers.parallelStream()
                .map(ProcessLauncher::launch_process).collect(Collectors.toCollection(ArrayList::new));
        for (ProcessLauncher launcher : launchers)
            launcher.input_stream.join();
        return returnValue;
    }

    static List<String> list_directory(String path_to_directory){
        try (Stream<Path> walk = Files.walk(Paths.get(path_to_directory))) {

            return walk.filter(Files::isRegularFile)
                    .map(Path::toString).collect(Collectors.toList());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return Collections.emptyList();
    }

    static void write_file(List<String> word_count, String filename, String mode) throws IOException {
        if (mode.equals("a")){
            try{
                FileWriter fstream = new FileWriter(filename,true);
                BufferedWriter writer = new BufferedWriter(fstream);
                for (String line:word_count){
                    writer.write(line + "\n");
                }
                writer.close();
            }catch (Exception e){
                System.err.println("Error while writing to file: " +
                        e.getMessage());
            }
        } else if (mode.equals("w")){
            BufferedWriter writer = new BufferedWriter(new FileWriter(filename));
            for (String line:word_count){
                writer.write(line + "\n");
            }
            writer.close();
        }
    }

    static class ProcessLauncher {
        Process process;
        Integer timeout;
        String[] command;
        ThreadReaderStream input_stream;
        ThreadReaderStream error_stream;

        ProcessLauncher(String command, Integer timeout) {
            this.timeout = timeout;
            this.command = command.split(" ");
            ProcessBuilder builder = new ProcessBuilder(this.command);
            try {
                this.process = builder.start();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
            assert process != null;
            input_stream = new ThreadReaderStream(process.getInputStream());
            error_stream = new ThreadReaderStream(process.getErrorStream());
            input_stream.start();
            error_stream.start();
        }

        boolean launch_process() {
            String line;
            boolean running = true;
            boolean tooLong = false;
            while (running) {
                try {
                    // Wait For retourne vrai si le programme est arrete
                    boolean stillRunning = !process.waitFor(5, TimeUnit.SECONDS);
                    // On lit la sortie standard. Si on a eu quelque chose, on continue
                    // ​
                    if (!input_stream.queue.isEmpty()) {
                        // On a du monde dans le buffer. On les recupere.
                        // Si on ne veut pas les récuperer, on peut faire un "reset"
                        // reader.reset();
                        while (((line = input_stream.queue.poll()) != null)) {
                            System.out.println(line);
                        }
                    } else if (!error_stream.queue.isEmpty()) {
                        // On a du monde dans le buffer. On les recupere.
                        // Si on ne veut pas les récuperer, on peut faire un "reset"
                        // reader.reset();
                        while (((line = error_stream.queue.poll()) != null)) {
                            System.out.println(line);
                        }
                    } else if (stillRunning) {
                        // Le process n'a rien écris pendant les 5 secondes. On le tue
                        tooLong = true;
                        process.destroy();
                    }
                    running = stillRunning && !tooLong;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return !tooLong && process.exitValue()==0;
        }
    }

    static class ThreadReaderStream extends Thread {
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
        BufferedReader reader;

        ThreadReaderStream(InputStream stream) {
            this.reader = new BufferedReader(new InputStreamReader(stream));
        }

        @Override
        public void run() {
            try {
                String line;
                while ((line = reader.readLine()) != null) {
                    queue.put(line);
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

__1. A DEPLOY program: Multiple SSH connection test__

Create a text file by hand containing: the IP addresses and/or names of the machines we want to use for our distributed system (for example all the machines in this lab room), with one name or one IP per line in the file.
Create a new java DEPLOY program that reads this file line by line and tests if the SSH connection works well on each machine. Attention, this is indeed a new program which is separate from MASTER or SLAVE, you will only run it in the event of an update of the SLAVE. This allows you to check that a machine is not turned off or that there is a connection problem (for example).
To verify that the connection works well, you can display the name of the remote machine (by executing the hostname command remotely) and verify that the display actually takes place, without errors. Reuse parts of codes from the fifth step.

Does your DEPLOY program launch connections sequentially (one after another) or in parallel?


__2. A DEPLOY program: copy of slave.jar multiple__

Modify your DEPLOY program to copy the slave.jar to `/tmp/<your username>/` on computers with a working SSH connection.

To do this, you must use the mkdir -p command to create the directories in /tmp if they do not already exist, wait for the mkdir to finish and then scp copy the slave.jar file. How do you wait for the mkdir to complete successfully?
Then manually verify that the file has been copied to the remote computers.
Be careful to wait for the end of the mkdir before launching the scp (we do not want to have a copy before the folder is actually created).

When copying, pay attention to the “/” character at the end of a path:
/tmp/foo is a path to a file named foo
/tmp/foo/ is a path to a folder named foo.

Does your DEPLOY program launch the copies sequentially (one after another) or in parallel?

Does your DEPLOY program launch the copies sequentially (one after another) or in parallel?

In [None]:
import java.util.*;

public class Deploy {

    public static Boolean deploy(List<String> hostnames, String host_path, String remote_path) throws InterruptedException {
        ArrayList<String> health_checks = new ArrayList<>();
        ArrayList<String> check_dir = new ArrayList<>();
        ArrayList<String> create_dirs = new ArrayList<>();
        ArrayList<String> run_copy = new ArrayList<>();

        // Create list of commands for each machines
        assert hostnames != null;
        for (String hostname : hostnames) {
            health_checks.add("ssh -o StrictHostKeyChecking=no root@" + hostname + " hostname");
            create_dirs.add("ssh root@" + hostname + " if test ! -d " + remote_path + "; then mkdir -p " + remote_path + "; fi");
            check_dir.add("ssh root@" + hostname + " ls /tmp/root");
            run_copy.add("scp -r " + host_path + " root@" + hostname + ":" + remote_path);
        }

        // Apply health checker
        List<Boolean> returnValue = Utils.launch_actions_with_return(health_checks);

        // Check all machine are alive and deploy jar
        boolean isNodesOk = returnValue.stream().allMatch(x -> x);
        if (isNodesOk) {
            Utils.launch_actions_without_return(create_dirs);

            // wait for directories creation and check the creation
            Thread.sleep(3000);
            returnValue = Utils.launch_actions_with_return(check_dir);

            // if all directories are created, deploy jar file
            if (returnValue.stream().allMatch(x -> x)) {
                Utils.launch_actions_without_return(run_copy);
            } else {
                System.out.println("Something went wrong during the directories creation");
                return false;
            }
        } else {
            System.out.println("Not all nodes are safe, please check connection");
            return false;
        }
        return true;
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Hello from deploy!");
    }
}

In [None]:
ArrayList<String> hostnames = Utils.read_file("data/hostnames.txt");
String path_to_jar = "jar/job.jar";
String path_to_remote_host = "/tmp/root/";

Deploy.deploy(hostnames,
          path_to_jar,
          path_to_remote_host);

worker-2
worker-3
worker-1
ssh root@worker-1 if test ! -d /tmp/root/; then mkdir -p /tmp/root/; fi
ssh root@worker-2 if test ! -d /tmp/root/; then mkdir -p /tmp/root/; fi
ssh root@worker-3 if test ! -d /tmp/root/; then mkdir -p /tmp/root/; fi
job.jar
job.jar
job.jar
scp -r jar/job.jar root@worker-1:/tmp/root/
scp -r jar/job.jar root@worker-2:/tmp/root/
scp -r jar/job.jar root@worker-3:/tmp/root/


true

### <font color="red">Step 8</font> : clean a set of machines with CLEAN.

__1. A “CLEAN” program that cleans up remote machines.__

Create a new CLEAN program (different from MASTER, SLAVE, or DEPLOY) that clears your /tmp/<your username>/ folder on computers with a working SSH connection. For this, you will use the same text file written by hand and used by DEPLOY containing: the IP addresses and/or the names of the machines that we want to use for our distributed system (for example all the machines in this lab room), with one name or IP per line in the file.

CLEAN reads this file line by line and erases on each of the machines your folder created in the temporary folder, by launching the remote command
```sh
rm -rf /tmp/<your username>/
```
Be careful to wait for the end of the `rm -rf` command to be sure that the deletion has been carried out.

Does your CLEAN program run erase commands sequentially (one after another) or in parallel?

__2. Verification of DEPLOY and CLEAN__

Manually verify that the deletion of folders takes place. This program allows you to clean a set of machines before restarting a calculation. From now on you can deploy your application using DEPLOY and you can clean up your application using CLEAN. So check that DEPLOY followed by CLEAN works correctly.

The commands are launched on the machines in parallel via the methods of the `Utils` class:

- `launch_actions_with_return`: Launch a process and get the output.
- `launch_actions_without_return`: Launches a process without waiting for a return.

The `parallelStream()` method allows us to browse and launch each command in our list in parallel.

In [None]:
import java.util.ArrayList;
import java.util.List;

public class Clean {
    public static void main(String[] args) throws InterruptedException {
        ArrayList<String> hostnames = Utils.read_file(args[0]);
        ArrayList<String> remove_folder = new ArrayList<>();
        ArrayList<String> check_remove = new ArrayList<>();

        // Create list of commands for each machines
        assert hostnames != null;
        for (String hostname : hostnames){
            remove_folder.add("ssh root@" + hostname + " rm -rf /tmp/root");
            check_remove.add("ssh root@" + hostname + " ls /tmp/root");
        }

        // Apply health checker
        List<Boolean> returnValue = Utils.launch_actions_with_return(remove_folder);
        System.out.println(returnValue);
        returnValue = Utils.launch_actions_with_return(check_remove);
        System.out.println(returnValue);
    }
}

In [None]:
String[] args = {"data/hostnames.txt"};

Clean.main(args);

[true, true, true]
ls: cannot access '/tmp/root': No such file or directory
ls: cannot access '/tmp/root': No such file or directory
ls: cannot access '/tmp/root': No such file or directory
[false, false, false]


### <font color="red">Step 10</font> : MAP.

![mapreduce](https://fxjollois.github.io/cours-2017-2018/slides/example-mapreduce-wordcount.png)

__1. A MASTER that deploys the splits__

Create three files corresponding to “splits” in the temporary directory. First, create these files manually in `/tmp/<your username>/splits`: S0.txt S1.txt S2.txt.

S0.txt contains:
Deer Beer River
S1.txt contains:
Car Car River
S2.txt contains:
Deer Car Beer

Modify your MASTER so that it copies the 3 split files in 3 different computers, copying only one slip per machine. For example, if there are 3 machines and 3 splits, the first machine will have the first split, the second the second etc. For this you will use the previously created file which contains the list of machines you want to use for your project.
Please note that the /tmp/<your username>/splits directory must be created on the 3 computers if it does not exist. This creation can be done automatically (by programmatically creating these directories). Be careful to wait until the creation of the folders is done before starting the copy of the splits. How do you wait for the creation of the folders to be done before actually copying the files? In the same way as the DEPLOY program, the MASTER will copy these splits to 3 computers whose SSH connection works.
Does your MASTER program launch the copies sequentially (one after the other) or in parallel?

__Enable python3 kernel__

The next cell creates the files needed for our mapreduce

In [None]:
!mkdir -p /tmp/root/splits
!echo "Deer Beer River" > /tmp/root/splits/S0.txt
!echo "Car Car River" > /tmp/root/splits/S1.txt
!echo "Deer Car Beer" > /tmp/root/splits/S2.txt

__Enable the java kernel and recompile the classes (Deploy, Clean and Utils)__

The `Master` class below launches the different phases of the mapreduce by timing each step. The `Master` waits for the completion of each step via the `join` method of the `Thread` class in Java. The `Master` waits for each process launched with the `launch_actions_with_return` and `launch_actions_without_return` methods to complete before continuing.

In [None]:
import java.util.*;

public class Master {
    public static void main(String[] args) throws InterruptedException {
        ArrayList<String> hostnames = Utils.read_file(args[0]);
        Set<String> slaves = new HashSet<>();
        List<String> files = Utils.list_directory(args[1]);
        ArrayList<String> health_checks = new ArrayList<>();
        ArrayList<String> create_dirs = new ArrayList<>();
        ArrayList<String> check_dir = new ArrayList<>();
        ArrayList<String> run_copy = new ArrayList<>();
        ArrayList<String> run_map = new ArrayList<>();
        ArrayList<String> run_shuffle = new ArrayList<>();
        ArrayList<String> run_reduce = new ArrayList<>();
        ArrayList<String> copy_hostnames_file = new ArrayList<>();
        Random rand = new Random();

        for (String file:files){
            assert hostnames != null;
            String slave = hostnames.get(rand.nextInt(hostnames.size()));
            slaves.add(slave);
            health_checks.add("ssh -o StrictHostKeyChecking=no root@" + slave + " hostname");
            create_dirs.add("ssh root@" + slave + " if test ! -d " + args[2] + "; then mkdir -p " + args[2] + "; fi");
            run_copy.add("scp -r " + file + " root@" + slave + ":" + args[2]);
            check_dir.add("ssh root@" + slave + " ls " + args[2]);
            run_map.add("ssh root@" + slave + " java -jar /tmp/root/job.jar 0 " + file);
            copy_hostnames_file.add("scp " + args[0] + " root@" + slave + ":/tmp/root");

        }
        for (String slave:slaves)
            run_shuffle.add("ssh root@" + slave + " java -jar /tmp/root/job.jar 1 ");

        assert hostnames != null;
        for (String slave:hostnames)
            run_reduce.add("ssh root@" + slave + " java -jar /tmp/root/job.jar 2 ");

        // Apply health checker
        List<Boolean> returnValue = Utils.launch_actions_with_return(health_checks);

        // Check all machine are alive and deploy jar
        boolean isNodesOk = returnValue.stream().allMatch(x -> x);
        if (isNodesOk) {
            Utils.launch_actions_without_return(create_dirs);

            // wait for directories creation and check the creation
            Thread.sleep(3000);
            returnValue = Utils.launch_actions_with_return(check_dir);

            // if all directories are created, deploy jar file
            if (returnValue.stream().allMatch(x -> x)) {
                Utils.launch_actions_without_return(run_copy);
            } else {
                System.out.println("Something went wrong during the directories creation");
            }
        } else {
            System.out.println("Not all nodes are safe, please check connection");
        }
    }
}

In [None]:
String[] args = {"data/hostnames.txt", "/tmp/root/splits", "/tmp/root/splits"};

Master.main(args);

worker-1
worker-3
worker-2
ssh root@worker-1 if test ! -d /tmp/root/splits; then mkdir -p /tmp/root/splits; fi
ssh root@worker-2 if test ! -d /tmp/root/splits; then mkdir -p /tmp/root/splits; fi
ssh root@worker-3 if test ! -d /tmp/root/splits; then mkdir -p /tmp/root/splits; fi
scp -r /tmp/root/splits/S0.txt root@worker-1:/tmp/root/splits
scp -r /tmp/root/splits/S1.txt root@worker-2:/tmp/root/splits
scp -r /tmp/root/splits/S2.txt root@worker-3:/tmp/root/splits


Les fichiers sont maintenants présents sur les machines. Vous pouvez vérifier en vous connectant via la ligne de commande du jupyter notebook :

```sh
ssh root@worker-[1-2-3]
```

__2. A SLAVE that does the map phase

Modify the SLAVE so that it calculates a map from a split.

To do this, it takes an operating mode as an argument: 0 corresponds to the calculation of the map from a split, then a file name “Sx.txt” as input from the splits folder and calculates a file “UMx.txt” output in the maps folder, with x varying (here from 1 to 3). As before, the maps folder must be created before you can write files to it. You must wait for the maps folder to be created before working with it. How do you wait for the maps folder to be well created before working with it?

The name of the file will be given as the args argument of the main:public static void main(String[] args)Test in a terminal the slave.jar as follows:
```sh
cd /tmp/<your username>/java -jar slave.jar 0 /tmp/<your username>/splits/S0.txt
```

File `/tmp/<your username>/maps/UM0.txt` should be created containing
- Dear 1
- Beer 1
-River 1

Test the operation of your SLAVE with the S1.txt file containing: `Car Car River`

>Ask yourself the question: why do we find two lines
Car 1
Car 1
Instead of a single line
Because 2?
A hint: the reduce phase which will arrive later, will make an addition of the values. In this case, the "function" of reduce is very simple: it is a large addition. Imagine a much more complex reduce function that performs a complex algorithm applied to all values...

In [None]:
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.net.InetAddress;

import static java.util.stream.Collectors.*;

public class Slave{

    private static ArrayList<String> transform_key_value(String filename){
        // Function for the question 1
        ArrayList<String> lines = Utils.read_file(filename);
        assert lines != null;
        ArrayList<String[]> lines_tokenized = Utils.tokenize(lines);
        ArrayList<String> words_count = new ArrayList<>();

        for (String[] line_tokenized: lines_tokenized) {
            for (String word:line_tokenized) {
                if (!word.isEmpty()){
                    words_count.add(word + " 1.0");
                }
            }
        }
        return words_count;
    }

    private static HashMap<String, Double> words_count(String filename){
        // Function for the question 1
        ArrayList<String> lines = Utils.read_file(filename);
        assert lines != null;
        ArrayList<String[]> lines_tokenized = Utils.tokenize(lines);
        HashMap<String, Double> words_count = new HashMap<>();

        for (String[] line_tokenized: lines_tokenized) {
            String word = line_tokenized[0];
            if (!word.isEmpty()){
                if (words_count.get(word) == null) {
                    words_count.put(word, 1.0);
                } else {
                    words_count.put(word, words_count.get(word)+1);
                }
            }
        }
        return words_count;
    }

    public static HashMap<String, Double> sorted_map_by_numeric_value(HashMap<String, Double> hash_map){
        // Function for question 2
        return hash_map
                .entrySet()
                .stream()
                .sorted(Collections.reverseOrder(Entry.comparingByValue()))
                .collect(toMap(Entry::getKey, Entry::getValue, (e1, e2) -> e2,
                        LinkedHashMap::new));
    }

    public static HashMap<String, Double> sort(HashMap<String, Double> hash_map){
        // Function for question 3
        return hash_map
                .entrySet()
                .stream()
                .sorted(new Comparator<Entry<String, Double>>() {
                    @Override
                    public int compare(Entry<String, Double> e1, Entry<String, Double> e2) {
                        if (e1.getValue().equals(e2.getValue())) {
                            return e1.getKey().compareTo(e2.getKey());
                        } else {
                            return e2.getValue().compareTo(e1.getValue());
                        }
                    }
                }).collect(toMap(Entry::getKey, Entry::getValue, (e1, e2) -> e1,
                        LinkedHashMap::new));
    }

    public static void print_map(Map<String, Double> hash_map) {
        for (Entry<String, Double> el:hash_map.entrySet()) {
            System.out.println(el.getKey() + " " + el.getValue());
        }
    }

    public static void print_map(Map<String, Double> hash_map, Integer n) {
        Integer count = 0;
        for (Entry<String, Double> el:hash_map.entrySet()) {
            if(count.equals(n)){
                break;
            } else {
                System.out.println(el.getKey() + " " + el.getValue());
                count++;
            }
        }
    }

    private static void map(String src_file, String map_file) throws IOException {
        ArrayList<String> words_count = Slave.transform_key_value(src_file);
        Utils.write_file(words_count, map_file, "a");
    }

    private static void shuffle(String file, ArrayList<String> hostnames) throws IOException, InterruptedException {
        String current_host = InetAddress.getLocalHost().getHostName();
        int nb_slaves = hostnames.size();

        ArrayList<String[]> keys_values = Utils.tokenize(Objects.requireNonNull(Utils.read_file(file)));
        for (String[] key_value:keys_values){
            int hash_key = key_value[0].hashCode();
            int compute_slave = hash_key%nb_slaves;
            if (!hostnames.get(compute_slave).equals(current_host)){
                File shuffle_file = new File ("/tmp/root/shuffle" + "/" + String.format("%d_%d_%d.txt",
                                                                                            hash_key,
                                                                                            compute_slave,
                                                                                            current_host.hashCode()));
                Utils.write_file(Collections.singletonList(key_value[0] + " " + key_value[1]),
                        shuffle_file.toString(),
                        "a");
                Deploy.deploy(Collections.singletonList(hostnames.get(compute_slave)),
                        shuffle_file.toString(), "/tmp/root/shuffle");
                shuffle_file.delete();
            } else {
                File shuffle_file = new File ("/tmp/root/shuffle" + "/" + hash_key + "_" + compute_slave + ".txt");
                Utils.write_file(Collections.singletonList(key_value[0] + " " + key_value[1]),
                         shuffle_file.toString(),
                        "a");
            }
        }
    }

    private static void reduce(String reduce_directory, String shuffle_directory) throws IOException {
        List<String> files = Utils.list_directory(shuffle_directory);
        HashMap<String, Double> words_count = new HashMap<>();

        for (String file:files) {
            words_count(file).forEach(
                    (key, value) -> words_count.merge(key, value, Double::sum)
            );
        }
        for (Entry<String, Double> key_value:words_count.entrySet()){
            int hash = key_value.getKey().hashCode();
            File reduce_file = new File(reduce_directory + "/" + hash + ".txt");
            Utils.write_file(Collections.singletonList(key_value.getKey() + " " + key_value.getValue()),
                                                    reduce_file.toString(), "w");
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        if (args[0].equals("0")){
            File split_file = new File(args[1]);
            String num = args[1].replaceAll("[^\\d]", "");
            File map_directory = new File(new File(split_file.getParent()).getParent() + "/maps");
            if (!map_directory.exists()){
                boolean result = map_directory.mkdir();
                if (!result){
                    System.out.println("Something goes wrong when creating maps folder");
                } else {
                    map(split_file.toString(), map_directory + "/UM" + num + ".txt");
                }
            } else {
                map(split_file.toString(), map_directory + "/UM" + num + ".txt");
            }
        } else if (args[0].equals("1")){
            ArrayList<String> hostnames = Utils.read_file("/tmp/root/hostnames.txt");
            ArrayList<String> health_checks = new ArrayList<>();
            assert hostnames != null;
            for (String hostname : hostnames)
                health_checks.add("ssh -o StrictHostKeyChecking=no root@" + hostname + " hostname");
            File shuffle_directory = new File("/tmp/root/shuffle");
            if (!shuffle_directory.exists()){
                boolean result = shuffle_directory.mkdir();
                if (!result){
                    System.out.println("Something goes wrong when creating shuffle folder");
                } else {
                    for (String file: Utils.list_directory("/tmp/root/maps")){
                        shuffle(file, hostnames);
                    }
                }
            } else {
                for (String file: Utils.list_directory("/tmp/root/maps")){
                    shuffle(file, hostnames);
                }
            }
        } else if (args[0].equals("2")){
            File reduce_directory = new File("/tmp/root/reduce");
            File shuffle_directory = new File("/tmp/root/shuffle");
            if (!reduce_directory.exists()){
                boolean result = reduce_directory.mkdir();
                if (!result){
                    System.out.println("Something goes wrong when creating reduce folder");
                } else {
                    reduce(reduce_directory.toString(), shuffle_directory.toString());
                }
            } else {
                reduce(reduce_directory.toString(), shuffle_directory.toString());
            }
        }

    }
}

Below is the `Slave` class used to launch the different stages of the mapreduce. The job.jar jar is generated from this class. Let's run the phase on the `master` server to understand how it works.

In [None]:
String[] files = {"S0.txt", "S1.txt", "S2.txt"};

for (String file:files){
    String[] args = {"0", String.format("/tmp/root/splits/%s",file)};
    Slave.main(args);
}

Via the jupyterlab command line, you can check the `/tmp/root/maps` folder which contains the created files and examine them to fully understand the process.

__3. A MASTER who launches the SLAVE for the map phase.__

Modify the MASTER so that it launches the map phase on several machines and displays “MAP FINISHED”. For this you will use the previously created file which contains the list of machines you want to use for your project.

To properly synchronize the MASTER with the SLAVES, make sure to display “MAP FINISHED” only once all the SLAVEs have finished and only when all the SLAVEs have finished, NOT BEFORE! The MASTER must therefore wait for SLAVEs to terminate correctly.
How do you make a process launched with ProcessBuilder in Java wait for the process to finish executing?
Does your MASTER program launch the SLAVEs sequentially (one after the other) or in parallel?

In [None]:
import java.util.*;

public class Master {
    public static void main(String[] args) throws InterruptedException {
        ArrayList<String> hostnames = Utils.read_file(args[0]);
        Set<String> slaves = new HashSet<>();
        List<String> files = Utils.list_directory(args[1]);
        ArrayList<String> health_checks = new ArrayList<>();
        ArrayList<String> create_dirs = new ArrayList<>();
        ArrayList<String> check_dir = new ArrayList<>();
        ArrayList<String> run_copy = new ArrayList<>();
        ArrayList<String> run_map = new ArrayList<>();
        ArrayList<String> run_shuffle = new ArrayList<>();
        ArrayList<String> run_reduce = new ArrayList<>();
        ArrayList<String> copy_hostnames_file = new ArrayList<>();
        Random rand = new Random();

        for (String file:files){
            assert hostnames != null;
            String slave = hostnames.get(rand.nextInt(hostnames.size()));
            slaves.add(slave);
            health_checks.add("ssh -o StrictHostKeyChecking=no root@" + slave + " hostname");
            create_dirs.add("ssh root@" + slave + " if test ! -d " + args[2] + "; then mkdir -p " + args[2] + "; fi");
            run_copy.add("scp -r " + file + " root@" + slave + ":" + args[2]);
            check_dir.add("ssh root@" + slave + " ls " + args[2]);
            run_map.add("ssh root@" + slave + " java -jar /tmp/root/job.jar 0 " + file);
            copy_hostnames_file.add("scp " + args[0] + " root@" + slave + ":/tmp/root");

        }
        for (String slave:slaves)
            run_shuffle.add("ssh root@" + slave + " java -jar /tmp/root/job.jar 1 ");

        assert hostnames != null;
        for (String slave:hostnames)
            run_reduce.add("ssh root@" + slave + " java -jar /tmp/root/job.jar 2 ");

        // Apply health checker
        List<Boolean> returnValue = Utils.launch_actions_with_return(health_checks);

        // Check all machine are alive and deploy jar
        boolean isNodesOk = returnValue.stream().allMatch(x -> x);
        if (isNodesOk) {
            Utils.launch_actions_without_return(create_dirs);

            // wait for directories creation and check the creation
            Thread.sleep(3000);
            returnValue = Utils.launch_actions_with_return(check_dir);

            // if all directories are created, deploy jar file
            if (returnValue.stream().allMatch(x -> x)) {
                Utils.launch_actions_without_return(run_copy);
            } else {
                System.out.println("Something went wrong during the directories creation");
            }
        } else {
            System.out.println("Not all nodes are safe, please check connection");
        }

        // Map
        double start_map_time = System.currentTimeMillis();
        Deploy.deploy(hostnames,
                "jar/job.jar", "/tmp/root/");
        Utils.launch_actions_without_return(run_map);
        System.out.println("MAP FINISHED");
        double end_map_time = System.currentTimeMillis();
        double total_map_time = end_map_time - start_map_time;
        System.out.println(String.format("map time: %f", total_map_time/1000));
    }
}

In [None]:
String[] args = {"data/hostnames.txt", "/tmp/root/splits", "/tmp/root/splits"};

Master.main(args);

worker-3
worker-3
ssh root@worker-2 if test ! -d /tmp/root/splits; then mkdir -p /tmp/root/splits; fi
ssh root@worker-3 if test ! -d /tmp/root/splits; then mkdir -p /tmp/root/splits; fi
ssh root@worker-3 if test ! -d /tmp/root/splits; then mkdir -p /tmp/root/splits; fi
S1.txt
S2.txt
S2.txt
scp -r /tmp/root/splits/S0.txt root@worker-2:/tmp/root/splits
scp -r /tmp/root/splits/S1.txt root@worker-3:/tmp/root/splits
scp -r /tmp/root/splits/S2.txt root@worker-3:/tmp/root/splits
worker-1
worker-3
worker-2
ssh root@worker-1 if test ! -d /tmp/root/; then mkdir -p /tmp/root/; fi
ssh root@worker-2 if test ! -d /tmp/root/; then mkdir -p /tmp/root/; fi
ssh root@worker-3 if test ! -d /tmp/root/; then mkdir -p /tmp/root/; fi
splits
splits
splits
scp -r jar/job.jar root@worker-1:/tmp/root/
scp -r jar/job.jar root@worker-2:/tmp/root/
scp -r jar/job.jar root@worker-3:/tmp/root/
ssh root@worker-2 java -jar /tmp/root/job.jar 0 /tmp/root/splits/S0.txt
ssh root@worker-3 java -jar /tmp/root/job.jar 0 /tmp/ro

The files are now present on the machines, in the `/tmp/root/maps` folder. You can check by connecting via the command line of the jupyter notebook:

```sh
ssh root@worker-[1-2-3]
```

### <font color="red">Step 11</font> : SHUFFLE.

__1. The MASTER that prepares the SLAVEs for the shuffle phase.__

Modify the MASTER so that it sends the previously created file which contains the list of machines you want to use for your project to all the SLAVES used for the map phase. Copy this file to the following target for all SLAVES:
`/tmp/<your username>/machines.txt`

In [None]:
import java.util.*;

public class Master {
    public static void main(String[] args) throws InterruptedException {
        ArrayList<String> hostnames = Utils.read_file(args[0]);
        Set<String> slaves = new HashSet<>();
        List<String> files = Utils.list_directory(args[1]);
        ArrayList<String> health_checks = new ArrayList<>();
        ArrayList<String> create_dirs = new ArrayList<>();
        ArrayList<String> check_dir = new ArrayList<>();
        ArrayList<String> run_copy = new ArrayList<>();
        ArrayList<String> run_map = new ArrayList<>();
        ArrayList<String> run_shuffle = new ArrayList<>();
        ArrayList<String> run_reduce = new ArrayList<>();
        ArrayList<String> copy_hostnames_file = new ArrayList<>();
        Random rand = new Random();

        for (String file:files){
            assert hostnames != null;
            String slave = hostnames.get(rand.nextInt(hostnames.size()));
            slaves.add(slave);
            health_checks.add("ssh -o StrictHostKeyChecking=no root@" + slave + " hostname");
            create_dirs.add("ssh root@" + slave + " if test ! -d " + args[2] + "; then mkdir -p " + args[2] + "; fi");
            run_copy.add("scp -r " + file + " root@" + slave + ":" + args[2]);
            check_dir.add("ssh root@" + slave + " ls " + args[2]);
            run_map.add("ssh root@" + slave + " java -jar /tmp/root/job.jar 0 " + file);
            copy_hostnames_file.add("scp " + args[0] + " root@" + slave + ":/tmp/root");

        }
        for (String slave:slaves)
            run_shuffle.add("ssh root@" + slave + " java -jar /tmp/root/job.jar 1 ");

        assert hostnames != null;
        for (String slave:hostnames)
            run_reduce.add("ssh root@" + slave + " java -jar /tmp/root/job.jar 2 ");

        // Apply health checker
        List<Boolean> returnValue = Utils.launch_actions_with_return(health_checks);

        // Check all machine are alive and deploy jar
        boolean isNodesOk = returnValue.stream().allMatch(x -> x);
        if (isNodesOk) {
            Utils.launch_actions_without_return(create_dirs);

            // wait for directories creation and check the creation
            Thread.sleep(3000);
            returnValue = Utils.launch_actions_with_return(check_dir);

            // if all directories are created, deploy jar file
            if (returnValue.stream().allMatch(x -> x)) {
                Utils.launch_actions_without_return(run_copy);
            } else {
                System.out.println("Something went wrong during the directories creation");
            }
        } else {
            System.out.println("Not all nodes are safe, please check connection");
        }

        // Map
        double start_map_time = System.currentTimeMillis();
        Deploy.deploy(hostnames,
                "jar/job.jar", "/tmp/root/");
        Utils.launch_actions_without_return(run_map);
        System.out.println("MAP FINISHED");
        double end_map_time = System.currentTimeMillis();
        double total_map_time = end_map_time - start_map_time;
        System.out.println(String.format("map time: %f", total_map_time/1000));

        // copy hostnames file
        Utils.launch_actions_without_return(copy_hostnames_file);
    }
}

__2. The SLAVE that prepares the shuffle phase.__

Modify the SLAVE so that it prepares for the shuffle phase by grouping the keys, calculating the “hash” for each of the keys and creating a file named <hash>-<hostname>.txt in the shuffles folder. As before, the shuffles folder must be created before you can write files to it. You must wait for the shuffles folder to be created before working with it. How do you wait for the shuffles folder to be successfully created before working with it?
The name of the file will be given as the args argument of the main:public static void main (String[] args)

Warning: if the <hash>-<hostname>.txt file already exists, do not overwrite it but rather continue writing in it.
This file name corresponds to the hash, obtained using the hash function of the String class, explanations here https://fr.wikipedia.org/wiki/Java_hashCode()#La_fonction_de_hachage_de_la_classe_java.lang.String , calculated from the key ; The name of the machine is obtained using the following java instruction: java.net.InetAddress.getLocalHost().getHostName()

For this, your SLAVE takes an operating mode as argument: 1 , which corresponds to the calculation of the hash , then a file name “UMx.txt” as input from the maps folder and calculates a file “<hash>-<hostname> .txt” output to the shuffles folder. Keep the previous mode: 0 for the map phase.

Test the operation of your SLAVE with the UM1.txt file containing
    - Car 1
    - Car 1
    -River 1
Test in a terminal the JAR as follows:
    
```sh
cd /tmp/<your username>/java -jar slave.jar 1 /tmp/<your username>/maps/UM1.txt
```
    
The following files must be created:

`/tmp/<your username>/shuffles/67508-c127-12.txt` created containing
- Car 1
- Car 1

`/tmp/<your username>/shuffles/78973420-c127-12.txt` created containing
-River 1

__3. The SLAVE that performs the shuffle phase.__

Modify the SLAVE to perform the shuffle phase.

To do this, for each key calculated during the map, in addition to creating the file in the shuffles folder, you must send this file to the shufflesreceived folder of one of the machines in the file `/tmp/<your username>/ machines.txt`

To know which machine to send it to, we use the hash calculated from the key (an integer value) and the number of machines in the file. It suffices to calculate the hash modulo the number of machine by considering that the first machine of the file has for number 0, the second 1 etc... The formula used to find the machine on which the shuffle will be sent will therefore be `numeroMachine = hash % nbMachines`

__Enable python3 kernel__

In [None]:
!cp data/hostnames.txt /tmp/root

__Enable the java kernel and recompile the classes (Slaves, Master, Deploy, Clean and Utils)__

Let's take the `Slave` class already implemented previously and change its input parameters. Implementations of part __2__ and __3__ are already developed.

In [None]:
String[] args = {"1"};

Slave.main(args);

worker-3
ssh root@worker-3 if test ! -d /tmp/root/shuffle; then mkdir -p /tmp/root/shuffle; fi
job.jar
maps
shuffle
splits
scp -r /tmp/root/shuffle/3079406_2_-1081267614.txt root@worker-3:/tmp/root/shuffle
worker-1
ssh root@worker-1 if test ! -d /tmp/root/shuffle; then mkdir -p /tmp/root/shuffle; fi
job.jar
shuffle
splits
scp -r /tmp/root/shuffle/3019824_0_-1081267614.txt root@worker-1:/tmp/root/shuffle
worker-1
ssh root@worker-1 if test ! -d /tmp/root/shuffle; then mkdir -p /tmp/root/shuffle; fi
job.jar
shuffle
splits
scp -r /tmp/root/shuffle/108526092_0_-1081267614.txt root@worker-1:/tmp/root/shuffle
worker-3
ssh root@worker-3 if test ! -d /tmp/root/shuffle; then mkdir -p /tmp/root/shuffle; fi
job.jar
maps
shuffle
splits
scp -r /tmp/root/shuffle/3079406_2_-1081267614.txt root@worker-3:/tmp/root/shuffle
worker-2
ssh root@worker-2 if test ! -d /tmp/root/shuffle; then mkdir -p /tmp/root/shuffle; fi
job.jar
maps
shuffle
splits
scp -r /tmp/root/shuffle/98260_1_-1081267614.txt root@worker-

__4. A MASTER which launches and waits for the end of the shuffle phase.__

Modify the MASTER so that it launches the shuffle phase once the map phase is finished, on several machines and displays “SHUFFLE FINISHED”. For this you will use the previously created file which contains the list of machines you want to use for your project.
To properly synchronize the MASTER with the SLAVES, make sure to display “SHUFFLE FINISHED” only once all the SLAVEs have finished and only when all the SLAVEs have finished, NOT BEFORE! The MASTER must therefore wait for SLAVEs to terminate correctly.
How do you make a process launched with ProcessBuilder in Java wait for the process to finish executing?
Does your MASTER program launch the SLAVEs sequentially (one after the other) or in parallel?

In [None]:
import java.util.*;

public class Master {
    public static void main(String[] args) throws InterruptedException {
        ArrayList<String> hostnames = Utils.read_file(args[0]);
        Set<String> slaves = new HashSet<>();
        List<String> files = Utils.list_directory(args[1]);
        ArrayList<String> health_checks = new ArrayList<>();
        ArrayList<String> create_dirs = new ArrayList<>();
        ArrayList<String> check_dir = new ArrayList<>();
        ArrayList<String> run_copy = new ArrayList<>();
        ArrayList<String> run_map = new ArrayList<>();
        ArrayList<String> run_shuffle = new ArrayList<>();
        ArrayList<String> run_reduce = new ArrayList<>();
        ArrayList<String> copy_hostnames_file = new ArrayList<>();
        Random rand = new Random();

        for (String file:files){
            assert hostnames != null;
            String slave = hostnames.get(rand.nextInt(hostnames.size()));
            slaves.add(slave);
            health_checks.add("ssh -o StrictHostKeyChecking=no root@" + slave + " hostname");
            create_dirs.add("ssh root@" + slave + " if test ! -d " + args[2] + "; then mkdir -p " + args[2] + "; fi");
            run_copy.add("scp -r " + file + " root@" + slave + ":" + args[2]);
            check_dir.add("ssh root@" + slave + " ls " + args[2]);
            run_map.add("ssh root@" + slave + " java -jar /tmp/root/job.jar 0 " + file);
            copy_hostnames_file.add("scp " + args[0] + " root@" + slave + ":/tmp/root");

        }
        for (String slave:slaves)
            run_shuffle.add("ssh root@" + slave + " java -jar /tmp/root/job.jar 1 ");

        assert hostnames != null;
        for (String slave:hostnames)
            run_reduce.add("ssh root@" + slave + " java -jar /tmp/root/job.jar 2 ");

        // Apply health checker
        List<Boolean> returnValue = Utils.launch_actions_with_return(health_checks);

        // Check all machine are alive and deploy jar
        boolean isNodesOk = returnValue.stream().allMatch(x -> x);
        if (isNodesOk) {
            Utils.launch_actions_without_return(create_dirs);

            // wait for directories creation and check the creation
            Thread.sleep(3000);
            returnValue = Utils.launch_actions_with_return(check_dir);

            // if all directories are created, deploy jar file
            if (returnValue.stream().allMatch(x -> x)) {
                Utils.launch_actions_without_return(run_copy);
            } else {
                System.out.println("Something went wrong during the directories creation");
            }
        } else {
            System.out.println("Not all nodes are safe, please check connection");
        }

        // Map
        double start_map_time = System.currentTimeMillis();
        Deploy.deploy(hostnames,
                "jar/job.jar", "/tmp/root/");
        Utils.launch_actions_without_return(run_map);
        System.out.println("MAP FINISHED");
        double end_map_time = System.currentTimeMillis();
        double total_map_time = end_map_time - start_map_time;
        System.out.println(String.format("map time: %f", total_map_time/1000));

        // shuffle
        double start_shuffle_time = System.currentTimeMillis();
        Utils.launch_actions_without_return(copy_hostnames_file);
        Utils.launch_actions_without_return(run_shuffle);
        System.out.println("SHUFFLE FINISHED");
        double end_shuffle_time = System.currentTimeMillis();
        double total_shuffle_time = end_shuffle_time - start_shuffle_time;
        System.out.println(String.format("shuffle time: %f", total_shuffle_time/1000));
    }
}

In [None]:
String[] args = {"data/hostnames.txt", "/tmp/root/splits", "/tmp/root/splits"};

Master.main(args);

worker-1
worker-1
worker-2
ssh root@worker-1 if test ! -d /tmp/root/splits; then mkdir -p /tmp/root/splits; fi
ssh root@worker-1 if test ! -d /tmp/root/splits; then mkdir -p /tmp/root/splits; fi
ssh root@worker-2 if test ! -d /tmp/root/splits; then mkdir -p /tmp/root/splits; fi
S0.txt
S1.txt
S0.txt
S0.txt
scp -r /tmp/root/splits/S0.txt root@worker-1:/tmp/root/splits
scp -r /tmp/root/splits/S1.txt root@worker-1:/tmp/root/splits
scp -r /tmp/root/splits/S2.txt root@worker-2:/tmp/root/splits
worker-1
worker-2
worker-3
ssh root@worker-1 if test ! -d /tmp/root/; then mkdir -p /tmp/root/; fi
ssh root@worker-2 if test ! -d /tmp/root/; then mkdir -p /tmp/root/; fi
ssh root@worker-3 if test ! -d /tmp/root/; then mkdir -p /tmp/root/; fi
job.jar
shuffle
splits
job.jar
maps
shuffle
splits
job.jar
maps
shuffle
splits
scp -r jar/job.jar root@worker-1:/tmp/root/
scp -r jar/job.jar root@worker-2:/tmp/root/
scp -r jar/job.jar root@worker-3:/tmp/root/
ssh root@worker-1 java -jar /tmp/root/job.jar 0 /tmp/

The files are now present on the machines, in the `/tmp/root/shuffle` folder. You can check by connecting via the command line of the jupyter notebook:

```sh
ssh root@worker-[1-2-3]
```

### <font color="red">Step 12</font> : REDUCE.

__1. The SLAVE that executes the reduce phase.

Modify the SLAVE so that it prepares the reduce phase by grouping the files with the same hash, calculating the reduce for each of the keys and creating a file with the name <hash>.txt in the reduces folder. As before, the reduces folder must be created before you can write files to it.

For this, your SLAVE takes an operating mode as an argument: 2 , which corresponds to the calculation of the reduce. Keep the previous modes: 0 for the map phase, 1 for the shuffle phase.

Test how your SLAVE works with the files in the `shuffle` folder. For example, result should be similar to the following:

`/tmp/root/shufflesreceived/67508-cxxx-12.txt` containing
- Car 1
- Car 1

`/tmp/root/shufflesreceived/67508-cxxx-13.txt` containing
- Car 1

`/tmp/root/shufflesreceived/78973420-cxxx-12.txt` containing
-River 1

The reduces folder should contain the following files:

`/tmp/root/reduces/67508.txt` containing
- Car 3

`/tmp/root/reduces/78973420.txt` containing
-River 1

Via the final version of the `Master` class below, I realize the reduce on the different nodes.

In [None]:
import java.util.*;

public class Master {
    public static void main(String[] args) throws InterruptedException {
        ArrayList<String> hostnames = Utils.read_file(args[0]);
        Set<String> slaves = new HashSet<>();
        List<String> files = Utils.list_directory(args[1]);
        ArrayList<String> health_checks = new ArrayList<>();
        ArrayList<String> create_dirs = new ArrayList<>();
        ArrayList<String> check_dir = new ArrayList<>();
        ArrayList<String> run_copy = new ArrayList<>();
        ArrayList<String> run_map = new ArrayList<>();
        ArrayList<String> run_shuffle = new ArrayList<>();
        ArrayList<String> run_reduce = new ArrayList<>();
        ArrayList<String> copy_hostnames_file = new ArrayList<>();
        Random rand = new Random();

        for (String file:files){
            assert hostnames != null;
            String slave = hostnames.get(rand.nextInt(hostnames.size()));
            slaves.add(slave);
            health_checks.add("ssh -o StrictHostKeyChecking=no root@" + slave + " hostname");
            create_dirs.add("ssh root@" + slave + " if test ! -d " + args[2] + "; then mkdir -p " + args[2] + "; fi");
            run_copy.add("scp -r " + file + " root@" + slave + ":" + args[2]);
            check_dir.add("ssh root@" + slave + " ls " + args[2]);
            run_map.add("ssh root@" + slave + " java -jar /tmp/root/job.jar 0 " + file);
            copy_hostnames_file.add("scp " + args[0] + " root@" + slave + ":/tmp/root");

        }
        for (String slave:slaves)
            run_shuffle.add("ssh root@" + slave + " java -jar /tmp/root/job.jar 1 ");

        assert hostnames != null;
        for (String slave:hostnames)
            run_reduce.add("ssh root@" + slave + " java -jar /tmp/root/job.jar 2 ");

        // Apply health checker
        List<Boolean> returnValue = Utils.launch_actions_with_return(health_checks);

        // Check all machine are alive and deploy jar
        boolean isNodesOk = returnValue.stream().allMatch(x -> x);
        if (isNodesOk) {
            Utils.launch_actions_without_return(create_dirs);

            // wait for directories creation and check the creation
            Thread.sleep(3000);
            returnValue = Utils.launch_actions_with_return(check_dir);

            // if all directories are created, deploy jar file
            if (returnValue.stream().allMatch(x -> x)) {
                Utils.launch_actions_without_return(run_copy);
            } else {
                System.out.println("Something went wrong during the directories creation");
            }
        } else {
            System.out.println("Not all nodes are safe, please check connection");
        }

        // Map
        double start_map_time = System.currentTimeMillis();
        Deploy.deploy(hostnames,
                "jar/job.jar", "/tmp/root/");
        Utils.launch_actions_without_return(run_map);
        System.out.println("MAP FINISHED");
        double end_map_time = System.currentTimeMillis();
        double total_map_time = end_map_time - start_map_time;
        System.out.println(String.format("map time: %f", total_map_time/1000));

        // shuffle
        double start_shuffle_time = System.currentTimeMillis();
        Utils.launch_actions_without_return(copy_hostnames_file);
        Utils.launch_actions_without_return(run_shuffle);
        System.out.println("SHUFFLE FINISHED");
        double end_shuffle_time = System.currentTimeMillis();
        double total_shuffle_time = end_shuffle_time - start_shuffle_time;
        System.out.println(String.format("shuffle time: %f", total_shuffle_time/1000));

        // reduce
        double start_reduce_time = System.currentTimeMillis();
        Utils.launch_actions_without_return(run_reduce);
        System.out.println("REDUCE FINISHED");
        double end_reduce_time = System.currentTimeMillis();
        double total_reduce_time = end_reduce_time - start_reduce_time;
        System.out.println(String.format("reduce time: %f", total_reduce_time/1000));
    }
}

In [None]:
String[] args = {"data/hostnames.txt", "/tmp/root/splits", "/tmp/root/splits"};

Master.main(args);

worker-1
worker-2
worker-2
ssh root@worker-1 if test ! -d /tmp/root/splits; then mkdir -p /tmp/root/splits; fi
ssh root@worker-2 if test ! -d /tmp/root/splits; then mkdir -p /tmp/root/splits; fi
ssh root@worker-2 if test ! -d /tmp/root/splits; then mkdir -p /tmp/root/splits; fi
scp -r /tmp/root/splits/S0.txt root@worker-1:/tmp/root/splits
scp -r /tmp/root/splits/S1.txt root@worker-2:/tmp/root/splits
scp -r /tmp/root/splits/S2.txt root@worker-2:/tmp/root/splits
worker-2
worker-1
worker-3
ssh root@worker-1 if test ! -d /tmp/root/; then mkdir -p /tmp/root/; fi
ssh root@worker-2 if test ! -d /tmp/root/; then mkdir -p /tmp/root/; fi
ssh root@worker-3 if test ! -d /tmp/root/; then mkdir -p /tmp/root/; fi
splits
splits
scp -r jar/job.jar root@worker-1:/tmp/root/
scp -r jar/job.jar root@worker-2:/tmp/root/
scp -r jar/job.jar root@worker-3:/tmp/root/
ssh root@worker-1 java -jar /tmp/root/job.jar 0 /tmp/root/splits/S0.txt
ssh root@worker-2 java -jar /tmp/root/job.jar 0 /tmp/root/splits/S1.txt
ss

Below is the calculation time for each step:

- map: 3.988000 seconds
- shuffle: 11.685000 seconds
- reduce: 0.27700 seconds

Of course, the shuffle part takes the most time. This is normal given all the network transfers necessary to send the keys to the correct nodes.

__Enable python3 kernel__

The cell below displays the files present in the reduce folder of each worker. The expected result is therefore:

- beer 2
-river 2
- because 3
- deer 2

In [None]:
!echo worker-1 && ssh root@worker-1 find /tmp/root/reduce -type f -exec cat {} +
!echo worker-2 && ssh root@worker-2 find /tmp/root/reduce -type f -exec cat {} +
!echo worker-3 && ssh root@worker-3 find /tmp/root/reduce -type f -exec cat {} +

worker-1
beer 2.0
river 2.0
worker-2
car 3.0
worker-3
deer 2.0
