In [5]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName('RDD').setMaster('local[4]')

sc = SparkContext(conf=conf)

In [6]:
sc.master

'local[4]'

In [2]:
import pyspark

In [2]:
from functools import reduce

In [3]:
reduce?

[0;31mDocstring:[0m
reduce(function, sequence[, initial]) -> value

Apply a function of two arguments cumulatively to the items of a sequence,
from left to right, so as to reduce the sequence to a single value.
For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates
((((1+2)+3)+4)+5).  If initial is present, it is placed before the items
of the sequence in the calculation, and serves as a default when the
sequence is empty.
[0;31mType:[0m      builtin_function_or_method


# SparkContext

SparkContext是任何spark功能的入口点.当我们运行任何Spark应用程序时，会启动一个驱动程序，它具有main函数，并且此处启动了SparkContext.然后，驱动程序在工作节点上的执行程序内运行操作.

SparkContext使用Py4J启动 JVM 并创建 JavaSparkContext .默认情况下，PySpark将SparkContext作为'sc'提供，因此创建新的SparkContext将不起作用.

In [3]:
SparkContext?

[0;31mInit signature:[0m
[0mSparkContext[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mmaster[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mappName[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0msparkHome[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mpyFiles[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0menvironment[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mbatchSize[0m[0;34m=[0m[0;36m0[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mserializer[0m[0;34m=[0m[0mPickleSerializer[0m[0;34m([0m[0;34m)[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mconf[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mgateway[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mjsc[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mprofiler_cls[0m[0;34m=[0m[0;34m<[0m[0;32mclass[0m [0;34m'py

Master : 它是它所连接的群集的URL.

appName : 你的工作名称.

sparkHome :  Spark安装目录.

pyFiles : 要发送到集群并添加到PYTHONPATH的.zip或.py文件.

环境 : 工作节点环境变量.

batchSize : 表示为单个Java对象的Python对象数.设置1以禁用批处理，设置为0以根据对象大小自动选择批处理大小，或设置为-1以使用无限制的批处理大小.

序列化程序 :  RDD序列化器.

Conf :  L {SparkConf}的一个对象，用于设置所有Spark属性.

网关 : 使用现有网关和JVM，否则初始化新JVM.

JSC :  JavaSparkContext实例.

profiler_cls : 用于分析的一类自定义Profiler(默认为pyspark.profiler.BasicProfiler).

In [5]:
log_file = "file:///usr/local/spark/README.md"
log_data = sc.textFile(log_file).cache()
log_data

file:///usr/local/spark/README.md MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [9]:
log_data.collect()[:3]

['# Apache Spark',
 '',
 'Spark is a fast and general cluster computing system for Big Data. It provides']

In [6]:
nums_A = log_data.filter(lambda s: 'a' in s).count()
nums_B = log_data.filter(lambda s: 'b' in s).count()

In [7]:
nums_A, nums_B

(61, 30)

提交任务
```
spark-submit first_app.py 
```

# RDD

RDD（Resilient Distributed Dataset）叫做`弹性分布式数据集`，是Spark中最基本的数据处理模型。代码中是一个抽象类，它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合


1. 弹性
- 存储的弹性：内存与磁盘的自动切换；
- 容错的弹性：数据丢失可以自动恢复；
- 计算的弹性：计算出错重试机制；
- 分片的弹性：可根据需要重新分片。

2. 分布式：数据存储在大数据集群不同节点上
3. 数据集：RDD封装了计算逻辑，并不保存数据
4. 数据抽象：RDD是一个抽象类，需要子类具体实现
1. 不可变：RDD封装了计算逻辑，是不可以改变的，想要改变，只能产生新的RDD，在新的RDD里面封装计算逻辑
1. 可分区、并行计算

要对这些RDD应用操作，有两种方法:

- 转型(transform) : 这些是操作，它们应用于RDD以创建新的RDD. Filter，groupBy和map是转换的例子.

- 动作(action) : 这些是应用于RDD的操作，它指示Spark执行计算并将结果发送回驱动程序.

In [9]:
pyspark.RDD?

[0;31mInit signature:[0m
[0mpyspark[0m[0;34m.[0m[0mRDD[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mjrdd[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mctx[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mjrdd_deserializer[0m[0;34m=[0m[0mAutoBatchedSerializer[0m[0;34m([0m[0mPickleSerializer[0m[0;34m([0m[0;34m)[0m[0;34m)[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m     
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
Represents an immutable, partitioned collection of elements that can be
operated on in parallel.
[0;31mFile:[0m           /usr/local/spark/python/pyspark/rdd.py
[0;31mType:[0m           type
[0;31mSubclasses:[0m     PipelinedRDD


In [10]:
# 创建RDD单词，其中存储了一组提到的单词
words = sc.parallelize(
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words

ParallelCollectionRDD[4] at parallelize at PythonRDD.scala:195

### 基本 转换 运算

In [14]:
counts = words.count()  # 返回RDD中的元素数
counts

8

In [15]:
coll = words.collect()  # 返回RDD中的所有元素
coll

['scala',
 'java',
 'hadoop',
 'spark',
 'akka',
 'spark vs hadoop',
 'pyspark',
 'pyspark and spark']

In [20]:
words.foreach?

[0;31mSignature:[0m [0mwords[0m[0;34m.[0m[0mforeach[0m[0;34m([0m[0mf[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Applies a function to all elements of this RDD.

>>> def f(x): print(x)
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
[0;31mFile:[0m      /usr/local/spark/python/pyspark/rdd.py
[0;31mType:[0m      method


In [32]:
# foreach(f) 仅返回满足foreach内函数条件的元素
def f(x): print(x)

words.foreach(f)

In [33]:
# filter(f) 返回一个包含元素的新RDD，它满足过滤器内部的功能
words.filter(lambda x: 'spark' in x).collect()

['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

In [34]:
# map(f，preservesPartitioning = False)
# 通过将函数应用于RDD中的每个元素来返回一个新的RDD
words.map(lambda x: x + '_001').collect()

['scala_001',
 'java_001',
 'hadoop_001',
 'spark_001',
 'akka_001',
 'spark vs hadoop_001',
 'pyspark_001',
 'pyspark and spark_001']

In [35]:
words.map(lambda x: 'spark' in x).collect()

[False, False, False, True, False, True, True, True]

In [37]:
# reduce(f)
# 执行指定的可交换和关联二进制运算后，返回RDD中的元素
nums = sc.parallelize(range(10))
nums.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [39]:
from operator import add
nums.reduce(add)

45

In [40]:
# distinct 运算
nums = sc.parallelize([1, 2, 3, 3, 4, 4, 5, 5, 5])
nums.distinct().collect()

[1, 2, 3, 4, 5]

In [42]:
num_list = nums.randomSplit([0.4, 0.6])

In [43]:
num_list[0].collect()

[1, 4, 5, 5]

In [44]:
num_list[1].collect()

[2, 3, 3, 4, 5]

In [11]:
# groupBy 
nums = sc.parallelize(range(10))
g_RDD = nums.groupBy(lambda x: 'even' if (x % 2 == 0) else "odd").collect()
g_RDD

[('even', <pyspark.resultiterable.ResultIterable at 0x7fb09a2517d0>),
 ('odd', <pyspark.resultiterable.ResultIterable at 0x7fb09a251810>)]

In [12]:
print(g_RDD[0][0], sorted(g_RDD[0][1]))  # 偶数list

even [0, 2, 4, 6, 8]


In [13]:
print(g_RDD[1][0], sorted(g_RDD[1][1])) 

odd [1, 3, 5, 7, 9]


In [52]:
# join(other, numPartitions = None)
# It returns RDD with a pair of elements with the matching keys and all the values for that particular key. 
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
joined.collect()

[('hadoop', (4, 5)), ('spark', (1, 2))]

In [14]:
# cache() 持久化
# Persist this RDD with the default storage level (MEMORY_ONLY)
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 


ParallelCollectionRDD[11] at parallelize at PythonRDD.scala:195

In [15]:
words.persist().is_cached 

True

### 多个RDD 运算

In [55]:
rdd1 = sc.parallelize([1, 2])
rdd2 = sc.parallelize([2, 4])
rdd3 = sc.parallelize([4, 8])

In [56]:
# 并集
rdd1.union(rdd2).union(rdd3).collect()

[1, 2, 2, 4, 4, 8]

In [57]:
# 交集
rdd1.intersection(rdd2).collect()

[2]

In [58]:
# 差集
rdd1.subtract(rdd2).collect()

[1]

In [59]:
# 笛卡尔积
rdd1.cartesian(rdd2).collect()

[(1, 2), (1, 4), (2, 2), (2, 4)]

### 基本 动作 运算

In [66]:
nums = sc.parallelize([2, 1, 0, 4, 3, 5, 7, 6, 8])

In [67]:
nums.first()

2

In [68]:
nums.take(3)

[2, 1, 0]

In [69]:
nums.takeOrdered(3)

[0, 1, 2]

In [70]:
nums.takeOrdered(3, key=lambda x: -x)

[8, 7, 6]

In [71]:
# 一些统计功能
nums.stats()

(count: 9, mean: 4.0, stdev: 2.5819888974716116, max: 8.0, min: 0.0)

In [75]:
nums.min(), nums.max(), nums.stdev(), nums.mean(), nums.count(), nums.sum()

(0, 8, 2.5819888974716116, 4.0, 9, 36)

### RDD key-value

In [24]:
kvrdd = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4)])

In [6]:
kvrdd.keys().collect()

['a', 'b', 'c', 'd']

In [79]:
kvrdd.values().collect()

[1, 2, 3, 4]

In [80]:
kvrdd.collect()

[('a', 1), ('b', 2), ('c', 3), ('d', 4)]

In [82]:
kvrdd.filter(lambda x: x[0]>'b').collect()  # 针对key

[('c', 3), ('d', 4)]

In [81]:
kvrdd.filter(lambda x: x[1]>2).collect()  # 针对value

[('c', 3), ('d', 4)]

In [83]:
kvrdd.mapValues(lambda x: x**2).collect()   # 对组的value进行操作kv

[('a', 1), ('b', 4), ('c', 9), ('d', 16)]

In [7]:
kvrdd.sortByKey(ascending=False).collect()  # 按key 排序

[('d', 4), ('c', 3), ('b', 2), ('a', 1)]

In [26]:
kvrdd2 = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('b', 4)])


In [27]:
kvrdd2.reduceByKey(lambda x1, x2: x1 + x2).collect()  # 相同的key进行reduce运算

[('b', 7), ('a', 3)]

In [30]:
# groupByKey 结果 (b, iterable的result对象), (a, ), ...
kvrdd2.groupByKey().map(lambda x: (x[0], sum(x[1]))).collect()

[('b', 7), ('a', 3)]

In [13]:
kvrdd3 = sc.parallelize([('a', '1'), ('a', '2'), ('b', '3'), ('b', '4')])
kvrdd2.join(kvrdd3).collect()

[('b', (3, '3')),
 ('b', (3, '4')),
 ('b', (4, '3')),
 ('b', (4, '4')),
 ('a', (1, '1')),
 ('a', (1, '2')),
 ('a', (2, '1')),
 ('a', (2, '2'))]

In [15]:
chr(ord('c'))

'c'

In [16]:
kvrdd4 = sc.parallelize((chr(ord('a') + i), i) for i in range(6))
kvrdd4.collect()

[('a', 0), ('b', 1), ('c', 2), ('d', 3), ('e', 4), ('f', 5)]

In [17]:
kvrdd5 = sc.parallelize((chr(ord('a') + i), '{}'.format(i)) for i in range(10) if i % 2 ==0)
kvrdd5.collect()

[('a', '0'), ('c', '2'), ('e', '4'), ('g', '6'), ('i', '8')]

In [34]:
# join  inner
kvrdd4.join(kvrdd5).collect()

[('c', (2, '2')), ('a', (0, '0')), ('e', (4, '4'))]

In [35]:
# left join
kvrdd4.leftOuterJoin(kvrdd5).sortByKey().collect()

[('a', (0, '0')),
 ('b', (1, None)),
 ('c', (2, '2')),
 ('d', (3, None)),
 ('e', (4, '4')),
 ('f', (5, None))]

In [36]:
# right join
kvrdd4.rightOuterJoin(kvrdd5).sortByKey().collect()

[('a', (0, '0')),
 ('c', (2, '2')),
 ('e', (4, '4')),
 ('g', (None, '6')),
 ('i', (None, '8'))]

In [18]:
# full out join 
kvrdd4.fullOuterJoin(kvrdd5).sortByKey().collect()

[('a', (0, '0')),
 ('b', (1, None)),
 ('c', (2, '2')),
 ('d', (3, None)),
 ('e', (4, '4')),
 ('f', (5, None)),
 ('g', (None, '6')),
 ('i', (None, '8'))]

In [38]:
# 删除 相同key的数据
kvrdd4.subtractByKey(kvrdd5).collect()

[('b', 1), ('d', 3), ('f', 5)]

In [41]:
kvfirst= kvrdd4.first()
kvfirst

('a', 0)

In [43]:
kvfirst[0]

'a'

In [44]:
kvfirst[1]

0

In [40]:
kvrdd4.take(2)

[('a', 0), ('b', 1)]

In [45]:
kvrdd2.countByKey()

defaultdict(int, {'a': 2, 'b': 2})

In [46]:
kvrdd4.collectAsMap()  # 创建python 字典

{'a': 0, 'b': 1, 'c': 2, 'd': 3, 'e': 4, 'f': 5}

In [47]:
kvrdd4.lookup('a')  # 输入key值 寻找value

[0]

In [48]:
kvrdd2.lookup('a')

[1, 2]

 key表示图书名称，value表示某天图书销量，请计算每个键对应的平均值

In [32]:
# key表示图书名称，value表示某天图书销量，请计算每个键对应的平均值
rdd = sc.parallelize([("spark",2),("hadoop",6),("hadoop",4),("spark",6)])
# (k, (销量, 1))) -> (k, (销量和, 数量和)))
rdd.mapValues(lambda x: (x, 1))\
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))\
    .mapValues(lambda x: x[0] / x[1]).collect()

[('hadoop', 5.0), ('spark', 4.0)]

## Broadcast & Accumulator

For parallel processing, Apache Spark uses shared variables. A copy of shared variable goes on each node of the cluster when the driver sends a task to the executor on the cluster, so that it can be used for performing tasks.

In [4]:
kvFruit = sc.parallelize([(1, "apple"), (2, 'orange'), (3, 'banana'), (4, 'grape')])

In [5]:
fruit_map = kvFruit.collectAsMap()
fruit_map

{1: 'apple', 2: 'orange', 3: 'banana', 4: 'grape'}

In [6]:
fruit_ids = sc.parallelize([2, 4, 3, 1])
print("水果编号", fruit_ids.collect())

水果编号 [2, 4, 3, 1]


In [7]:
# 使用字典进行转换
fruit_names = fruit_ids.map(lambda x: fruit_map[x]).collect()
fruit_names  # 每执行一次转换都需要将二者传送到Worker node

['orange', 'grape', 'banana', 'apple']

Broadcast variables are used to save the copy of data across all nodes. 
This variable is cached on all the machines and not sent on machines with tasks.

In [8]:
bc_fruit = sc.broadcast(fruit_map)
bc_fruit

<pyspark.broadcast.Broadcast at 0x7f7a3da19d50>

In [9]:
# # Broadcast变量有一个名为value的属性，它存储数据并用于返回广播值
fruit_names = fruit_ids.map(lambda x: bc_fruit.value[x]).collect()
fruit_names

['orange', 'grape', 'banana', 'apple']

bc_fruit 会传送到Worker Node 机器, 并存储在内存中.

广播变量被创建后不能修改

Accumulator variables are used for aggregating the information through associative and commutative operations. 

For example, you can use an accumulator for a sum operation or counters (in MapReduce). 

In [10]:
num = sc.accumulator(10)
num

Accumulator<id=0, value=10>

In [11]:
def f(x):
    global num
    num += x
rdd = sc.parallelize([10, 20, 30, 40])
rdd.foreach(f)

In [12]:
num.value

110

In [13]:
# 可以使用add进行累加

num_float = sc.accumulator(0.0)  # double 类型
num_int = sc.accumulator(0)

In [14]:
rdd.foreach(lambda x: [num_float.add(x), num_int.add(x)])

In [15]:
num_float.value, num_int.value

(100.0, 100)

In [16]:
type(num_float.value), type(num_int.value)

(float, int)

## RDD 持久化
标记为持久化, 要等到遇到第一个行动操作触发真正计算以后，才会把计算结果进行持久化

将需要重复运算的RDD 存储在内存中, 提升计算效率
- RDD.presist(存储等级) -默认MEMORY_ONLY
- RDD.unpresist()

This can only be used to assign a new storage level if the RDD does not have a storage level set yet.

In [38]:
rdd.persist?

[0;31mSignature:[0m [0mrdd[0m[0;34m.[0m[0mpersist[0m[0;34m([0m[0mstorageLevel[0m[0;34m=[0m[0mStorageLevel[0m[0;34m([0m[0;32mFalse[0m[0;34m,[0m [0;32mTrue[0m[0;34m,[0m [0;32mFalse[0m[0;34m,[0m [0;32mFalse[0m[0;34m,[0m [0;36m1[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Set this RDD's storage level to persist its values across operations
after the first time it is computed. This can only be used to assign
a new storage level if the RDD does not have a storage level set yet.
If no storage level is specified defaults to (C{MEMORY_ONLY}).

>>> rdd = sc.parallelize(["b", "a", "c"])
>>> rdd.persist().is_cached
True
[0;31mFile:[0m      /usr/local/spark/python/pyspark/rdd.py
[0;31mType:[0m      method


In [39]:
pyspark.StorageLevel?

[0;31mInit signature:[0m
[0mpyspark[0m[0;34m.[0m[0mStorageLevel[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0museDisk[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0museMemory[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0museOffHeap[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mdeserialized[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mreplication[0m[0;34m=[0m[0;36m1[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m     
Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory
in a JAVA-specific serialized format, and whether to replicate the RDD partitions on multiple
nodes. Also contains static constants for some commonly used storage levels, MEMORY_ONLY.
Since the data is always serialized on the Python side, all the constants use the serialized
formats.
[0;31mFile:[0m           /usr/local/spark

In [17]:
rdd_memory = sc.parallelize([3, 1, 3, 4])
rdd_memory.persist()

ParallelCollectionRDD[7] at parallelize at PythonRDD.scala:195

In [18]:
rdd_memory.is_cached

True

In [19]:
rdd_memory.unpersist()
rdd_memory.is_cached

False

In [20]:
rdd_memory_disk  = sc.parallelize([1, 2, 3])
rdd_memory_disk.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
rdd_memory_disk.is_cached

True

In [21]:
rdd_memory_disk.getStorageLevel()

StorageLevel(True, True, False, False, 1)

## WordCount

In [22]:
log_file = sc.textFile("file:///usr/local/spark/README.md")
log_file

file:///usr/local/spark/README.md MapPartitionsRDD[10] at textFile at NativeMethodAccessorImpl.java:0

In [23]:
words_rdd = log_file.flatMap(lambda line: line.split(" "))

In [24]:
counts_rdd = words_rdd.map(lambda word: (word, 1)).reduceByKey(lambda x1, x2: x1 + x2)

In [25]:
# result = counts_rdd.sortBy(lambda x: x[1], ascending=False)
result = counts_rdd.takeOrdered(10, key=lambda x: -x[1])
result

[('', 72),
 ('the', 23),
 ('to', 17),
 ('Spark', 15),
 ('for', 12),
 ('and', 10),
 ('a', 9),
 ('##', 9),
 ('is', 7),
 ('on', 7)]

In [28]:
counts_rdd.saveAsTextFile("data/output")

In [29]:
%cat data/output/part-00000

('#', 1)
('Apache', 1)
('Spark', 15)
('', 72)
('is', 7)
('a', 9)
('fast', 1)
('and', 10)
('general', 3)
('cluster', 2)
('computing', 1)
('system', 1)
('for', 12)
('Big', 1)
('Data.', 1)
('It', 2)
('provides', 1)
('high-level', 1)
('APIs', 1)
('in', 5)
('Scala,', 1)
('Java,', 1)
('Python,', 2)
('R,', 1)
('an', 4)
('optimized', 1)
('engine', 1)
('that', 2)
('supports', 2)
('computation', 1)
('graphs', 1)
('data', 1)
('analysis.', 1)
('also', 5)
('rich', 1)
('set', 2)
('of', 5)
('higher-level', 1)
('tools', 1)
('including', 4)
('SQL', 2)
('DataFrames,', 1)
('MLlib', 1)
('machine', 1)
('learning,', 1)
('GraphX', 1)
('graph', 1)
('processing,', 1)
('Streaming', 1)
('stream', 1)
('processing.', 1)
('<http://spark.apache.org/>', 1)
('##', 9)
('Online', 1)
('Documentation', 1)
('You', 3)
('can', 6)
('find', 1)
('the', 23)
('latest', 1)
('documentation,', 1)
('programming', 1)
('guide,', 1)
('on', 7)
('[project', 1)
('web', 1)
('page](http://spark.apache.org/documentation.html).', 1)
('This', 2

In [47]:
log_file.flatMap?

[0;31mSignature:[0m [0mlog_file[0m[0;34m.[0m[0mflatMap[0m[0;34m([0m[0mf[0m[0;34m,[0m [0mpreservesPartitioning[0m[0;34m=[0m[0;32mFalse[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Return a new RDD by first applying a function to all elements of this
RDD, and then flattening the results.

>>> rdd = sc.parallelize([2, 3, 4])
>>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
[1, 1, 1, 2, 2, 3]
>>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
[0;31mFile:[0m      /usr/local/spark/python/pyspark/rdd.py
[0;31mType:[0m      method


In [84]:
log_file.map?

[0;31mSignature:[0m [0mlog_file[0m[0;34m.[0m[0mmap[0m[0;34m([0m[0mf[0m[0;34m,[0m [0mpreservesPartitioning[0m[0;34m=[0m[0;32mFalse[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Return a new RDD by applying a function to each element of this RDD.

>>> rdd = sc.parallelize(["b", "a", "c"])
>>> sorted(rdd.map(lambda x: (x, 1)).collect())
[('a', 1), ('b', 1), ('c', 1)]
[0;31mFile:[0m      /usr/local/spark/python/pyspark/rdd.py
[0;31mType:[0m      method


**flatMap 与 map的差异**

In [32]:
test_file = sc.textFile("test_file")

In [33]:
test_file.map(lambda line: line.split(" ")).collect()  # 具有分层结构

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/mnt/data1/workspace/data_analysis_mining/spark/tutorial/test_file
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
	at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:55)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [83]:
str_rdd = test_file.flatMap(lambda line: line.split(" "))  # flattening the results
str_rdd.collect()  

['apple',
 'orange',
 'apple',
 'maple',
 'banana',
 'orange',
 'water',
 'mountain',
 '',
 'first',
 'apple']

# 分区

RDD是弹性分布式数据集，通常RDD很大，会被分成很多个分区，分别保存在不同的节点上

1. 分区的作用

- 增加并行度
- 减少通信开销


2. RDD分区原则

RDD分区的一个原则是使得分区的个数尽量等于集群中的CPU核心（core）数目


[0;31mSignature:[0m [0msc[0m[0;34m.[0m[0mparallelize[0m[0;34m([0m[0mc[0m[0;34m,[0m [0mnumSlices[0m[0;34m=[0m[0;32mNone[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Distribute a local Python collection to form an RDD. Using xrange
is recommended if the input represents a range for performance.

>>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
[[0], [2], [3], [4], [6]]
>>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
[[], [0], [], [2], [4]]
[0;31mFile:[0m      /usr/local/spark/python/pyspark/context.py
[0;31mType:[0m      method


In [7]:
sc.textFile?  # minPartition 手动指定分区

[0;31mSignature:[0m [0msc[0m[0;34m.[0m[0mtextFile[0m[0;34m([0m[0mname[0m[0;34m,[0m [0mminPartitions[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0muse_unicode[0m[0;34m=[0m[0;32mTrue[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Read a text file from HDFS, a local file system (available on all
nodes), or any Hadoop-supported file system URI, and return it as an
RDD of Strings.

If use_unicode is False, the strings will be kept as `str` (encoding
as `utf-8`), which is faster and smaller than unicode. (Added in
Spark 1.2)

>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
...    _ = testFile.write("Hello world!")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
['Hello world!']
[0;31mFile:[0m      /usr/local/spark/python/pyspark/context.py
[0;31mType:[0m      method


In [None]:
log_file = "file:///usr/local/spark/README.md"
log_data = sc.textFile(log_file, 2)

In [9]:
l = sc.parallelize(range(10), 2)

In [13]:
l.glom?

[0;31mSignature:[0m [0ml[0m[0;34m.[0m[0mglom[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Return an RDD created by coalescing all elements within each partition
into a list.

>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> sorted(rdd.glom().collect())
[[1, 2], [3, 4]]
[0;31mFile:[0m      /usr/local/spark/python/pyspark/rdd.py
[0;31mType:[0m      method


In [14]:
l.glom().count()

2

In [15]:
# 重新设置分区
new_l = l.repartition(1)
new_l.glom().count()

1

##  自定义分区

Spark提供了自带的HashPartitioner（哈希分区）与RangePartitioner（区域分区），能够满足大多数应用场景的需求。

```python
from pyspark import SparkConf, SparkContext


def MyPartitioner(key):
    print("MyPartitioner is running")
    print('The key is %d' % key)
    return key%10


def main():
    print("The main function is running")
    conf = SparkConf().setMaster("local").setAppName("MyApp")
    sc = SparkContext(conf = conf)
    data=sc.parallelize(range(10),5)
    data.map(lambda x:(x,1)) \
        .partitionBy(10,MyPartitioner) \
        .map(lambda x:x[0]) \
        .saveAsTextFile("file:///usr/local/spark/mycode/rdd/partitioner")


if __name__ == '__main__':
  main()
```

In [17]:
def MyPartitioner(key):
    print("MyPartitioner is running")
    print('The key is %d' % key)
    return key%10

In [22]:
data=sc.parallelize(range(10),5)
#  # 只接受key-value形式
data.partitionBy?

[0;31mSignature:[0m
[0mdata[0m[0;34m.[0m[0mpartitionBy[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mnumPartitions[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mpartitionFunc[0m[0;34m=[0m[0;34m<[0m[0mfunction[0m [0mportable_hash[0m [0mat[0m [0;36m0x7f7f22f9e3b0[0m[0;34m>[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Return a copy of the RDD partitioned using the specified partitioner.

>>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
>>> sets = pairs.partitionBy(2).glom().collect()
>>> len(set(sets[0]).intersection(set(sets[1])))
0
[0;31mFile:[0m      /usr/local/spark/python/pyspark/rdd.py
[0;31mType:[0m      method


In [23]:
data.map(lambda x: (x, 1))\
    .partitionBy(10, MyPartitioner)\
    .map(lambda x: x[0])\
    .saveAsTextFile('partition_test')

# 例子: 二次排序

In [3]:
%cat data/file4.txt

5 3
1 6
4 9
8 3
4 7
5 6
3 2


In [7]:
rdd_04 = sc.textFile("file:///mnt/data1/workspace/data_analysis_mining/Python+Spark2.0+Hadoop机器学习与大数据实战/spark_tutorial/data/file4.txt")
rdd_04.foreach(print)

In [16]:
res1 = rdd_04.filter(lambda line: len(line.strip()) > 0)

res2 = res1.map(
    lambda x: ((int(x.split(" ")[0]), int(x.split(" ")[1])),
               x)
)
res2.collect()

[((5, 3), '5 3'),
 ((1, 6), '1 6'),
 ((4, 9), '4 9'),
 ((8, 3), '8 3'),
 ((4, 7), '4 7'),
 ((5, 6), '5 6'),
 ((3, 2), '3 2')]

In [20]:
from operator import gt

class SecondarySort:
    def __init__(self, k):
        self.c1 = k[0]
        self.c2 = k[1]
    def __gt__(self, other):
        if other.c1 == self.c1:
            return gt(self.c2, other.c2)
        else:
            return gt(self.c1, other.c1)

In [21]:
res3 = res2.map(lambda x: (SecondarySort(x[0]), x[1]))
res4 = res3.sortByKey(False)
res5 = res4.map(lambda x: x[1])
res5.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 30, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 404, in dump_stream
    bytes = self.serializer.dumps(vs)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 583, in dumps
    return pickle.dumps(obj, protocol)
_pickle.PicklingError: Can't pickle <class '__main__.SecondarySort'>: attribute lookup SecondarySort on __main__ failed

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 404, in dump_stream
    bytes = self.serializer.dumps(vs)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 583, in dumps
    return pickle.dumps(obj, protocol)
_pickle.PicklingError: Can't pickle <class '__main__.SecondarySort'>: attribute lookup SecondarySort on __main__ failed

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
