## 检查SpartContext类是否正常

In [1]:
sc

In [72]:
# 在其他脚本中独立使用sc
if False:
    from pyspark import SparkConf, SparkContext
    conf = SparkConf().setMaster('local').setAppName('MyApp')
    sc=SparkContext(conf=conf) 

## 创建弹性分布式数据集（RDD）

In [2]:
# 对某集合、列表进行并行化
lines = sc.parallelize(['pandas','i like pandas'])

In [3]:
# 读取外部数据
lines = sc.textFile('./spark/README.md')

## RDD的转化操作transform

转化操作是惰性求值，每次操作对象是RDD的某个元素

In [4]:
lines.count()

105

In [5]:
lines.first()

'# Apache Spark'

In [11]:
pythonLines = lines.filter(lambda s:'Python' in s)

In [12]:
pythonLines.first()

'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'

In [13]:
pythonLines.persist()

PythonRDD[7] at RDD at PythonRDD.scala:53

In [14]:
print(pythonLines.count(),pythonLines.first())

3 high-level APIs in Scala, Java, Python, and R, and an optimized engine that


In [16]:
javaLines = lines.filter(lambda s: 'Java' in s)

In [17]:
PJlines = pythonLines.union(javaLines)

In [18]:
# better code
PJlines=lines.filter(lambda s:('Java' in s)|('Python' in s))

## RDD的行动操作action

注意：避免传入对象是某个对象的成员或字段

In [19]:
for line in PJlines.take(10):
    print(line)

high-level APIs in Scala, Java, Python, and R, and an optimized engine that
## Interactive Python Shell
Alternatively, if you prefer Python, you can use the Python shell:


## 常见转化/行动操作

In [21]:
# map
nums = sc.parallelize([1,2,3,4])
squared = nums.map(lambda x:x**2).collect()
for i in squared:
    print(i)

1
4
9
16


In [24]:
#flatmap
lines = sc.parallelize(['hello world', 'hw'])
words = lines.flatMap(lambda s:s.split(' '))
words.first()
for i in words.collect():
    print(i)

hello
world
hw


#伪集合操作  
RDD.distinct() ## 网络数据混洗，开销很大  
RDD.union() ##可能包含重复数据  
RDD.intersection()  
RDD.subtract()  
RDD.cartesian()

In [62]:
#行动操作
nums = sc.parallelize([1,2,3,4])
nums.reduce(lambda x,y:x+y)

10

In [39]:
#行动操作--聚合
combOp = (lambda par1,par2:(par1[1]+par2[1],par1[0]+par2[0]))
seqOp = (lambda acc,num:(acc[0]+num,acc[1]+1))
nums.aggregate((0,0),seqOp=seqOp,combOp=combOp)## 求均值

(4, 10)

In [58]:
#求各节点的最大值
import numpy as np
l1 = [1,2,3,4,5,6,7,8,9]
rdd1 = sc.parallelize(l1,2)
## 由下看出，seqOp和combOp都会吧zerovalue设为起始值
rdd1.aggregate(zeroValue=0,seqOp=max,combOp=lambda a,b:(str(a)+str(b))) 

'049'

In [69]:
# 其他常用行动操作
# nums.count()
# nums.collect()
# nums.take(3)
# nums.top(3)
# nums.takeSample(False,2)
nums.foreach(lambda x:np.sin(x))

In [70]:
nums.collect()

[1, 2, 3, 4]

## 持久化

In [73]:
nums.persist()

ParallelCollectionRDD[29] at parallelize at PythonRDD.scala:195

In [74]:
nums.count()
nums.takeSample(False,2)

[1, 4]

In [75]:
#取消持久化
nums.unpersist()

ParallelCollectionRDD[29] at parallelize at PythonRDD.scala:195

## 键值对 pair RDD操作

In [3]:
# 创建
pairs = lines.map(lambda x: (x.split(' ')[0],x))
exa = sc.parallelize({(1,2),(3,4),(3,6)})

In [79]:
[s for s in exa.keys().collect()]

[1, 3, 3]

In [8]:
[s for s in pairs.values().collect()]

['pandas', 'i like pandas']

### 转化操作

In [13]:
# reduceByKey
list(exa.reduceByKey(lambda x,y:x+y).values().collect())

[2, 10]

In [20]:
# groupByKey
a,b=exa.groupByKey().values().collect()

In [25]:
[s for s in a]
[s for s in b]

[4, 6]

In [26]:
# mapValues
a=exa.mapValues(lambda x:x+1)
[s for s in a.values().collect()]

[3, 5, 7]

In [27]:
# sortByKey
a=exa.sortByKey(ascending=False)
[s for s in a.keys().collect()]

[3, 3, 1]

In [28]:
exa2 = sc.parallelize([(3,90)])

In [31]:
# subtractByKey
a=exa.subtractByKey(exa2)
[x for x in a.keys().collect()]

[1]

In [32]:
# join 内连接
a=exa.join(exa2)
[x for x in a.keys().collect()]

[3, 3]

In [36]:
# rightOtherJoin/leftOtherJoin
a=exa.rightOuterJoin(exa2)
[x for x in a.values().collect()]

[(4, 90), (6, 90)]

In [34]:
[x for x in a.values().collect()]

[(4, 90), (6, 90)]

In [35]:
a=exa.leftOuterJoin(exa2)
[x for x in a.values().collect()]

[(2, None), (4, 90), (6, 90)]

In [38]:
# 其他针对RDD的函数可以用在pairRDD 上
a=exa.filter(lambda x:x[1]>5)
a.values().collect()

[6]

In [45]:
# 聚合操作
a=exa.mapValues(lambda x:(x,1)).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))#求均值
a.values().collect()

[(2, 1), (10, 2)]

In [60]:
a=sc.parallelize(list('abcdabcdaaabbc'))
b=a.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)# 统计字数，自动识别key 
for k,v in b.collect():
    print(k,v)

a 5
b 4
c 3
d 2


In [61]:
a.countByValue()#更快的求字数

defaultdict(int, {'a': 5, 'b': 4, 'c': 3, 'd': 2})

In [91]:
# combineByKey,函数讲解如下链接
# https://blog.csdn.net/jiangpeng59/article/details/52538254
exa2 = sc.parallelize([("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0)])
sumCount = exa2.combineByKey((lambda score:(score,1)),
                  (lambda scores,score:(scores[0]+score, scores[1]+1)),
                  (lambda scoresA,scoresB:(scoresA[0]+scoresB[0],scoresA[1]+scoresB[1])))
a=sumCount.mapValues(lambda v:v[0]/v[1])

In [95]:
a.collect()

[('Fred', 91.33333333333333), ('Wilma', 95.33333333333333)]

### 数据分组

In [96]:
# groupByKey
# 接受已成对的数据，接受一个函数，其返回值是用来分组的键
if False:
    rdd.groupByKey().mapValues(lambda v:v.reduce(func))
    等价于
    rdd.reduceByKey(lambda v:v.reduce(func))

In [97]:
# cogroup


### 连接

In [102]:
exa = sc.parallelize({(1,2),(3,4),(3,6)})
exa2 = sc.parallelize({(1,20),(3,40),(3,60),(4,70)})

In [99]:
exa.leftOuterJoin(exa2).collect()

[(1, (2, 20)), (3, (4, 40)), (3, (4, 60)), (3, (6, 40)), (3, (6, 60))]

In [103]:
exa.rightOuterJoin(exa2).collect()

[(1, (2, 20)),
 (3, (4, 40)),
 (3, (4, 60)),
 (3, (6, 40)),
 (3, (6, 60)),
 (4, (None, 70))]

### 排序

In [112]:
exa = sc.parallelize([(11,3),(21,4),(9,8)])
# 按字符串升序排序
exa.map(lambda x:(str(x[0]),x[1])).sortByKey().collect().
# exa.sortByKey(ascending=True,keyfunc = lambda x:str(x)).collect()

### 行动操作

所有RDD的操作可用在pair RDD上，此外还有特有函数：

In [121]:
exa.countByKey()

defaultdict(int, {11: 1, 21: 1, 9: 1})

In [122]:
exa.collectAsMap()

{11: 3, 21: 4, 9: 8}

In [123]:
exa.lookup(21)

[4]