## MIDS UC Berkeley, Machine Learning at Scale   
 
__W261__ Summer 2016    
__Week 2__: Total Sort, Partial Sort, and Secondary Sort using Hadoop Streaming     

__James G. Shanahan__

James.Shanahan@gmail.com  
MIDS w261 Machine Learning at Scale September 15, 2014
Data used

Self-contained datesets generated using code
***

# Contents:
1. Generate numbers
2. Single Reducer Sort
3. Partial Sort
4. Total Order Sort

* Hadoop will always give a total sort on the key (i.e., key part of the key-value pairs produced by the mappers) when using just one reducer.
*  When using multiple reducers Hadoop will by default give you a partial sort (i.e., all records within a partition will be sorted by the key (i.e., key part of the key-value pairs produced by the mappers)).
*  To achieve a total sort one needs to write a customer mapper to to prepend a partition key to each record,  and then do a secondary sort or the resulting records (This can be done with ONE map-reduce job)

Hadoop Streaming provides an extensive command line interface for doing sorts. It builds on the Unix Sort command.

This notebook provides examples of the following:

* Total Sort using Unix Sort command
* Total Sort using Hadoop Streaming with a single reducer mapreduce job
* Partial Sort using Hadoop Streaming with a multiple reducer mapreduce job
* Secondary Sort
* Total Sort using Hadoop Streaming with a multiple reducer mapreduce job (Single job)



## Generate data records with column 1 being the primary key of interest

In [164]:
%pwd


u'/Users/jshanahan/Dropbox/Lectures-UC-Berkeley-ML-Class-2015/Notebooks'

In [7]:
%pwd
%mkdir SortData
%mkdir SortCode

mkdir: SortData: File exists


In [165]:
%%writefile SortCode/generate_numbers.py
#!/usr/bin/python

import random
random.seed(9001)
N = 13
for n in range(N):
    print random.randint(0,N),"\t",random.randint(0,N)
    
print "2\tA Group"
print "2\tB Group"
print "2\tC Group"
print "2\tD Group"
print "6\tA Group"
print "6\tC Group"
print "6\tB Group"
print "6\tD Group"
print "3\tA Group"
print "3\tC Group"
print "3\tB Group"
print "3\tD Group"

Overwriting SortCode/generate_numbers.py


In [166]:
!chmod +x SortCode/generate_numbers.py;
!./SortCode/generate_numbers.py > SortData/thirtyNumbersDataSet.txt
!wc -l SortData/thirtyNumbersDataSet.txt

      25 SortData/thirtyNumbersDataSet.txt


In [158]:
cat SortData/thirtyNumbersDataSet.txt

0 	2
6 	6
6 	5
7 	9
6 	5
8 	10
3 	6
0 	4
8 	6
8 	4
2	A Group
2	B Group
2	C Group
2	D Group
6	A Group
6	C Group
6	B Group
6	D Group
3	A Group
3	C Group
3	B Group
3	D Group


In [2]:
#stop old cluster
!/usr/local/Cellar/hadoop/2.6.0/sbin/stop-yarn.sh;/usr/local/Cellar/hadoop/2.6.0/sbin/stop-dfs.sh
#hstart Cluster
!/usr/local/Cellar/hadoop/2.6.0/sbin/start-dfs.sh; /usr/local/Cellar/hadoop/2.6.0/sbin/start-yarn.sh

stopping yarn daemons
no resourcemanager to stop
localhost: no nodemanager to stop
no proxyserver to stop
16/05/31 14:57:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Stopping namenodes on [localhost]
localhost: no namenode to stop
localhost: no datanode to stop
Stopping secondary namenodes [0.0.0.0]
0.0.0.0: no secondarynamenode to stop
16/05/31 14:57:56 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/31 14:57:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting namenodes on [localhost]
localhost: starting namenode, logging to /usr/local/Cellar/hadoop/2.6.0/libexec/logs/hadoop-jshanahan-namenode-JAMES-SHANAHANs-MacBook-Pro.local.out
localhost: starting datanode, logging to /usr/local/Cellar/hadoop/2.6.0/libexec/logs/hadoop-js

In [3]:
!hdfs dfs -ls  /user/jshanahan
!echo "-------"
!hdfs dfs -ls 

16/05/31 14:58:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
drwxr-xr-x   - jshanahan supergroup          0 2015-02-26 21:03 /user/jshanahan/gutenberg-output
-rw-r--r--   1 jshanahan supergroup      87483 2015-02-26 19:36 /user/jshanahan/historical_tours.txt
-------
16/05/31 14:58:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
drwxr-xr-x   - jshanahan supergroup          0 2015-02-26 21:03 gutenberg-output
-rw-r--r--   1 jshanahan supergroup      87483 2015-02-26 19:36 historical_tours.txt


In [4]:
# put file in hdfs
!hdfs dfs -rm -r Sort
!hdfs dfs -mkdir Sort
!hdfs dfs -mkdir Sort
!hdfs dfs -put SortData/thirtyNumbersDataSet.txt Sort

16/05/31 14:58:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
rm: `Sort': No such file or directory
16/05/31 14:58:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/31 14:58:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
mkdir: `Sort': File exists
16/05/31 14:58:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Total Sort using Unix Sort command

In [6]:
# Total sort of the column 1 (numeric reverse) via command line
!echo "=============UNSorted============"
!hdfs dfs -cat Sort/thirtyNumbersDataSet.txt 
!echo "================================="
!hdfs dfs -cat Sort/thirtyNumbersDataSet.txt | sort -k1,1nr 

16/05/31 14:59:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
0 	3
7 	7
8 	7
9 	12
8 	7
10 	13
4 	8
1 	6
10 	8
10 	5
11 	3
12 	4
2 	8
2	A Group
2	B Group
2	C Group
2	D Group
6	A Group
6	C Group
6	B Group
6	D Group
3	A Group
3	C Group
3	B Group
3	D Group
16/05/31 14:59:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12 	4
11 	3
10 	13
10 	5
10 	8
9 	12
8 	7
8 	7
7 	7
6	A Group
6	B Group
6	C Group
6	D Group
4 	8
3	A Group
3	B Group
3	C Group
3	D Group
2	A Group
2	B Group
2	C Group
2	D Group
2 	8
1 	6
0 	3


# Total Sort using Single Reducer MapReduce Job





In [22]:
%%writefile SortCode/identityFunction.py
#!/usr/bin/python
import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    key,value = line.split("\t", 1)
    print '%s\t%s' % (key,value)

Writing SortCode/identityMapper.py


In [7]:
!hdfs dfs -rm -r Sort/Output
!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
   -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
   -D mapred.text.key.comparator.options=-nr \
   -mapper /bin/cat \
   -reducer /bin/cat \
   -input Sort/thirtyNumbersDataSet.txt  -output Sort/Output 

16/05/31 14:59:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
rm: `Sort/Output': No such file or directory
16/05/31 14:59:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/31 14:59:29 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/31 14:59:29 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/31 14:59:29 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/31 14:59:29 INFO mapred.FileInputFormat: Total input paths to process : 1
16/05/31 14:59:30 INFO mapreduce.JobSubmitter: number of splits:1
16/05/31 14:59:30 INFO Configuration.deprecation: mapred.text.key.comparator.options is deprecated. Instead, use mapreduce.partition.keycomparator.options
16/05/31 14

In [8]:
#have a look at the input file
!echo "=============  UNSorted  ============"
!hdfs dfs -cat Sort/thirtyNumbersDataSet.txt
!echo  "\nTOTAL Sort primary key numeric decresing\n"
# Wordcount output
!hdfs dfs -cat Sort/Output/part-00000

16/05/31 14:59:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
0 	3
7 	7
8 	7
9 	12
8 	7
10 	13
4 	8
1 	6
10 	8
10 	5
11 	3
12 	4
2 	8
2	A Group
2	B Group
2	C Group
2	D Group
6	A Group
6	C Group
6	B Group
6	D Group
3	A Group
3	C Group
3	B Group
3	D Group

TOTAL Sort primary key numeric decresing

16/05/31 14:59:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12 	4
11 	3
10 	13
10 	8
10 	5
9 	12
8 	7
8 	7
7 	7
6	D Group
6	A Group
6	C Group
6	B Group
4 	8
3	A Group
3	D Group
3	C Group
3	B Group
2	C Group
2	D Group
2	A Group
2	B Group
2 	8
1 	6
0 	3


## Default MapReduce Job with Multiple Reducer leads to a Partial Sort


Set number of reducer tasks to 3
<li>CASE 1. use default key-value behavior but treat the key as numeric and reverse it 
<p><i>-D mapred.text.key.comparator.options=-nr </i>
<p></li>
<li>CASE 2. use default key-value behavior but treat the key as numeric and but do NOT reverse it </li>
<p><i> -D mapred.text.key.comparator.options=-nr \ #ignored </i>
 <p><i>  -D mapreduce.partition.keypartitioner.options="-k1,1" \ </i>
 <p><i>   -D mapreduce.partition.keycomparator.options="-k1,1n" \ </i>


In [186]:
!hdfs dfs -rm -r Sort/Output
!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
   -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
   -D mapred.text.key.comparator.options=-nr \
   -mapper /bin/cat \
   -reducer /bin/cat \
   -numReduceTasks 3 \
   -input Sort/thirtyNumbersDataSet.txt  -output Sort/Output 

16/05/27 18:45:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 18:45:24 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted Sort/Output
16/05/27 18:45:25 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 18:45:26 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/27 18:45:26 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/27 18:45:26 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/27 18:45:27 INFO mapred.FileInputFormat: Total input paths to process : 1
16/05/27 18:45:27 INFO mapreduce.JobSubmitter: number of splits:1
16/05/27 18:45:27 INFO Configuration.deprecation: mapr

In [187]:

#have a look at the input file
!echo  "\n-------- UNSORTED INPUT-------------------\n"
!hdfs dfs -cat Sort/thirtyNumbersDataSet.txt
!echo  "\Partial Sort:  primary key numeric decresing\n"
# 
!echo  "\n --- Partial Sort:  part-00000 ----- \n"
!hdfs dfs -cat Sort/Output/part-00000
!echo  "\n --- Partial Sort:  part-00001 ----- \n"
!hdfs dfs -cat Sort/Output/part-00001
!echo  "\n --- Partial Sort:  part-00002 ----- \n"
!hdfs dfs -cat Sort/Output/part-00002



-------- UNSORTED INPUT-------------------

16/05/27 18:45:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
0 	3
7 	7
8 	7
9 	12
8 	7
10 	13
4 	8
1 	6
10 	8
10 	5
11 	3
12 	4
2 	8
2	A Group
2	B Group
2	C Group
2	D Group
6	A Group
6	C Group
6	B Group
6	D Group
3	A Group
3	C Group
3	B Group
3	D Group
\Partial Sort:  primary key numeric decresing


 --- Partial Sort:  part-00000 ----- 

16/05/27 18:45:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12 	4
9 	12
2	C Group
2	D Group
2	A Group
2	B Group
0 	3

 --- Partial Sort:  part-00001 ----- 

16/05/27 18:45:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
10 	8
10 	13
10 	5
7 	7
6	C Group
6	B Group
6	D Group
6	A Group
4 	8
3	D Group
3	B Group
3	C Group
3	A Group
1 	6

 --- Partial Sort:  

In [188]:
#specify the key and 
!hdfs dfs -rm -r Sort/Output
!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
   -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
   -D mapred.text.key.comparator.options=-nr \
   -D mapreduce.partition.keypartitioner.options="-k1,1" \
   -D mapreduce.partition.keycomparator.options="-k1,1nr" \
   -mapper /bin/cat \
   -reducer /bin/cat \
   -numReduceTasks 3 \
   -input Sort/thirtyNumbersDataSet.txt  -output Sort/Output 

16/05/27 18:46:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 18:46:07 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted Sort/Output
16/05/27 18:46:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 18:46:09 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/27 18:46:09 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/27 18:46:09 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/27 18:46:09 INFO mapred.FileInputFormat: Total input paths to process : 1
16/05/27 18:46:09 INFO mapreduce.JobSubmitter: number of splits:1
16/05/27 18:46:09 INFO Configuration.deprecation: mapr

In [189]:
#have a look at the input file
!echo  "\n-------- UNSORTED INPUT-------------------\n"
!hdfs dfs -cat Sort/thirtyNumbersDataSet.txt
!echo  "\Partial Sort:  primary key numeric decresing\n"
# 
!echo  "\n --- Partial Sort:  part-00000 ----- \n"
!hdfs dfs -cat Sort/Output/part-00000
!echo  "\n --- Partial Sort:  part-00001 ----- \n"
!hdfs dfs -cat Sort/Output/part-00001
!echo  "\n --- Partial Sort:  part-00002 ----- \n"
!hdfs dfs -cat Sort/Output/part-00002



-------- UNSORTED INPUT-------------------

16/05/27 18:48:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
0 	3
7 	7
8 	7
9 	12
8 	7
10 	13
4 	8
1 	6
10 	8
10 	5
11 	3
12 	4
2 	8
2	A Group
2	B Group
2	C Group
2	D Group
6	A Group
6	C Group
6	B Group
6	D Group
3	A Group
3	C Group
3	B Group
3	D Group
\Partial Sort:  primary key numeric decresing


 --- Partial Sort:  part-00000 ----- 

16/05/27 18:48:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12 	4
9 	12
2	C Group
2	D Group
2	A Group
2	B Group
0 	3

 --- Partial Sort:  part-00001 ----- 

16/05/27 18:48:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
10 	8
10 	13
10 	5
7 	7
6	C Group
6	B Group
6	D Group
6	A Group
4 	8
3	D Group
3	B Group
3	C Group
3	A Group
1 	6

 --- Partial Sort:  

# MULTIPLE REDUCERS TOTAL SORT 
### __ MAPPER prepend a partition key to each row such that each record is routed to the appropriate partition (creates order in the partition files)__

In [252]:
%%writefile SortCode/prependPartitionKeyMapper.py
#!/usr/bin/env python

import sys
import logging
logging.basicConfig(filename='prependPartitionKeyMapper.log',level=logging.DEBUG)

for line in sys.stdin:
    line = line.strip()
    key, value = line.split("\t", 1)
    
    partitionKey="c"     
    if int(key) < 5:
        partitionKey = "a"        
    elif int(key) < 10:
        partitionKey = "b"        
        
    logging.debug("%s\t%s\t%s" % (partitionKey, key,value))
    print "%s\t%s\t%s" % (partitionKey, key,value)


Writing SortCode/prependPartitionKeyMapper.py


### __UNIT TEST - run the hadoop command for a mapper only job by specifying 0 reducers__

In [202]:
!hdfs dfs -rm -r Sort/Output-Intermediate
!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
   -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
   -D mapred.reduce.tasks=0 \
   -mapper SortCode/prependPartitionKeyMapper.py \
   -input Sort/thirtyNumbersDataSet.txt  -output Sort/Output-Intermediate 

16/05/27 18:58:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 18:58:30 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted Sort/Output-Intermediate
16/05/27 18:58:31 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 18:58:32 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/27 18:58:32 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/27 18:58:32 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/27 18:58:32 INFO mapred.FileInputFormat: Total input paths to process : 1
16/05/27 18:58:32 INFO mapreduce.JobSubmitter: number of splits:1
16/05/27 18:58:32 INFO Configuration.depr

In [203]:
print "Output from the prependPartitionKeyMapper mapper job"
!hdfs dfs -cat Sort/Output-Intermediate/*

Output from the prependPartitionKeyMapper mapper job
16/05/27 18:58:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
a	0 	3
b	7 	7
b	8 	7
b	9 	12
b	8 	7
c	10 	13
a	4 	8
a	1 	6
c	10 	8
c	10 	5
c	11 	3
c	12 	4
a	2 	8
a	2	A Group
a	2	B Group
a	2	C Group
a	2	D Group
b	6	A Group
b	6	C Group
b	6	B Group
b	6	D Group
a	3	A Group
a	3	C Group
a	3	B Group
a	3	D Group


### __UNIT TEST- an example of  a secondary  sort map/reduce job 
 

In [247]:
#-----------------------------------------------------------
#     Basic Secondary SORT (works!)
#     # but it still has the partition key in each record
#-----------------------------------------------------------
!hdfs dfs -rm -r Sort/Output
!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
   -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
   -D stream.map.output.field.separator="\t" \
   -D mapreduce.partition.keypartitioner.options="-k1,1" \
   -D mapreduce.partition.keycomparator.options="-k1,1 -k2,2nr" \
   -mapper /bin/cat \
   -reducer /bin/cat \
   -numReduceTasks 3 \
   -input Sort/Output-Intermediate  -output Sort/Output \
   -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 


#have a look at the input file
!echo  "\n-------- UNSORTED INPUT-------------------\n"
!hdfs dfs -cat Sort/thirtyNumbersDataSet.txt
!echo  "\Partial Sort:  primary key numeric decresing\n"
# 
!echo  "\n --- Total Sort:  part-00000 ----- \n"
!hdfs dfs -cat Sort/Output/part-00000
!echo  "\n --- Total Sort:  part-00001 ----- \n"
!hdfs dfs -cat Sort/Output/part-00001
!echo  "\n --- Total Sort:  part-00002 ----- \n"
!hdfs dfs -cat Sort/Output/part-00002

16/05/27 19:46:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 19:46:32 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted Sort/Output
16/05/27 19:46:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 19:46:34 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/27 19:46:34 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/27 19:46:34 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/27 19:46:34 INFO mapred.FileInputFormat: Total input paths to process : 1
16/05/27 19:46:34 INFO mapreduce.JobSubmitter: number of splits:1
16/05/27 19:46:34 INFO Configuration.deprecation: mapr

In [249]:
#-----------------------------------
#     Basic Secondary SORT (Works)
#     Select a subset of the fields
#     i.e., drop the partition key
#-----------------------------------
!hdfs dfs -rm -r Sort/Output
!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
   -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
   -D stream.map.output.field.separator="\t" \
   -D mapreduce.partition.keypartitioner.options="-k1,1" \
   -D mapreduce.partition.keycomparator.options="-k1,1 -k2,2nr" \
   -D reduce.output.key.value.fields.spec=1:2- \
   -mapper /bin/cat \
   -reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce \
   -numReduceTasks 3 \
   -input Sort/Output-Intermediate  -output Sort/Output \
   -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 


#have a look at the input file
!echo  "\n-------- UNSORTED INPUT-------------------\n"
!hdfs dfs -cat Sort/Output-Intermediate/*
!echo  "\Partial Sort:  primary key numeric decresing\n"
# 
!echo  "\n --- Total Sort:  part-00000 ----- \n"
!hdfs dfs -cat Sort/Output/part-00000
!echo  "\n --- Total Sort:  part-00001 ----- \n"
!hdfs dfs -cat Sort/Output/part-00001
!echo  "\n --- Total Sort:  part-00002 ----- \n"
!hdfs dfs -cat Sort/Output/part-00002

16/05/27 19:50:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 19:50:09 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted Sort/Output
16/05/27 19:50:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 19:50:11 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/27 19:50:11 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/27 19:50:11 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/27 19:50:11 INFO mapred.FileInputFormat: Total input paths to process : 1
16/05/27 19:50:11 INFO mapreduce.JobSubmitter: number of splits:1
16/05/27 19:50:11 INFO Configuration.deprecation: redu

In [253]:
#-----------------------------------
#     TOTAL SORT (Works)
#     Mapper: Prepend partition key  USING SortCode/prependPartitionKeyMapper.py \
#     Shuffle: partition and do secondary sort
#     Reduce: drop the partition key USING -D reduce.output.key.value.fields.spec=1:2- 
#-----------------------------------
!hdfs dfs -rm -r Sort/Output
!hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming*.jar \
   -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
   -D stream.map.output.field.separator="\t" \
   -D mapreduce.partition.keypartitioner.options="-k1,1" \
   -D mapreduce.partition.keycomparator.options="-k1,1 -k2,2nr" \
   -D reduce.output.key.value.fields.spec=1:2- \
   -mapper SortCode/prependPartitionKeyMapper.py \
   -reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce \
   -numReduceTasks 3 \
   -input Sort/thirtyNumbersDataSet.txt  -output Sort/Output \
   -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 


#have a look at the input file
!echo  "\n-------- UNSORTED INPUT-------------------\n"
!hdfs dfs -cat Sort/Output-Intermediate/*
!echo  "\Partial Sort:  primary key numeric decresing\n"
# 
!echo  "\n --- Total Sort:  part-00000 ----- \n"
!hdfs dfs -cat Sort/Output/part-00000
!echo  "\n --- Total Sort:  part-00001 ----- \n"
!hdfs dfs -cat Sort/Output/part-00001
!echo  "\n --- Total Sort:  part-00002 ----- \n"
!hdfs dfs -cat Sort/Output/part-00002

16/05/27 19:56:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 19:56:07 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted Sort/Output
16/05/27 19:56:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 19:56:08 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/27 19:56:08 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/27 19:56:08 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/27 19:56:09 INFO mapred.FileInputFormat: Total input paths to process : 1
16/05/27 19:56:09 INFO mapreduce.JobSubmitter: number of splits:1
16/05/27 19:56:09 INFO Configuration.deprecation: redu

### __Step - Combine the secondary sort output files in the appropriate order__

In [320]:
import subprocess 
import re

lines=""
p = subprocess.Popen(["hdfs", "dfs", "-ls", "Sort/Output/part-*" ],  stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in p.stdout.readlines():
    lines = lines + line
it = re.finditer(regex, line)
regex = re.compile('(Sort\/Output\/part-\d*)')
it = re.finditer(regex, lines)

outputPARTFiles=[]
for match in it:
  outputPARTFiles.append(match.group(0))

partKeys=[]
for f in outputPARTFiles:
    partKeys.append(int(subprocess.Popen(["hdfs", "dfs", "-tail", f], stdout=subprocess.PIPE).stdout.read().splitlines()[0].split('\t')[0].strip()))

d={}
for i in range(len(outputPARTFiles)):
    print "i is %d, %d, %s" %(i, partKeys[i], outputPARTFiles[i])
    d[partKeys[i]] = outputPARTFiles[i]


#TOTAL Sort in decreasing order
for k in sorted(d.items(), key=lambda x: x[0], reverse=True):
    print "%d:%s"%(k[0], k[1])
    p = subprocess.Popen(["hdfs", "dfs", "-tail", k[1]],  stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    for line in p.stdout.readlines():
        print line,


i is 0, 12, Sort/Output/part-00000
i is 1, 4, Sort/Output/part-00001
i is 2, 9, Sort/Output/part-00002
12:Sort/Output/part-00000
16/05/27 22:38:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12 	4
11 	3
10 	13
10 	8
10 	5
9:Sort/Output/part-00002
16/05/27 22:38:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
9 	12
8 	7
8 	7
7 	7
6	A Group
6	C Group
6	B Group
6	D Group
4:Sort/Output/part-00001
16/05/27 22:38:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
4 	8
3	C Group
3	A Group
3	B Group
3	D Group
2	D Group
2	C Group
2	B Group
2	A Group
2 	8
1 	6
0 	3


In [301]:
!hdfs dfs -ls Sort/Output/*

16/05/27 22:11:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-rw-r--r--   1 jshanahan supergroup          0 2016-05-27 19:56 Sort/Output/_SUCCESS
-rw-r--r--   1 jshanahan supergroup         31 2016-05-27 19:56 Sort/Output/part-00000
-rw-r--r--   1 jshanahan supergroup        100 2016-05-27 19:56 Sort/Output/part-00001
-rw-r--r--   1 jshanahan supergroup         61 2016-05-27 19:56 Sort/Output/part-00002
