In [1]:
import findspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
findspark.init()

In [2]:
conf = SparkConf().setAppName('heima').setMaster('local[*]')
sc = SparkContext(conf=conf)

# Transformation转换算子:返回RDD的算子

In [3]:
"""
f:(T)->U 表示该算子接收一个参数的传入，传入参数类型不限，返回一个返回值，返回类型不限；
T和U 为泛型的代表，表示任意类型；
f:(A)->A 表示该算子接收一个参数传入，返回一个返回值，与传入参数类型一致；
"""

'\nf:(T)->U 表示该算子接收一个参数的传入，传入参数类型不限，返回一个返回值，返回类型不限；\nT和U 为泛型的代表，表示任意类型；\nf:(A)->A 表示该算子接收一个参数传入，返回一个返回值，与传入参数类型一致；\n'

In [4]:
# map算子: f:(T)->U
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
rdd.map(lambda x: x * 10).collect()

[10, 20, 30, 40, 50, 60]

In [7]:
# flatmap算子: 比map多了一个解除嵌套的功能   f:(T)->U
def function_1(string):
    str_list = string.split(" ")
    temp_list = []
    for item in str_list:
        if item == "a":
            temp_list.append(1)
        elif item == "b":
            temp_list.append(2)
        elif item == "c":
            temp_list.append(3)
        else:
            temp_list.append(4)
    return temp_list
    
rdd = sc.parallelize(["a b c", "a c e", "e c a"])
rdd.flatMap(function_1).collect() # lambda x: x.split(" ")

[1, 2, 3, 1, 3, 4, 4, 3, 1]

In [8]:
# reduceByKey(func)算子:f(V)->V针对KV型RDD,自动按照key分组,然后根据逻辑完成组内聚合
# reduceByKey会先进行合并,再shuffle,再合并,因此比先groupByKey再map会效率高很多;
# f:(V)->V
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 2), ('b', 3), ('a', 5)])
rdd.reduceByKey(lambda a, b: a + b).collect()

[('b', 5), ('a', 7)]

In [9]:
# mapValues算子:针对二元元组RDD，对其内部的Value进行map操作
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 2), ('b', 3), ('a', 5)])
rdd.mapValues(lambda x: x*10).collect()  # 自动取的是value

[('a', 10), ('a', 10), ('b', 20), ('b', 30), ('a', 50)]

In [11]:
# groupBy(func)算子:f(T)->K 返回的是KV型，其中key是函数func的返回值
# KV型中返回迭代器, value中保留的是原格式(看示例)
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 2), ('b', 3), ('a', 5)])
def function_2(x):
    if x[1] // 2 == 0:
        return 0
    else:
        return 1
    
rdd.groupBy(function_2).mapValues(lambda x: list(x)).collect()

[(0, [('a', 1), ('a', 1)]), (1, [('b', 2), ('b', 3), ('a', 5)])]

In [13]:
# filter(func)算子:f(T)->bool对数据进行过滤
def function_3(x):
    if x % 2 == 0:
        return True
    else:
        return False

rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
rdd.filter(function_3).collect()

[2, 4, 6]

In [14]:
# distinct算子: 对RDD数据进行去重,返回新的RDD,结果是无序的
rdd = sc.parallelize([1, 1, 2, 2, 3, 3, 'a', 'a', 'b'])
rdd.distinct().collect()

[1, 2, 3, 'b', 'a']

In [16]:
# union算子: 将两个RDD合并为同一个RDD返回,不去重,内容类型不同依旧可以合并
rdd1 = sc.parallelize([1, 2, 3, 4])
rdd2 = sc.parallelize([('a', 1), ('a', 1), ('b', 2), ('b', 3), ('a', 5)])
rdd1.union(rdd2).collect()

[1, 2, 3, 4, ('a', 1), ('a', 1), ('b', 2), ('b', 3), ('a', 5)]

In [18]:
# join、leftOuterJoin、rightOuterJoin算子:只能用于二元元组
rdd1 = sc.parallelize([('jack', 12), ('json', 11), ('mark', 13), ('Jim', 12)])
rdd2 = sc.parallelize([('jack', 'male'), ('mark', 'male'), ('Rose', 'female')])
rdd1.leftOuterJoin(rdd2).collect()  # join   leftOuterJoin    rightOuterJoin

[('json', (11, None)),
 ('Jim', (12, None)),
 ('mark', (13, 'male')),
 ('jack', (12, 'male'))]

In [19]:
# intersection算子:求两个RDD的交集，返回新的RDD
rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3)])
rdd2 = sc.parallelize([('a', 1), 3, 4, 5, 6])
rdd1.intersection(rdd2).collect()

[('a', 1)]

In [20]:
# glom 算子:将RDD加上嵌套,按照分区进行嵌套
rdd = sc.parallelize([1,2,3,4,5,6], 2)
rdd.glom().collect()

[[1, 2, 3], [4, 5, 6]]

In [22]:
# groupByKey算子:对数据按照指定的规则进行分组,针对KV型数据自动按照K分组
# 返回的value内容是迭代器,与groupBy一样
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 2), ('b', 3), ('a', 5)])
rdd.groupByKey().mapValues(list).collect()

[('b', [2, 3]), ('a', [1, 1, 5])]

In [25]:
# sortBy(func, ascending, numPartition)算子:对RDD数据进行排序,排序依据基于自己指定的函数
# 注意numPartition只能确保分区内有序,结果可能整体不有序,可设置为1即全局
rdd = sc.parallelize([('a', 1), ('a', 7), ('b', 2), ('b', 3), ('a', 5)], 2)
rdd.sortBy(lambda x: x[1], numPartitions=2).collect()

[('a', 1), ('b', 2), ('b', 3), ('a', 5), ('a', 7)]

In [28]:
# sortByKey(ascending, numPartition, keyfunc)算子:针对KV型RDD,按照Key进行排序
# keyfunc需要显示传入,只影响排序的结果,不影响排序后原内容
def function_4(x):
    return x.lower()

rdd = sc.parallelize([('a', 1), ('B', 1), ('c', 2), ('e', 5), ('D', 3)])
rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=function_4).collect()

[('a', 1), ('B', 1), ('c', 2), ('D', 3), ('e', 5)]

## Transformation下：分区操作算子

In [61]:
# mapPartitions算子:,一次性批量处理分区数据,一次IO操作一整个分区
# 一个分区作为一个迭代对象传出
# 以List打包发给func,处理完以后以List返回
def function_5(items):
    result = []
    for item in items:
        result.append(item * 10)
    return result

rdd = sc.parallelize([3, 1, 2, 7, 10, 9, 2, 3], 2)
rdd.mapPartitions(function_5).collect()

[30, 10, 20, 70, 100, 90, 20, 30]

In [66]:
# foreachPartition算子：其特点同mapPartitions
def function_6(items):
    result = []
    for item in items:
        result.append(item * 10)
    print(result)
rdd = sc.parallelize([3, 1, 2, 7, 10, 9, 2, 3], 2)
rdd.foreachPartition(function_6) # [30, 10, 20, 70],[100, 90, 20, 30]

In [68]:
# patitionBy(参数1：分区数, func：分区规则)算子
# 编号从0开始到N-1结束
def function_7(x):
    if x == "hadoop":
        return 0
    elif x == "spark":
        return 1
    else:
        return 2
    
rdd = sc.parallelize([('hadoop', 1), ('spark', 1), ('python', 1), ('c++', 1)], 2)
rdd.partitionBy(3, function_7).glom().collect()

[[('hadoop', 1)], [('spark', 1)], [('python', 1), ('c++', 1)]]

In [69]:
# repartition(N)算子：对RDD进行重新分区，实际上调用了coalesce
# 一般情况下除了全局排序设置为1分区外,多数时候都交给spark自动设置分区数量
# 增加分区风险大于减少分区，增加分区导致shuffle增加，性能下降
rdd = sc.parallelize([3, 1, 2, 7, 10, 9, 2, 3], 2)
rdd.repartition(3).glom().collect()
# coalesce(N, shuffle=True)增加分区需要，明确参数shuffle

[[], [3, 1, 2, 7], [10, 9, 2, 3]]

# Action动作算子:不返回RDD的算子或None

In [31]:
# countByKey算子:统计Key出现的次数
rdd = sc.parallelize([('a', 1), ('a', 7), ('b', 2), ('b', 3), ('a', 5)], 2)
rdd.countByKey()

defaultdict(int, {'a': 3, 'b': 2})

In [32]:
# collect算子:将RDD各个分区的数据都拉取到Driver,在使用之前要判断内存占用可能
rdd = sc.parallelize([1, 2, 3])
rdd.collect()

[1, 2, 3]

In [33]:
# reduce(func)算子:func(T, T)->T对RDD数据按照传入的逻辑进行聚合
rdd = sc.parallelize(range(1, 10))
print(rdd.reduce(lambda a, b: a + b))

45


In [35]:
# fold(init_value, func)算子:接收一个传入逻辑,进行带有初始值的聚合
# 会作用在每个分区,会额外带上每个分区的初始值聚合
rdd = sc.parallelize(range(1, 10), 3)
rdd.fold(10, lambda a, b: a + b)

85

In [36]:
# first算子
rdd = sc.parallelize([3, 1, 2])
rdd.first()

3

In [37]:
# take算子：返回RDD的前N个元素
rdd = sc.parallelize([3, 1, 2, 7, 10, 9, 2, 3])
rdd.take(3)

[3, 1, 2]

In [38]:
# top算子：对RDD数据集进行降序排序,取前N个返回
rdd = sc.parallelize([3, 1, 2, 7, 10, 9, 2, 3])
rdd.top(3)

[10, 9, 7]

In [39]:
# count算子：计算RDD有多少条数据,返回一个数字
rdd = sc.parallelize([3, 1, 2, 7, 10, 9, 2, 3])
rdd.count()

8

In [41]:
# takeSample(参数1:可放回,参数2:个数,参数3:随机数种子)算子:随机抽样RDD的数据
rdd = sc.parallelize([3, 1, 2, 7, 10, 9, 2, 3])
rdd.takeSample(False, 10)

[3, 1, 2, 9, 10, 7, 3, 2]

In [43]:
# takeOrdered(参数1:个数,func):功能对RDD进行排序取前N个，比Top好的点在于，可升序可降序
rdd = sc.parallelize([3, 1, 2, 7, 10, 9, 2, 3])
rdd.takeOrdered(10, lambda x: -x)

[10, 9, 7, 3, 3, 2, 2, 1]

In [65]:
# foreach算子:对RDD的每一个元素执行提供的操作逻辑,没有返回值;
# 经由excutor直接输出，与collect等算子不一样，不与Driver汇报
# “不生成新的RDD”，可以进行数据库插入 (saveAsTextFile一样)

rdd = sc.parallelize([3, 1, 2, 7, 10, 9, 2, 3])
rdd.foreach(lambda x: print(x * 10))

In [58]:
# saveAsTextFile算子：将RDD的数据写入文本文件中
# 支持本地写出，hdfs等文件系统
rdd = sc.parallelize([3, 1, 2, 7, 10, 9, 2, 3], 2)
rdd.saveAsTextFile('./output2')