# 04 基于Hadoop Streaming的Python大数据分析
---
## 1. 概述

Hadoop Streaming是Hadoop提供的一种编程工具，允许用户用任何可执行程序和脚本作为**Mapper**和**Reducer**来完成Map/Reduce任务，这意味完全用户可以用Hadoop Streaming+Python/Ruby/Golang/C++ 等任何熟悉的语言来完成大数据处理和分析的任务。

## 2. Hadoop Streaming的工作方式

**Hadoop Streaming**的工作方式如下图所示。与标准的MapReduce(以下简称MR)一样，整个MR过程依然由**Mapper**、\[Combiner\]、**Reducer**组成(其中Combiner为可选加入)。用户可以像使用Java一样去用其他语言编写MR，只不过Mapper/Reducer的输入和输出并与Java API打交道，而是通过该语言下的标准输入输出函数来进行。图中标注了用户编写的Mapper和Reducer的位置。
![Hadoop Streaming的工作方式](images/hadoopstream.jpg)

### Mapper的角色

Hadoop将用户提交的**Mapper**可执行程序或脚本作为一个单独的进程加载起来，这个进程我们称之为mapper进程，hadoop不断地将文件片段转换为行，传递到我们的**Mapper**进程中，**Mapper**进程通过**标准输入**(STDIN) 的方式一行一行地获取这些数据，然后设法将其转换为**键值对**(Key/Value)，再通过**标准输出**(STDOUT)的形式将这些**键值对**按照**一对一行**的方式输出。

虽然在**Mapper**程序中，我们自己能分得清**Key/Value**(比如：string key,int value)，但是当我们采用标准输出之后，Key/Value是打印到一行作为结果输出的(比如`print("{0}\{1}".format(key,value))`，因此,为了保证Hadoop能从中鉴别出键和值，**键值对**一定要以分隔符'\t'即Tab(也可自定义分隔符)字符分隔，这样才能保证hadoop正确地进行分区（Partition）、重排（shuffle）等过程。

### Reducer的角色

Hadoop将用户提交的**Reducer**可执行程序或脚本同样作为一个单独的进程加载起来，这个进程被称为**Reducer**进程，hadoop不断地将**键值对**(按键排序)按照**一对儿一行**的方式传递到**Reducer**进程中，Reducer进程同样通过标准输入的方式按行获取这些**键值对**，进行**自定义计算**后将结果通过标准输出的形式输出。

在**Reducer**这个过程中需要注意的是：传递到**Reducer**的键值对是按照键排过序的，这一点是由MR框架的**Sort**过程保证的，因此如果读到一个键与前一个键不同，我们就可以知道当前Key对应的**键值对**已经结束了，接下来将是新的Key对应的**键值对**。

## 3. Hadoop Streaming的使用方式

Hadoop Streaming的使用方式形如：
```
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/.../hadoop-streaming.jar [genericOptions] [streamingOptions]
```
在这行命令中，其实是有先后顺序的，一定要保证 `[genericOptions]`写在`[streamingOptions]`之前，否则Hadoop Streaming命令将失效。

按照以上的格式使用该命令需要在安装hadoop时设置好环境变量HADOOP_HOME，将其值设置为hadoop的安装目录，当然如果觉得以上的调用方式还是麻烦的话，也可以把hadoop设置进系统的PATH环境变量并用`hadoop jar $HADOOP_HOME/.../hadoop-streaming.jar [genericOptions] [streamingOptions]`的格式调用。

另外在指定hadoop-streaming.jar时，可能由于装的hadoop版本不同，那这个jar的位置也不同，需要根据自己的实际情况来确定这个hadoop-streaming.jar的位置，并将其填入命令中。

### genericOptions ###

常用的genericOptions如下：

* **-D property=value**: 指定额外的配置信息变量，详情在后文介绍
* **-files file1,file2,...**: 指定需要拷贝到集群节点的文件，以逗号分隔，通常为自己编写的Mapper和Reducer脚本或可执行程序。Mapper和Reducer通常要由集群中不同的节点来执行，而很脚本或可执行程序可能仅存在于提交任务时所用的那个节点上，因此需要将其分发出去。

### streamingOptions ###

常用的streamingOptions如下：

* **-file filename**: 指定需要拷贝到集群节点的文件，与-files的功能类似，只不过如果使用-file的话，就需要一个文件一个文件地去上传，比方说如果我要将我的mapper.py，reducer.py上传到集群上去运行，那就得需要两个-file参数。在实际使用-file时，hadoop似乎并不希望我们使用-file参数，会出现如下警告：
```
warning。“18/03/26 20:17:40 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
```
* **-input myInputDirs**: 指定给MapReduce任务输入的文件位置，通常为hdfs上的文件路径，多个文件或目录用逗号隔开
* **-output myOutputDir**: 指定给mapreduce任务输出的目录，通常为hdfs上的文件路径
* **-mapper**: executable or JavaClassName，用于Mapper的可执行程序或Java类，如果是脚本文件，应该以命令行完整调用的格式作为可执行程序参数并且须加引号，比如`-mapper "python mapper.py"`
* **-reducer**: executable or JavaClassName，用于Reducer的可执行程序或Java类，要求同上
* **-partitioner**: JavaClassName,自定义的partitioner Java类
* **-combiner**: streamingCommandor JavaClassName,自定义的combiner类或命令

### -D property=value

常用的-D property=value如下：

* **-D mapred.job.name=jobname**: 指定作业名称
* **-D mapred.map.tasks=numofmap**: 每个Job运行map task的数量
* **-D mapred.reduce.tasks=numofreduce**: 每个Job运行reduce task的数量，如果指定为0，则意味着提交了一个map only的任务
* **-D stream.map.input.field.separator**: 指定map输入时的分隔符，默认为"\t"
* **-D stream.map.output.field.separator**: 指定map输出时使用的Key/Value分隔符，默认为"\t"，比如在我们的mapper中，输出Key/Value pairs的标准输出语句很可能是这样的 print("{0},{1}".format(k,v))，由于使用了非默认的分隔符，因此需要额外指定分隔符","
* **-D stream.reduce.input.field.separator**: 指定reduce输入时的分隔符，默认为"\t"
* **-D stream.reduce.output.field.separator**: 指定reduce输出时的分隔符，默认为"\t"
* **-D stream.num.map.output.key.fields=num**: 指定map输出中第几个分隔符作为key和value的分隔点，默认为1
* **-D stream.num.reduce.output.fields=num**: 指定reduce输出中第几个分隔符作为key和value的分隔点，默认为1
* **-D stream.non.zero.exit.is.failure=false/true**: 指定当mapper和reducer未返回0时，hadoop是否该认为此任务执行失败。默认为true。当mapper和reducer的返回值不是0或没有返回值时，hadoop将认为该任务为异常任务，将被再次执行，默认尝试4次都不是0，整个job都将失败。因此，如果我们在编写mapper和reducer未返回0时，则应该将该参数设置为false，否则hadoop streaming任务将报出异常。

## 5. 示例数据与程序

### 示例数据

该数据为部分儿童的样本数据sample_data.csv，每行由user_id（用户ID）,birthday（生日），gender（性别，0为女，1为男，2为未知）组成。假设我们的问题是：在这些儿童中，每一年出生的男孩和女孩各是多少？

In [1]:
!cat data/sample_data.csv

user_id,birthday,gender
27571234,20130311,1
41597123,20121111,0
13725721,20120130,1
10339332,20110910,0
10642245,20130213,0
10923201,20110830,1
11768880,20120107,1
12519465,20130705,1
12950574,20090708,0
13735440,20120323,0
14510892,20140812,1
14905422,20110429,1
15786531,20080922,0
16265490,20091209,0
17431245,20110115,0
18190851,20110101,0
20087991,20100808,0
20570454,20081017,1
21137271,20110204,1
21415917,20060801,1
21887268,20100526,0
22602471,20090601,1
23208537,20080416,1
23927133,20081029,0
24829944,20140826,1
52529655,20130611,2

### 流程解析

首先梳理下使用此数据的MapReduce场景，并思考**Mapper**接收到的数据是什么样的，又应该将处理后的数据输出成什么样的。流程如下图（有缩减）:

![解析流程](images/mrflow.jpg)

#### 编写Mapper的思路
一行一行的获取MR传入的原始数据记录，然后将记录分割成多个字段，获取其中的生年和性别字段，之后将结果打印到标准输出中。需要注意的是该段数据是有header的,要想办法跳过这段header。

#### 编写Reducer的思路
一行一行地获取到按key排过序的Key/Value对(“排序行为”是MR框架为做的，不需要我们自己指定)，由于MR框架已经为我们排好序，因此只要观察到当前行获得的key与上一行获得的key不一样，即可判断是新的生年/性别组，然后累加每一组的男孩和女孩数，遇到新的组时将上一生年/性别组的男孩和女孩数目打印出来。

### 示例程序

#### Mapper（sample_mapper.py）

In [None]:
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys

for line in sys.stdin:
    line = line.strip()
    row = line.split(',')
    user_id = row[0]
    if user_id == "user_id":
        continue
    birth_year = row[1][0:4]
    gender = row[2]
    print("{0}\t{0}"format(birth_year,gender))

#### Reducer（sample_reducer.py）

In [2]:
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys

num_by_gender = {'0': 0, '1': 0, '2': 0}
last_key = False
for line in sys.stdin:
    line = line.strip()    
    row = line.split('\t')
    cur_key = row[0]
    gender = row[1]
    if last_key and cur_key != last_key:
        print("Y:{0},F:{1},M:{2}".
              format(last_key,num_by_gender['0'],num_by_gender['1']))
        last_key = cur_key
        num_by_gender = {'0': 0, '1': 0, '2': 0}
        num_by_gender[gender] += 1
    else:
        last_key = cur_key
        num_by_gender[gender] += 1
        
if last_key:
    print("Y:{0},F:{1},M:{2}".
          format(last_key,num_by_gender['0'],num_by_gender['1']))


### 本地测试

在提交MapReduce作业之前，可以使用Linux的管道操作来测试一下编写的Mapper和Reducer。使用Linux的管道，可以轻易地连接两个毫不相关的程序，把一个程序的结果交给另一个来处理，甚至，不停地交接处理。

In [15]:
!cat data/sample_data.csv | \
python input_files/sample_mapper.py | \
sort -t '\t' -k 1 | \
python input_files/sample_reducer.py

Y:2006,F:0,M:1
Y:2008,F:2,M:2
Y:2009,F:2,M:1
Y:2010,F:2,M:0
Y:2011,F:3,M:3
Y:2012,F:2,M:2
Y:2013,F:1,M:2
Y:2014,F:0,M:2


### 在集群上运行MapReduce程序

接着把sample_data.csv上传到hdfs上，并使用Hadoop Streaming命令来在集群上运行我们的程序。请注意一定要将-D，-files参数放在所有参数前面，因为genericOptions一定要放在streamingOptions前面，而-files -D都属于genericOptions。

In [16]:
# 操作代码
!docker cp ./data/sample_data.csv master:/root/

In [17]:
!docker exec master \
hadoop fs -put /root/sample_data.csv /input/

In [19]:
!docker cp ./input_files/sample_mapper.py master:/root/
!docker cp ./input_files/sample_reducer.py master:/root/

In [22]:
!docker exec master \
hadoop fs -rm -f -r /output/sample_data
!docker exec master \
hadoop jar /usr/hadoop-2.8.3/share/hadoop/tools/lib/hadoop-streaming-2.8.3.jar \
-D stream.num.map.output.key.fields=1 \
-D num.key.fields.for.partition=1 \
-D mapred.map.tasks=10 \
-D mapred.reduce.tasks=1 \
-D mapred.job.name="Sample Data" \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input /input/sample_data.csv \
-output /output/sample_data \
-mapper /root/sample_mapper.py \
-reducer /root/sample_reducer.py \
-file /root/sample_mapper.py \
-file /root/sample_reducer.py
!docker exec master \
hadoop fs -ls /output/sample_data/

19/08/14 12:59:58 INFO Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
Deleted /output/sample_data
19/08/14 13:00:00 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/root/sample_mapper.py, /root/sample_reducer.py] [] /tmp/streamjob147488410626420435.jar tmpDir=null
19/08/14 13:00:01 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
19/08/14 13:00:01 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
19/08/14 13:00:01 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
19/08/14 13:00:02 INFO mapred.FileInputFormat: Total input files to process : 1
19/08/14 13:00:02 INFO mapreduce.JobSubmitter: number of splits:1
19/08/14 13:00:02 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
19/08

In [23]:
!docker exec master \
hadoop fs -cat /output/sample_data/part-00000

Y:2006,F:0,M:1	
Y:2008,F:2,M:2	
Y:2009,F:2,M:1	
Y:2010,F:2,M:0	
Y:2011,F:3,M:3	
Y:2012,F:2,M:2	
Y:2013,F:1,M:2	
Y:2014,F:0,M:2	


在集群上运行的结果和预期一致，并与管道测试结果一致。当然也有可能出现不一致的情况，如果是这样我们该关注的点应该放到分布式架构上面而不是代码的计算逻辑上，因为在集群上运行和本机测试主要的不同是：在集群上是多个Mapper和Reducer共同运行，而在本机你可以理解为一个Mapper和Reducer，这种不同可能会导致不同的结果。

## 7. 注意事项

### Hadoop Streaming与python环境

hadoop是基于集群的，因此MR任务是运行于集群中的各个节点上的，正如使用集群时需要为集群中的节点安装Java环境一样，如果你想用Python来实现MapReduce，当然也需要为各个节点配置好python环境。

那么问题就来了，我应该为节点配置什么样的环境，如何让hadoop命令或者python脚本与该环境一致。这里将涉及三个方面:

#### 需要保证python安装正确并且能使用python命令行的方式启动python
  
-files参数表示将mapper.py和reducer.py发送到集群中的各个节点去运行，-mapper "python mapper.py"表示发送到这些节点的.py文件是使用"python xxx.py"的命令行方式启动成进程的。因此必须保证这些节点拥有这样的能力。检验的方法是在节点的Terminal下面输入python -V，如果能正确显示python版本则表示安装正确，否则可能需要花时间把python重新安装好。

#### 需要保证python命令启动的是节点中的哪个版本

如果一定要使用python3作为MR，那么要在/usr/bin下面建立python到python3安装目录下bin/python3的软连接。

#### 需要保证代码语法适应所有节点安装的python版本

如果集群上安装的是python2的环境，而Mapper和Reducer使用的是python3的语法编写的，那一定跑不起来，会报一堆难以排查的错误

### 打包python环境到集群

有一个办法可以解决为各个节点配置环境的问题，那就是将环境打包到集群，并在执行命令时分发到各个节点供其使用。为此，首先将python安装目录压缩，然后上传到hdfs，之后在hadoop streaming命令中通过设置`-archives "hdfs://master/env/python35.tar.gz#py`参数即可把这个hdfs上的Python环境分发到集群上各个节点上去运行。其中的#py表示将hdfs上的这个文件分发到集群中各个节点之后再解压到名为py的文件夹中，因此需要在-mapper中使用解压后文件夹中的python程序来启动脚本。

## 8. 附录

### Hadoop Streaming的官方Jar包注释

In [10]:
JAR_PACKAGE = '/usr/hadoop-2.8.3/share/\
hadoop/tools/lib/hadoop-streaming-2.8.3.jar'
!docker exec master hadoop jar $JAR_PACKAGE -info > hadoopStreaming_doc.txt

In [11]:
!cat hadoopStreaming_doc.txt

Usage: $HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar [options]
Options:
  -input          <path> DFS input file(s) for the Map step.
  -output         <path> DFS output directory for the Reduce step.
  -mapper         <cmd|JavaClassName> Optional. Command to be run as mapper.
  -combiner       <cmd|JavaClassName> Optional. Command to be run as combiner.
  -reducer        <cmd|JavaClassName> Optional. Command to be run as reducer.
  -file           <file> Optional. File/dir to be shipped in the Job jar file.
                  Deprecated. Use generic option "-files" instead.
  -inputformat    <TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName>
                  Optional. The input format class.
  -outputformat   <TextOutputFormat(default)|JavaClassName>
                  Optional. The output format class.
  -partitioner    <JavaClassName>  Optional. The partitioner class.
  -numReduceTasks <num> Optional. Number of reduce tasks.
  -inputreader    <spec> Optional. 