public
Description: Support libraries for writing Hadoop Streaming-compatible map/reduce tasks.
Clone URL: git://github.com/codahale/hadoop-streaming.git
Search Repo:
Changed Collector to KeyValueCollector to better describe it, and added 
some documentation to stuff.
codahale (author)
Tue Mar 25 12:09:15 -0700 2008
commit  53c462ecee0da108f330fdbe2791641078081f36
tree    9c2ceda7823be92a81d56b3b6e931a778caa752e
parent  d1b3bc3dffb8ee3690dce306b55b67ddff82159c
...
25
26
27
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
29
30
31
32
 
33
34
35
36
37
...
39
40
41
 
 
 
 
 
 
 
 
 
 
 
42
43
44
 
 
45
46
47
48
 
49
50
51
...
57
58
59
60
 
61
62
63
...
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
 
68
69
70
71
72
73
...
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
 
 
90
91
92
93
94
 
95
96
97
98
...
104
105
106
 
107
108
109
110
0
@@ -25,11 +25,47 @@
0
 
0
 """
0
   A set of classes which make writing map/reduce tasks for Hadoop easy.
0
+
0
+ An example job, which reads countries from a Tab-Separated Value file and
0
+ outputs the number of times each country appears:
0
+
0
+ country_count.py:
0
+
0
+ from collections import defaultdict
0
+ from hadoop import Job
0
+ from hadoop.parsers import TSVParser
0
+
0
+ COUNTRY_COLUMN = 3
0
+
0
+ class CountryCount(Job):
0
+ def __init__(self):
0
+ super(CountryCount, self).__init__()
0
+ self.map_parser = TSVParser
0
+
0
+ def map(self, key, values, collector):
0
+ collector.collect(values[COUNTRY_COLUMN], 1)
0
+
0
+ def reduce(self, keys_and_values, collector):
0
+ countries = defaultdict(int)
0
+ for country, count in keys_and_values:
0
+ countries[country] += int(count)
0
+ for country, count in countries.iteritems():
0
+ collector.collect(country, count)
0
+
0
+ To run locally:
0
+
0
+ cat data.tsv | python country_count.py --map | python country_count.py --reduce
0
+
0
+ To run via Hadoop Streaming:
0
+
0
+ bin/hadoop jar contrib/streaming/hadoop-streaming-0.16.0.jar \\
0
+ -input my_countries.tsv -output country_counts \\
0
+ -mapper "country_count.py --map" -reducer "country_count.py --reduce"
0
 """
0
 
0
 import sys
0
 
0
-from hadoop.collectors import Collector
0
+from hadoop.collectors import KeyValueCollector
0
 from hadoop.parsers import LineParser, KeyValueParser
0
 from hadoop.runner import Runner
0
 
0
0
0
@@ -39,13 +75,24 @@
0
     implement map() and reduce().
0
   """
0
   def __init__(self):
0
+ """
0
+ Creates a new job instance.
0
+
0
+ Override this to change the parser and collector types for your map()
0
+ and reduce() methods. They default to:
0
+
0
+ map_parser = LineParser
0
+ map_collector = KeyValueCollector
0
+ reduce_parser = KeyValueParser
0
+ reduce_collector = KeyValueCollector
0
+ """
0
     super(Job, self).__init__()
0
- self.map_parser, self.map_collector = LineParser, Collector
0
- self.reduce_parser, self.reduce_collector = KeyValueParser, Collector
0
+ self.map_parser, self.map_collector = LineParser, KeyValueCollector
0
+ self.reduce_parser, self.reduce_collector = KeyValueParser, KeyValueCollector
0
   
0
   def start_map(self, parser_stream=sys.stdin, collector_stream=sys.stdout):
0
     """
0
- Starts the mapping process..
0
+ Starts the mapping process. Should only be called by the Runner.
0
     """
0
     parser = self.map_parser(parser_stream)
0
     collector = self.map_collector(collector_stream)
0
@@ -57,7 +104,7 @@
0
   
0
   def start_reduce(self, parser_stream=sys.stdin, collector_stream=sys.stdout):
0
     """
0
- Starts the reducing process..
0
+ Starts the reducing process. Should only be called by the Runner.
0
     """
0
     parser = self.reduce_parser(parser_stream)
0
     collector = self.reduce_collector(collector_stream)
...
1
2
3
4
 
5
6
7
...
13
14
15
16
 
17
18
19
...
23
24
25
26
 
27
28
29
...
1
2
3
 
4
5
6
7
...
13
14
15
 
16
17
18
19
...
23
24
25
 
26
27
28
29
0
@@ -1,7 +1,7 @@
0
 """
0
   Output collectors for Hadoop tasks.
0
 """
0
-class Collector(object):
0
+class KeyValueCollector(object):
0
   """
0
     A basic string/string collector for key/value pairs.
0
     
0
@@ -13,7 +13,7 @@
0
       Creates a new Collector instance which outputs data to the provided
0
       stream.
0
     """
0
- super(Collector, self).__init__()
0
+ super(KeyValueCollector, self).__init__()
0
     self.stream = stream
0
   
0
   def collect(self, key, value):
0
@@ -23,7 +23,7 @@
0
     self.stream.write('%s\t%s\n' % (key, value))
0
   
0
 
0
-class TSVCollector(Collector):
0
+class TSVCollector(KeyValueCollector):
0
   """
0
     A collector which outputs multiple, tab-separated values.
0
     
...
11
12
13
 
 
 
 
 
14
15
16
...
37
38
39
 
 
 
 
40
41
42
...
58
59
60
 
 
 
 
61
62
63
...
11
12
13
14
15
16
17
18
19
20
21
...
42
43
44
45
46
47
48
49
50
51
...
67
68
69
70
71
72
73
74
75
76
0
@@ -11,6 +11,11 @@
0
     >>> p = LineParser(sys.stdin)
0
     >>> lines = [line for line in p]
0
     ['blah', 'blee', 'blorg']
0
+
0
+ Your map() or reduce() method should have the following profile:
0
+
0
+ f(self, line, collector)
0
+
0
   """
0
   def __init__(self, stream):
0
     """
0
@@ -37,6 +42,10 @@
0
   """
0
     A key/value parser. Each key and value are separated by a tab, as per
0
     Hadoop Streaming.
0
+
0
+ Your map() or reduce() method should have the following profile:
0
+
0
+ f(self, key, value, collector)
0
   """
0
   def parse_line(self, line):
0
     """
0
@@ -58,6 +67,10 @@
0
     >>> p = TSVParser(sys.stdin)
0
     >>> lines = [line for line in p]
0
     [('key', ('1', '2', '3')), ('another', ('4', '5', '6'))]
0
+
0
+ Your map() or reduce() method should have the following profile:
0
+
0
+ f(self, key, values, collector)
0
   """
0
   def parse_line(self, line):
0
     """
...
3
4
5
6
 
7
8
9
...
24
25
26
27
 
28
29
30
...
3
4
5
 
6
7
8
9
...
24
25
26
 
27
28
29
30
0
@@ -3,7 +3,7 @@
0
 import unittest
0
 from helpers import test
0
 
0
-from hadoop.collectors import Collector, TSVCollector
0
+from hadoop.collectors import KeyValueCollector, TSVCollector
0
 
0
 class MockStdOut(object):
0
   def __init__(self):
0
@@ -24,7 +24,7 @@
0
   
0
   @test
0
   def collector_should_output_key_and_value_to_stdout(self):
0
- collector = Collector(stream=self.stdout)
0
+ collector = KeyValueCollector(stream=self.stdout)
0
     collector.collect('key', 'value')
0
     self.assertEqual(['key\tvalue\n'], self.stdout.read_lines())
0
   
...
6
7
8
9
 
10
11
12
13
...
47
48
49
50
 
51
52
53
54
 
55
56
57
...
6
7
8
 
9
10
11
12
13
...
47
48
49
 
50
51
52
53
 
54
55
56
57
0
@@ -6,7 +6,7 @@
0
 
0
 from hadoop import Job
0
 from hadoop.parsers import LineParser, KeyValueParser
0
-from hadoop.collectors import Collector
0
+from hadoop.collectors import KeyValueCollector
0
 import hadoop.runner
0
 
0
 
0
0
@@ -47,11 +47,11 @@
0
   
0
   @test
0
   def job_should_have_a_map_collector(self):
0
- self.assertEqual(Collector, self.job.map_collector)
0
+ self.assertEqual(KeyValueCollector, self.job.map_collector)
0
   
0
   @test
0
   def job_should_have_a_reduce_collector(self):
0
- self.assertEqual(Collector, self.job.reduce_collector)
0
+ self.assertEqual(KeyValueCollector, self.job.reduce_collector)
0
   
0
   @test
0
   def job_should_map_parser_output_to_collector_input(self):

Comments

    No one has commented yet.