# Hadoop

## hadoop 概念

![The Hadoop Ecosystem](./images/the-hadoop-ecosystem.png)

- hadoop 起源于 Doug Cutting，是行业大数据标准开源软件

用处
- 海量数据存储（HDFS）
- 海量数据分析（MapReduce）(YARN)

版本
- Apache ／ Cloudera(CDH) / HDP(Hortonworks)

核心
- HDFS：Hadoop Distributed File System
- MapReduce：并行计算框架
- YARN：Yet Another Resource Negotiator(资源管理调度系统)
    - 可运行 Storm／Spark 等

### HDFS 架构

主从结构
- 主节点：只有一个，namenode
- 从节点：有多个，datanodes

namenode
- 接收用户操作请求
- 维护文件系统的目录结构
- 管理文件与 block 之间的关系，block 与 datanode 之间的关系

datanode
- 存储文件
- 文件被分成 block 存储在磁盘上
- 为保证数据的安全，文件会有多个副本

### Linux 环境

主机名 hostname 修改
``` bash
vim /etc/sysconfig/network
```
host 映射修改
``` bash
vim /etc/hosts
```

ip 修改
``` bash
vim /etc/sysconfig/network-scripts/ifcfg-eth0
```
文件末尾添加：
``` txt
BOOTPROTO = "static"
IPADDR = "192.168.177.1"
NETMASK = "255.255.255.0"
GATEWAY = "192.168.177.1"
DNS1 = "8.8.8.8"
```

关闭防火墙
``` bash
service iptables stop
```

关闭防火墙开机启动
``` bash
chkconfig iptables off
```

三种虚拟网络模式
- bridge: 局域网的一台独立主机，手动 TCP/IP
- nat: 通过宿主机器所在网络访问公网 VMnet8(NAT), 不用任何手动配置
- host-only: 与真实网络隔离 VMnet1， 虚拟网络的 DHCP 服务器动态分配

### 环境变量

``` bash
vim /etc/profile
```
或
``` bash
export JAVA_HOME = /usr/java/jdk1.8.0_181
export PATH = $PATH:$JAVA_HOME/bin
```
刷新配置
``` bash
source /etc/profile
```

### hadoop 配置

- 版本：hadoop-2.9.1
- 路径：/hadoop
- 配置文件（/hadoop/hadoop-2.9.1/etc/hadoop）：
    1. 依赖的jdk:(hadoop-env.sh)
    ``` bash
    export JAVA_HOME=/usr/java/jdk1.8.0_181
    ```
    
    2. core-site.xml:
        ``` xml
        <!--单机-->
        <!--HDFS的NameNode地址-->
        <property>
                <name>fs.defaultFS</name>
                <value>hdfs://centos-base:9000</value>
        </property>
        <!--Runtime files-->
        <property>
                <name>hadoop.tmp.dir</name>
                <value>/hadoop/hadoop-2.9.1/tmp</value>
        </property>
        ```
        
        ``` xml
        <!--集群-->
        <!--HDFS NameNode-->
        <property>
                <name>fs.defaultFS</name>
                <value>hdfs://ns1</value>
        </property>
        <!--runtime files-->
        <property>
                <name>hadoop.tmp.dir</name>
                <value>/hadoop/hadoop-2.9.1/tmp</value>
        </property>
        <property>
                <name>ha.zookeeper.quorum</name>
                <value>node04:2181,node05:2181,node06:2181</value>
        </property>
        ```
        
    3. hdfs.site.xml
        ``` xml
        <!--单机-->
        <!--hdfs data 保存副本数量-->
        <property>
                <name>dfs.replication</name>
                <value>1</value>
        </property>
        ```
        
        ``` xml
        <!--集群-->        
        <!--这里抽象出两个NameService实际上就是给这个HDFS集群起了个别名-->
        <property>
            <name>dfs.nameservices</name>
            <value>ns1</value>
        </property>

        <!--指定NameService是cluster1时的namenode有哪些,这里的值也是逻辑名称，名字随便起，相互不重复即可-->
        <property>
            <name>dfs.ha.namenodes.ns1</name>
            <value>nn1,nn2</value>
        </property>

        <!--指定nn1,nn2的RPC地址 8020 9000-->
            <property>
                <name>dfs.namenode.rpc-address.ns1.nn1</name>
                <value>node01:9000</value>
            </property>
            <property>
                <name>dfs.namenode.rpc-address.ns1.nn2</name>
                <value>node02:9000</value>
            </property>

        <!--指定nn1、nn2的http地址-->
            <property>
                <name>dfs.namenode.http-address.ns1.nn1</name>
                <value>node01:50070</value>
            </property>
            <property>
                <name>dfs.namenode.http-address.ns1.nn2</name>
                <value>node02:50070</value>
            </property>

        <!--指定cluster1的两个NameNode共享edits文件目录时，使用的JournalNode集群信息-->
            <property>
                <name>dfs.namenode.shared.edits.dir</name>
                <value>qjournal://node04:8485;node05:8485;node06:8485/ns1</value>
            </property>

        <!--指定JournalNode集群在对NameNode的目录进行共享时，自己存储数据的磁盘路径-->
            <property>
              <name>dfs.journalnode.edits.dir</name>
              <value>/hadoop/hadoop-2.9.1/data/jn</value>
            </property>

        <!--指定是否启动自动故障恢复，即当NameNode出故障时，是否自动切换到另一台NameNode-->
            <property>
               <name>dfs.ha.automatic-failover.enabled</name>
               <value>true</value>
             </property>

        <!--指定cluster1出故障时，哪个实现类负责执行故障切换-->
            <property>
              <name>dfs.client.failover.proxy.provider.ns1</name>
              <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
            </property>

        <!--一旦需要NameNode切换，使用ssh方式进行操作-->
            <property>
              <name>dfs.ha.fencing.methods</name>
              <value>
                sshfence
                shell(/bin/true)
              </value>
            </property>

        <!--如果使用ssh进行故障切换，使用ssh通信时用的密钥存储的位置-->
            <property>
              <name>dfs.ha.fencing.ssh.private-key-files</name>
              <value>/home/beifeng/.ssh/id_rsa</value>
            </property>

        <!--sshfence 超时时间-->
            <property>
                <name>dfs.ha.fencing.ssh.connect-timeout</name>
                <value>30000</value>
            </property>

        <!--指定DataNode存储block的副本数量-->
            <property>
                <name>dfs.replication</name>
                <value>3</value>
            </property>
        <!--解释：hadoop 守护进程一般同时运行RPC 和HTTP两个服务器，RPC服务器支持守护进程间的通信，HTTP服务器则提供与用户交互的Web页面。-->
        ```
        
    4. mapred-site.xml
        ``` xml
        <!--mapreduce运行在yarn上-->
        <property>
                <name>mapreduce.framework.name</name>
                <value>yarn</value>
        </property>
        ```
        
    5. yarn-site.xml
        ``` xml
        <!--制定数据获取方式是shuffle-->
        <property>
                <name>yarn.nodemanager.aux-services</name>
                <value>mapreduce_shuffle</value>
        </property>
        <!--指定yarn的resourcemanager地址-->
        <property>
                <name>yarn.resourcemanager.hostname</name>
                <value>centos-base</value>
        </property>
        ```

### 环境变量

``` bash
export HADOOP_HOME=/hadoop/hadoop-2.9.1
export PATH=$PATH:$HADOOP_HOME/bin
source /etc/profile
```

### hadoop 文件系统初始化

``` bash
hdfs namenode -format
```

### 启动hadoop

初始化 HDFS
``` 
bin/hadoop namenode -format
```
启动 HDFS
```
sbin/start-dfs.sh
```
启动 YARN
```
sbin/start-yarn.sh
```
直接一步启动，切换到 sbin 目录(不推荐)
``` bash
./start-all.sh
```
查看进程(java)快照
``` bash
jps
```

### 配置 ssh 免密码登陆

（hadoop 启停免登录）
``` bash
cd ~/.ssh/
```

- 生成密钥

``` bash
ssh-keygen -t rsa
```

- 拷贝公钥到已认证文件

``` bash
cp id_rsa.pub authorized_keys
```

- 或者

``` bash
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
```

- 拷贝公钥到其他机器

``` bash
ssh-copyid address/hostname
```

### 测试 HDFS

#### 上传文件
``` bash
hadoop fs -put file address
```

#### 下载文件
``` bash
hadoop fs -get address file
```

### HDFS 的 shell 查看
``` bash
hadoop fs
```
或：
``` bash
hdfs dfs
```

查看帮助：
``` bash 
hadoop fs -help <cmd>
``` 

### 测试 MapReduce/YARN

``` bash
hadoop jar hadoop-mapreduce-exp.jar wordcount hdfs://hostname:/words hdfs://hostname:/out
```

## Hadoop Clusters

### 初始化

1. 启动 ZK（4，5，6）
``` bash
./zkServer.sh start
```

2. 启动 journalnode（4，5，6）
``` bash
c /path/hadoop-2.2.0
sbin/hadoop-daemon.sh start journalnode
```

3. 格式化 HDFS（在h1 上格式化，再拷贝tmp 到其他主机h2）
``` bash
hdfs namenode -format
```

4. 格式化 ZK（只需要在h1 上格式化）
``` bash
hdfs zkfc -formatZK
```

5. 启动／停止 HDFS（在h1上）
``` bash
sbin/start-dfs.sh
sbin/stop-dfs.sh
```

6. 启动／停止 YARN（在h3）
``` bash
sbin/start-yarn.sh
sbin/stop-yarn.sh
```

7. 官方文档
``` txt
hadoop-ver\share\doc\index
```

### 集群测试
``` java
public static void main(String[] args) throw Exception{
    Configuration conf = new Configuration();
    conf.set("dfs.nameservices","ns1");
    conf.set("dfs.ha.namenode.ns1","nn1,nn2");
    conf.set("dfs.namenode.rpc-address.ns1.nn1","h1:9000");
    conf.set("dfs.namenode.rpc-address.ns1.nn2","h2:9000");
    conf.set("dfs.client.failover.proxy.provider.ns1","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvicer");
    FileSystem fs = FileSystem.get(new URI("hdfs:ns1"), conf,"root");
    // 上传
    InputStream in = new FileInputStream("test");
    OutputStream out = fs.create(new Path("/test"));
    // 下载
    // InputStream in = fs.open(new Path("/test"));
    // OutputStream out = new FileOutputStream("D://test");
    IOUtils.copyBytes(in, out, 4096, true);
}
```

## HDFS

### 分布式文件系统
- GFS
- HDFS
- Lustre
- Ceph
- mogileFS
- GridFS
- TFS

### HDFS 原理
![hdfs theory](./images/hdfs-theory.png)

### HDFS Architecture
![hdfs architecture](./images/hdfs-architecture.png)

### 元数据存储
```
NameNode Metadata
NameNode(FileName, Replication, block-ids, idzhost...)
/test/a.log,3,{blk_1,blk_2},[{blk_1:[h0,h1,h3]},{blk_2:[h0,h2,h4]}]
```
![hdfs-datanodes-data](./images/hdfs-datanodes-data.png)

### NameNode 介绍
整个文件系统的管理节点
- 文件目录树管理（元数据与数据块列表管理）。
- 接收用户操作请求，执行命令。

成员：
- fsimage：元数据镜像文件（hdfs.site.xml中的 dfs.name.dir 属性），NameNode 内存元数据信息。
- edits：操作日志文件。
- fstime：保存最近一次checkpoint还原点的时间。

以上文件是保存在Linux 文件系统的（../hadoop-2.20/tmp/dfs/name/current/）

### NameNode 特点
1. Metadata 始终保存在内存，用于处理“读请求”。
2. “写请求”时，先向edits 写日志，成功后改内存元数据，最后向客户端返回，并写入 DataNode。
3. fsimage 由 edits 与旧的 fsimage 同步合并生成。

### 数据存储服务
1. 文件块 block，HDFS 默认 128 MB。
2. 不同与普通文件系统， HDFS 中文件小于 128 MB时，并不占用整块空间
3. Replication 多副本，默认三个（hdfs-site.xml的dfs.replication 属性）

### Secondary NameNode
伪分布式独有，用于生成 fsimage

- checkpoint 的时机
    1. fs.checkpoint.period 间隔默认 3600s
    2. fs.checkpoint.size edits 最大值默认 64M
![secondary-namenode](./images/secondary-namenode.png)

### HDFS 的 Java 接口
通过 RPC 代理实现通行

#### Eclipse
1. 新建 Java Project
2. lib 文件夹引入 hadoop.jar
3. Build Path-> Add to buildPath
4. 新建 class

#### 演示
``` java
public class HDFSDemo{
    FileSystem fs = null;
    
    @Before
    public void init() throw IOException,URIException{
        fs = FileSystem(new URI("hdfs://hostname:9000"), new Configuration(), "root");
    }
    
    @Test //上传
    public void testUpload() throw Exception{
        InputStream in = new FileInputSystem("c://test.txt");
        OutputStream out = fs.Create(new Path("/text"));
        IOUtils.copyBytes(in, out, 4096, true);
    }
    
    @Test //下载
    public void testDownload() throw Exception{
        fs.copyToLocalFile(new Path(""), new Path(""));
    }
    
    @Test //删除
    public void testdel() throw Exception{
        boolean flag = fs.delete(new Path("/test", false);
    }
    
    @Test //创建
    public void testMKDir() throw Exception{
        boolean flag = fs.mkdirs(new Path(""));
    }
    
    // 下载
    public static void main(String[] args) throws ... {
        InputStream in = fs.open(new Path("/test"));
        OutputStream out = new FileOutputStream("c://test.txt");
        IOUtils.copyBytes(in, out, 4096, true);
    }    
}
```

### RPC(Remote Procedure Call)
远程过程调用协议，底层使用 socket

#### 定义接口
``` java
public interface Bizable{
    public static final long versionId = 10000;
    public String sayHi(String name);
}
```

#### 实现
``` java
public class RPCServer implements Bizable{
    public String sayHi(String name){
        return "hi ~" + name;
    }
    
    public static void main(String[] args) throw Exception{
        Configuration conf = new Configuration();
        Server server = new RPC.Builder(conf).SetProtocal(Bizable.class).SetInstance(new RPCServer()).SetBindAddress("ip").SetPort(port).build();
        server.start();
    }
}

public class RPCClient{
    public static void main(String[] args) throw Exception{
        Bizable proxy = RPC.getProxy(Bizable.class, 10000, new InetSocketAddress("ip", port), new Configuration());
        String result = proxy.sayHi(" tomcat ");
        System.out.printIn(result);
        RPC.stopProxy(proxy);
    }
}
```

## MapReduce 
``` 
    分布式计算模型，解决海量数据计算问题，Map() 和 Reduce() 实现分布式计算，两个函数的形参都是key，value 对。
```

### MapReduce 框架
![mapreduce-architecture](./images/mapreduce-architecture.png)

### MapReduce 原理
![mapreduce-theory](./images/mapreduce-theory.png)
Mappers must complete before Reducers can begin

### MapReduce 执行过程
#### map
1. 读取文件，每行解析成一堆key value， 每对调用一次 map 函数
2. map 函数生成新的 keyvalue 对
3. 对新的 keyvalue 分区
4. 对不同分区的数据按 key 排序分组
5. 分组数据归约

#### reduce
1. 对多个 map 的输出按分区 copy 到不同的reduce 节点
2. 对多个 map 的输出进行行合并排序，通过reduce 函数生成新 keyvalue对
3. 将 reduce 的输出保存到文件中

### example 1: WordCount

#### 执行流程
|HDFS | mapper input | mapper output | recude output | 
|-|-|-|-|
| hello tom | <0, "hello tom"> | <hello, {1,1,1,1}> | <hello, 4> |
| hello jerry | <10, "hello jerry"> | <jerry, {1}> | <jerry, 1> |
| hello kitty | <22, "hello kitty"> | <kitty,{1}> | <kitty, 1> |
| hello world | <34, "hello world"> | <tom, {1}> | <tom, 1> |
| hello tom | <46, "hello tom"> | <world, {1}> | <world, 1> |

***
``` java
map(){
    String vlaue = v1;
    String[] words = value.split(" ");
    for(String w:words){
        context.write(w, 1);
    }
}
```
``` java
reduce(){
    String key = k2;
    int counter =0;
    for(int i: v2s){
        counter +=1;
    }
    context.write(key, counter);
}
```

#### 代码实现
``` java
public class WCMapper extends Mapper<LongWritable, Text, Test, LongWritable>{
    @override
    protecte void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
        String line = value.toString();
        String[] words = line.split(" ");
        for(String w: words){
            context.write(new Text(w), new LongWritable(1));
        }
    }
}

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
    @override
    protected void reduce(Text key, Interable<LongWritable> v2s, Context context) throws IOException, InterruptedException{
        long counter = 0;
        for(LongWritable i : v2s){
            counter += i.get();
        }
            context.write(key, new LongWritable(counter));
    }
}
```

#### 封装 WordCount 的 MapReduce
``` java
public class WordCount{
    public static void main(String[] args) throws Exception{
        Job job =Job.getInstance(new Configuration());
        job.setJarByClass(WordCount.class);
        job.setMapperClass(WCMapper.class);
        // 当 Map 与 Reduce 的输出类型一致时
        // 可只 setOutputValue／Key，不设 setMapOutputKey／Value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        FileInputFormat.setInputPaths(job, new Path("..."));
        job.setReducerClass(WCReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        FileOutputFormat.setOutputPath(job, new Path("..."));
        // 提交任务
        job.waitForCompletion(true);
    }
}
```

#### 执行命令
``` bash
hadoop jar /root/exam.jar groupid.class /data/dout
```

### example 2: DataCount
``` java
public class DataCount{
    public static void main(String[] args) throws Exception{
        // 同1.3.6
    }
    
    public static class DCMapper extents Mapper<LongWritable, Text, Text, DataBean>{
        @override
        protected void map(LongWritable key, Text value, Context context) throws Exception{
            String[] fields = value.toString().split("\t");
            DataBean bean = new DataBean(fields[1], fields[8], fields[9]);
            context.write(new Text(fields[1], bean);
        }
    }
                          
    public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean>{
        @override
        protected void reduce(Text key, Iterable<DataBean> vs, Context context) throws Exception{
            long up_sum = 0, down_sum = 0;
            for( DataBean bean:vs){
                up_sum += bean.getUpPayLoad();
            }
            down_sum += bean.getDownPayLoad();
            Context.write(key, new DataBean(key, up_sum, down_sum, up_sum+down_sum));
        }
    }
}
                          
public class DataBean impliment Writable{
    private String telNum;
    private long upload;
    ...
        
    @override
    public wirte(out){
    }
    
    @override
    public read(in){
    }
    
    @override
    public String toString(){
        return "";
    }
}
```

### 远程调试
``` bash
# hadoop 远程 debug 配置
# 在 ../hadoop-2.9.1/etc/hadoop/hadoop-env.sh 追加
# 远程调试 Namenode
export HADOOP_NAMENODE_OPTS="-agentlib:jdwp=transport=dt_socket,address=...,server=y,suspend=y"
# 远程调试 Datanode
export HADOOP_DATANODE_OPTS="-agentlib:jdwp=transport=dt_socket,address=...,server=y,suspend=y"
# 远程调试 ResourceManager
export HADOOP_RESOURCEMANAGER_OPTS="-agentlib:jdwp=transport=dt_socket,address=...,server=y,suspend=y"
# 远程调试 NodeManager
export HADOOP_NODEMANAGER_OPTS="-agentlib:jdwp=transport=dt_socket,address=...,server=y,suspend=y"
```
```
"-agentlib: jdwp=transportdt_sorket,address=8888, server=y, suspend=y"
参数说明：
启动 JDWP 实现， dt_socket 套接字
JVM 在 8888 端口监听请求
server=y 表示 JVM 是被调试者，n 表示调试器
suspend=y 表示会被挂起
```

### Partitioner
#### example
``` java
public static class ProviderPartitioner extends Partitioner<Text, DataBean>{
    @override
    public int getPartition(Text key, DataBean bean, int partitions){
        String sub_acc = key.toString.subString(0, 3);
        Integer code = providerMap.get(sub_acc);
        if(code==null){
            code=0;
        }
        return code;
    }
}
```

``` java
//main
job.setPartitionerClass(ProviderPartitioner.Class);
// 决定 Reducer 数和 Partitioner 数，也就是结果数
// Reducer 数必须大于等于 Partitioner 数
job.setNumReduceTask(args[2]);
job.waitForcompletion(true);
```

#### sort
```
通过 tocompare 重载，将要排序的对象作为 Reducer 的 key, 可以实现排序
```

### Combiner

```
可插拔类的 Combiner
只是可以提效，不改变结果
    在 map 端对输出先做一次合并，以减少传输到Reducer 的数据量，相对提升效率；Combiner 只应用于 Reduce 的输入和输出类型完全一致的场景，比如累加，最大值等
```
``` java
// 直接是用 Reducer 的实现
job.setCombinerClass(WCReducer.Class);
```
|HDFS | mapper | combiner |
|-|-|-|
| hello tom | <hello, {1,1,1}> | <hello, 3> |
| hello tom | <tom, {1,1}> | <tom, 2> |
| hello kitty | <kitty,{1}> | <kitty, 1> |
|
| hello tom | <hello, {1,1}> | <hello,2> |
| hello tom | <tom, {1}> | <tom, 2> |

### Shuffle

分区、排序、Combiner、分组

### example 3: 倒排索引
```
    倒排索引，也常被称为反向索引，置入档案或反向档案，是一种索引方式，被用来存储在全文搜索下某个单词或者一组文档中的存储位置的映射。
    它是文档系统中最常用的数据结构。通过倒排索引，可以根据单词快速获取包含这个单词的文档列表。
    倒排索引主要有两部分组成：“单词词典”和“倒排文件”
```

伪代码
``` java
// mapper
context.write("hello, a.txt", 1);
context.write("hello, a.txt", 1);
context.write("hello, b.txt", 1);
```
***
``` java
// combiner
context.write("hello, a.txt", 2);
context.write("hello, b.txt", 1);
```
***
``` java
// reducer 1
context.write("hello", "a.txt 2");
context.write("hello", "b.txt 1");
```
***
```
空 mapper
```
***
``` java
// reducer 2
context.write("hello", {"a.txt, 2","b.txt, 1"});
```

### mapper 数量因素
```
split 大小 = Math.max(minSize, Math.min(maxSize, blockSize));
mapper 数 = split结果 block 块数
```