#DATASCI W261: Machine Learning at Scale

#This notebook shows a Hadoop MapReduce job for counting paris of words.

##Creat four documents

In [1]:
%%writefile doc1.txt
A B C A C

Overwriting doc1.txt


In [2]:
%%writefile doc2.txt
D A C B E D E A

Overwriting doc2.txt


In [3]:
%%writefile doc3.txt
B A C E A

Overwriting doc3.txt


In [4]:
%%writefile doc4.txt
D D B A C

Overwriting doc4.txt


##Mapper

In [5]:
%%writefile mapper.py
#!/usr/bin/python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word1 in words:
        for word2 in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
            if word1!=word2:
                print '%s\t%s\t%s' % (word1, word2, 1)

Overwriting mapper.py


##Reducer

In [6]:
%%writefile reducer.py
#!/usr/bin/python
from operator import itemgetter
import sys

current_words = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word1, word2, count = line.split('\t')
    # convert count (currently a string) to int
    words = (word1, word2)
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_words == words:
        current_count += count
    else:
        if current_words:
            # write result to STDOUT
            print '%s\t%s' % (current_words, current_count)
        current_count = count
        current_words = words

# do not forget to output the last word if needed!
if current_words == words:
    print '%s\t%s' % (current_words, current_count)

Overwriting reducer.py


## Run the code in hadoop

###start yarn and hdfs

In [7]:
!/usr/local/Cellar/hadoop/2.6.0/sbin/start-yarn.sh
!/usr/local/Cellar/hadoop/2.6.0/sbin/start-dfs.sh

starting yarn daemons
starting resourcemanager, logging to /usr/local/Cellar/hadoop/2.6.0/libexec/logs/yarn-liang-resourcemanager-ldai.out
localhost: starting nodemanager, logging to /usr/local/Cellar/hadoop/2.6.0/libexec/logs/yarn-liang-nodemanager-ldai.out
15/08/21 08:20:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting namenodes on [localhost]
localhost: starting namenode, logging to /usr/local/Cellar/hadoop/2.6.0/libexec/logs/hadoop-liang-namenode-ldai.out
localhost: starting datanode, logging to /usr/local/Cellar/hadoop/2.6.0/libexec/logs/hadoop-liang-datanode-ldai.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /usr/local/Cellar/hadoop/2.6.0/libexec/logs/hadoop-liang-secondarynamenode-ldai.out
15/08/21 08:20:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


###create a new folder

In [10]:
!hdfs dfs -mkdir pairs

15/08/21 08:23:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


###upload files to hdfs

In [11]:
!hdfs dfs -put doc*.txt pairs

15/08/21 08:23:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Hadoop streaming command

<pre>
hadoop jar hadoopstreamingjarfile \
    -D stream.num.map.output.key.fields=n \
    -mapper mapperfile \
    -reducer reducerfile \
    -input inputfile \
    -output outputfile</pre>

**By default, stream.num.map.output.key.fields is 1, which means the data of the first column is the key. In pairs example, the first two columns are the key. so stream.num.map.output.key.fields should be 2.**

hadoop streaming jar file can be found in your hadoop folder or downloaded from
http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-streaming/2.6.0

In [12]:
!hadoop jar hadoop-*streaming*.jar -D stream.num.map.output.key.fields=2 -mapper mapper.py -reducer reducer.py -input pairs -output pairsoutput

15/08/21 08:23:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/08/21 08:23:15 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
15/08/21 08:23:15 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/08/21 08:23:15 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
15/08/21 08:23:15 INFO mapred.FileInputFormat: Total input paths to process : 4
15/08/21 08:23:15 INFO mapreduce.JobSubmitter: number of splits:4
15/08/21 08:23:16 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local705544776_0001
15/08/21 08:23:16 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
15/08/21 08:23:16 INFO mapred.LocalJobRunner: OutputCommitter set in config null
15/08/21 08:23:16 INFO mapreduce.Job: Running job: job_local705544776_0001
15/08/21 08:23:16 

###show the results

In [13]:
!hdfs dfs -cat pairsoutput/part-00000

15/08/21 08:23:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
('A', 'B')	7
('A', 'C')	9
('A', 'D')	6
('A', 'E')	6
('B', 'A')	7
('B', 'C')	5
('B', 'D')	4
('B', 'E')	3
('C', 'A')	9
('C', 'B')	5
('C', 'D')	4
('C', 'E')	3
('D', 'A')	6
('D', 'B')	4
('D', 'C')	4
('D', 'E')	4
('E', 'A')	6
('E', 'B')	3
('E', 'C')	3
('E', 'D')	4


###Remove the folder created

In [14]:
!hdfs dfs -rm -r pairsoutput
!hdfs dfs -rm -r pairs

15/08/21 08:23:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/08/21 08:23:27 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted pairsoutput
15/08/21 08:23:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/08/21 08:23:30 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted pairs


###stop yarn and hdfs

In [15]:
!/usr/local/Cellar/hadoop/2.6.0/sbin/stop-yarn.sh
!/usr/local/Cellar/hadoop/2.6.0/sbin/stop-dfs.sh

stopping yarn daemons
stopping resourcemanager
localhost: stopping nodemanager
no proxyserver to stop
15/08/21 08:23:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Stopping namenodes on [localhost]
localhost: stopping namenode
localhost: stopping datanode
Stopping secondary namenodes [0.0.0.0]
0.0.0.0: stopping secondarynamenode
15/08/21 08:24:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
