# pair RDD
Spark为包含键值对类型的RDD提供了一些专有的操作，这些RDD被称为pair RDD。

In [2]:
import pyspark

from pyspark import SparkConf,SparkContext

In [3]:
conf = SparkConf().setMaster("local").setAppName("My APP")
sc = SparkContext(conf = conf)

lines = sc.textFile('file:///Users/fire/jupyter/data/ball2018.txt')
lines

file:///Users/fire/jupyter/data/ball2018.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

## 1.pair RDD的操作

In [6]:
# 使用第一个元素作为键创建一个pair RDD
pairs = lines.map(lambda x: (x.split(" ")[0],x))
pairs.first()

('2018010',
 '2018010 2018.01.23  01081720212203  351327690   6注8085375元  71注325919元  奖池:3.9亿 详情走势')

In [9]:
# 对第二个元素进行筛选
pairs.filter(lambda keyValue: len(keyValue[1] < 20))

PythonRDD[7] at RDD at PythonRDD.scala:49

In [11]:
# 使用reduceByKey() 和 mapValues()计算每个键对应的平均值

# reduceByKey(): 合并含有相同键的值
# mapValues(): 对pair RDD中的每个值应用一个函数而不改变键

# 这里的x都是值 不是键
pairs.mapValues(lambda x:(x,1)).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))

PythonRDD[12] at RDD at PythonRDD.scala:49

In [14]:
#### 实现单词计数
words = lines.flatMap(lambda x: x.split(" "))
#flatMap： Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
result = words.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
result.first()

('2018010', 1)

In [17]:
# comineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)：使用不同的返回类型合并具有相同键的值

#求每个键的平均值
sumCount = words.combineByKey((lambda x:(x,1)),
                             (lambda x, y: (x[0] + y, x[1] + 1)),
                             (lambda x, y: (x[0] + y[0], x[1] + y[1])))
sumCount.map(lambda key,xy: (key,xy[0]/xy[1]))
#原书最后还有个 .collectAsMap() 但是会报错，估计和count()一样的原因，先忽略。

PythonRDD[36] at RDD at PythonRDD.scala:49

In [18]:
# 自定义reduceBy的并行度
data = [('a',3),('b',4),('c',5)]
sc.parallelize(data).reduceByKey(lambda x,y:x+y) #默认并行度
sc.parallelize(data).reduceByKey(lambda x,y:x+y, 10) #自定义并行度

PythonRDD[47] at RDD at PythonRDD.scala:49

In [20]:
# 以字符串顺序对整数进行自定义排序

linesort = lines.sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x: str(x))
linesort.first()

'2018010 2018.01.23  01081720212203  351327690   6注8085375元  71注325919元  奖池:3.9亿 详情走势'

In [24]:
# 额外的行动操作    暂时还是会报错
#words.countByKey()  返对每个键对应的元素分别计数 
#words.lookup('2018010') 返回给定键对应的所有值

## 2.数据分区
spark许多操作都引入了将数据根据键跨节点进行混洗的过程，所有这些操作都可以从数据分区中获益。

分布式程序中通信代价很大，spark可以通过控制RDD分区方式来减少通信开销。

In [28]:
# 自定义分区方式
import urllib.parse

def hash_domin(url):
    return hash(urllib.parse.urlparse(url).netloc)

lines.partitionBy(20, hash_domin) #创建20个分区

MapPartitionsRDD[55] at mapPartitions at PythonRDD.scala:129