# 键值对操作   
### 1.动机
- pairRDD: 提供并行操作各个键或跨节点重新进行数据分组的操作接口。
- reduceByKey: 分别规约每个键对应的数据
- join: 可以把两个RDD中键相同的的元素组合到一起，合并为一个RDD。

### 2.创建PairRDD
```
rawRDD = sc.parallelize(["打招呼 你好啊~~吃了吗??","约 约吗美女?"])
pairRDD = rawRDD.map(lambda line : (line.split(' ')[0],line.split(' ')[1:]))
pairRDD.collect()
```
### 3.PairRDD的转化操作
#### 0.基本函数操作详见代码
略
#### 1.聚合操作   
- 基础RDD聚合操作: reduce(),combine(),fold()
- PairRDD聚合操作: reduceByKey().combineByKey(),foldByKey()
- 数据分组: groupByKey().    groupByKey + mapValues 可以实现reduceByKey()同样功能，但前者效率低
- 连接: join(),leftOuterJoin(),rightOuterJoin().
- 数据排序: sortByKey()    
### 4.PairRDD的行动操作
- countByKey(): 对每个键嘴硬的元素分别计数
- countByValue(): 按键对值进行统计  单词计数简化版
- collectAsMap(): 结果以映射表的形式返回
- lookup(key): 返回给定键对应的所有值 
### 5.数据分区（进阶）
使用自定义分区来提高效率，减少每次对不变的表进行混洗操作而消耗时间。例如使用rdd.partitionBy()



In [1]:
from pyspark import SparkConf, SparkContext
sc = SparkContext('local', 'PairRDD')

In [6]:
# WordCount
import os
words = sc.textFile('file://' + os.path.abspath('.') +
                    '/quickstart.txt').flatMap(lambda line: line.split(' '))

# wordCount = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
# wordCount.collect()
wordCount = words.countByValue()
wordCount

defaultdict(int,
            {'Quick': 1,
             'Start': 2,
             'Interactive': 2,
             'Analysis': 2,
             'with': 13,
             'the': 41,
             'Spark': 21,
             'Shell': 2,
             'Basics': 2,
             'More': 2,
             'on': 9,
             'RDD': 9,
             'Operations': 2,
             'Caching': 2,
             'Self-Contained': 2,
             'Applications': 2,
             'Where': 2,
             'to': 28,
             'Go': 2,
             'from': 5,
             'Here': 2,
             'This': 5,
             'tutorial': 1,
             'provides': 2,
             'a': 34,
             'quick': 1,
             'introduction': 1,
             'using': 3,
             'Spark.': 1,
             'We': 5,
             'will': 3,
             'first': 4,
             'introduce': 1,
             'API': 1,
             'through': 2,
             'Spark’s': 3,
             'interactive': 1,
             'shell'

In [8]:
rawRDD = sc.parallelize(["打招呼 你好啊~~吃了吗??", "约 约吗美女?"])
pairRDD = rawRDD.map(lambda line: (line.split(' ')[0], line.split(' ')[1]))
pairRDD.collect()

[('打招呼', '你好啊~~吃了吗??'), ('约', '约吗美女?')]

In [16]:
rawRDD = sc.parallelize(["打招呼 你好啊~~吃了吗??", "约 约吗美女?"])
pairRDD = rawRDD.flatMap(lambda line: line.split(' '))
pairRDD.countByKey()

defaultdict(int, {'打': 1, '你': 1, '约': 2})

### PairRDD转化操作

In [28]:
pairRDD = sc.parallelize([[1, 2], [3, 4], [3, 6], [4, 6]])

In [29]:
# 1.reduceByKey 合并相同Key的值
pair1 = pairRDD.reduceByKey(lambda x, y: x + y)
pair1.collect()

[(1, 2), (3, 10), (4, 6)]

In [30]:
# 2.groupByKey  对相同Key的值分组
pair2 = pairRDD.groupByKey()
pair2.collect()

[(1, <pyspark.resultiterable.ResultIterable at 0x122e68908>),
 (3, <pyspark.resultiterable.ResultIterable at 0x122e68748>),
 (4, <pyspark.resultiterable.ResultIterable at 0x122e685f8>)]

In [31]:
# 3.mapValues   对RDD中的每个值应用一个函数而不改变键
pair3 = pairRDD.mapValues(lambda x: x + 5)
pair3.collect()

[(1, 7), (3, 9), (3, 11), (4, 11)]

In [33]:
# 4.flatMapValues 对RDD中的每个值应用一个返回迭代器的函数，对于每个元素都生成一个对应原键的键值对记录。
pair4 = pairRDD.flatMapValues(lambda x: range(x))
# pair4 = pairRDD.flatMapValues(lambda x: (range(x)))
pair4.collect()

[(1, 0),
 (1, 1),
 (3, 0),
 (3, 1),
 (3, 2),
 (3, 3),
 (3, 0),
 (3, 1),
 (3, 2),
 (3, 3),
 (3, 4),
 (3, 5),
 (4, 0),
 (4, 1),
 (4, 2),
 (4, 3),
 (4, 4),
 (4, 5)]

In [34]:
# 5.keys 返回Key的RDD
keysRDD = pairRDD.keys()
keysRDD.collect()

[1, 3, 3, 4]

In [35]:
# 6.values 返回value的RDD
valuesRDD = pairRDD.values()
valuesRDD.collect()

[2, 4, 6, 6]

In [36]:
# 7.sortByKey 按键排序
sortedRDD = pairRDD.sortByKey()
sortedRDD.collect()

[[1, 2], [3, 4], [3, 6], [4, 6]]

### 针对两个RDD的转换操作

In [37]:
# 针对两个RDD的转换操作
rdd1 = sc.parallelize([[1, 2], [3, 4], [3, 6]])
rdd2 = sc.parallelize([[3, 9], [4, 5]])

In [38]:
# 1.substractByKey  删掉key值重复的元素
subRDD = rdd1.subtractByKey(rdd2)
subRDD.collect()

[(1, 2)]

In [39]:
# 2.join  内连接
joinRDD = rdd1.join(rdd2)
joinRDD.collect()

[(3, (4, 9)), (3, (6, 9))]

In [40]:
joinRDD = rdd2.join(rdd1)
joinRDD.collect()

[(3, (9, 4)), (3, (9, 6))]

In [41]:
# 3.rightOuterJoin 确保第2个RDD的键必须存在   右外连接
rightOuterRDD = rdd1.rightOuterJoin(rdd2)
rightOuterRDD.collect()

[(4, (None, 5)), (3, (4, 9)), (3, (6, 9))]

In [42]:
# 4.leftOuterJoin 确保第1个RDD的键必须存在  左外连接
leftOuterRDD = rdd1.leftOuterJoin(rdd1)
leftOuterRDD.collect()

[(1, (2, 2)), (3, (4, 4)), (3, (4, 6)), (3, (6, 4)), (3, (6, 6))]

In [43]:
# 5.cogroup 将两个RDD中拥有相同键的数据分组到一起
coRDD = rdd1.cogroup(rdd2)
coRDD.collect()

[(4,
  (<pyspark.resultiterable.ResultIterable at 0x1224dceb8>,
   <pyspark.resultiterable.ResultIterable at 0x1224dc6d8>)),
 (1,
  (<pyspark.resultiterable.ResultIterable at 0x1224dcf98>,
   <pyspark.resultiterable.ResultIterable at 0x1224dc358>)),
 (3,
  (<pyspark.resultiterable.ResultIterable at 0x1224dce48>,
   <pyspark.resultiterable.ResultIterable at 0x1224dcac8>))]

### 对pairRDD的value进行筛选

In [45]:
lines = sc.textFile('file://{}/quickstart.txt'.format(os.path.abspath('.')))
pairsRDD = lines.map(lambda line: (line.split(' ')[0], line))
limitLengthRDD = pairsRDD.filter(lambda keyValue: len(keyValue[0]) > 7)
limitLengthRDD.collect()

[('Interactive', 'Interactive Analysis with the Spark Shell'),
 ('Self-Contained', 'Self-Contained Applications'),
 ('Interactive', 'Interactive Analysis with the Spark Shell'),
 ('./bin/spark-shell', './bin/spark-shell'),
 ('textFile:', 'textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3'),
 ('linesWithSpark:',
  'linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09'),
 ('wordCounts:',
  'wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8'),
 ('Self-Contained', 'Self-Contained Applications'),
 ('scalaVersion', 'scalaVersion := "2.10.5"'),
 ('libraryDependencies',
  'libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.2"'),
 ('./simple.sbt', './simple.sbt'),
 ('./src/main', './src/main'),
 ('./src/main/scala', './src/main/scala'),
 ('./src/main/scala/SimpleApp.scala', './src/main/scala/SimpleApp.scala'),
 ('Congratulations',
  'Congratulations on running your first Spark application!'),
 ('Finally,',
  'Finally, Spark includes several

### 聚合操作  按key值计算平均值

In [46]:
pairRDD = sc.parallelize(
    [['a', 1], ['b', 2], ['c', 3], ['a', 2], ['b', 0], ['c', 10]])
aveRDD = pairRDD.mapValues(lambda x: (x, 1)).reduceByKey(
    lambda x, y: (x[0] + y[0], x[1] + y[1])).mapValues(lambda x: x[0]/x[1])
aveRDD.collect()

[('a', 1.5), ('b', 1.0), ('c', 6.5)]

In [63]:
sumRDD = pairRDD.mapValues(lambda x: (x, 1)).reduceByKey(
    lambda x, y: (x[0] + y[0], x[1] + y[1]))
print(sumRDD.collect())
aveRDD = sumRDD.mapValues(lambda x: x[0]/x[1])
aveRDD.collect()

[('a', (3, 2)), ('b', (2, 2)), ('c', (13, 2))]


[('a', 1.5), ('b', 1.0), ('c', 6.5)]

In [65]:
sumCount = pairRDD.combineByKey((lambda x: (x, 1)),
                                (lambda x, y: (x[0]+y, x[1]+1)),
                                (lambda x, y: (x[0]+y[0], x[1]+y[1])))
sumCount.getNumPartitions()

1

In [66]:
aveRDD = sumCount.mapValues(lambda x: x[0]/x[1])
aveRDD.collectAsMap()

{'a': 1.5, 'b': 1.0, 'c': 6.5}

In [59]:
sumCount.collect()

[('a', (3, 2)), ('b', (2, 2)), ('c', (13, 2))]

In [52]:
sumCount.collectAsMap()

{'a': (3, 2), 'b': (2, 2), 'c': (13, 2)}

In [71]:
# 现在这种写法错误
sumCount = pairRDD.combineByKey((lambda x: (x, 1)),
                                (lambda x, y: (x[0] + y, x[1] + 1)),
                                (lambda x, y: (x[0] + y[0], x[1] + y[1])))
sumCount.map(lambda key, xy: (key, xy[0]/xy[1])).collectAsMap()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 86.0 failed 1 times, most recent failure: Lost task 0.0 in stage 86.0 (TID 83, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/cool/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/Users/cool/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/cool/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/cool/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
TypeError: <lambda>() missing 1 required positional argument: 'xy'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/cool/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/Users/cool/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/cool/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/cool/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
TypeError: <lambda>() missing 1 required positional argument: 'xy'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
