### RDD 并行度和分区

默认情况下，Spark 可以将一个作业切分多个任务后，发送给 Executor 节点并行计算，而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。记住，这里的并行执行的任务数量，并不是指的切分任务的数量，不要混淆了。

In [20]:
rdd3 = sc.parallelize(range(0, 6, 2), 5)
rdd3.collect()

print("Number of Partitions: "+str(rdd3.getNumPartitions()))
print("Action: First element: "+str(rdd3.first()))

Number of Partitions: 5
Action: First element: 0


这里使我们后续做优化的要去调优的一个部分，所以关于怎么去设置并行和分区，后面在说原理的时候再来看。

### RDD 转换算子

RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value 类型

In [21]:
# 1. value 类型 
# 将处理的数据逐条进行映射转换，这里的转换可以是类型的转换，也可以是值的转换。
rdd4 = sc.parallelize(["b", "a", "c"])
sorted(rdd4.map(lambda x: (x, 1)).collect())

[('a', 1), ('b', 1), ('c', 1)]

In [22]:
# 2. value 类型
# 将待处理的数据以分区为单位发送到计算节点进行处理，这里的处理是指可以进行任意的处理，哪怕是过滤数据。
def f(iterator): 
    yield sum(iterator)
    
rdd5 = sc.parallelize([1, 2, 3, 4], 2)
rdd5.mapPartitions(f).collect()

[3, 7]

**思考一个问题:map 和 mapPartitions 的区别?**

- 数据处理角度
    Map 算子是分区内一个数据一个数据的执行，类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。
    
- 功能的角度
    Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。 MapPartitions 算子需要传递一个迭代器，返回一个迭代器，没有要求的元素的个数保持不变， 所以可以增加或减少数据
    
- 性能的角度
    Map 算子因为类似于串行操作，所以性能比较低，而是 mapPartitions 算子类似于批处理，所以性能较高。但是 mapPartitions 算子会长时间占用内存，那么这样会导致内存可能不够用，出现内存溢出的错误。所以在内存有限的情况下，不推荐使用。使用 map 操作。

In [23]:
# 3. value 类型
# 将待处理的数据以分区为单位发送到计算节点进行处理，这里的处理是指可以进行任意的处理，哪怕是过滤数据，在处理时同时可以获取当前分区索引。
def f(splitIndex, iterator): 
    yield splitIndex

rdd6 = sc.parallelize([1, 2, 3, 4], 4)
rdd6.mapPartitionsWithIndex(f).collect()

[0, 1, 2, 3]

In [24]:
# 4. value 类型
# 将处理的数据进行扁平化后再进行映射处理，所以算子也称之为扁平映射
rdd7 = sc.parallelize([2, 3, 4])
print(rdd7.collect())
print(sorted(rdd7.flatMap(lambda x: range(1, x)).collect()))
print(sorted(rdd7.flatMap(lambda x: [(x, x), (x, x)]).collect()))

[2, 3, 4]
[1, 1, 1, 2, 2, 3]
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]


In [25]:
# 5. value 类型
# 将同一个分区的数据直接转换为相同类型的内存数组进行处理，分区不变
rdd8 = sc.parallelize([1, 2, 3, 4], 2)
sorted(rdd8.glom().collect())

[[1, 2], [3, 4]]

In [26]:
# 6. value 类型
# 将数据根据指定的规则进行分组, 分区默认不变，但是数据会被打乱重新组合，我们将这样的操作称之为 shuffle。极限情况下，数据可能被分在同一个分区中
rdd9 = sc.parallelize([1, 1, 2, 3, 5, 8])
result = rdd9.groupBy(lambda x: x % 2).collect()
sorted([(x, sorted(y)) for (x, y) in result])

[(0, [2, 8]), (1, [1, 1, 3, 5])]

In [27]:
# 7. value 类型
# 将数据根据指定的规则进行筛选过滤，符合规则的数据保留，不符合规则的数据丢弃。
# 当数据进行筛选过滤后，分区不变，但是分区内的数据可能不均衡，生产环境下，可能会出现数据倾斜。
rdd10 = sc.parallelize([1, 2, 3, 4, 5])
rdd10.filter(lambda x: x % 2 == 0).collect()

[2, 4]

```sample(withReplacement, fraction, seed=None)```


Return a sampled subset of this RDD.

Parameters
- withReplacement – can elements be sampled multiple times (replaced when sampled out)

- fraction – expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be >= 0

- seed – seed for the random number generator


In [28]:
# 8. value 类型
# 根据指定的规则从数据集中抽取数据
rdd11 = sc.parallelize(range(100), 4)
rdd11.sample(False, 0.1, 81).count()

11

In [29]:
# 9. value 类型 
# 将数据集中重复的数据去重
sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())

[1, 2, 3]

In [30]:
# 10. value 类型
# 根据数据量缩减分区，用于大数据集过滤后，提高小数据集的执行效率
# 当 spark 程序中，存在过多的小任务的时候，可以通过 coalesce 方法，收缩合并分区，减少分区的个数，减小任务调度成本
print(sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect())
print(sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect())

[[1], [2, 3], [4, 5]]
[[1, 2, 3, 4, 5]]


In [31]:
# 11. value 类型
# 该操作内部其实执行的是 coalesce 操作，参数 shuffle 的默认值为 true。
# 无论是将分区数多的 RDD 转换为分区数少的 RDD，还是将分区数少的 RDD 转换为分区数多的 RDD，
# repartition 操作都可以完成，因为无论如何都会经 shuffle 过程。
rdd12 = sc.parallelize([1,2,3,4,5,6,7], 4)
print(sorted(rdd.glom().collect()))
print(len(rdd12.repartition(2).glom().collect()))
print(len(rdd12.repartition(10).glom().collect()))

NameError: name 'rdd' is not defined

In [None]:
# 12. value 类型
# 该操作用于排序数据。在排序之前，可以将数据通过 f 函数进行处理，
# 之后按照 f 函数处理 的结果进行排序，默认为升序排列。
# 排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
print(sc.parallelize(tmp).sortBy(lambda x: x[0]).collect())
print(sc.parallelize(tmp).sortBy(lambda x: x[1]).collect())

In [None]:
# 1. 双 value 类型
# 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
rdd13 = sc.parallelize([1, 10, 2, 3, 4, 5])
rdd14 = sc.parallelize([1, 6, 2, 3, 7, 8])
rdd13.intersection(rdd14).collect()

In [None]:
# 2. 双 value 类型
# 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD
rdd15 = sc.parallelize([1, 10, 2, 3, 4, 5])
rdd16 = sc.parallelize([1, 6, 2, 3, 7, 8])
rdd15.union(rdd16).collect()

In [None]:
# 3. 双 value 类型
# 以一个 RDD 元素为主，去除两个 RDD 中重复元素，将其他元素保留下来。求差集
x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
y = sc.parallelize([("a", 3), ("c", None)])
sorted(x.subtract(y).collect())

In [None]:
# 4. 双 value 类型
# 将两个 RDD 中的元素，以键值对的形式进行合并。
# 其中，键值对中的 Key 为第 1 个 RDD 中的元素，Value 为第 2 个 RDD 中的相同位置的元素。
x = sc.parallelize(range(0,5))
y = sc.parallelize(range(1000, 1005))
x.zip(y).collect()

In [None]:
# 1. Key-Value 类型
# 将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner
pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
sets = pairs.partitionBy(2).glom().collect()
len(set(sets[0]).intersection(set(sets[1])))

In [None]:
# 2. Key-Value 类型
# 可以将数据按照相同的 Key 对 Value 进行聚合
from operator import add


rdd17 = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd17.reduceByKey(add).collect())

In [None]:
# 3. Key-Value 类型
# 将数据源的数据根据 key 对 value 进行分组
rdd18 = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(sorted(rdd18.groupByKey().mapValues(len).collect()))
print(sorted(rdd18.groupByKey().mapValues(list).collect()))

**reduceByKey 和 groupByKey 的区别?**

- 从 shuffle 的角度:
    
    reduceByKey 和 groupByKey 都存在 shuffle 的操作，但是 reduceByKey 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能，这样会减少落盘的 数据量，而 groupByKey 只是进行分组，不存在数据量减少的问题，reduceByKey 性能比较高。

- 从功能的角度:
    
    reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组，不能聚合，所以在分组聚合的场合下，推荐使用 reduceByKey，如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey

In [None]:
# 4. Key-Value 类型
# 将数据根据不同的规则进行分区内计算和分区间计算
# aggregateByKey

In [None]:
# 5. Key-Value 类型
# 当分区内计算规则和分区间计算规则相同时，aggregateByKey 就可以简化为 foldByKey
from operator import add


rdd19 = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd19.foldByKey(0, add).collect())

```combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash>)```

Generic function to combine the elements for each key using a custom set of aggregation functions.

Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C.

Users provide three functions:

- createCombiner, which turns a V into a C (e.g., creates a one-element list)

- mergeValue, to merge a V into a C (e.g., adds it to the end of a list)

- mergeCombiners, to combine two C’s into a single one (e.g., merges the lists)

To avoid memory allocation, both mergeValue and mergeCombiners are allowed to modify and return their first argument instead of creating a new C.

In addition, users can control the partitioning of the output RDD.

> Note V and C can be different – for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).

In [None]:
# 6. Key-Value 类型
# 最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。
# 类似于 aggregate()，combineByKey()允许用户返回值的类型与输入不一致。
x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])

def to_list(a):
    return [a]

def append(a, b):
    a.append(b)
    return a

def extend(a, b):
    a.extend(b)
    return a

sorted(x.combineByKey(to_list, append, extend).collect())

**reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?**

- reduceByKey: 相同 key 的第一个数据不进行任何计算，分区内和分区间计算规则相同 
- FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算，分区内和分区间计算规则相同
- AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算，分区内和分区间计算规 则可以不相同 
- CombineByKey:当计算时，发现数据结构不满足要求时，可以让第一个数据转换结构。分区内和分区间计算规则不相同。

In [None]:
# 7. Key-Value 类型
# 在一个(K,V)的 RDD 上调用，K 必须实现 Ordered 接口(特质)，返回一个按照 key 进行排序的
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
print(sc.parallelize(tmp).sortByKey().first())
print(sc.parallelize(tmp).sortByKey(True, 1).collect())
print(sc.parallelize(tmp).sortByKey(True, 2).collect())
tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
print(sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect())

In [None]:
# 8. Key-Value 类型
# 在类型为(K,V)和(K,W)的 RDD 上调用，返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W)) 的 RDD
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
sorted(x.join(y).collect())

In [None]:
# 9. Key-Value 类型
# 类似于 SQL 语句的左外连接
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
sorted(x.leftOuterJoin(y).collect())

In [None]:
# 10. Key-Value 类型
# 在类型为 (K,V) 和 (K,W) 的 RDD 上调用，返回一个(K,(Iterable<V>,Iterable<W>))类型的 RDD
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
[(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]

In [None]:
# 1
# 聚集 RDD 中的所有元素，先聚合分区内数据，再聚合分区间数据
from operator import add


print(sc.parallelize([1, 2, 3, 4, 5]).reduce(add))
print(sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add))

In [None]:
# 2 
# 在驱动程序中，以数组 Array 的形式返回数据集的所有元素
# collect

In [None]:
# 3
# 返回 RDD 中元素的个数
sc.parallelize([2, 3, 4]).count()

In [None]:
# 4
# 返回 RDD 中的第一个元素
sc.parallelize([2, 3, 4]).first()

In [None]:
# 5
# 返回一个由 RDD 的前 n 个元素组成的数组
print(sc.parallelize([2, 3, 4, 5, 6]).cache().take(2))
print(sc.parallelize([2, 3, 4, 5, 6]).take(10))
print(sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3))

In [None]:
# 6 
# 返回该 RDD 排序后的前 n 个元素组成的数组
print(sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6))
print(sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x))

In [None]:
# 7 
# 分区的数据通过初始值和分区内的数据进行聚合，然后再和初始值进行分区间的数据聚合
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
print(sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp))
print(sc.parallelize([]).aggregate((0, 0), seqOp, combOp))

In [None]:
# 8
# 折叠操作，aggregate 的简化版操作
from operator import add

sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)

In [None]:
# 9
# 统计每种 key 的个数
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.countByKey().items())

**save 系列**

```saveAsHadoopDataset(conf, keyConverter=None, valueConverter=None)```
Output a Python RDD of key-value pairs (of form RDD[(K, V)]) to any Hadoop file system, using the old Hadoop OutputFormat API (mapred package). Keys/values are converted for output using either user specified converters or, by default, “org.apache.spark.api.python.JavaToWritableConverter”.

Parameters
- conf – Hadoop job configuration, passed in as a dict

- keyConverter – (None by default)

- valueConverter – (None by default)


```saveAsHadoopFile(path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None, compressionCodecClass=None)```
Output a Python RDD of key-value pairs (of form RDD[(K, V)]) to any Hadoop file system, using the old Hadoop OutputFormat API (mapred package). Key and value types will be inferred if not specified. Keys and values are converted for output using either user specified converters or “org.apache.spark.api.python.JavaToWritableConverter”. The conf is applied on top of the base Hadoop conf associated with the SparkContext of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.

Parameters
- path – path to Hadoop file

- outputFormatClass – fully qualified classname of Hadoop OutputFormat (e.g. “org.apache.hadoop.mapred.SequenceFileOutputFormat”)

- keyClass – fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.IntWritable”, None by default)

- valueClass – fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.Text”, None by default)

- keyConverter – (None by default)

- valueConverter – (None by default)

- conf – (None by default)

- compressionCodecClass – (None by default)


```saveAsNewAPIHadoopDataset(conf, keyConverter=None, valueConverter=None)```
Output a Python RDD of key-value pairs (of form RDD[(K, V)]) to any Hadoop file system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are converted for output using either user specified converters or, by default, “org.apache.spark.api.python.JavaToWritableConverter”.

Parameters
- conf – Hadoop job configuration, passed in as a dict

- keyConverter – (None by default)

- valueConverter – (None by default)

```saveAsNewAPIHadoopFile(path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None)```
Output a Python RDD of key-value pairs (of form RDD[(K, V)]) to any Hadoop file system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types will be inferred if not specified. Keys and values are converted for output using either user specified converters or “org.apache.spark.api.python.JavaToWritableConverter”. The conf is applied on top of the base Hadoop conf associated with the SparkContext of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.

Parameters
- path – path to Hadoop file

- outputFormatClass – fully qualified classname of Hadoop OutputFormat (e.g. “org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat”)

- keyClass – fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.IntWritable”, None by default)

- valueClass – fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.Text”, None by default)

- keyConverter – (None by default)

- valueConverter – (None by default)

- conf – Hadoop job configuration, passed in as a dict (None by default)

```saveAsPickleFile(path, batchSize=10)```
Save this RDD as a SequenceFile of serialized objects. The serializer used is pyspark.serializers.PickleSerializer, default batch size is 10.


```saveAsSequenceFile(path, compressionCodecClass=None)```
Output a Python RDD of key-value pairs (of form RDD[(K, V)]) to any Hadoop file system, using the “org.apache.hadoop.io.Writable” types that we convert from the RDD’s key and value types. The mechanism is as follows:

Pyrolite is used to convert pickled Python RDD into RDD of Java objects.

Keys and values of this Java RDD are converted to Writables and written out.

Parameters
- path – path to sequence file

- compressionCodecClass – (None by default)

```saveAsTextFile(path, compressionCodecClass=None)```
Save this RDD as a text file, using string representations of elements.

Parameters
- path – path to text file

- compressionCodecClass – (None by default) string i.e. “org.apache.hadoop.io.compress.GzipCodec”

In [None]:
# 11
# 分布式遍历 RDD 中的每一个元素，调用指定函数
def f(x):
    print(x)
    
sc.parallelize([1, 2, 3, 4, 5]).foreach(f)

## RDD 的依赖关系

### RDD 血缘关系

RDD 只支持粗粒度转换，即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage (血统)记录下来，以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转 换行为，当该 RDD 的部分分区数据丢失时，它可以根据这些信息来重新运算和恢复丢失的 数据分区。

### RDD 依赖关系

这里所谓的依赖关系，其实就是两个相邻 RDD 之间的关系。

### 窄依赖

窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用，窄依赖我们形象的比喻为独生子女。

### 宽依赖

宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖，会 引起 Shuffle，总结:宽依赖我们形象的比喻为多生。


### RDD 阶段划分

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形，该图形具有方向， 不会闭环。例如，DAG 记录了 RDD 的转换过程和任务的阶段。

<div align="center"><img src="asset/1613809284051.jpg"/></div>

### RDD 任务划分

RDD 任务切分中间分为:Application、Job、Stage 和 Task

- Application:初始化一个SparkContext即生成一个Application;
- Job:一个Action算子就会生成一个Job;
- Stage:Stage等于宽依赖(ShuffleDependency)的个数加1;
- Task:一个Stage阶段中，最后一个RDD的分区个数就是Task的个数。

注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。

<div align="center"><img src="asset/1613809448691.jpg"/></div>

## RDD 的持久化

### RDD Cache 缓存

RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存，默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存，而是触发后面的 action 算子时，该 RDD 将会被缓存在计算节点的内存中，并供后面重用。

<div align="center"><img src="asset/1613809558769.jpg"/></div>

缓存有可能丢失，或者存储于内存的数据由于内存不足而被删除，RDD 的缓存容错机 制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换，丢失的数据会被重算，由于 RDD 的各个 Partition 是相对独立的，因此只需要计算丢失的部分即可，并不需要重算全部 Partition。
Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是，在实际使用的时候，如果想重用数据，仍然建议调用 persist 或 cache。

### RDD CheckPoint 检查点

所谓的检查点其实就是通过将 RDD 中间结果写入磁盘，由于血缘依赖过长会造成容错成本过高，这样就不如在中间阶段做检查点容错，如果检查点之后有节点出现问题，可以从检查点开始重做血缘，减少了开销。

对 RDD 进行 checkpoint 操作并不会马上被执行，必须执行 Action 操作才能触发。

### 缓存和检查点区别

- Cache 缓存只是将数据保存起来，不切断血缘依赖。Checkpoint 检查点切断血缘依赖。 
- Cache 缓存的数据通常存储在磁盘、内存等地方，可靠性低。Checkpoint 的数据通常存 储在 HDFS 等容错、高可用的文件系统，可靠性高。
- 建议对 checkpoint() 的 RDD 使用 Cache 缓存，这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可，否则需要再从头计算一次 RDD。

## RDD 分区器

Spark 目前支持 Hash 分区和 Range 分区，和用户自定义分区。Hash 分区为当前的默认
分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分 区，进而决定了 Reduce 的个数。

- 只有 Key-Value 类型的RDD才有分区器，非Key-Value类型的RDD分区的值是None 
- 每个 RDD 的分区 ID 范围:0 ~ (numPartitions - 1)，决定这个值是属于那个分区的。

1) Hash 分区:对于给定的 key，计算其 hashCode,并除以分区个数取余
2) Range 分区:将一定范围内的数据映射到一个分区中，尽量保证每个分区数据均匀，而且分区间有序

## 累加器

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量，在 Executor 端的每个 Task 都会得到这个变量的一份新的副本，每个 task 更新这些副本的值后， 传回 Driver 端进行 merge。

## 广播变量

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值，以供一个或多个 Spark 操作使用。比如，如果你的应用需要向所有节点发送一个较大的只读查询表， 广播变量用起来都很顺手。在多个并行操作中使用同一个变量，但是 Spark 会为每个任务分别发送。

# Spark 企业实战

hive 中创建用户播放视频表。

```sql
create database cilicili;
create table cilicili.user_play 
(
    id INT, 
    user_id INT, 
    content_id INT, 
    play_time FLOAT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
```

安装 datax，并增加 job 如下：

```json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "user_id",
                            "content_id",
                            "play_time"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.56.101:3306/my_project"
                                ],
                                "table": [
                                    "user_play"
                                ]
                            }
                        ],
                        "password": "bigdata123",
                        "username": "bigdata",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "INT"
                            },
                            {
                                "name": "user_id",
                                "type": "INT"
                            },
                            {
                                "name": "content_id",
                                "type": "INT"
                            },
                            {
                                "name": "play_time",
                                "type": "FLOAT"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://bigdata-node1:9820",
                        "fieldDelimiter": "\t",
                        "fileName": "user",
                        "fileType": "text",
                        "path": "/hive/warehouse/cilicili.db/user_play",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
```
配置文件中 `"path": "/hive/warehouse/cilicili.db/user_play"` 地址，就是我们使用 hive 创建的表的文件存储地址。

确保使用的 mysql 版本和 jdbc mysql 的驱动版本一致。

```bash
mv datax/plugin/reader/mysqlreader/libs/mysql-connector-java-5.1.34.jar plugin/reader/mysqlreader/libs/mysql-connector-java-8.x.x.jar
```

```bash
bin/datax.py job/mysql2hive.json
```

In [1]:
# 为了能够让 python 找到 pyspark，使用 findspark
import findspark
findspark.init()

# 为了使用 RDDs，创建 SparkSession
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# 创建 SparkConf 和 SparkSession
conf=SparkConf()\
        .setMaster('local[*]')\
        .setAppName("Spark Read Hive")\
        .setExecutorEnv("spark.executor.memory","4g")\
        .setExecutorEnv("spark.driver.memory","4g")

spark=SparkSession.builder\
        .config(conf=conf)\
        .enableHiveSupport()\
        .getOrCreate()

# 获取 SparkContext
sc=spark.sparkContext

In [2]:
user_play_rdd = sc.textFile('/hive/warehouse/cilicili.db/user_play')
user_play_rdd.take(10)

['1\t40\t15094\t44.0',
 '2\t40\t12044\t29.0',
 '3\t40\t8419\t52.0',
 '4\t40\t7769\t25.0',
 '5\t40\t17174\t49.0',
 '6\t40\t7735\t60.0',
 '7\t40\t9266\t35.0',
 '8\t40\t4862\t16.0',
 '9\t40\t10480\t57.0',
 '10\t40\t10919\t21.0']

In [3]:
# 先把它切断
user_play_rdd = user_play_rdd.map(lambda satir: satir.split("\t"))
user_play_rdd.take(10)

[['1', '40', '15094', '44.0'],
 ['2', '40', '12044', '29.0'],
 ['3', '40', '8419', '52.0'],
 ['4', '40', '7769', '25.0'],
 ['5', '40', '17174', '49.0'],
 ['6', '40', '7735', '60.0'],
 ['7', '40', '9266', '35.0'],
 ['8', '40', '4862', '16.0'],
 ['9', '40', '10480', '57.0'],
 ['10', '40', '10919', '21.0']]

In [4]:
from pyspark.sql.types import IntegerType


df = spark.createDataFrame(user_play_rdd, ['Id', 'UserId', 'ContentId', 'Duration'])
df = df.withColumn("Duration", df["Duration"].cast(IntegerType()))
df.take(10)

# video_play_df = df.groupBy('ContentId').agg({"duration": "sum", "Duration": 'count'})
# video_play_df = video_play_df.withColumnRenamed('sum(duration)', 'play_duration').withColumnRenamed('count(Duration)', 'play_times')
# video_play_df = video_play_df.collect()

[Row(Id='1', UserId='40', ContentId='15094', Duration=44),
 Row(Id='2', UserId='40', ContentId='12044', Duration=29),
 Row(Id='3', UserId='40', ContentId='8419', Duration=52),
 Row(Id='4', UserId='40', ContentId='7769', Duration=25),
 Row(Id='5', UserId='40', ContentId='17174', Duration=49),
 Row(Id='6', UserId='40', ContentId='7735', Duration=60),
 Row(Id='7', UserId='40', ContentId='9266', Duration=35),
 Row(Id='8', UserId='40', ContentId='4862', Duration=16),
 Row(Id='9', UserId='40', ContentId='10480', Duration=57),
 Row(Id='10', UserId='40', ContentId='10919', Duration=21)]

In [5]:
# 分组获取播放时长
play_duration_df = df.groupBy('ContentId').sum('Duration').withColumnRenamed('sum(Duration)', 'play_duration')
play_duration_df = play_duration_df.orderBy(play_duration_df.play_duration.desc()).collect()

In [6]:
# 分组获取播放次数
play_times_df = df.groupBy('ContentId').count().withColumnRenamed('count', 'play_times')
play_times_df = play_times_df.orderBy(play_times_df.play_times.desc()).collect()

In [7]:
sc.stop()

为了能够让 python 使用 happy base 连接 hbase，需要启动 hbase 的 thrift 服务。

```bash
nohup hbase thrift start-port:9090 &
```


### HBase

![](asset/hbase-logo.png)

定义：
    A column-oriented DBMS (or columnar database management system) 
    

**row-oriented DBMS**

![1615518246367.jpg](asset/1615518246367.jpg)



**row store**

```
1, Paul Walker, US, 231, Gallardo, 
2, Vin Diesel, Brazil, 520, Mustang
```


**column store**

```
1,2, Paul Walker, Vin Diesel, US, Brazil, 231, 520, Gallardo, Mustang
```

- 随机读性能大幅提升
- 水平扩展性更好


**Hbase的优点：**

- 列的可以动态增加，并且列为空就不存储数据,节省存储空间.
- Hbase自动切分数据，使得数据存储自动具有水平scalability.
- Hbase可以提供高并发读写操作的支持


**Hbase的缺点：**

- 不能支持条件查询，只支持按照Row key来查询.
- 不适合于大范围扫描查询
- 不支持事务
- 不直接支持 SQL 的语句查询




**HBase的应用场景**

hbase适合大量随机写入和顺序读取的应用场景，不适合大量随机读的应用场景。

- 写密集型应用，每天写入量巨大，而相对读数量较小的应用，比如IM的历史消息，游戏的日志、推荐结果等等
- 不需要复杂查询条件来查询数据的应用，HBase只支持基于rowkey的查询，对于HBase来说，单条记录或者小范围的查询是可以接受的，大范围的查询由于分布式的原因，可能在性能上有点影响，而对于像SQL的join等查询，HBase无法支持。
- 对性能和可靠性要求非常高的应用，由于HBase本身没有单点故障，可用性非常高。
- 数据量较大，而且增长量无法预估的应用，HBase支持在线扩展，即使在一段时间内数据量呈井喷式增长，也可以通过HBase横向扩展来满足功能。


![HBase-Table-HBase-Architecture-Edureka.png](asset/HBase-Table-HBase-Architecture-Edureka.png)


- Tables: Data is stored in a table format in HBase. But here tables are in column-oriented format.
- Row Key: Row keys are used to search records which make searches fast. You would be curious to know how? I will explain it in the architecture part moving ahead in this blog. 
- Column Families: Various columns are combined in a column family. These column families are stored together which makes the searching process faster because data belonging to same column family can be accessed together in a single seek.
- Column Qualifiers: Each column’s name is known as its column qualifier.
- Cell: Data is stored in cells. The data is dumped into cells which are specifically identified by rowkey and column qualifiers.
- Timestamp: Timestamp is a combination of date and time. Whenever data is stored, it is stored with its timestamp. This makes easy to search for a particular version of data.

In [17]:
import happybase

connection = happybase.Connection('192.168.56.101')

In [9]:
# connection.delete_table('video', disable=True)

In [13]:
connection.create_table(
    'video',
    {'video': dict(max_versions=10),
     'play': dict(max_versions=10)}
)

In [15]:
play_duration_df[:10]

[Row(ContentId='4012', play_duration=602),
 Row(ContentId='10702', play_duration=446),
 Row(ContentId='5773', play_duration=426),
 Row(ContentId='17308', play_duration=426),
 Row(ContentId='13307', play_duration=420),
 Row(ContentId='10680', play_duration=408),
 Row(ContentId='6896', play_duration=396),
 Row(ContentId='9453', play_duration=394),
 Row(ContentId='15493', play_duration=392),
 Row(ContentId='10649', play_duration=390)]

In [18]:
video_talbe = connection.table('video')

with video_talbe.batch() as b:
    def save_duration_to_hbase(idx, row):
        video_talbe.put(f'{idx:0>6d}' + '-play-duration' , {b'video:content_id': str(row.ContentId), 
                                                            b'play:duration': str(row.play_duration)})
    for idx, row in enumerate(play_duration_df[:100]):
        save_duration_to_hbase(idx, row)

In [23]:
video_talbe = connection.table('video')

with video_talbe.batch() as b:
    def save_times_to_hbase(idx, row):
        video_talbe.put(f'{idx:0>6d}' + '-play-times', {b'video:content_id': str(row.ContentId),
                                                        b'play:times': str(row.play_times)})
    for idx, row in enumerate(play_times_df[:100]):
        save_times_to_hbase(idx, row)

In [25]:
# 查看数据
video_talbe = connection.table('video')
for i in video_talbe.scan(row_start=b'000005', row_stop=b'000030'):
    print(i)

(b'000005-play-duration', {b'play:duration': b'408', b'video:content_id': b'10680'})
(b'000005-play-times', {b'play:times': b'11', b'video:content_id': b'4638'})
(b'000006-play-duration', {b'play:duration': b'396', b'video:content_id': b'6896'})
(b'000006-play-times', {b'play:times': b'11', b'video:content_id': b'6896'})
(b'000007-play-duration', {b'play:duration': b'394', b'video:content_id': b'9453'})
(b'000007-play-times', {b'play:times': b'11', b'video:content_id': b'9114'})
(b'000008-play-duration', {b'play:duration': b'392', b'video:content_id': b'15493'})
(b'000008-play-times', {b'play:times': b'11', b'video:content_id': b'12009'})
(b'000009-play-duration', {b'play:duration': b'390', b'video:content_id': b'10649'})
(b'000009-play-times', {b'play:times': b'11', b'video:content_id': b'4550'})
(b'000010-play-duration', {b'play:duration': b'388', b'video:content_id': b'12618'})
(b'000010-play-times', {b'play:times': b'11', b'video:content_id': b'16039'})
(b'000011-play-duration', {b