# RDD编程
## RDD编程基础

 ### RDD创建
 1.从文件系统中加载数据创建RDD 
 - Spark采用textFile()方法来从文件系统中加载数据创建RDD
 - 该方法把文件的URI作为参数，这个URI可以是
     - 本地文件系统的地址
     - 或者是分布式文件系统HDFS的地址
     - 或者是Amazon S3的地址等等
     
(1)从本地文件系统中加载数据创建RDD

In [1]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster('local')
sc = SparkContext(conf=conf)

In [2]:
lines = sc.textFile('file:///opt/code/SparkProgramming-PySpark/word.txt')
lines.foreach(print)

In [16]:
!lines = sc.textFile('file:///opt/code/SparkProgramming-PySpark/word.txt')

/bin/bash: -c: line 0: syntax error near unexpected token `('
/bin/bash: -c: line 0: `lines = sc.textFile('file:///opt/code/SparkProgramming-PySpark/word.txt')'


此代码的输出可以在Jupyter-notebook终端看到

(2)从分布式文件系统HDFS中加载数据

```
lines = sc.textFile('hdfs://localhost:9000/user/spark/stocks/part-m-00000')
lines = sc.textFile('/user/spark/stocks/part-m-00000')
lines = sc.textFile('stocks/part-m-00000')
```

2.通过并行集合（列表）创建RDD  
可以调用SparkContext的parallelize方法，在Driver中一个已经存在的集合（列表）上创建

In [4]:
array = [1, 2, 3, 4, 5]
rdd = sc.parallelize(array)
rdd.collect()

[1, 2, 3, 4, 5]

### RDD操作
1.转换操作
- 对于RDD而言，每一次转换操作都会产生不同的RDD，供给下一个“转换”使用
- 转换得到的RDD是惰性求值，整个转换过程只是记录了转换的轨迹，并不会发生真正的计算，只有遇到行动操作时，才会发生真正的计算

| 操作              | 含义                                                         |
      | ----------------- | ------------------------------------------------------------ |
      | filter(func)      | 筛选出满足函数func的元素，并返回一个新的数据集               |
      | map(func)         | 将每个元素传递到函数func中，并将结果返回为一个新的数据集     |
      | flatMap(func)     | 与map相似(),但每个输入元素都可以映射到0或多个输出结果        |
      | groupByKey()      | 应用于(K, V)键值对的数据集时，返回一个新的(K, Iterable)形式的数据集 |
      | reduceByKey(func) | 应用于(K, V)键值对的数据集时，返回一个新的(K, V)形式的数据集，其中每个值都是将每个key传递到函数func中进行聚合后的结果 |

In [11]:
#filter(func)
lines = sc.textFile('file:///opt/code/SparkProgramming-PySpark/word.txt')
linesWithSpark = lines.filter(lambda line: "Spark" in line)
for i in linesWithSpark.collect():
    print(i)

Spark is fast
Spark is better


In [12]:
#map(func)
data = [1, 2, 3, 4, 5]
rdd1 = sc.parallelize(data)
rdd2 = rdd1.map(lambda x: x+10)
for r in rdd.collect():
    print(r)

1
2
3
4
5


In [9]:
#map(func) 另一个实例
lines = sc.textFile('file:///opt/code/SparkProgramming-PySpark/word.txt')
words = lines.map(lambda line: line.split(" "))
for word in words.collect():
    print(word)

['Hadoop', 'is', 'good']
['Spark', 'is', 'fast']
['Spark', 'is', 'better']


In [13]:
#flatMap(func)
words = lines.flatMap(lambda line: line.split(" "))

In [14]:
#groupByKey()
words = sc.parallelize([("Hadoop", 1), ("is", 1), ("good", 1), ("Spark", 1), ("is", 1), ("fast", 1), ("Spark", 1),
                       ("is", 1), ("better", 1)])
words1 = words.groupByKey()
for w in words.collect():
    print(w)

('Hadoop', 1)
('is', 1)
('good', 1)
('Spark', 1)
('is', 1)
('fast', 1)
('Spark', 1)
('is', 1)
('better', 1)


In [15]:
#reduceByKey(func)
words2 = words.reduceByKey(lambda a, b: a + b)
for w in words2.collect():
    print(w)

('Hadoop', 1)
('is', 3)
('good', 1)
('Spark', 2)
('fast', 1)
('better', 1)


2.行动操作
行动操作是真正触发计算的地方

|操作          | 含义                                                     |
      | ------------- | -------------------------------------------------------- |
      | count()       | 返回数据集中的元素个数                                   |
      | collect()     | 以数组的形式返回数据集中的所有元素                       |
      | first()       | 返回数据集中的第一个元素                                 |
      | take(n)       | 以数组的形式返回数据集中的前n个元素                      |
      | reduce(func)  | 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素 |
      | foreach(func) | 将数据集中的每个元素传递到函数func中运行                 |



In [17]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.count())
print(rdd.first())
print(rdd.take(3))
print(rdd.collect())

5
1
[1, 2, 3]
[1, 2, 3, 4, 5]


3.惰性机制  
惰性机制是指，整个转换过程只是记录了转换的轨迹，并不会发生真正的计算，只有遇到行动操作时，才会触发真正的计算

### 持久化  
在Spark中，RDD采用惰性求值的机制，每次遇到行动操作，都会从头开始执行计算。

In [18]:
list = ["Hadoop", "Spark", "Hive"]
rdd = sc.parallelize(list)
print(rdd.count())
print(','.join(rdd.collect()))

3
Hadoop,Spark,Hive


- 可以通过持久化（缓存）机制避免这种重复计算的开销
- 可以使用persist()方法对一个RDD标记为持久化
- 之所以说“标记为持久化”，是因为出现persist()语句的地方，并不会马上计算生成RDD并把它持久化，而是要等到遇到第一个行动操作触发真正计算以后，才会把计算结果进行持久化
- 持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用

实际上，可以通过持久化（缓存）机制来避免这种重复计算的开销。具体方法是使用persist()方法对一个RDD标记为持久化，持久化后的RDD会被保留到计算节点的内存中，被后面的行动操作重复使用。

- persist(MEMORY_ONLY):表示将RDD作为反序列化的对象存储在JVM中，如果内存不足，就要按照LRU原则替换缓存中的内容
- persist(MEMORY_AND_DISK):表示将RDD作为反序列化的对象存储在JVM中，如果内存不足，超出的分区将会被存放在磁盘中
- 可以使用unpersist()方法手动地把持久化的RDD从缓存中移除

In [19]:
list = ["Hadoop", "Spark", "Hive"]
rdd = sc.parallelize(list)
rdd.cache() #这时候会调用persist(MEMORY_ONLY)，但是，语句到这里并不会缓存RDD
print(rdd.count()) #第一次行动操作，触发一次真正从头到尾的计算，把上面的rdd放入缓存
print(','.join(rdd.collect())) #第二次行动操作，重复使用上面的rdd

3
Hadoop,Spark,Hive


### 分区

- 分区的作用

  RDD是弹性分布式数据集，通常RDD很大，会被分成多个分区，分别保存在不同的节点上。

  对RDD进行分区，第一个功能是增加并行度，第二个功能是减少通信开销

- 分区的原则

  RDD分区的一个原则是使分区的个数尽量等于集群中的CPU核心(Core)数目。对于不同的Spark部署模式而言(Local模式、Standalone模式、YARN模式、Mesos模式)，都可以通过设置spark.default.parallelism这个参数的值，来配置默认的分区数目。

  - Local模式：默认为本地机器的CPU数目，若设置了local[N]，则默认为N
  - Standalone或YARN模式：在“集群中所有CPU核心数目总和”和“2”这二者中取较大值作为默认值
  - Mesos模式：默认的分区数为8

- 设置分区的个数
    - 创建RDD时手动指定分区个数


In [20]:
list = [1, 2, 3, 4, 5]
rdd = sc.parallelize(list, 2)

- 使用repartition方法重新设置分区个数

In [23]:
data = sc.parallelize(list, 2)
print(len(data.glom().collect()))
rdd = data.repartition(1)
print(len(rdd.glom().collect()))

2
1


- 自定义分区方法

Spark提供了自带的哈希分区(HashPartitioner)与区域分区(RangePartitioner)，能够满足大多数应用场景的需求。与此同时，Spark也支持自定义方式，即通过提供一个自定义的Partitioner对象来控制RDD的分区方式，从而利用领域知识进一步减小通信开销。

实现自定义分区的方法：

- numPartitions: Int 	返回创建出来的分区数
- getPartition(Key: Any): Int  返回给定键的分区编号（0 到 numPartitioners-1）
- equals():  Java判断相等性的标准方法

In [26]:
#这是一个实例
from pyspark import SparkConf, SparkContext

def MyPartitioner(key):
    print("My Partitioner is running")
    print('The key is %d' % key)
    return key % 10

def main():
    print("The main func is running")
#     conf = SparkConf().setMaster('local').setAppName('App')
#     sc = SparkContext(conf=conf)
    data = sc.parallelize(range(10), 5)
    data.map(lambda x: (x, 1))\
    .partitionBy(10, MyPartitioner)\
    .map(lambda x: x[0])\
    .saveAsTextFile('file:///opt/code/SparkProgramming-PySpark/partitioner')
    
main()

The main func is running


MyPartitioner的调用输出在终端显示

## 键值对RDD
### 键值对RDD的创建

1.从文件中加载

In [28]:
lines = sc.textFile('file:///opt/code/SparkProgramming-PySpark/word.txt')
pairRDD = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1))
for p in pairRDD.collect():
    print(p)

('Hadoop', 1)
('is', 1)
('good', 1)
('Spark', 1)
('is', 1)
('fast', 1)
('Spark', 1)
('is', 1)
('better', 1)


2.通过并行集合（列表）创建RDD

In [30]:
list = ["Hadoop", "Hive", "Spark", "Spark"]
rdd = sc.parallelize(list)
pairRDD = rdd.map(lambda word: (word, 1))
for p in pairRDD.collect():
    print(p)

('Hadoop', 1)
('Hive', 1)
('Spark', 1)
('Spark', 1)


### 常用的键值对转换操作

常用的键值对转换操作包括 reduceByKey(func)、groupByKey()、keys、values、sortByKey()、mapValues(func)、join和combineByKey等

- reduceByKey(func)

reduceByKey(func)的功能是，使用func函数合并具有相同键的值。

In [31]:
pairRDD = sc.parallelize([("Hadoop", 1), ("Spark", 1), ("Hive", 1), ("Spark", 1)])
pairRDD.reduceByKey(lambda a,b: a+b).collect()

[('Hadoop', 1), ('Spark', 2), ('Hive', 1)]

- groupByKey()
groupByKey()的功能是，对具有相同键的值进行分组

In [34]:
list = [("spark", 1), ('spark', 2), ('hadoop', 3), ('hadoop', 5)]
pairRDD = sc.parallelize(list)
pairRDD.groupByKey()
pairRDD.groupByKey().collect()

[('spark', <pyspark.resultiterable.ResultIterable at 0x7fe9dc3a1940>),
 ('hadoop', <pyspark.resultiterable.ResultIterable at 0x7fe9dc3a1be0>)]

**groupByKey和reduceByKey的区别**  
- reduceByKey用于对每个key对应的多个value进行聚合操作，并且聚合操作可以通过函数func进行自定义
- groupByKey也是对每个key进行操作，但是，对每个key只会生成一个value-list，groupByKey本身不能自定义函数，需要先用groupByKey生成RDD，然后才能对此RDD通过map进行自定义函数操作

In [36]:
words = ['one', 'two', 'three', 'three', 'three']
wordPairsRDD = sc.parallelize(words).map(lambda word: (word, 1))
wordCountsWithReduce = wordPairsRDD.reduceByKey(lambda a,b: a+b)
wordCountsWithReduce.collect()

[('one', 1), ('two', 1), ('three', 3)]

In [37]:
wordCountsWithGroup = wordPairsRDD.groupByKey().map(lambda t: (t[0], sum(t[1])))
wordCountsWithGroup.collect()

[('one', 1), ('two', 1), ('three', 3)]

- keys
keys只会把PairRDD中key返回形成一个新的RDD

In [38]:
list = [('hadoop', 1), ('spark', 1), ('hive', 1), ('spark', 1)]
pairRDD = sc.parallelize(list)
pairRDD.keys().collect()

['hadoop', 'spark', 'hive', 'spark']

- values
values只会把PairRDD中的value返回形成一个新的RDD

In [39]:
pairRDD.values().collect()

[1, 1, 1, 1]

- sortByKey()
sortByKey()的功能返回一个根据键排序的RDD

In [41]:
pairRDD.sortByKey().collect()

[('hadoop', 1), ('hive', 1), ('spark', 1), ('spark', 1)]

- sortByKey()和sortBy()

In [42]:
d1 = sc.parallelize([('c', 8), ('b', 25), ('c', 17), ('a', 42), ('b', 42), ('d', 17)])
d1.reduceByKey(lambda a,b: a+b).sortByKey(False).collect()

[('d', 17), ('c', 25), ('b', 67), ('a', 42)]

In [44]:
print(d1.reduceByKey(lambda a,b: a+b).sortBy(lambda x: x, False).collect())
print(d1.reduceByKey(lambda a,b: a+b).sortBy(lambda x: x[0], False).collect())
print(d1.reduceByKey(lambda a,b: a+b).sortBy(lambda x: x[1], False).collect())

[('d', 17), ('c', 25), ('b', 67), ('a', 42)]
[('d', 17), ('c', 25), ('b', 67), ('a', 42)]
[('b', 67), ('a', 42), ('c', 25), ('d', 17)]


- mapValues(func)
对键值对RDD中的每一个value都应用一个函数，但是，key不会发生变化

In [45]:
list = [('hadoop', 1), ('spark', 1), ('hive', 1), ('spark', 1)]
pairRDD = sc.parallelize(list)
pairRDD1 = pairRDD.mapValues(lambda x: x+1)
pairRDD1.collect()

[('hadoop', 2), ('spark', 2), ('hive', 2), ('spark', 2)]

- join  
join()内连接，对于给定的两个输入数据集(K, V1)和(K, V2)，只有在两个数据集中都在存在的key才会被输出，最终得到一个(K, (V1, V2))的数据集

In [46]:
pairRDD1 = sc.parallelize([('spark', 1), ('spark', 2), ('hadoop', 3), ('hadoop', 5)])
pairRDD2 = sc.parallelize([('spark', 'fast')])
pairRDD3 = pairRDD1.join(pairRDD2)
pairRDD3.collect()

[('spark', (1, 'fast')), ('spark', (2, 'fast'))]

## 数据读写
### 文件数据读写

1.本地文件系统的数据读写  
(1)从文件中读取数据创建RDD

In [50]:
textFile = sc.textFile('file:///opt/code/SparkProgramming-PySpark/word.txt')
textFile.first()

'Hadoop is good'

(2)把RDD写入到文本文件中

In [51]:
textFile.saveAsTextFile('file:///opt/code/SparkProgramming-PySpark/word')

In [52]:
!ls ./word

part-00000  _SUCCESS


2.分布式文件系统HDFS的数据读写  
从分布式文件系统HDFS中读取数据，也是采用textFile()方法，可以为textFile()方法提供一个HDFS文件或目录地址，如果是一个文件地址，它会加载该文件，如果是一个目录，则会加载该目录下的所有文件

In [53]:
textFile = sc.textFile('hdfs://localhost:9000/user/spark/stocks/part-m-00000')
textFile.first()

'688316,青云科技-U'

In [54]:
textFile.saveAsTextFile('writeback')

In [55]:
!hadoop fs -ls /user/spark

Found 2 items
drwxr-xr-x   - spark supergroup          0 2021-12-09 03:44 /user/spark/stocks
drwxr-xr-x   - spark supergroup          0 2021-12-10 20:13 /user/spark/writeback


### 读写HBase数据
0.HBase简介  
- HBase是一个高可靠、高性能、面向列、可伸缩的分布式数据库，主要用来存储非结构化和半结构化的松散数据。  
- 每个值是一个未经解释的字符串，没有数据类型
- 用户在表中存储数据，每一行都有一个可排序的行键和任意多的列
- 表在水平方向由一个或多个列族组成，一个列族中可以包含任意多个列的数量以及类型，所有列均以字符串形式存储，用户需要自行进行数据类型转换
- HBase中执行更新操作时，并不会删除数据旧的版本，而是生成一个新的版本


- 表：HBase采用表来组织数据，表由行和列组成，列划分为若干列族
- 行：每个HBase表都由若干行组成，每个行都由行键(row key)来标识
- 列族：一个HBase表被分组成许多“列族”(Column Family)的集合，它是基本的访问控制单元
- 列限定符：列族里的数据通过列限定符（或列）来定位
- 单元格：在HBase表中，通过行、列族和列限定符确定一个“单元格”（cell），单元格中存储的数据没有数据类型，总被视为字符数组byte[]
- 时间戳：每个单元格都保存着同一份数据的多个版本，这些版本采用时间戳进行索引

1.[安装HBase](http://dblab.xmu.edu.cn/blog/install-hbase/)  

- 创建一个HBase表
```
hbase> create 'student','info'
hbase> put 'student','1','info:name','Xueqian'
hbase> put 'student','1','info:gender','F'
hbase> put 'student','1','info:age','23'
hbase> put 'student','2','info:name','Weiliang'
hbase> put 'student','2','info:gender','M'
hbase> put 'student','2','info:age','25'
```

2.配置Spark  
把HBase安装目录下的lib目录中的一些jar文件拷贝到Spark安装目录中，这些都是编程时需要用到的包。

```shell
$ cd $SPARK_HOME/jars
$ mkdir hbase && cd hbase
$ cp $HBASE_HOME/lib/hbase*.jar ./
$ cp $HBASE_HOME/lib/guava* ./
$ cp $HBASE_HOME/lib/htrace-core*.jar ./
$ cp $HABSE_HOME/lib/protobuf*.jar ./
```

此外，在Spark2.0版本以上，缺少把HBase数据转换成Python可读取数据的jar包，需要另外下载。  
https://mvnrepository.com/artifact/org.apache.spark/spark-examples_2.11/1.6.0-typesafe-001

然后，打开spark-env.sh文件
```shell
export SPARK_DIST_CLASSPATH=$(/opt/software/hadoop/bin/hadoop classpath):$(/opt/software/hbase/bin/hbase classpath):/opt/software/spark/jars/hbase/*
```

3.编写程序读取HBase数据

In [1]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster('local').setAppName('ReadHbase')
sc = SparkContext(conf=conf)
host = 'localhost'
table = 'student'
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",
                               "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
                              "org.apache.hadoop.hbase.client.Result",
                              keyConverter=keyConv, valueConverter=valueConv, conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k, v) in output:
    print(k, v)

1 {"qualifier" : "age", "timestamp" : "1639187056536", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "23"}
{"qualifier" : "gender", "timestamp" : "1639187037733", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "F"}
{"qualifier" : "name", "timestamp" : "1639187002111", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "Xueqian"}
2 {"qualifier" : "age", "timestamp" : "1639187125950", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "25"}
{"qualifier" : "gender", "timestamp" : "1639187104846", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "M"}
{"qualifier" : "name", "timestamp" : "1639187078184", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "Weiliang"}


当然，你可以通过spark-submit来提交任务

在文件夹下创建SparkOperateHBase.py,复制粘贴以下代码
```python
#!/home/spark/miniconda3/envs/bigdata/bin/python
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster('local').setAppName('ReadHbase')
sc = SparkContext(conf=conf)
host = 'localhost'
table = 'student'
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",
                               "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
                              "org.apache.hadoop.hbase.client.Result",
                              keyConverter=keyConv, valueConverter=valueConv, conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k, v) in output:
    print(k, v)
```

运行成功之后，你会看到
```
1 {"qualifier" : "age", "timestamp" : "1639187056536", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "23"}
{"qualifier" : "gender", "timestamp" : "1639187037733", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "F"}
{"qualifier" : "name", "timestamp" : "1639187002111", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "Xueqian"}
2 {"qualifier" : "age", "timestamp" : "1639187125950", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "25"}
{"qualifier" : "gender", "timestamp" : "1639187104846", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "M"}
{"qualifier" : "name", "timestamp" : "1639187078184", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "Weiliang"}

```

4.编写程序向HBase写入数据库
下面编写应用程序把表中的两个学生信息插入到HBase的student表中

<table>
  <tr>
    <th rowspan="2">id</th>
    <th colspan="9">info</th>
  </tr>
  <tr>
    <td colspan="3"><center>name</center></td>
    <td colspan="3"><center>gender</center></td>
    <td colspan="3"><center>age</center></td>
  </tr>
  <tr>
    <td rowspan="1"><center>3</center></td>
    <td colspan="3"><center>Rongcheng</center></td>
    <td colspan="3"><center>M</center></td>
    <td colspan="3"><center>26</center></td>
  </tr>
  <tr>
    <td rowspan="1"><center>4</center></td>
    <td colspan="3"><center>Guanhua</center></td>
    <td colspan="3"><center>M</center></td>
    <td colspan="3"><center>27</center></td>
  </tr>
  </table>


In [1]:
#!/usr/bin/env python3
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf = conf)
host = 'localhost'
table = 'student'
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
conf = {"hbase.zookeeper.quorum": host,
        "hbase.mapred.outputtable": table,
        "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
        "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
        "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
rawData = ['3,info,name,Rongcheng','3,info,gender,M','3,info,age,26',
           '4,info,name,Guanhua','4,info,gender,M','4,info,age,27']
sc.parallelize(rawData).map(lambda x: (x[0],x.split(',')))\
    .saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)

In [None]:
#运行刚才写好的读数据代码，查看是否写入成功
!spark-submit /home/spark/pyspark-pro/SparkOperateHBase.py | less

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/software/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/software/hbase/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
21/12/10 23:13:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/12/10 23:13:29 INFO spark.SparkContext: Running Spark version 2.4.2
21/12/10 23:13:29 INFO spark.SparkContext: Submitted application: ReadHbase
21/12/10 23:13:29 INFO spark.SecurityManager: Changing view acls to: spark
21/12/10 23:13:29 INFO spark.SecurityManager: Changing modify acls to: spark
21/12/10 23:13:29 INFO spark.SecurityManager: Changing view acls groups to: 
21/12

21/12/10 23:13:30 INFO zookeeper.RecoverableZooKeeper: Process identifier=hconnection-0x4de3729f connecting to ZooKeeper ensemble=localhost:2181
21/12/10 23:13:30 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
21/12/10 23:13:30 INFO zookeeper.ZooKeeper: Client environment:host.name=alone
21/12/10 23:13:30 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_162
21/12/10 23:13:30 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation
21/12/10 23:13:30 INFO zookeeper.ZooKeeper: Client environment:java.home=/opt/software/jdk/jre
21/12/10 23:13:30 INFO zookeeper.ZooKeeper: Client environment:java.class.path=/opt/software/spark/conf/:/opt/software/spark/jars/commons-lang3-3.5.jar:/opt/software/spark/jars/arrow-format-0.10.0.jar:/opt/software/spark/jars/json4s-jackson_2.11-3.5.3.jar:/opt/software/spark/jars/jersey-container-servlet-2.22.2.jar:/opt/software/spark/jars/pyrolite-4.13.jar:/opt/soft

21/12/10 23:13:31 INFO util.RegionSizeCalculator: Calculating region sizes for table "student".
21/12/10 23:13:31 INFO client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService
21/12/10 23:13:31 INFO client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x17da40c8726001a
21/12/10 23:13:31 INFO zookeeper.ZooKeeper: Session: 0x17da40c8726001a closed
21/12/10 23:13:31 INFO zookeeper.ClientCnxn: EventThread shut down
21/12/10 23:13:31 INFO spark.SparkContext: Starting job: take at SerDeUtil.scala:239
21/12/10 23:13:31 INFO scheduler.DAGScheduler: Got job 0 (take at SerDeUtil.scala:239) with 1 output partitions
21/12/10 23:13:31 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (take at SerDeUtil.scala:239)
21/12/10 23:13:31 INFO scheduler.DAGScheduler: Parents of final stage: List()
21/12/10 23:13:31 INFO scheduler.DAGScheduler: Missing parents: List()
21/12/10 23:13:31 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (M

21/12/10 23:13:31 INFO spark.ContextCleaner: Cleaned accumulator 24
21/12/10 23:13:31 INFO spark.ContextCleaner: Cleaned accumulator 41
21/12/10 23:13:31 INFO spark.ContextCleaner: Cleaned accumulator 8
21/12/10 23:13:31 INFO spark.ContextCleaner: Cleaned accumulator 44
21/12/10 23:13:31 INFO spark.ContextCleaner: Cleaned accumulator 7
21/12/10 23:13:31 INFO spark.ContextCleaner: Cleaned accumulator 18
21/12/10 23:13:31 INFO spark.ContextCleaner: Cleaned accumulator 23
21/12/10 23:13:31 INFO spark.ContextCleaner: Cleaned accumulator 13
21/12/10 23:13:31 INFO spark.ContextCleaner: Cleaned accumulator 4
21/12/10 23:13:31 INFO spark.ContextCleaner: Cleaned accumulator 42
21/12/10 23:13:31 INFO spark.ContextCleaner: Cleaned accumulator 40
21/12/10 23:13:31 INFO spark.ContextCleaner: Cleaned accumulator 46
21/12/10 23:13:31 INFO spark.ContextCleaner: Cleaned accumulator 32
21/12/10 23:13:31 INFO spark.ContextCleaner: Cleaned accumulator 36
21/12/10 23:13:31 INFO spark.ContextCleaner: Cleane

## 综合案例
### 案例1:求TOP值
#### 任务描述

orderid, userid, payment, productid
file1.txt 
```shell
1,1768,50,155 
2,1218, 600,211 
3,2239,788,242 
4,3101,28,599 
5,4899,290,129 
6,3110,54,1201
7,4436,259,877 
8,2369,7890,27
```

file2.txt
```shell
100,4287,226,233 
101,6562,489,124 
102,1124,33,17 
103,3267,159,179 
104,4569,57,125
105,1438,37,116
```

求TOP N个payment值

In [10]:
from pyspark import SparkConf, SparkContext

# conf = SparkConf().setMaster('local').setAppName('ReadHBase')
# sc = SparkContext(conf=conf)
lines = sc.textFile('file:///opt/code/SparkProgramming-PySpark/RDD-p1')
result1 = lines.filter(lambda line: len(line.strip())>0 and (len(line.split(",")) == 4))
result2 = result1.map(lambda x: x.split(',')[2])
result3 = result2.map(lambda x: (int(x), ''))
result4 = result3.repartition(1).sortByKey(False)
result5 = result4.map(lambda x: x[0])
result5.take(5)

[7890, 788, 600, 489, 290]

或者，你可以写成脚本

```python
#!/home/spark/miniconda3/envs/bigdata/env/bin/python
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster('local').setAppName('ReadHBase')
sc = SparkContext(conf=conf)
lines = sc.textFile('file:///opt/code/SparkProgramming-PySpark/RDD-p1')
result1 = lines.filter(lambda line: len(line.strip())>0 and (len(line.split(",")) == 4))
result2 = result1.map(lambda x: x.split(',')[2])
result3 = result2.map(lambda x: (int(x), ''))
result4 = result3.repartition(1).sortByKey(False)
result5 = result4.map(lambda x: x[0]).take(5)

for a in result5:
    print(a)
```

### 任务2：文件排序
#### 任务描述

有多个输入文件，每个文件中的每一行内容均为一个整数，进行排序后，输出到一个新的文件中，输出的内容个数为每行两个整数，第一个整数为第二个整数的排序位次，第二个整数为原待排序的整数

file1.txt
```
33
37
12
40
```

file2.txt
```
4
16
39
5
```

file3.txt
```
1
45
25
```

In [37]:
index = 0

def getindex():
    global index
    index += 1
    return index

def main():
    lines  = sc.textFile('file:///opt/code/SparkProgramming-PySpark/RDD-p2/')
    index = 0
    result = lines.filter(lambda line:(len(line.strip()) > 0)).map(lambda x: (int(x.strip()), ''))
    result2 = result.repartition(1).sortByKey(True)
    result3 = result2.map(lambda x: x[0]).map(lambda x: (getindex(), x))
    print(result3.collect())
    result3.saveAsTextFile('file:///opt/code/SparkProgramming-PySpark/sortresult')
    
    
main()
    

[(1, 1), (2, 4), (3, 5), (4, 12), (5, 16), (6, 25), (7, 33), (8, 37), (9, 39), (10, 40), (11, 45)]


In [40]:
!cat /opt/code/SparkProgramming-PySpark/sortresult/pa*

(1, 1)
(2, 4)
(3, 5)
(4, 12)
(5, 16)
(6, 25)
(7, 33)
(8, 37)
(9, 39)
(10, 40)
(11, 45)
