学习 Spark 的 python api

In [1]:
from pyspark import SparkContext
import os
os.environ["SPARK_HOME"] = "/home/hadoop/spark-2.0.1-bin-hadoop2.7"   #KeyError: 'SPARK_HOME'
sc=SparkContext(appName='Test')


#conf = (SparkConf().setMaster('local').setAppName('a'))



In [2]:
# list 转化为 RDD
rdd = sc.parallelize([1,2,3,4,5])

In [3]:
# filter 过滤操作
filterRdd = rdd.filter(lambda x:x>=4)
filterRdd.collect()

[4, 5]

In [4]:
# map 对 rdd 每一个元素进行操作
mapRDD = rdd.map(lambda x:x*2)
mapRDD.collect()

[2, 4, 6, 8, 10]

In [5]:
# union 操作,将两个 RDD 合并为一个 RDD
unionRDD = mapRDD.union(rdd)
unionRDD.collect()

[2, 4, 6, 8, 10, 1, 2, 3, 4, 5]

In [9]:
#  count 结果统计, take 收集部分元素 属于行动操作
print unionRDD.count()
unionRDD.take(3)

10


[2, 4, 6]

In [10]:
# flatMap:对每个输入元素生成多个输出元素
lines = sc.parallelize(["hello world","hi"])
words = lines.flatMap(lambda line:line.split(" "))
words.collect()

['hello', 'world', 'hi']

In [14]:
xx = sc.parallelize([[1,2,3],[4,5,6]])
xx.flatMap(lambda x:map(lambda y:y,x)).collect()  # map 对子元素 [1,2,3],[4,5,6] 进行操作

[1, 2, 3, 4, 5, 6]

In [23]:
#  伪集合操作
rdd1 = sc.parallelize(['coffee','coffee','panda','monkey','tea'])
rdd2 = sc.parallelize(['coffee','monkey','kitty'])
rdd1.distinct().collect() # 生成只包含不同元素的新的 RDD

['tea', 'panda', 'monkey', 'coffee']

In [25]:
rdd1.union(rdd2).collect()  # 包含两个 RDD 所有元素,并不会去重

['coffee', 'coffee', 'panda', 'monkey', 'tea', 'coffee', 'monkey', 'kitty']

In [24]:
rdd1.intersection(rdd2).collect()  # 包含两个 RDD 都有的元素

['coffee', 'monkey']

In [26]:
rdd1.subtract(rdd2).collect() # 存在第一个 RDD 中,而不在第二个 RDD 中的所有元素

['tea', 'panda']

##  行动操作

In [28]:
rdd.reduce(lambda x,y:x+y)  # 接受 RDD 两个元素类型数据并返回同一个类型的新元素

15

In [30]:
rdd.fold(0,lambda x,y:x+y)  # fold 与 reduce 一样,只是需要指定一个初始值

15

[Spark算子：RDD行动Action操作](http://lxw1234.com/archives/2015/07/394.htm)
**aggregate()**先对每个分区调用第一个lambda reduce,合并,再调用第二个 reduce

In [39]:
p = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)
p.aggregate((0),
           (lambda x,y:x+y),
           (lambda a,b:a+b))

55

In [40]:
#  取平均值

sumCount = p.aggregate((0,0),
                      (lambda x,y:(x[0]+y,x[1]+1)),  # 第一个参数 x,是 aggregate 传入的参数;第二个参数 y 是 p 的元素,返回一个元组
                      (lambda a,b:(a[0]+b[0],a[1]+b[1]))) # a,b 是每个分区返回的元组

print sumCount

(55, 10)


In [41]:
# 各元素在 RDD 中出现的次数
rdd.countByValue()

defaultdict(int, {1: 1, 2: 1, 3: 1, 4: 1, 5: 1})

##   键值对操作

In [43]:
#  创建 pair RDD,把一个普通的 RDD 转为 pair RDD,调用 map() 返回键值对
pairs = lines.map(lambda x:(x.split(" ")[0],x)) # 以第一个单词为键创建一个 pair RDD
pairs.collect()

[('hello', 'hello world'), ('hi', 'hi')]

In [47]:
lines.flatMap(lambda line:line.split(" ")). \   # 先把单词分出来
             map(lambda x:(x,1)).collect()      # 统计

[('hello', 1), ('world', 1), ('hi', 1)]

In [50]:
#  reduceByKey 合并具有相同键的值,也就是对相同的 key 进行 reduce
rdd = sc.parallelize([(1,2),(3,4),(3,6)])
rdd.reduceByKey(lambda x,y:x+y).collect()

[(1, 2), (3, 10)]

In [54]:
# groupByKey 对具有相同键的值进行分组,对相同 key 进行 group
rdd.groupByKey().collect()
'''
{(1,[2]),(3,[4,6])}
'''

'\n{(1,[2]),(3,[4,6])}\n'

In [56]:
# 对 pair RDD 每个值进行操作,函数不改变键
rdd.mapValues(lambda x:x+1).collect()

[(1, 3), (3, 5), (3, 7)]

In [58]:
# flatMapValues() 对每个值应用一个返回迭代器的函数,然后对返回的每个元素和对应原键生成一个键值对记录
rdd.flatMapValues(lambda x:range(x,6)).collect()

[(1, 2), (1, 3), (1, 4), (1, 5), (3, 4), (3, 5)]

In [72]:
# filter 对第二个元素进行筛选
print rdd.collect()
result = rdd.filter(lambda x:x[1]>2)
print result.collect()

[(1, 2), (3, 4), (3, 6)]
[(3, 4), (3, 6)]


In [None]:
#  combineByKey



In [None]:
# 数据分组,查看顾客所有订单;groupByKey() 使用 RDD 中的键来对数据进行分组
# 对于一个由类型 K 的键和类型 V 的值组成的 RDD，所得到的结果 RDD 类型会是 [K, Iterable[V]






##  Pair RDD 的行动操作

In [77]:
print rdd.collect()
print rdd.countByKey()  # 对每个键对应的元素分别计数

[(1, 2), (3, 4), (3, 6)]
defaultdict(<type 'int'>, {1: 1, 3: 2})
