### 〇，编程环境

以下过程为Mac系统上单机版Spark练习编程环境的配置方法。
注意：仅配置练习环境无需安装hadoop,无需安装scala.


1，安装Java8

https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

注意避免安装其它版本的jdk否则可能会有不兼容spark的情况


2，下载spark并解压

http://spark.apache.org/downloads.html

解压到以下路径：

Users/liangyun/ProgramFiles/spark-2.4.3-bin-hadoop2.7


3，配置spark环境

vim ~/.bashrc

插入下面两条语句

export SPARK_HOME=/Users/liangyun/ProgramFiles/spark-2.4.3-bin-hadoop2.7

export PATH=$PATH:$SPARK_HOME/bin


4，配置jupyter支持

若未有安装jupyter可以下载Anaconda安装之。
pip install toree

jupyter toree install --spark_home=your-spark-home


### 一，运行Spark

Spark主要通过以下一些方式运行。

1，通过spark-shell进入spark交互式环境,使用Scala语言。

2，通过spark-submit提交Spark应用程序进行批处理。
这种方式可以提交Scala或Java语言编写后生成的jar包或者Python脚本。

3，通过pyspark进入pyspark交互式环境，使用Python语言。
这种方式可以指定jupyter或者ipython为交互环境。

4，通过zepplin notebook交互式执行。
zepplin是jupyter notebook的apache对应产品。

5, 安装Apache Toree - Scala内核。
可以在jupyter 中运行spark-shell.

使用spark-shell运行时，还可以添加常用的两个参数。

一个是-master指定使用何种分布类型。

第二个是-jars指定依赖的jar包。

In [None]:
//local本地模式运行，默认使用4个逻辑CPU内核
spark-shell

//local本地模式运行，使用全部内核，添加 code.jar到classpath
spark-shell  --master local[*] --jars code.jar 

//local本地模式运行，使用4个内核
spark-shell  --master local[4]

//standalone模式连接集群，指定url和端口号
spark-shell  --master spark://master:7077

//客户端模式连接YARN集群，Driver运行在本地，方便查看日志，调试时推荐使用。
spark-shell  --master yarn-client

//集群模式连接YARN集群，Driver运行在集群，本地机器计算和通信压力小，批量任务时推荐使用。
spark-shell  --master yarn-cluster



In [None]:
//提交scala写的任务
./bin/spark-submit --class org.apache.spark.examples.SparkPi \
 --master yarn \
 --deploy-mode cluster \
 --driver-memory 4g \
 --executor-memory 2g \
 --executor-cores 1 \
 --queue thequeue \
 examples/jars/spark-examples*.jar 10 

//提交python写的任务
spark-submit --master yarn \
--executor-memory 6G \
--driver-memory 6G \
--deploy-mode cluster \
--num-executors 600 \
--conf spark.yarn.maxAppAttempts=1 \
--executor-cores 1 \
--conf spark.default.parallelism=2000 \
--conf spark.task.maxFailures=10 \
--conf spark.stage.maxConsecutiveAttempts=10 \
test.py 



### 一，创建RDD

创建RDD主要有两种方式，一个是textFile加载本地或者集群文件系统中的数据，

第二个是用parallelize方法将Driver中的数据结构并行化成RDD。

In [10]:
//从本地文件系统中加载数据
val file = "file:///Users/liangyun/CodeFiles/scala_tutorial/data.txt"
val rdd = sc.textFile(file,3)
rdd.collect.foreach(println)

1
2
3
4
5


file = file:///Users/liangyun/CodeFiles/scala_tutorial/data.txt
rdd = file:///Users/liangyun/CodeFiles/scala_tutorial/data.txt MapPartitionsRDD[10] at textFile at <console>:31


file:///Users/liangyun/CodeFiles/scala_tutorial/data.txt MapPartitionsRDD[10] at textFile at <console>:31

In [None]:
//从集群文件系统中加载数据
//val file = "hdfs://localhost:9000/user/hadoop/data.txt"
//可以省去hdfs://localhost:9000
//val rdd = sc.textFile(file,3)
//rdd.collect.foreach(println)

In [11]:
//并行化Driver中的数据结构,第二个参数指定分区数
val rdd = sc.parallelize(1 to 100,2)

rdd = ParallelCollectionRDD[11] at parallelize at <console>:28


ParallelCollectionRDD[11] at parallelize at <console>:28

### 二，常用Action操作

Action操作将触发基于RDD依赖关系的计算。

**collect**

In [45]:
val rdd = sc.parallelize(1 to 20,5) 

rdd = ParallelCollectionRDD[19] at parallelize at <console>:27


ParallelCollectionRDD[19] at parallelize at <console>:27

In [19]:
//collect操作将数据汇集到Driver,数据过大时有超内存风险
val all_data = rdd.collect()

alldata = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)


Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

**take**

In [20]:
//take操作将前若干个数据汇集到Driver，相比collect安全
val part_data = rdd.take(4)

part_data = Array(1, 2, 3, 4)


Array(1, 2, 3, 4)

**takeSample**

In [50]:
//takeSample可以随机取若干个到Driver,并且数据量不足时可以设置是否用随机数填充
val sample_data = rdd.takeSample(false,5,0)

sample_data = Array(17, 13, 6, 1, 7)


Array(17, 13, 6, 1, 7)

**first**

In [22]:
//first看第一个数据
val first_data = rdd.first

first_data = 1


1

**count**

In [23]:
//count查看数据数量
val data_count = rdd.count

data_count = 20


20

**reduce**

In [31]:
//reduce利用二元函数对数据进行规约
rdd.reduce(_+_)

210

**foreach**

In [44]:
//foreach对每一个元素执行某种操作，不生成新的RDD
//累加器用法详见共享变量

var accum = sc.longAccumulator("sumAccum")

rdd.foreach(x => accum.add(x))

print(accum)

LongAccumulator(id: 553, name: Some(sumAccum), value: 210)

accum = LongAccumulator(id: 553, name: Some(sumAccum), value: 210)


LongAccumulator(id: 553, name: Some(sumAccum), value: 210)

**countByKey**

In [27]:
//countByKey对Pair RDD按key统计数量

val pairRdd = sc.parallelize(Array((1,1),(1,4),(3,9),(2,16))) 
pairRdd.countByKey

pairRdd = ParallelCollectionRDD[15] at parallelize at <console>:31


Map(1 -> 2, 2 -> 1, 3 -> 1)

**saveAsTextFile和saveAsObjectFile**

In [60]:
//saveAsTextFile保存rdd成text文件到本地,类似作用的
rdd.saveAsTextFile("file:///Users/liangyun/CodeFiles/scala_tutorial/rdddata")

In [64]:
//重新读入
val data = sc.textFile("file:///Users/liangyun/CodeFiles/scala_tutorial/rdddata")
data.collect

data = file:///Users/liangyun/CodeFiles/scala_tutorial/rdddata MapPartitionsRDD[26] at textFile at <console>:30


Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

In [67]:
//saveAsObjectFile保存rdd成Object文件，类似的还有saveAsSequenceFile
rdd3 = rdd.saveAsObjectFile("file:///Users/liangyun/CodeFiles/scala_tutorial/rdddata3")

### 三，常用Transformation操作

Transformation转换操作具有懒惰执行的特性，它只指定新的RDD和其父RDD的依赖关系，只有当Action操作触发到该依赖的时候，它才被计算。

**map**

In [85]:
//Map操作对每个元素进行一个映射转换
val rdd = sc.parallelize(1 to 10,3)
rdd.collect

rdd = ParallelCollectionRDD[56] at parallelize at <console>:30


Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

In [86]:
rdd.map(_+1).collect

Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)

**filter**

In [87]:
//filter应用过滤条件过滤掉一些数据
val rdd = sc.parallelize(1 to 10,3)
rdd.filter(_>5).collect

rdd = ParallelCollectionRDD[58] at parallelize at <console>:30


Array(6, 7, 8, 9, 10)

**flatMap**

In [80]:
//flatMap操作执行将每个元素生成一个Array后压平
val rdd = sc.parallelize(Array("hello world","hello China"))
rdd.map(_.split(" ")).collect

rdd = ParallelCollectionRDD[51] at parallelize at <console>:30


Array(Array(hello, world), Array(hello, China))

In [81]:
rdd.flatMap(_.split(" ")).collect

Array(hello, world, hello, China)

**sample**

In [98]:
//sample对原rdd在每个分区按照比例进行抽样,可以设置是否可以重复抽样以及随机数种子
val rdd = sc.parallelize(1 to 12,1)
rdd.sample(false,0.5,0).collect

rdd = ParallelCollectionRDD[78] at parallelize at <console>:30


Array(1, 2, 4, 8, 9, 10, 11)

**distinct**

In [99]:
//distinct去重
val rdd = sc.parallelize(Array(1,1,2,2,3,3,4,5))
rdd.distinct.collect

rdd = ParallelCollectionRDD[80] at parallelize at <console>:29


Array(4, 1, 5, 2, 3)

**subtract**

In [121]:
//subtract找到在前一个rdd而不再后一个rdd的元素
val a = sc.parallelize(1 to 10)
val b = sc.parallelize(5 to 15)
a.subtract(b).collect

a = ParallelCollectionRDD[120] at parallelize at <console>:31
b = ParallelCollectionRDD[121] at parallelize at <console>:32


Array(4, 1, 2, 3)

**union和intersection** 

In [100]:
//union合并数据，intersection求交集
val a = sc.parallelize(1 to 5)
val b = sc.parallelize(3 to 8)
a.union(b).collect

a = ParallelCollectionRDD[84] at parallelize at <console>:27
b = ParallelCollectionRDD[85] at parallelize at <console>:28


Array(1, 2, 3, 4, 5, 3, 4, 5, 6, 7, 8)

In [101]:
a.intersection(b).collect

Array(4, 5, 3)

**cartesian**

In [102]:
//cartesian笛卡尔积

val boys = sc.parallelize(Array("LiLei","Tom"))
val girls = sc.parallelize(Array("HanMeiMei","Lily"))
boys.cartesian(girls).collect

boys = ParallelCollectionRDD[93] at parallelize at <console>:29
girls = ParallelCollectionRDD[94] at parallelize at <console>:30


Array((LiLei,HanMeiMei), (LiLei,Lily), (Tom,HanMeiMei), (Tom,Lily))

**pipe**

In [103]:
//pipe可以调用外部可执行程序进行转换操作生成新的rdd，类似于hadoop streaming
val rdd = sc.parallelize(Array("hello","world","hi","apache"))

rdd = ParallelCollectionRDD[96] at parallelize at <console>:28


ParallelCollectionRDD[96] at parallelize at <console>:28

In [117]:
//getlen.py
/*
import sys
for line in sys.stdin:
    print(str(len(line.rstrip)))

*/

1

In [114]:
val rdd2 = rdd.pipe("python getlen.py")
rdd2.collect

rdd2 = PipedRDD[107] at pipe at <console>:29


Array(5, 5, 2, 6)

**sortBy**

In [140]:
//按照某种方式进行排序
//指定按照第3个元素大小进行排序
val rdd = sc.parallelize(Array((1,2,3),(3,2,2),(4,1,1)))
rdd.sortBy(_._3).collect

rdd = ParallelCollectionRDD[182] at parallelize at <console>:31


Array((4,1,1), (3,2,2), (1,2,3))

### 四，常用PairRDD的转换操作

PairRDD指的是数据为Tuple2数据类型的RDD,其每个数据的第一个元素被当做key，第二个元素被当做value. 

**reduceByKey**

In [124]:
//reduceByKey对相同的key对应的values应用二元归并操作
val rdd = sc.parallelize(Array(("hello",1),("world",2),("hello",3),("world",5)))
rdd.reduceByKey(_+_).collect

rdd = ParallelCollectionRDD[129] at parallelize at <console>:29


Array((hello,4), (world,7))

**groupByKey**

In [127]:
//groupByKey将相同的key对应的values收集成一个Iterator
val rdd = sc.parallelize(Array(("hello",1),("world",2),("hello",3),("world",5)))
rdd.groupByKey().collect

rdd = ParallelCollectionRDD[135] at parallelize at <console>:30


Array((hello,CompactBuffer(1, 3)), (world,CompactBuffer(2, 5)))

**sortByKey**

In [132]:
//sortByKey按照key排序,可以指定是否降序
val rdd = sc.parallelize(Array(("hello",1),("world",2),("hello",3),("world",5)))
rdd.sortByKey(false).collect

rdd = ParallelCollectionRDD[146] at parallelize at <console>:30


Array((world,2), (world,5), (hello,1), (hello,3))

**join**

In [144]:
//join相当于根据key进行内连接
val age = sc.parallelize(Array(("LiLei",18),("HanMeiMei",16),("Jim",20)))
val gender = sc.parallelize(Array(("LiLei","male"),("HanMeiMei","female"),("Lucy","female")))
age.join(gender).collect


age = ParallelCollectionRDD[195] at parallelize at <console>:31
gender = ParallelCollectionRDD[196] at parallelize at <console>:32


Array((HanMeiMei,(16,female)), (LiLei,(18,male)))

**leftOuterJoin和rightOuterJoin**

In [145]:
//leftOuterJoin和rightOuterJoin相当于左连接和右连接

val age = sc.parallelize(Array(("LiLei",18),("HanMeiMei",16),("Jim",20)))
val gender = sc.parallelize(Array(("LiLei","male"),("HanMeiMei","female"),("Lucy","female")))
age.leftOuterJoin(gender).collect

age = ParallelCollectionRDD[200] at parallelize at <console>:32
gender = ParallelCollectionRDD[201] at parallelize at <console>:33


Array((HanMeiMei,(16,Some(female))), (Jim,(20,None)), (LiLei,(18,Some(male))))

In [146]:
age.rightOuterJoin(gender).collect

Array((HanMeiMei,(Some(16),female)), (Lucy,(None,female)), (LiLei,(Some(18),male)))

**cogroup**

In [150]:
//cogroup相当于两个输入分别goupByKey然后再对结构进行groupByKey

val x = sc.parallelize(Array(("a",1),("b",2),("a",3)))
val y = sc.parallelize(Array(("a",2),("b",3),("b",5)))

x.cogroup(y).collect

x = ParallelCollectionRDD[218] at parallelize at <console>:32
y = ParallelCollectionRDD[219] at parallelize at <console>:33


Array((a,(CompactBuffer(1, 3),CompactBuffer(2))), (b,(CompactBuffer(2),CompactBuffer(3, 5))))

**subtractByKey**

In [157]:
//subtractByKey去除a中key也在b中的元素

val x = sc.parallelize(Array(("a",1),("b",2),("c",3)))
val y = sc.parallelize(Array(("a",2),("b",(1,2))))

x.subtractByKey(y).collect

x = ParallelCollectionRDD[237] at parallelize at <console>:32
y = ParallelCollectionRDD[238] at parallelize at <console>:33


Array((c,3))

**foldByKey**

In [171]:
//foldByKey的操作和reduceByKey类似，但是要提供一个初始值
val x = sc.parallelize(Array(("a",1),("b",2),("a",3),("b",5)),1)
x.foldByKey(1)(_*_).collect

x = ParallelCollectionRDD[251] at parallelize at <console>:30


Array((a,3), (b,10))

### 五，持久化操作

如果一个rdd被多个任务用作中间量，那么对其进行cache缓存到内存中对加快计算会非常有帮助。

声明对一个rdd进行cache后，该rdd不会被立即缓存，而是等到它第一次被计算出来时才进行缓存。

可以使用persist明确指定存储级别。


In [192]:
//cache缓存到内存中，使用存储级别 MEMORY_ONLY。
//MEMORY_ONLY意味着如果内存存储不下，放弃存储其余部分，下次需要重新计算。
val a = sc.parallelize(1 to 1000000,5)
a.cache
val sum_a = a.reduce(_+_)
val cnt_a = a.count
val mean_a = sum_a/cnt_a

a = ParallelCollectionRDD[270] at parallelize at <console>:31
sum_a = 1784293664
cnt_a = 1000000
mean_a = 1784


1784

In [208]:
//persist缓存到内存或磁盘中，默认使用存储级别MEMORY_AND_DISK
//MEMORY_AND_DISK意味着如果内存存储不下，其余部分存储到磁盘中。
//persist可以指定其它存储级别
val a = sc.parallelize(1 to 1000000,5)
a.persist
val sum_a = a.reduce(_+_)
val cnt_a = a.count
val mean_a = sum_a/cnt_a
a.unpersist()



a = ParallelCollectionRDD[272] at parallelize at <console>:35
sum_a = 1784293664
cnt_a = 1000000
mean_a = 1784


ParallelCollectionRDD[272] at parallelize at <console>:35

### 六，共享变量

当spark集群在许多节点上运行一个函数时，默认情况下会把这个函数涉及到的对象在每个节点生成一个副本。

但是，有时候需要在不同节点或者节点和Driver之间共享变量。

Spark提供两种类型的共享变量，广播变量和累加器。

广播变量是不可变变量，实现在不同节点不同任务之间共享数据。

广播变量在每个机器上缓存一个只读的变量，而不是为每个task生成一个副本，可以减少数据的传输。

累加器主要是不同节点和Driver之间共享变量，只能实现计数或者累加功能。

累加器的值只有在Driver上是可读的，在节点上不可见。

In [215]:
//广播变量 broadcast 不可变，在所有节点可读

val broads = sc.broadcast(100)

val rdd = sc.parallelize(1 to 10)
rdd.map(_+broads.value).collect

broads.value

broads = Broadcast(178)
rdd = ParallelCollectionRDD[281] at parallelize at <console>:35


100

In [309]:
//累加器 只能在Driver上可读，在其它节点只能进行累加

var total = sc.longAccumulator("total")
val rdd = sc.parallelize(1 to 10,3)

rdd.foreach(x => {total.add(x)})
total.value

total = LongAccumulator(id: 5615, name: Some(total), value: 55)
rdd = ParallelCollectionRDD[434] at parallelize at <console>:42


55

In [228]:
val rdd = sc.parallelize(for(i<-1 to 100) yield 0.1*i )
val total = sc.doubleAccumulator("sum")
var count = sc.longAccumulator("count")


rdd.foreach(x => {total.add(x)
                  count.add(1)})
total.value/count.value

rdd = ParallelCollectionRDD[288] at parallelize at <console>:34
total = DoubleAccumulator(id: 4335, name: Some(sum), value: 505.0)
count = LongAccumulator(id: 4336, name: Some(count), value: 100)


5.05

### 七，分区操作

分区操作包括改变分区操作，以及针对分区执行的一些转换操作。

coalesce：shuffle可选，默认为false情况下窄依赖，不能增加分区。repartition和partitionBy调用它实现。
repartition：按随机数进行shuffle，相同key不一定在同一个分区
partitionBy：按key进行shuffle，相同key放入同一个分区
HashPartitioner：默认分区器，根据key的hash值进行分区，相同的key进入同一分区，效率较高，key不可为Array.
RangePartitioner：只在排序相关函数中使用，除相同的key进入同一分区，相邻的key也会进入同一分区，key必须可排序。
TaskContext:  获取当前分区id方法 TaskContext.get.partitionId
mapPartitions：每次处理分区内的一批数据，适合需要分批处理数据的情况，比如将数据插入某个表，每批数据只需要开启一次数据库连接，大大减少了连接开支
mapPartitionsWithIndex：类似mapPartitions，提供了分区索引，输入参数为（i，Iterator）
foreachPartition：类似foreach，但每次提供一个Partition的一批数据



**coalesce**

In [272]:
//coalesce 默认shuffle为False，不能增加分区，只能减少分区
//如果要增加分区，要设置shuffle = true
//parallelize等许多操作可以指定taskNums
val a = sc.parallelize(1 to 10,3)  
println(a.getNumPartitions)
a.mapPartitions(iter => Iterator(iter.toArray)).collect


3


a = ParallelCollectionRDD[359] at parallelize at <console>:34


Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))

In [273]:
val b = a.coalesce(2) 
b.mapPartitions(iter => Iterator(iter.toArray)).collect()

b = CoalescedRDD[361] at coalesce at <console>:32


Array(Array(1, 2, 3), Array(4, 5, 6, 7, 8, 9, 10))

**repartition**

In [275]:
//repartition按随机数进行shuffle，相同key不一定在一个分区，可以增加分区
//repartition实际上调用coalesce实现，设置了shuffle = true
val a = sc.parallelize(1 to 10,3)  
val c = a.repartition(7) 
c.mapPartitions(iter => Iterator(iter.toArray)).collect()

a = ParallelCollectionRDD[368] at parallelize at <console>:36
c = MapPartitionsRDD[372] at repartition at <console>:37


Array(Array(2), Array(3), Array(4), Array(5, 7), Array(6, 8), Array(9), Array(1, 10))

In [283]:
//repartition按随机数进行shuffle，相同key不一定在一个分区
val a = sc.parallelize(Array(("a",1),("a",1),("a",2),("b",2),("c",3)))  
val c = a.repartition(2)
c.mapPartitions(iter => Iterator(iter.toArray)).collect()

a = ParallelCollectionRDD[401] at parallelize at <console>:37
c = MapPartitionsRDD[405] at repartition at <console>:38


Array(Array((a,1), (a,2), (b,2)), Array((a,1), (c,3)))

**partitionBy**

In [281]:
//partitionBy可以指定分区函数为 HashPartitioner或者RangePartitioner.
//HashPartitioner默认分区器，根据key的hash值进行分区.
//HashPartitioner相同的key进入同一分区，效率较高，key不可为Array.
//RangePartitioner通常在排序相关场景应用。
//RangePartitioner除相同的key进入同一分区，相邻的key也会进入同一分区，key必须可排序。

import org.apache.spark.{HashPartitioner,RangePartitioner}
val a = sc.parallelize(Array(("a",1),("a",1),("a",2),("b",2),("c",3)))  
val c = a.partitionBy(new HashPartitioner(2))
c.mapPartitions(iter => Iterator(iter.toArray)).collect()


a = ParallelCollectionRDD[392] at parallelize at <console>:42
c = ShuffledRDD[393] at partitionBy at <console>:43


Array(Array((b,2)), Array((a,1), (a,1), (a,2), (c,3)))

**mapPartitions**

In [293]:
//mapPartitions可以对每个分区分别执行操作
//每次处理分区内的一批数据，适合需要按批处理数据的情况，例如将数据写入数据库，可以极大的减少连接次数。
//mapPartitions的输入分区内数据组成的Iterator，其输出也需要是一个Iterator
//以下例子查看每个分区内的数据
val a = sc.parallelize(1 to 10,2)
a.mapPartitions(iter => Iterator(iter.toArray)).collect()

a = ParallelCollectionRDD[421] at parallelize at <console>:42


Array(Array(1, 2, 3, 4, 5), Array(6, 7, 8, 9, 10))

**mapPartitionsWithIndex**

In [298]:
//mapPartitionsWithIndex可以获取两个参数，即分区id和每个分区内的数据组成的Iterator
val a = sc.parallelize(1 to 10,2)
var b = a.mapPartitionsWithIndex{
        (pid,iter) => {
          var result = List[String]()
            var i = 0
            while(iter.hasNext){
              i += iter.next()
            }
            result.::(pid + "|" + i).iterator
           
        }
      }
b.collect

a = ParallelCollectionRDD[427] at parallelize at <console>:40
b = MapPartitionsRDD[428] at mapPartitionsWithIndex at <console>:41


Array(0|15, 1|40)

In [287]:
//利用TaskContext可以获取当前每个元素的分区
import org.apache.spark.TaskContext

val a = sc.parallelize(1 to 10,3)
val c = a.map((TaskContext.get.partitionId,_))
c.collect

a = ParallelCollectionRDD[411] at parallelize at <console>:38
c = MapPartitionsRDD[412] at map at <console>:39


Array((0,1), (0,2), (0,3), (1,4), (1,5), (1,6), (2,7), (2,8), (2,9), (2,10))

**foreachPartitions**

In [307]:
//foreachPartition对每个分区分别执行操作
//范例：求每个分区内最大值的和
var total = sc.doubleAccumulator("total")

val a = sc.parallelize(1 to 100,3)

a.foreachPartition(iter =>{
    val m = iter.reduce((x,y) => if(x>y) x else y)
    total.add(m)
})

total.value

total = DoubleAccumulator(id: 5563, name: Some(total), value: 199.0)
a = ParallelCollectionRDD[432] at parallelize at <console>:47


199.0

**aggregate**

In [310]:
//aggregate比较复杂，先对每个分区执行一个函数，再对每个分区计算结果执行一个合并函数。
//例子：求元素之和以及元素个数
//三个参数，第一个参数为初始值，第二个为分区执行函数，第三个为结果合并执行函数。

rdd.aggregate((0,0))((t,x)=>{println(t)
                             (t._1+x,t._2+1)},
                     (p,q)=>{println(p,q)
                         (p._1+q._1,p._2+q._2)})

(55,10)

**aggregateByKey**

In [None]:
//aggregateByKey的操作和aggregate类似，但是会对每个key分别进行操作
//第一个参数为初始值，第二个参数为分区内归并函数，第三个参数为分区间归并函数

val a = sc.parallelize(Array(("a",1),("b",1),("c",2),("a",2),("b",3)),3)

val b = a.aggregateByKey(0)((a,b)=>if(a>b) a else b,(a,b)=>if(a>b) a else b)

b.collect