# Sample code to demostrate the following on Hadoop Stream
- Word count
- Combiners
- Sorts, Secondary sorts, total sort (under construction)
- Custom partitions

### Written by James G. Shanahan
MIDS w261 Machine Learning at Scale
September 15, 2014

### Data used
- Selfcontained datesets generated using code
- Gutenberg dataset available http://www.gutenberg.org/cache/epub/48054/pg48054.txt


In [1]:
%pwd

u'C:\\Anaconda2\\Scripts'

In [2]:
mkdir WordCount

# Get Gutenberg dataset 

In [3]:
#run any unix command
!curl http://www.gutenberg.org/cache/epub/48054/pg48054.txt > WordCount/historical_tours.txt

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1318  100  1318    0     0   4013      0 --:--:-- --:--:-- --:--:--  5491


# Wordcount Example in map reduce

See the following for a detailed presentation of the word count example
-     http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
- See this notebook for more nuances and more intricate extensions

In [20]:
%%writefile WordCount/mapper.py
#!/usr/bin/env python

import sys
#sys.stderr.write("reporter:counter:Tokens,Total,1") # NOTE missing the carriage return so wont work
sys.stderr.write("reporter:counter:Mapper Counters,Calls,1\n")
sys.stderr.write("reporter:status:processing my message...how are you\n")

long_len = 0

for line in sys.stdin:
    for word in line.split():
        if len(word) >= 5:
            long_len = 1
        else:
            long_len = 0
        print '%s\t%s\t%s\t%s' % (word, 1, len(word), long_len)
        if word == "debt":
            sys.stderr.write("reporter:counter:EDA Counters,Calls,1\n")          


Overwriting WordCount/mapper.py


In [25]:
%%writefile WordCount/reducer.py
#!/usr/bin/env python

import sys

cur_key = None
cur_count = 0
cur_len = 0
cur_long = 0
sys.stderr.write("reporter:counter:Reducer Counters,Calls,1\n")
for line in sys.stdin:

    key, value, lgth, lgth_size = line.split()

    if key == cur_key:
        cur_count += int(value)
    else:
        if cur_key:
            print '%s\t%s\t%s\t%s' % (cur_key, cur_count, cur_len, cur_long)
        cur_key = key
        cur_count = int(value)
        cur_len = lgth
        cur_long = lgth_size

print '%s\t%s\t%s\t%s' % (cur_key, cur_count, cur_len, cur_long)


Overwriting WordCount/reducer.py


## Do a local test of  your mapper.py and reducer on Unix command:

In [26]:
!chmod a+x WordCount/mapper.py 
!chmod a+x WordCount/reducer.py 

In [27]:
!echo "foo foo quux sample labs foo bar quux sample" | WordCount/mapper.py 

reporter:counter:Mapper Counters,Calls,1
reporter:status:processing my message...how are you
foo	1	3	0
foo	1	3	0
quux	1	4	0
sample	1	6	1
labs	1	4	0
foo	1	3	0
bar	1	3	0
quux	1	4	0
sample	1	6	1


In [29]:
!echo "foo foo quux sample labs foo bar quux sample" | WordCount/mapper.py | sort -k1,1 | WordCount/reducer.py| sort -k2,2nr 

reporter:counter:Mapper Counters,Calls,1
reporter:counter:Reducer Counters,Calls,1
reporter:status:processing my message...how are you
foo	3	3	0
quux	2	4	0
sample	2	6	1
bar	1	3	0
labs	1	4	0


## Is the  Hadoop Cluster HDFS available?

In [10]:
!hdfs dfs

Usage: hadoop fs [generic options]
	[-appendToFile <localsrc> ... <dst>]
	[-cat [-ignoreCrc] <src> ...]
	[-checksum <src> ...]
	[-chgrp [-R] GROUP PATH...]
	[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
	[-chown [-R] [OWNER][:[GROUP]] PATH...]
	[-copyFromLocal [-f] [-p] [-l] <localsrc> ... <dst>]
	[-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
	[-count [-q] [-h] <path> ...]
	[-cp [-f] [-p | -p[topax]] <src> ... <dst>]
	[-createSnapshot <snapshotDir> [<snapshotName>]]
	[-deleteSnapshot <snapshotDir> <snapshotName>]
	[-df [-h] [<path> ...]]
	[-du [-s] [-h] <path> ...]
	[-expunge]
	[-find <path> ... <expression> ...]
	[-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
	[-getfacl [-R] <path>]
	[-getfattr [-R] {-n name | -d} [-e en] <path>]
	[-getmerge [-nl] <src> <localdst>]
	[-help [cmd ...]]
	[-ls [-d] [-h] [-R] [<path> ...]]
	[-mkdir [-p] <path> ...]
	[-moveFromLocal <localsrc> ... <dst>]
	[-moveToLocal <src> <localdst>]
	[-mv <src> ... <dst>]
	[-put [-f] [-p] [-l] 

## Pay attention to the order in which you specify the opts for hdfs commands

Generic options supported are
- -conf <configuration file>     specify an application configuration file
- -D <property=value>            use value for given property
- -fs <local|namenode:port>      specify a namenode
- -jt <local|resourcemanager:port>    specify a ResourceManager
- -files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
- -libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
- -archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.

The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]


In [11]:
!hdfs dfs -ls

Found 14 items
drwxr-xr-x   - hadoop hadoop          0 2016-06-04 18:07 HW2_2
drwxr-xr-x   - hadoop hadoop          0 2016-06-07 03:15 HW2_3
drwxr-xr-x   - hadoop hadoop          0 2016-06-03 02:19 Sort
drwxr-xr-x   - hadoop hadoop          0 2016-06-13 18:19 WordCount
-rw-r--r--   1 hadoop hadoop     204678 2016-06-04 18:06 enronemail_1h.txt
drwxr-xr-x   - hadoop hadoop          0 2016-06-14 01:59 gutenberg-wordCount
-rw-r--r--   1 hadoop hadoop       1318 2016-06-14 01:58 historical_tours.txt
drwxr-xr-x   - hadoop hadoop          0 2016-06-12 16:11 integers-sorted
-rw-r--r--   1 hadoop hadoop      98794 2016-06-12 02:10 integers.txt
-rw-r--r--   1 hadoop hadoop        110 2016-06-14 01:59 ipAddresses.txt
-rw-r--r--   1 hadoop hadoop         56 2016-06-14 01:58 testWordCountInput.txt
drwxr-xr-x   - hadoop hadoop          0 2016-06-10 22:42 tmp
drwxr-xr-x   - hadoop hadoop          0 2016-06-14 01:58 wordcount-output
drwxr-xr-x   - hadoop hadoop          0 2016-06-14 02:0

# Small test for Word Count (one input file)

In [12]:
%%writefile testWordCountInput.txt
hello this is Jimi
jimi who Jimi three Jimi 
Hello
hello

Writing testWordCountInput.txt


In [13]:
!hdfs dfs -rm testWordCountInput.txt 
!hdfs dfs -copyFromLocal testWordCountInput.txt 
!hdfs dfs -rm -r wordcount-output
#usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib
#dataDir = "/Users/jshanahan/Dropbox/lectures-uc-berkeley-ml-class-2015/Notebooks/WordCount"

!hadoop jar /usr/lib/hadoop/hadoop-streaming-2.7.2-amzn-1.jar \
   -files WordCount/mapper.py,WordCount/reducer.py \
   -mapper mapper.py \
   -reducer reducer.py \
   -combiner reducer.py \
   -input testWordCountInput.txt \
   -output wordcount-output  \
   -numReduceTasks 3
   #--D mapreduce.job.reduces=2  depecated
#-input historical_tours.txt  file on Hadoop


#output directory on Hadoop 

16/06/14 02:09:30 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted testWordCountInput.txt
16/06/14 02:09:36 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted wordcount-output
packageJobJar: [] [/usr/lib/hadoop/hadoop-streaming-2.7.2-amzn-1.jar] /tmp/streamjob8127314882734979367.jar tmpDir=null
16/06/14 02:09:39 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-7-251.us-west-1.compute.internal/172.31.7.251:8032
16/06/14 02:09:39 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-7-251.us-west-1.compute.internal/172.31.7.251:8032
16/06/14 02:09:40 INFO metrics.MetricsSaver: MetricsConfigRecord disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec: 60 disableClusterEngine: true maxMemoryMb: 3072 maxInstanceCount: 500 lastModified: 1464726748890 
16/06/14 02:09:40 INFO metrics.MetricsSaver: 

In [14]:
#have a look at the input
!echo  "\n---------------------------\n"
!hdfs dfs -cat testWordCountInput.txt
!echo  "\n---------------------------\n"
# Wordcount output
!hdfs dfs -cat wordcount-output/part-0000*

\n---------------------------\n
hello this is Jimi
jimi who Jimi three Jimi 
Hello
hello\n---------------------------\n
Hello	1
jimi	1
this	1
three	1
Jimi	3
hello	2
is	1
who	1


## Sort the words in descreasing order of frequency, but break ties with alphabetical sorting

In [86]:
## Sort the wordcount out in descreasing order of counts and increasing order of tokens
I.e., "-k2,2nr -k1,1"

SyntaxError: invalid syntax (<ipython-input-86-2cf3c4dec7bc>, line 2)

In [87]:
# This call to Hadoop does NOT work as anticpated
# IdentityMapper, IdentityReducer do not trigger the Hadoop framework to sort properly

!hdfs dfs -rm -r wordcount-output-sorted
!hadoop jar /usr/lib/hadoop/hadoop-streaming-2.7.2-amzn-1.jar \
    -D mapreduce.job.output.key.comparator.class=\
org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
    -D mapreduce.partition.keycomparator.options="-k2,2nr -k1,1" \
   -input wordcount-output \
   -output wordcount-output-sorted  \
   -numReduceTasks 1 \
   -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
   -reducer org.apache.hadoop.mapred.lib.IdentityReducer \

#DOES not work in streaming mode
#   -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
#  -reducer org.apache.hadoop.mapred.lib.IdentityReducer \


16/06/12 17:50:27 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted wordcount-output-sorted
packageJobJar: [] [/usr/lib/hadoop/hadoop-streaming-2.7.2-amzn-1.jar] /tmp/streamjob5263851993333263738.jar tmpDir=null
16/06/12 17:50:30 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-7-251.us-west-1.compute.internal/172.31.7.251:8032
16/06/12 17:50:30 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-7-251.us-west-1.compute.internal/172.31.7.251:8032
16/06/12 17:50:31 INFO metrics.MetricsSaver: MetricsConfigRecord disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec: 60 disableClusterEngine: true maxMemoryMb: 3072 maxInstanceCount: 500 lastModified: 1464726748890 
16/06/12 17:50:31 INFO metrics.MetricsSaver: Created MetricsSaver j-ZAC3GQDMC0E6:i-610a91d4:RunJar:20372 period:60 /mnt/var/em/raw/i-610a91d4_20160612_RunJar_20372_raw.bin
16/06/12 17:50:31 INFO lzo.GPLNati

In [39]:
!hdfs dfs -cat wordcount-output-sorted/part-00000| head -10

8	is	1
0	hello	2
22	three	1
0	Hello	1
8	jimi	1
15	this	1
0	Jimi	3
13	who	1


In [40]:
#Does NOT work 
!hdfs dfs -ls wordcount-output-sorted/*
!rm -r wordcount-output-sorted
!hdfs dfs -copyToLocal wordcount-output-sorted 

!sort -k2,2nr <wordcount-output-sorted/part-00000 >wordcount-output-sorted/part-00000.SORTED.txt
!head -n 100 wordcount-output-sorted/part-00000.SORTED.txt


-rw-r--r--   1 hadoop hadoop          0 2016-06-01 02:07 wordcount-output-sorted/_SUCCESS
-rw-r--r--   1 hadoop hadoop         75 2016-06-01 02:07 wordcount-output-sorted/part-00000
rm: cannot remove ‘wordcount-output-sorted’: No such file or directory
0	Hello	1
0	hello	2
0	Jimi	3
13	who	1
15	this	1
22	three	1
8	is	1
8	jimi	1


## identityMapper.py

# Word count for Gutenberg

In [43]:
!hdfs dfs -rm historical_tours.txt 
!hdfs dfs -copyFromLocal WordCount/historical_tours.txt 
!hdfs dfs -ls
!hdfs dfs -rm testWordCountInput.txt 
!hdfs dfs -copyFromLocal testWordCountInput.txt 
!hdfs dfs -rm -r gutenberg-wordCount
#usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib
dataDir = "/Users/jshanahan/Dropbox/lectures-uc-berkeley-ml-class-2015/Notebooks/WordCount"

!hadoop jar /usr/lib/hadoop/hadoop-streaming-2.7.2-amzn-1.jar \
   -mapper WordCount/mapper.py \
   -reducer WordCount/reducer.py \
   -combiner WordCount/reducer.py \
   -input testWordCountInput.txt \
   -output gutenberg-wordCount  \
   -numReduceTasks 3


16/06/04 15:43:18 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted historical_tours.txt
Found 8 items
drwxr-xr-x   - hadoop hadoop          0 2016-06-03 02:41 Enron_email_reading_output
drwxr-xr-x   - hadoop hadoop          0 2016-06-03 02:19 Sort
-rw-r--r--   1 hadoop hadoop     204678 2016-06-03 02:41 enronemail_1h.txt
drwxr-xr-x   - hadoop hadoop          0 2016-06-04 15:42 gutenberg-wordCount
-rw-r--r--   1 hadoop hadoop       1318 2016-06-04 15:43 historical_tours.txt
-rw-r--r--   1 hadoop hadoop         56 2016-06-04 15:41 testWordCountInput.txt
drwxr-xr-x   - hadoop hadoop          0 2016-06-04 15:39 wordcount-output
drwxr-xr-x   - hadoop hadoop          0 2016-06-01 02:07 wordcount-output-sorted
16/06/04 15:43:27 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted testWordCountInput.txt
16/06/04 15:43:33 INFO fs.TrashPolicyDefa

In [364]:
#Sort the words in descreasing order of frequency, but break ties with alphabetical sorting
!hdfs dfs -rm -r gutenberg-output-sorted

!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
    -D mapreduce.job.output.key.comparator.class=\
org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
    -D mapreduce.partition.keycomparator.options="-k2,2nr -k1,1" \
   -mapper identityMapper.py \
   -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
   -input gutenberg-output \
   -output gutenberg-output-sorted  \
   -numReduceTasks 1 \

16/02/02 10:50:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 10:50:52 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted gutenberg-output-sorted
16/02/02 10:50:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 10:50:54 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/02 10:50:54 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/02/02 10:50:54 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/02/02 10:50:54 INFO mapred.FileInputFormat: Total input paths to process : 4
16/02/02 10:50:54 INFO mapreduce.JobSubmitter: number of splits:4
16/02/02 10:50:54 INFO mapreduce.JobSubmit

In [366]:
!hdfs dfs -ls "gutenberg-output-sorted/part-*"
!rm -r gutenberg-output-sorted
!hdfs dfs -copyToLocal "gutenberg-output-sorted" 
!head -n 20 gutenberg-output-sorted/part-00000


16/02/02 10:51:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-rw-r--r--   1 jshanahan supergroup      36548 2016-02-02 10:50 gutenberg-output-sorted/part-00000
16/02/02 10:51:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
States,	6
Starting	3
Start:	1
Start	1
Students	1
Station,	2
Standish	2
Square;	1
Swing	2
Sunday.	1
Sumner	2
Sullivan	1
Standish's	1
Tavern"	1
Shows	1
Sheraton	1
Sewall"	1
Sewall	1
Simmons	2
Sidney	2


In [362]:
!head -n 10 gutenberg-output-sorted/part-00000

States,	6
Starting	3
Start:	1
Start	1
Students	1
Station,	2
Standish	2
Square;	1
Swing	2
Sunday.	1


In [231]:
!hdfs dfs -tail  gutenberg-output/part-00000 |tail -n 10

16/02/02 08:40:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
windows.	1
winter.	1
wish	2
within	6
words	1
work.	5
would	2
wrote	3
your	52
yourself	1


# IP Address secondary sort and paritioning

In [322]:
%%writefile ipAddresses.txt
11.12.1.2
11.14.2.3
11.11.4.1
11.11.3.1
11.11.6.1
11.11.7.1
11.12.1.1
11.14.2.2
11.12.2.5222
11.9999999.2.5222


Overwriting ipAddresses.txt


In [307]:
!hdfs dfs -rm ipAddresses.txt 
!hdfs dfs -copyFromLocal ipAddresses.txt 
!hdfs dfs -rm -r myOutputDirForIPAddresses 
!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
    -D stream.map.output.field.separator=. \
    -D stream.num.map.output.key.fields=4 \
    -D mapreduce.job.output.key.comparator.class=\
org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
    -D mapreduce.partition.keycomparator.options="-k1,1n -k2,2nr" \
    -input ipAddresses.txt \
    -output myOutputDirForIPAddresses \
    -mapper /bin/cat \
    -reducer /bin/cat

16/02/02 10:06:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 10:06:55 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted ipAddresses.txt
16/02/02 10:06:56 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 10:06:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 10:06:58 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted myOutputDirForIPAddresses
16/02/02 10:06:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 10:06:59 INFO Configuration.deprecation: session.id is deprecated. 

## Custom partitioner  and sort works locally and on AWS EM Cluster

In [325]:
#Custom partitioner works 
# partition based on first parts
#sort numerically decreasing on the third part
!hdfs dfs -rm ipAddresses.txt 
!hdfs dfs -copyFromLocal ipAddresses.txt 
!hdfs dfs -rm -r myOutputDirForIPAddresses 
!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
  -D stream.num.map.output.key.fields=4 \
  -D map.output.key.field.separator=. \
  -D mapreduce.partition.keypartitioner.options=-k1,2 \
  -D mapreduce.job.output.key.comparator.class=\
org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k3,3nr" \
  -D mapreduce.job.reduces=3 \
  -input ipAddresses.txt \
  -output myOutputDirForIPAddresses \
  -mapper /bin/cat \
  -reducer /bin/cat \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

16/02/02 10:23:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 10:23:44 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted ipAddresses.txt
16/02/02 10:23:45 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 10:23:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 10:23:47 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted myOutputDirForIPAddresses
16/02/02 10:23:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 10:23:48 INFO Configuration.deprecation: session.id is deprecated. 

In [326]:
!hdfs dfs -cat myOutputDirForIPAddresses/part-00000
!echo "=========================="
!hdfs dfs -cat myOutputDirForIPAddresses/part-00001
!echo "=========================="
!hdfs dfs -cat myOutputDirForIPAddresses/part-00002

16/02/02 10:23:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
11.12.2.5222	
11.12.1.1	
11.12.1.2	
16/02/02 10:23:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 10:23:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
11.11.7.1	
11.11.6.1	
11.11.4.1	
11.11.3.1	
11.9999999.2.5222	
11.14.2.2	
11.14.2.3	


# Run a job on AWS by launch an EMR Cluster
** custom partition and sort works**

In [None]:
ls /usr/lib/hadoop/hadoop-streaming-2.7.2-amzn-1.jar

In [None]:
#Run a job on AWS by launch an EMR Cluster
#remote login to cluster

#on AWS generate sample data file called ipAddresses.txt
echo -e "11.12.1.2" > ipAddresses.txt
echo -e "11.14.2.3" >> ipAddresses.txt
echo -e "11.11.4.1" >> ipAddresses.txt
echo -e "11.11.7.1" >> ipAddresses.txt
echo -e "11.11.6.1" >> ipAddresses.txt
echo -e "11.11.5.1" >> ipAddresses.txt
echo -e "11.11.4.1" >> ipAddresses.txt
echo -e "11.11.9.1" >> ipAddresses.txt
echo -e "11.12.9.1" >> ipAddresses.txt
echo -e "11.14.2.2" >> ipAddresses.txt
echo -e "11.12.2.5222" >> ipAddresses.txt
echo -e "11.9999999.2.5222" >> ipAddresses.txt


#NOTE Hadoop is already running!
#
hdfs dfs -rm ipAddresses.txt 
hdfs dfs -copyFromLocal ipAddresses.txt 
hdfs dfs -ls
#no need to start hadoop on an EMR cluster. It is starts at cluster launch time
# check by typing hdfs dfs -l
hdfs dfs -rm -r myOutputDirForIPAddresses

hadoop jar /usr/lib/hadoop/hadoop-streaming-2.7.1-amzn-0.jar \
  -D stream.num.map.output.key.fields=4 \
  -D map.output.key.field.separator=. \
  -D mapreduce.partition.keypartitioner.options=-k1,2 \
  -D mapreduce.job.output.key.comparator.class=\
org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k3,3nr" \
  -D mapreduce.job.reduces=3 \
  -input ipAddresses.txt \
  -output myOutputDirForIPAddresses \
  -mapper /bin/cat \
  -reducer /bin/cat \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

hdfs dfs -cat myOutputDirForIPAddresses/*
    # This job failed as it did partition the records correctly (i.e., by column 1 and 2)
# as required by 
#         mapred.text.key.partitioner.options=-k1,2 
[hadoop@ip-172-31-15-64 ~]$ hdfs dfs -cat myOutputDirForIPAddresses/*
11.12.9.1	
11.12.2.5222	
11.12.1.2	
11.11.9.1	
11.11.7.1	
11.11.6.1	
11.11.5.1	
11.11.4.1	
11.11.4.1	
11.14.2.3	
11.14.2.2	
11.9999999.2.5222

In [241]:
scp ipAddresses.txt
!sort -d"." -k1,1 -k2,2 <ipAddresses.txt

SyntaxError: invalid syntax (<ipython-input-241-02d4bae2e0e3>, line 1)

In [243]:
!cat ipAddresses.txt

11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2
11.12.2.5222
11.9999999.2.5222

In [242]:
!hdfs dfs -rm ipAddresses.txt 
!hdfs dfs -copyFromLocal ipAddresses.txt 
!hdfs dfs -ls

16/02/02 08:47:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 08:47:50 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted ipAddresses.txt
16/02/02 08:47:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 08:47:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 15 items
-rw-r--r--   1 jshanahan supergroup     888190 2016-02-01 08:32 1901
-rw-r--r--   1 jshanahan supergroup     888978 2016-02-01 08:43 1902
drwxr-xr-x   - jshanahan supergroup          0 2016-02-01 09:21 SecondarySort
drwxr-xr-x   - jshanahan supergroup          0 2016-02-01 16:57 gutenberg-output
drwxr-xr-x   - jshanahan supergroup          0 2016-02-02 08:45 gutenberg-output-so

In [66]:
%%writefile ipAddressMapper.py
#!/usr/bin/python
import sys
import datetime

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # parse the input we got from mapper.py
    f1, f2, f3, f4 = line.split('.')
    # get date from unix time
    print '%s\t%s\t%s\t%s mapper' % (f1, f2, f3, f4)

Overwriting ipAddressMapper.py


In [67]:
%%writefile ipAddressReducer.py
#!/usr/bin/python
import sys
import datetime

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # parse the input we got from mapper.py
    f1, f2, f3, f4 = line.split('\t')
    # get date from unix time
    print '%s.%s.%s.%s jimi' % (f1, f2, f3, f4)

Overwriting ipAddressReducer.py


In [31]:
!cat ipAddresses.txt| python ipAddressReducer.py

11.12.1.2 jimi
11.14.2.3 jimi
11.11.4.1 jimi
11.12.1.1 jimi
11.14.2.2 jimi
11.12.2.5222 jimi
11.9999999.2.5222 jimi


## Word count with a Custom partitioner step 
**To enable total sorting of the word count frequency when multiple reducers are used**

In [250]:
%%writefile WordCount/mapperWithPartitionTable.py
#!/usr/bin/env python

import sys
#sys.stderr.write("reporter:counter:Tokens,Total,1") # NOTE missing the carriage return so wont work
sys.stderr.write("reporter:counter:Mapper Counters,Calls,1\n")
sys.stderr.write("reporter:status:processing my message...how are you\n")

group1 = "abcdefghijklm"
group2 = "nopqrstuvwxyz"

for line in sys.stdin:
    for word in line.split():
        firstChar = word[0].lower()
        if firstChar in group1:
            print "group1\t%s\t%d" %(word, 1)
        elif firstChar in group2:
            print "group2\t%s\t%d" %(word, 1)
        else:
            print "group3\t%s\t%d" %(word, 1)
 

Overwriting WordCount/mapperWithPartitionTable.py


In [254]:
#partition the word count records
!echo "foo foo quux labs foo #funky st#ff bar quux" | python WordCount/mapperWithPartitionTable.py | sort -k1,1 -k2,2 

reporter:counter:Mapper Counters,Calls,1
reporter:status:processing my message...how are you
group1	bar	1
group1	foo	1
group1	foo	1
group1	foo	1
group1	labs	1
group2	quux	1
group2	quux	1
group2	st#ff	1
group3	#funky	1


In [259]:
%%writefile WordCount/reducerWithPartitionKey.py
#!/usr/bin/env python

import sys

cur_key = None
cur_count = 0
sys.stderr.write("reporter:counter:Reducer Counters,Calls,1\n")
for line in sys.stdin:
    group, key, value = line.split()   #one minor modification to process the parition key. I.e., drop it
    if key == cur_key:
        cur_count += int(value)
    else:
        if cur_key:
            print '%s\t%s' % (cur_key, cur_count)
        cur_key = key
        cur_count = int(value)

print '%s\t%s' % (cur_key, cur_count)

Overwriting WordCount/reducerWithPartitionKey.py


In [263]:
!echo "foo foo quux labs foo #funky st#ff bar quux" | python WordCount/mapperWithPartitionTable.py | \
 sort -k1,1 -k2,2 | \
 python WordCount/reducerWithPartitionKey.py |\
 sort -k2,2nr -k1,1

reporter:counter:Mapper Counters,Calls,1
reporter:counter:Reducer Counters,Calls,1
reporter:status:processing my message...how are you
foo	3
quux	2
#funky	1
bar	1
labs	1
st#ff	1


In [76]:
#Partition into 3 reducers (the first 2 fields are used as keys for partition)
#Sorting within each partition for the reducer(all 4 fields used for sorting)
#In the above example, "-D stream.map.output.field.separator=." specifies "." as the field separator 
#for the map outputs, and the prefix up to the fourth "." in a line will be the key and the rest of 
#the line (excluding the fourth ".") will be the value. If a line has less than four "."s, then the 
#whole line will be the key and the value will be an empty Text object (like the one created by new Text("")).

#Similarly, you can use "-D stream.reduce.output.field.separator=SEP" and "-D stream.num.reduce.output.fields=NUM"
#to specify the nth field separator in a line of the reduce outputs as the separator between the key and the value.

#Similarly, you can specify "stream.map.input.field.separator" and "stream.reduce.input.field.separator" as 
#the input separator for Map/Reduce inputs. By default the separator is the tab character.

#record 11.12.1.2 is treated as a key only with no value

!hdfs dfs -rm -r myOutputDirForIPAddresses 
!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
    -D stream.num.map.output.key.fields=4 \
    -D mapreduce.partition.keypartitioner.options=-k1,2 \
    -input ipAddresses.txt \
    -output myOutputDirForIPAddresses \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
    -numReduceTasks 3 \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner     
 

16/02/01 12:53:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 12:53:46 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted myOutputDirForIPAddresses
16/02/01 12:53:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 12:53:48 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/01 12:53:48 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/02/01 12:53:48 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/02/01 12:53:49 INFO mapred.FileInputFormat: Total input paths to process : 1
16/02/01 12:53:49 INFO mapreduce.JobSubmitter: number of splits:1
16/02/01 12:53:49 INFO mapreduce.JobSubm

In [82]:
!hdfs dfs -rm -r myOutputDirForIPAddresses 
!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
    -D stream.num.map.output.key.fields=4 \
    -D mapred.text.key.partitioner.options=-k1,2 \
    -input ipAddresses.txt \
    -output myOutputDirForIPAddresses \
    -mapper ipAddressMapper.py \
    -reducer ipAddressReducer.py \
    -numReduceTasks 5 \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 

16/02/01 12:56:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 12:56:59 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted myOutputDirForIPAddresses
16/02/01 12:57:00 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 12:57:00 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/01 12:57:00 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/02/01 12:57:00 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/02/01 12:57:00 INFO mapred.FileInputFormat: Total input paths to process : 1
16/02/01 12:57:00 INFO mapreduce.JobSubmitter: number of splits:1
16/02/01 12:57:01 INFO Configuration.dep

In [None]:
#Sorting within each partition for the reducer(all 4 fields used for sorting)
# desired output 
11.11.4.1
-----------
11.12.1.1
11.12.1.2
-----------
11.14.2.2
11.14.2.3
11.14.2.5222

In [83]:
!hdfs dfs -ls myOutputDirForIPAddresses
!hdfs dfs -cat myOutputDirForIPAddresses/part-00000
!echo "----" 
!hdfs dfs -cat myOutputDirForIPAddresses/part-00001
!echo "----" 
!hdfs dfs -cat myOutputDirForIPAddresses/part-00002

16/02/01 12:57:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 6 items
-rw-r--r--   1 jshanahan supergroup          0 2016-02-01 12:57 myOutputDirForIPAddresses/_SUCCESS
-rw-r--r--   1 jshanahan supergroup         23 2016-02-01 12:57 myOutputDirForIPAddresses/part-00000
-rw-r--r--   1 jshanahan supergroup         72 2016-02-01 12:57 myOutputDirForIPAddresses/part-00001
-rw-r--r--   1 jshanahan supergroup         31 2016-02-01 12:57 myOutputDirForIPAddresses/part-00002
-rw-r--r--   1 jshanahan supergroup         46 2016-02-01 12:57 myOutputDirForIPAddresses/part-00003
-rw-r--r--   1 jshanahan supergroup          0 2016-02-01 12:57 myOutputDirForIPAddresses/part-00004
16/02/01 12:57:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
11.11.4.1 mapper jimi	
----
16/02/01 12:57:07 WARN util.NativeCodeLoader: Unable to lo

In [95]:
!hdfs dfs -rm -r myOutputDirForIPAddresses 
!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
 -D stream.map.output.field.separator=. \
    -D stream.num.map.output.key.fields=4 \
    -D map.output.key.field.separator=. \
    -D mapreduce.job.output.key.comparator.class=\
org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
    -D mapreduce.partition.keycomparator.options="-k1n -k2nr" \
    -input ipAddresses.txt \
    -output myOutputDirForIPAddresses \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
    -numReduceTasks 3 \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 
    
    
 

16/02/01 10:08:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 10:08:41 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted myOutputDirForIPAddresses
16/02/01 10:08:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 10:08:43 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/01 10:08:43 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/02/01 10:08:43 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/02/01 10:08:43 INFO mapred.FileInputFormat: Total input paths to process : 1
16/02/01 10:08:43 INFO mapreduce.JobSubmitter: number of splits:1
16/02/01 10:08:43 INFO Configuration.dep

In [79]:
!hdfs dfs -rm -r myOutputDirForIPAddresses 
!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
 -D stream.map.output.field.separator=. \
    -D stream.num.map.output.key.fields=4 \
     -D mapreduce.job.output.key.comparator.class=\
org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
    -D mapreduce.partition.keycomparator.options="-k1n -k2nr" \
    -input ipAddresses.txt \
    -output myOutputDirForIPAddresses \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -reducer ipAddressReducer.py \
    -numReduceTasks 3 \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 

   -D map.output.key.field.separator=. \


16/02/01 12:55:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 12:55:34 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted myOutputDirForIPAddresses
16/02/01 12:55:34 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 12:55:35 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/01 12:55:35 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/02/01 12:55:35 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/02/01 12:55:35 INFO mapred.FileInputFormat: Total input paths to process : 1
16/02/01 12:55:35 INFO mapreduce.JobSubmitter: number of splits:1
16/02/01 12:55:35 INFO mapreduce.JobSubm

In [97]:
!hdfs dfs -cat myOutputDirForIPAddresses/part-0000*

16/02/01 10:09:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
0	11.12.1.2
30	11.12.1.1
10	11.14.2.3
40	11.14.2.2
20	11.11.4.1
50	11.14.2.5


# Secondary Sort: Stock Price

In [60]:
%%writefile stockprice.txt
Google,2015-08-06,647.32
Apple,2015-08-01,117.83
Facebook,2015-08-05,92.67
Facebook,2015-08-01,90.96
Oracle,2015-08-04,38.55
Apple,2015-08-04,113.77
Google,2015-08-05,677.95
Facebook,2015-08-08,90.43
Oracle,2015-08-03,35.78
Apple,2015-08-11,110.09
Oracle,2015-08-07,39.67
Google,2015-08-09,656.63
ABXXXX,2015-08-07,39.67
ABXXXX,2015-08-09,656.63
Google,2000-08-09,0
ABXXXX,2015-08-08,6569999999999.63

Overwriting stockprice.txt


In [81]:
%%writefile stockPriceMapper.py
#!/usr/bin/python
import sys
import datetime
# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line
    name, date, price= line.split(",")
    # unix time is for secondary sort
    unix_time = datetime.datetime.strptime(date, '%Y-%m-%d').strftime("%s")
    # output each record
    print '%s\t%s\t%s' % (name, unix_time, price)

Overwriting stockPriceMapper.py


In [80]:
%%writefile stockPriceReducer.py
#!/usr/bin/python
import sys
import datetime

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # parse the input we got from mapper.py
    name, unix_time, price = line.split('\t')
    # get date from unix time
    date = datetime.datetime.fromtimestamp(int(unix_time)).strftime('%Y-%m-%d')
    print '%s\t%s\t%s' % (name, date, price)

Overwriting stockPriceReducer.py


In [94]:
#sort stock name increasing, and the do a secondary sort on key2=unix date, NUMERICALLY
!cat stockprice.txt| python stockPriceMapper.py |sort -k1 -k2n   #Unix sort
#WHY does it NOT work? It takes the whole as a key and sorts alphanumerically

#very different to the following line -k1,1 sort 
#!cat stockprice.txt| python stockPriceMapper.py |sort -k1 -k2,2n   #Unix sort BAD SORT key for key 1

ABXXXX	1438930800	39.67
ABXXXX	1439103600	656.63
Apple	1438412400	117.83
Apple	1438671600	113.77
Apple	1439276400	110.09
Facebook	1438412400	90.96
Facebook	1438758000	92.67
Facebook	1439017200	90.43
Google	1438758000	677.95
Google	1438844400	647.32
Google	1439103600	656.63
Google	965804400	0
Oracle	1438585200	35.78
Oracle	1438671600	38.55
Oracle	1438930800	39.67


In [92]:
#sort stock name increasing, and the do a secondary sort on key2=unix date, NUMERICALLY
!cat stockprice.txt| python stockPriceMapper.py |sort -k1,1 -k2,2n   #Unix sort
#this is CORRECT
#very different to the following line -k1,1 sort 
#!cat stockprice.txt| python stockPriceMapper.py |sort -k1 -k2,2n   #Unix sort BAD SORT key for key 1

ABXXXX	1438930800	39.67
ABXXXX	1439103600	656.63
Apple	1438412400	117.83
Apple	1438671600	113.77
Apple	1439276400	110.09
Facebook	1438412400	90.96
Facebook	1438758000	92.67
Facebook	1439017200	90.43
Google	965804400	0
Google	1438758000	677.95
Google	1438844400	647.32
Google	1439103600	656.63
Oracle	1438585200	35.78
Oracle	1438671600	38.55
Oracle	1438930800	39.67


In [93]:
#!cat stockprice.txt| python stockPriceMapper.py |sort -k1,1 -k2,2n   #Unix sort
#very different to the following line -k1,1 sort 
!cat stockprice.txt| python stockPriceMapper.py |sort -k1 -k2,2n   #Unix sort BAD SORT key for key 1

ABXXXX	1438930800	39.67
ABXXXX	1439103600	656.63
Apple	1438412400	117.83
Apple	1438671600	113.77
Apple	1439276400	110.09
Facebook	1438412400	90.96
Facebook	1438758000	92.67
Facebook	1439017200	90.43
Google	1438758000	677.95
Google	1438844400	647.32
Google	1439103600	656.63
Google	965804400	0
Oracle	1438585200	35.78
Oracle	1438671600	38.55
Oracle	1438930800	39.67


In [63]:
!hdfs dfs -rm stockprice.txt
!hdfs dfs -copyFromLocal stockprice.txt /user/jshanahan

16/02/01 12:46:18 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 12:46:19 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted stockprice.txt
16/02/01 12:46:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [64]:
#NOTE "-k1,1 -k2,2nr" -k1,1  is redundanct
!hdfs dfs -rm -r stockprice
!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
        -D stream.num.map.output.key.fields=3 \
        -D mapreduce.partition.keypartitioner.options=-k1,1 \
        -D mapreduce.job.output.key.comparator.class=\
org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
        -D mapreduce.partition.keycomparator.options="-k1,1 -k2,2nr" \
        -mapper stockPriceMapper.py \
        -reducer stockPriceReducer.py \
        -input stockprice.txt -output stockprice \
        -numReduceTasks 2 \
        -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 

16/02/01 12:46:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 12:46:40 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted stockprice
16/02/01 12:46:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 12:46:42 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/01 12:46:42 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/02/01 12:46:42 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/02/01 12:46:42 INFO mapred.FileInputFormat: Total input paths to process : 1
16/02/01 12:46:42 INFO mapreduce.JobSubmitter: number of splits:1
16/02/01 12:46:42 INFO mapreduce.JobSubmitter: Submitti

In [65]:
!hdfs dfs -ls stockprice
!hdfs dfs -cat stockprice/part-0000*

16/02/01 12:46:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r--   1 jshanahan supergroup          0 2016-02-01 12:46 stockprice/_SUCCESS
-rw-r--r--   1 jshanahan supergroup        222 2016-02-01 12:46 stockprice/part-00000
-rw-r--r--   1 jshanahan supergroup        179 2016-02-01 12:46 stockprice/part-00001
16/02/01 12:46:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Apple	2015-08-11	110.09
Apple	2015-08-04	113.77
Apple	2015-08-01	117.83
Facebook	2015-08-08	90.43
Facebook	2015-08-05	92.67
Facebook	2015-08-01	90.96
Oracle	2015-08-07	39.67
Oracle	2015-08-04	38.55
Oracle	2015-08-03	35.78
ABXXXX	2015-08-09	656.63
ABXXXX	2015-08-08	6569999999999.63
ABXXXX	2015-08-07	39.67
Google	2015-08-09	656.63
Google	2015-08-06	647.32
Google	2015-08-05	677.95
Google	2000-08-09	0


In [91]:
!hdfs dfs -cat stockprice/part-00001

16/02/01 09:39:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
ABXXXX	2015-08-09	656.63
ABXXXX	2015-08-07	39.67
Google	2015-08-09	656.63
Google	2015-08-06	647.32
Google	2015-08-05	677.95
Google	2000-08-09	0


In [None]:
!hdfs dfs -rm -r stockprice
!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
    -D stream.num.map.output.key.fields=2 \
    -D mapreduce.partition.keypartitioner.options=-k1,1 \
    -D mapreduce.job.output.key.comparator.class=\
org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
    -D mapreduce.partition.keycomparator.options="-k1n -k2nr" \
    -files secondary_sort_map.py,secondary_sort_reduce.py \
    -input SecondarySort \
    -output output-secondarysort-streaming \
    -mapper secondary_sort_map.py \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
    -reducer secondary_sort_reduce.py

In [42]:
!hdfs dfs -ls stockprice


16/01/27 22:39:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r--   1 jshanahan supergroup          0 2016-01-27 22:39 stockprice/_SUCCESS
-rw-r--r--   1 jshanahan supergroup         75 2016-01-27 22:39 stockprice/part-00000
-rw-r--r--   1 jshanahan supergroup        222 2016-01-27 22:39 stockprice/part-00001


In [44]:
!hdfs dfs -cat stockprice/part-00001

16/01/27 22:40:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Apple	2015-08-11	110.09
Apple	2015-08-04	113.77
Apple	2015-08-01	117.83
Facebook	2015-08-08	90.43
Facebook	2015-08-01	90.96
Facebook	2015-08-05	92.67
Oracle	2015-08-07	39.67
Oracle	2015-08-03	35.78
Oracle	2015-08-04	38.55


## Secondary Sort from Tom White (Chapter 9)

In [63]:
%%writefile SecondarySort/otherYears.txt
0029029070999991903010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999
0029029070999991904010113004+64333+023450FM-12+000599999V0202901N008219999999N0000001N9-00721+99999102001ADDGF104991999999999999999999
0029029070999991907010120004+64333+023450FM-12+000599999V0209991C000019999999N0000001N9-00941+99999102001ADDGF108991999999999999999999
002902907099999190610106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999
0029029070999991906010113004+64333+023450FM-12+000599999V0202901N008219999999N0000001N9-00721+99999102001ADDGF104991999999999999999999
0029029070999991906010120004+64333+023450FM-12+000599999V0209991C000019999999N0000001N9-00941+99999102001ADDGF108991999999999999999999
0029029070999991907010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999
0029029070999991907010113004+64333+023450FM-12+000599999V0202901N008219999999N0000001N9-00721+99999102001ADDGF104991999999999999999999


Overwriting SecondarySort/otherYears.txt


In [64]:
#copy year 1901 and 1902 to HDFS
#val[15:19], int(val[87:92]), val[92:93]
# year, temp, q
#002902907099999  1901  010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999
#0029029070999991901010113004+64333+023450FM-12+000599999V0202901N008219999999N0000001N9-00721+99999102001ADDGF104991999999999999999999
#0029029070999991901010120004+64333+023450FM-12+000599999V0209991C000019999999N0000001N9-00941+99999102001ADDGF108991999999999999999999


!hdfs dfs -copyFromLocal SecondarySort
!hdfs dfs -ls

16/02/01 09:23:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
copyFromLocal: `SecondarySort/1901': File exists
copyFromLocal: `SecondarySort/1902': File exists
copyFromLocal: `SecondarySort/otherYears.txt': File exists
16/02/01 09:23:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 9 items
-rw-r--r--   1 jshanahan supergroup     888190 2016-02-01 08:32 1901
-rw-r--r--   1 jshanahan supergroup     888978 2016-02-01 08:43 1902
drwxr-xr-x   - jshanahan supergroup          0 2016-02-01 09:21 SecondarySort
drwxr-xr-x   - jshanahan supergroup          0 2016-01-30 09:23 gutenberg-output
-rw-r--r--   1 jshanahan supergroup      87483 2015-02-26 19:36 historical_tours.txt
-rw-r--r--   1 jshanahan supergroup         59 2016-01-31 20:01 ipAddresses.txt
drwxr-xr-x   - jshanahan supergroup          0 2016-01-31 20:36 myOutpu

In [190]:
%%writefile secondary_sort_map.py
#!/usr/bin/env python
#Example 9-7. Map function for secondary sort in Python
import re
import sys
for line in sys.stdin:
    val = line.strip()
    (year, temp, q) = (val[15:19], int(val[87:92]), val[92:93])
    if temp == 9999:
        sys.stderr.write("reporter:counter:Temperature,Missing,1\n")
    elif re.match("[01459]", q):
        print "%s\t%s" % (year, temp)

Overwriting secondary_sort_map.py


In [192]:
%%writefile secondary_sort_reduce.py
#!/usr/bin/env python
#Example 9-8. Reducer function for secondary sort in Python
import sys
last_group = None
for line in sys.stdin:
    val = line.strip()
    (year, temp) = val.split("\t")
    group = year
    if last_group != group:   #print the first record ONLY for each year; skip all other records for that year
        print val
        last_group = group

Overwriting secondary_sort_reduce.py


In [10]:
!cat SecondarySort/1902| python secondary_sort_map.py |sort|python secondary_sort_reduce.py




1902	-100


In [84]:
#To do a secondary sort in Streaming, we can take advantage of a couple of library classes
#that Hadoop provides. Here’s the driver that we can use to do a secondary sort:
!hdfs dfs -rm -r output-secondarysort-streaming
!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
    -D stream.num.map.output.key.fields=2 \
    -D mapreduce.partition.keypartitioner.options=-k1,1 \
    -D mapreduce.job.output.key.comparator.class=\
org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
    -D mapreduce.partition.keycomparator.options="-k1,1n -k2,1nr" \
    -files secondary_sort_map.py,secondary_sort_reduce.py \
    -input SecondarySort \
    -output output-secondarysort-streaming \
    -mapper secondary_sort_map.py \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
    -reducer secondary_sort_reduce.py \
    -numReduceTasks 3

16/02/01 09:36:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 09:36:09 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted output-secondarysort-streaming
16/02/01 09:36:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 09:36:11 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/01 09:36:11 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/02/01 09:36:11 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/02/01 09:36:11 INFO mapred.FileInputFormat: Total input paths to process : 3
16/02/01 09:36:11 INFO mapreduce.JobSubmitter: number of splits:3
16/02/01 09:36:11 INFO mapreduce.Jo

In [71]:
!hdfs dfs -ls output-secondarysort-streaming/*
!hdfs dfs -cat output-secondarysort-streaming/part-0000*

16/02/01 09:25:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-rw-r--r--   1 jshanahan supergroup          0 2016-02-01 09:24 output-secondarysort-streaming/_SUCCESS
-rw-r--r--   1 jshanahan supergroup          9 2016-02-01 09:24 output-secondarysort-streaming/part-00000
-rw-r--r--   1 jshanahan supergroup         18 2016-02-01 09:24 output-secondarysort-streaming/part-00001
-rw-r--r--   1 jshanahan supergroup         27 2016-02-01 09:24 output-secondarysort-streaming/part-00002
16/02/01 09:25:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
1902	244
1903	-78
1906	-72
1901	317
1904	-72
1907	-72


In [114]:
#stop hadoop/yarn cluster on my local machine (in this case)
#!alias hstop="/usr/local/Cellar/hadoop/2.6.0/sbin/stop-yarn.sh;/usr/local/Cellar/hadoop/2.6.0/sbin/stop-dfs.sh"
!/usr/local/Cellar/hadoop/2.6.0/sbin/stop-yarn.sh;/usr/local/Cellar/hadoop/2.6.0/sbin/stop-dfs.sh

stopping yarn daemons
stopping resourcemanager
localhost: stopping nodemanager
no proxyserver to stop
16/01/30 09:12:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Stopping namenodes on [localhost]
localhost: stopping namenode
localhost: stopping datanode
Stopping secondary namenodes [0.0.0.0]
0.0.0.0: stopping secondarynamenode
16/01/30 09:13:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Chris Caldwell Homework

In [9]:
#Chris Caldwell Homework
# make a work directory on my local machine
!mkdir ChrisC


In [24]:
import random

#getting random ints for indexes
indices = random.sample(range(1, 100), 10)

#writing out the file
intIndFile = open('ChrisC/hw2_1.txt', 'w')
#for x in indices:
#intIndFile.write("{}\t".format(x))
intIndFile.write("\t\n".join(str(x) for x in indices)+'\t')
intIndFile.close()

#delete if previous existing file
#!hdfs dfs -rm  /user/ubuntu/hw2_1.txt
#moving file to hdfs
#!hdfs dfs -put hw2_1.txt /user/ubuntu

In [25]:
!head ChrisC/hw2_1.txt

77	
62	
64	
59	
27	
41	
18	
19	
90	
94	

In [15]:
%%writefile ChrisC/mapper.py
#!/usr/bin/python
import sys

#Input from standard in
for ent in sys.stdin:
    print(ent.strip())
    # split entity into key / value
    #key, val = ent.split('\t')    
    #print("{}\t{}".format(key,val))

Writing ChrisC/mapper.py


In [16]:
%%writefile ChrisC/reducer.py
#!/usr/bin/python
import sys
#print("109\t")
#Input from standard in
for ent in sys.stdin:
    print(ent.strip())
    # split entity into key / value
    #key, val = ent.split('\t')    
    #print("{}\t{}".format(key,val))

Overwriting ChrisC/reducer.py


In [17]:
!chmod a+x ChrisC/mapper.py
!chmod a+x ChrisC/reducer.py

In [20]:
!echo "1\t\n5\t\n3\t\n8\t\n9\t" | python ChrisC/mapper.py |sort -k1,1 | python ChrisC/reducer.py

1
3
5
8
9


In [26]:
#Make directory on HDFS and copy the data file up there
!hdfs dfs -mkdir ChrisC
!hdfs dfs -put ChrisC/hw2_1.txt ChrisC

16/05/28 19:23:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/28 19:23:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [32]:
#!hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \
!hdfs dfs -rm -r hw2_1_out
!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
   -mapper ChrisC/mapper.py \
   -reducer ChrisC/reducer.py \
   -input ChrisC/hw2_1.txt -output hw2_1_out

16/05/28 19:36:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/28 19:36:08 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted hw2_1_out
16/05/28 19:36:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/28 19:36:10 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/28 19:36:10 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/28 19:36:10 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/28 19:36:10 INFO mapred.FileInputFormat: Total input paths to process : 1
16/05/28 19:36:10 INFO mapreduce.JobSubmitter: number of splits:1
16/05/28 19:36:11 INFO mapreduce.JobSubmitter: Submittin

In [31]:
!hdfs dfs -cat hw2_1_out/part-0000*

16/05/28 19:35:34 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18	
19	
27	
41	
59	
62	
64	
77	
90	
94	
