In [1]:
from pyspark import SparkContext, SparkConf

# local - 本地运行
conf = SparkConf().setMaster("local").setAppName("myApp")
sc = SparkContext(conf=conf)

In [2]:
# 键值对rdd(pairRDD)
# 键值型rdd适合进行聚合运算
# ("ip", 访问次数）
new_pair_rdd = sc.parallelize([("192.168.3.11", 32), ("192.168.3.12", 50),
                               ("192.168.3.11", 30), ("192.168.3.12", 21),
                               ("192.168.3.12", 15), ("192.168.3.11", 63),
                               ("192.168.3.11", 33), ("192.168.3.11", 32)])

In [3]:
# 所有键
new_pair_rdd.keys().collect()

['192.168.3.11',
 '192.168.3.12',
 '192.168.3.11',
 '192.168.3.12',
 '192.168.3.12',
 '192.168.3.11',
 '192.168.3.11',
 '192.168.3.11']

In [4]:
# 所有值
new_pair_rdd.values().collect()

[32, 50, 30, 21, 15, 63, 33, 32]

In [5]:
# 统计元素个数
new_pair_rdd.count()

8

In [6]:
# 按键名统计元素个数
new_pair_rdd.countByKey()

defaultdict(int, {'192.168.3.11': 5, '192.168.3.12': 3})

In [7]:
# 按照值统计元素个数，加上键的区别
new_pair_rdd.countByValue()

defaultdict(int,
            {('192.168.3.11', 32): 2,
             ('192.168.3.12', 50): 1,
             ('192.168.3.11', 30): 1,
             ('192.168.3.12', 21): 1,
             ('192.168.3.12', 15): 1,
             ('192.168.3.11', 63): 1,
             ('192.168.3.11', 33): 1})

In [8]:
# 根据键值reduce
rdd = new_pair_rdd.reduceByKey(lambda x, y: x + y)
rdd.collect()

[('192.168.3.11', 190), ('192.168.3.12', 86)]

In [9]:
# 根据键进行分组
rdd = new_pair_rdd.groupByKey()
rdd.collect()

[('192.168.3.11', <pyspark.resultiterable.ResultIterable at 0x21a378308c8>),
 ('192.168.3.12', <pyspark.resultiterable.ResultIterable at 0x21a37830808>)]

In [10]:
# 对每个值进行mapValues
# 对分组后的rdd进行mapValues
rdd.mapValues(len).collect()

[('192.168.3.11', 5), ('192.168.3.12', 3)]

In [11]:
rdd.mapValues(list).collect()

[('192.168.3.11', [32, 30, 63, 33, 32]), ('192.168.3.12', [50, 21, 15])]

In [12]:
rdd.mapValues(sum).collect()

[('192.168.3.11', 190), ('192.168.3.12', 86)]

In [13]:
# ############################################
# new_pair_rdd.combineByKey(createCombiner, mergeValue, mergeCombiners)
# combineByKey过程中，会遍历每一个元素
# createCombiner - 遍历过程中，当第一次遍历到某个键时,使用createCombiner创建一个这个键的初始值(在每个分区)
# mergeValue - 将新值添加到对应的key元素值中
# mergeCombiners - 将两个组合成一个
# combineByKey可以把rdd的value整合成不同类型的value，比如原来的是[("a",1),("a",2)]可以整合成[("a",3)]
# 也可以整合成[("a",[1,2])]
# ###############################################

In [14]:
new_pair_rdd = sc.parallelize([
    ("a", [1, 2, 3, 4]),
    ("b", [10, 20, 30, 40]),
    ("a", [6, 7, 8, 9]),
    ("b", [60, 70, 80, 90]),
])

In [15]:
# No.1 对每个键的每一个元素首先求和，然后所有的和合并成一个列表
# 目标 [("a",[10, 30]), ("b", [100,300])]
# 遇到的第一个新元素
def create_list(first_item):
    print('first=', first_item)
    return [sum(first_item)]

In [16]:
# 将后面遇到的相同的key元素添加到对应的列表
def append(last_result, current_item):
    print('args:', last_result, current_item)
    last_result.append(sum(current_item))
    return last_result

In [17]:
# 每个分区最后合并成一个新的列表，只有一个区不会执行
def extend(last_partitioner, current_partitioner):
    print('partitioner:', last_partitioner, current_partitioner)
    last_partitioner.extend(current_partitioner)
    return last_partitioner

In [18]:
rdd1 = new_pair_rdd.combineByKey(create_list, append, extend)

In [19]:
rdd1.collect()

[('a', [10, 30]), ('b', [100, 300])]

In [20]:
# No.2 每个键求一个总和
# 目标 [("a",40), ("b", 400)]
def create_init(first_item):
    return sum(first_item)


def accumulation(last_result, current_item):
    return last_result + sum(current_item)


def reduce_partitioner(last_partitioner, current_partitioner):
    return last_partitioner + current_partitioner

In [21]:
rdd2 = new_pair_rdd.combineByKey(create_init, accumulation, reduce_partitioner)

In [22]:
rdd2.collect()

[('a', 40), ('b', 400)]

In [23]:
# 统计各地总销量
# 地区1
# 商品名  2018年  2019年  2020年
# mac    1000    1200   1000
# iphone 3000    3300   1800

# 地区2
# 商品名  2018年  2019年  2020年
# mac    1100    1300   1200
# iphone 3200    3100   2000

# 地区3
# 商品名  2018年  2019年  2020年
# mac    1500    1800   1300
# iphone 3100    3200   2200

In [24]:
sale_rdd = sc.parallelize([
    ("mac", [1000, 1200, 1000]),
    ("iphone", [3000, 3300, 1800]),
    ("mac", [1100, 1300, 1200]),
    ("iphone", [3200, 3100, 2000]),
    ("mac", [1500, 1800, 1300]),
    ("iphone", [3100, 3200, 2200]),
])

In [25]:
sale_count_1 = sale_rdd.combineByKey(create_list, append, extend)

In [26]:
sale_count_1.collect()

[('mac', [3200, 3600, 4600]), ('iphone', [8100, 8300, 8500])]

In [27]:
sale_count_2 = sale_rdd.combineByKey(create_init, accumulation,
                                     reduce_partitioner)

In [28]:
sale_count_2.collect()

[('mac', 11400), ('iphone', 24900)]

In [29]:
sale_count_3 = sale_rdd.combineByKey(create_list, append, reduce_partitioner)

In [30]:
sale_count_3.collect()

[('mac', [3200, 3600, 4600]), ('iphone', [8100, 8300, 8500])]

In [37]:
def init_item(first_item):
    return tuple(first_item)


def add_by_year(last_result, current_item):
    lst = [last_result[i] + current_item[i] for i in range(len(last_result))]
    print(lst)
    return lst


def reduce_partitioner(last_partitioner, current_partitioner):
    return last_partitioner + current_partitioner

In [38]:
sale_count_4 = sale_rdd.combineByKey(init_item, add_by_year, reduce_partitioner)

In [39]:
sale_count_4.collect()

[('mac', [3600, 4300, 3500]), ('iphone', [9300, 9600, 6000])]