# Tutorial 2: Hadoop and HDFS

In this tutorial, you will:

* Import a spam dataset into HDFS filesystem
* Run a MapReduce task using the Hadoop "Streaming" API (Python)
* Run a machine learning algorithm using Mahout in order to create a simple spam filter using the naive Bayes algorithm.

Remember:

* If you are not on the wired network, you will need to connect to the VPN
* You do not have to use Jupyter.  If you prefer, you can do everything in the Putty terminal.  However, if you do use Jupyter, you should bind the service to `0.0.0.0` on port `8888`, and add the token in as a password.

## Start a new terminal
In addition to using notebooks, other features of Jupyter include running a terminal.  On the main menu on the `Home` page, you can start a new terminal by clicking on `New` -> `Terminal`.  Do this now, so that you can run interactive bash commands.

## Check that Hadoop is running

The first thing to do is to check that we have Hadoop installed and running.  Open a terminal, and type in: `hadoop version`, which should show you that the version you have is Hadoop 2.7.4.  

## Importing data

The first thing we're going to do, is download some data.  This represents data which could be remote or in a datacentre somewhere.  Run the following code:

In [None]:
%%bash

# YOUR CODE HERE
wget https://archive.ics.uci.edu/ml/machine-learning-databases/00380/YouTube-Spam-Collection-v1.zip
unzip YouTube-Spam-Collection-v1.zip
ls -lh *.csv

## MapReduce

Having downloaded the data, we want to be able to run a Machine learning algorithm over it.  To do this, we will use the Hadoop Streaming API, which allows us to write Python code.  When we call Hadoop, we pass two Python files to the command - one which maps, and one which reduces.

Let's look at the data:

In [None]:
%%bash

head -n 10 Youtube04-Eminem.csv

## Word counting

The first thing we want to do is to set up a MapReduce function which will allow us to count the number of each individual word from the `comment` field.

In [None]:
# YOUR CODE HERE
#TODO: Get rid of code below
#!/usr/bin/env python
# MAPPER

import csv
import sys

input_text = open('Youtube04-Eminem.csv', 'r')
# When we move to the actual MapReduce job, we will need to read from STDIN
# input_text = sys.stdin

reader = csv.reader(input_text)
# Skip the column header
next(reader)
for row in reader:
    tokens = row[3].split(' ')
    for t in tokens:
        # print tab delimted here,
        # will be input for the reducer
        print('%s\t%d' % (t, 1))    
    

input_text.close()

In [None]:
# YOUR CODE HERE
#TODO: Get rid of code below
# REDUCER

import sys
# Keep simple example in for now, switch to stdin later
# input_text = ['+447935454150', 'lovely', 'girl', 'talk', 'to', 'me', 'xxx\ufeff']

input_text = [
    '+447935454150	1',
    'lovely	1',
    'girl	1',
    'talk	1',
    'to	1',
    'me	1',
    'xxx﻿	1',
    # Add an extra one to test that it works
    'to\t1'
]

# input_text = sys.stdin
words = {}

for line in input_text:
    word, count = line.split('\t', 1)
#     print('word: %s count: %s' % (word, count))
    
    # Convert count to an integer
    try:
        count = int(count)
    except ValueError:
        # We can safely ignore, so keep calm and carry on
        continue
        
        
    if word in words:
        words[word] += 1
    else:
        words[word] = 1
        
for w in words:
    print('%s\t%s' % (w, words[w]))
    


Does it work in principle?  We can test without Hadoop


In [None]:
%%bash
cat Youtube04-Eminem.csv | ./mapper.py | ./reducer.py

In [None]:
%%bash

# start-dfs.sh
# start-yarn.sh

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper.py,reducer.py \
-input Youtube04-Eminem.csv \
-mapper ./mapper.py \
-reducer ./reducer.py \
-output output \

## Setting up HDFS

There are a few more things you need to make Hadoop work nicely.  We are going to set up pseudo-distributed mode, which requires passwordless SSH to be set up.  To do this, we need to run the following commands:

    ssh-keygen -t rsa -P ''
    cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys
    ssh localhost

For the first command, leave all the options as default (press enter for each one to do this).  For the second, you are checking you are able to SSH into your own machine, localhost.  Normally this would be done to a different computer, but to SSH into localhost.  You will have output like the following text.  Type `yes`, because you do still want to connect.


    The authenticity of host 'localhost (127.0.0.1)' can't be established.
    ECDSA key fingerprint is 18:6e:42:bd:0c:8c:35:bc:d9:e8:3c:c6:a3:08:56:43.
    Are you sure you want to continue connecting (yes/no)? yes
    
Type `exit` from that internal shell, and then try and ssh to `0.0.0.0`.  We also need to make sure that this works for some of the tasks ahead.

    ssh 0.0.0.0
    
Now, we need to set up some configuration for the various parts of Hadoop.  Firstly, download the following file, and make sure it has permissions to be executed.  It's good practice to check what unknown files from the Internet are doing, so have a read of the code.

In [None]:
%%bash
rm hadoop-config*
wget https://raw.githubusercontent.com/huwf/data-science-vm/master/hadoop-config.sh
cat hadoop-config.sh
chmod 755 hadoop-config.sh

Open a terminal, and run the following command which will execute the file: **YOU WILL NEED TO RUN THIS AS SUDO**

    sudo ./hadoop-config.sh

We should now have HDFS configured for pseudo-distributed mode.  We will now need to create a disk for HDFS, which will use the configurations we just set:

    hdfs namenode -format

## Starting services

Now we need to start the different services and we can get to work!  Run the following command in the terminal to start DFS:

    start-dfs.sh

To see what this has left you with, you can see the processes which are running on the JVM by running the `jps` command:

In [None]:
%%bash 
jps

Now that we have a HDFS disk, and the appropriate Hadoop services running, we can start to import the data into the new HDFS filesystem and run the MapReduce task there.  This is allowing us to move from the single mode to pseudo-distributed mode.  Following these principles, we could apply MapReduce over various machines.

In order to achieve this, we need to: 

* Create a directory for the input
* Import the data from the local file to the HDFS datanode
* Run the MapReduce job
* View the output

The Commands on HDFS are similar to standard linux CLI commands, except for the fact that they are prefixed by either `hadoop fs` or `hdfs dfs`.

The `hadoop fs` command is more general, as it can cope with different types of filesystem, such as the one on the local disk.  As such, this is a better choice to use for commands relating solely to HDFS.

The command to create a directory is `-mkdir`.  Create a directory `/input` on the HDFS system.  Use the `hdfs dfs` command

In [None]:
%%bash
# YOUR CODE HERE
# hadoop fs -mkdir /input


Importing our data, we are dealing with two different filesystems: the local system and the HDFS node so we will use `hadoop fs`, with the `-copyFromLocal` command.  The next two arguments are file source and destination.

HDFS filesystems are defined by a URI prefixed by `hdfs://`, and the `hdfs dfs` and `hadoop fs` commands will normally expect to see them.

If they are not specified, the default location of the filesystem is specified in `core-site.xml`, which is one of the config files we imported earlier.  The value can be seen from the following command:

In [None]:
%%bash 
cat $HADOOP_HOME/etc/hadoop/core-site.xml

So, for the `copyFromLocal` we can either specify `hdfs://localhost:9000` or leave it out.  The local file can be specified with a relative command, leaving the import command as one of the following two.  Pick one and execute it in the cell below.

In [None]:
%%bash
# With fully specified URI
hadoop fs -copyFromLocal *.csv hdfs://localhost:9000/input
# Implied URI based on default
# hadoop fs -copyFromLocal *.csv /input


Perform the same for the `mapper.py` and `reducer.py` files we created for the MapReduce task earlier, keeping those in the `input` directory as well:

In [None]:
%%bash
# YOUR CODE HERE
hadoop fs -copyFromLocal mapper.py /input/mapper.py
hadoop fs -copyFromLocal reducer.py /input/reducer.py

Now, in the terminal, run the command again, this time over all files:

In [31]:
%%bash
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-input hdfs://localhost:9000/input/Youtube04-Eminem.csv \
-mapper ./mapper.py \
-reducer ./reducer.py \
-output hdfs://localhost:9000/output_8

17/11/30 11:23:01 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
17/11/30 11:23:01 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
17/11/30 11:23:01 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
17/11/30 11:23:02 INFO mapred.FileInputFormat: Total input paths to process : 2
17/11/30 11:23:02 INFO mapreduce.JobSubmitter: number of splits:2
17/11/30 11:23:02 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local197003866_0001
17/11/30 11:23:02 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
17/11/30 11:23:02 INFO mapred.LocalJobRunner: OutputCommitter set in config null
17/11/30 11:23:02 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
17/11/30 11:23:02 INFO mapreduce.Job: Running job: job_local197003866_0001
17/11/30 11:23:02 INFO output.FileOutputCommitter: File 

In [23]:
%%bash

hdfs dfs -ls /output_7
# hdfs dfs -cat output_7/_SUCCESS


Found 2 items
-rw-r--r--   1 user supergroup          0 2017-11-30 11:02 /output_7/_SUCCESS
-rw-r--r--   1 user supergroup      25580 2017-11-30 11:02 /output_7/part-00000


The `_SUCCCESS` file indicates that the job was a success, which is good.  The other file, `part-00000` contains the result.  Write code in the cell below to get the output (from HDFS)

In [34]:
%%bash
# YOUR CODE HERE

Alternatively, you can view this 

You can include multiple `-input` parameters to operate on more than one file.  Update the streaming command above to include all 5 files in the cell below.  Make sure you include a new output directory!



In [None]:
%%bash

# Update this command to include multiple files
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-input hdfs://localhost:9000/input/Youtube04-Eminem.csv \
-mapper ./mapper.py \
-reducer ./reducer.py \
-output hdfs://localhost:9000/output_8