In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext, SparkConf


# 创建RDD

## 1. 并行化创建

In [2]:
conf = SparkConf().setAppName('test').setMaster('local[*]')

In [3]:
conf.set('spark.default.parallesim', '50')

<pyspark.conf.SparkConf at 0x1309282fe20>

In [3]:
sc = SparkContext(conf = conf)

In [4]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])

In [5]:
# 默认根据CPU 分区
rdd.getNumPartitions()

16

In [6]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9], 3)

In [7]:
rdd.getNumPartitions()

3

In [27]:
rdd.glom().collect()

[[1, 2, 3], [4, 5, 6], [7, 8, 9]]

In [8]:
# 发到driver  
# 分布式 转 本地集合
rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9]

## 2. 读文件

In [9]:
file_rdd1 = sc.textFile('word.txt')

In [10]:
file_rdd1.getNumPartitions()

2

In [11]:
file_rdd1.collect()

['hadoop hadoop spark', 'hadoop hadoop spark', 'hadoop hadoop spark flink']

In [12]:
# 加最小分区数 spark 有自己的测速测试 给太大不理会 
file_rdd2 = sc.textFile('word.txt', 3)
print(file_rdd2.getNumPartitions())
file_rdd3 = sc.textFile('word.txt', 100)
print(file_rdd3.getNumPartitions())

3
65


# Operator

## Transformation

## 1. map

In [14]:
def add(data): return data*10

In [16]:
rdd.map(add).collect()

[10, 20, 30, 40, 50, 60, 70, 80, 90]

In [17]:
rdd.map(lambda data: data*10).collect( )

[10, 20, 30, 40, 50, 60, 70, 80, 90]

## 2.flatMap  => 先map 后解除嵌套

In [18]:
file_rdd1

word.txt MapPartitionsRDD[3] at textFile at <unknown>:0

In [20]:
rdd2 = file_rdd1.map(lambda line: line.split(' '))

In [21]:
rdd2.collect()

[['hadoop', 'hadoop', 'spark'],
 ['hadoop', 'hadoop', 'spark'],
 ['hadoop', 'hadoop', 'spark', 'flink']]

In [24]:
file_rdd1.flatMap(lambda line: line.split(' ')).collect()

['hadoop',
 'hadoop',
 'spark',
 'hadoop',
 'hadoop',
 'spark',
 'hadoop',
 'hadoop',
 'spark',
 'flink']

## 3.ReducebyKey  => 针对 Keyvalue rdd =》先分组再聚合

In [28]:
rdd = sc.parallelize([('a', 1), ('a', 1), ('a', 1), ('b', 1), ('b', 1)])

In [29]:
rdd.reduceByKey(lambda a,b: a+b).collect()

[('b', 2), ('a', 3)]

## 3.mapValue  => 针对 Keyvalue rdd 中的 value =》先分组再聚合

In [30]:
rdd.map(lambda x:(x[0], x[1]*10)).collect( )

[('a', 10), ('a', 10), ('a', 10), ('b', 10), ('b', 10)]

In [32]:
rdd.mapValues(lambda value:value*10).collect()

[('a', 10), ('a', 10), ('a', 10), ('b', 10), ('b', 10)]

# eg:Word COunt

In [35]:
file_rdd1

word.txt MapPartitionsRDD[3] at textFile at <unknown>:0

In [42]:
# 取出所有 word
word_rdd = file_rdd1.flatMap(lambda line: line.split(' '))

In [38]:
word_rdd.collect()

['hadoop',
 'hadoop',
 'spark',
 'hadoop',
 'hadoop',
 'spark',
 'hadoop',
 'hadoop',
 'spark',
 'flink']

In [43]:
# key 是单词 value 是1
word_with_one_rdd = word_rdd.map(lambda word: (word,1 ))

In [44]:
# 相加
result_rdd = word_with_one_rdd.reduceByKey(lambda a,b: a+b)

In [45]:
result_rdd.collect()

[('hadoop', 6), ('spark', 3), ('flink', 1)]

## 4.groupBy   和sql一样 =》 hash分组


In [48]:
rdd = sc.parallelize([('a', 1), ('a', 1), ('a', 1), ('b', 1), ('b', 1)])

In [52]:
# 通过函数按照谁来分组 key 或 value =》 f返回谁即可
result = rdd.groupBy(lambda t:t[0])

In [54]:
result.collect()

[('b', <pyspark.resultiterable.ResultIterable at 0x1a71d7a1310>),
 ('a', <pyspark.resultiterable.ResultIterable at 0x1a71d4112b0>)]

In [53]:
result.map(lambda t:(t[0], list(t[1]))).collect()

[('b', [('b', 1), ('b', 1)]), ('a', [('a', 1), ('a', 1), ('a', 1)])]

## 5.filter


In [58]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9], 3)

In [60]:
rdd.filter(lambda x : x%2 ==1).collect()

[1, 3, 5, 7, 9]

## 6. distinct

In [61]:
rdd = sc.parallelize([('a', 1), ('a', 1), ('a', 1), ('b', 1), ('b', 1)])

In [62]:
rdd.distinct().collect()

[('a', 1), ('b', 1)]

In [64]:
sc.parallelize([1,2,3, 1, 3, 2]).distinct().collect()

[1, 2, 3]

## 7.Union => 不会和并  可以合并不同的东西

In [66]:
rdd.union(   sc.parallelize([1,2,3, 1, 3, 2])    ).collect()

[('a', 1), ('a', 1), ('a', 1), ('b', 1), ('b', 1), 1, 2, 3, 1, 3, 2]

## 8.Join

In [67]:
rdd1 = sc.parallelize([  (1001, '张三'), (1002, '李四')  , (1003, '王五')   ])

In [73]:
rdd2 = sc.parallelize(   [   (1001, '销售部'),  (1002, '科技部')] )

In [75]:
rdd1.join(rdd2).collect()

[(1001, ('张三', '销售部')), (1002, ('李四', '科技部'))]

In [76]:
rdd1.leftOuterJoin(rdd2).collect()

[(1001, ('张三', '销售部')), (1002, ('李四', '科技部')), (1003, ('王五', None))]

In [77]:
rdd1.rightOuterJoin(rdd2).collect()

[(1001, ('张三', '销售部')), (1002, ('李四', '科技部'))]

## 9.Intersection => union

In [78]:
rdd1 = sc.parallelize([('a', 1), ('a', 3)])

In [79]:
rdd2 = sc.parallelize([('a', 1), ('b', 3)])

In [80]:
rdd1.intersection(rdd2).collect()

[('a', 1)]

## 10. glom

In [83]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9], 3)

In [86]:
rdd.glom()

PythonRDD[113] at RDD at PythonRDD.scala:53

In [84]:
rdd.glom().flatMap( lambda x:x).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9]

## 10. groupByKey

In [87]:
rdd = sc.parallelize([('a', 1), ('a', 1), ('a', 1), ('b', 1), ('b', 1)])

In [89]:
rdd.groupByKey().collect()

[('b', <pyspark.resultiterable.ResultIterable at 0x1a71e9dfe80>),
 ('a', <pyspark.resultiterable.ResultIterable at 0x1a71e9dff40>)]

In [91]:
rdd.groupByKey().map(lambda x:(x[0], list(x[1]))).collect()

[('b', [1, 1]), ('a', [1, 1, 1])]

## 10. sortBy => 只是分区排序，全局排序就是 numPartitions =1

In [93]:
rdd = sc.parallelize([('x', 1), ('w', 2), ('c', 4), ('g', 123), ('b', 16)])

In [95]:
rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=1).collect()

[('x', 1), ('w', 2), ('c', 4), ('b', 16), ('g', 123)]

In [99]:
rdd.sortBy(lambda x:x[0], ascending=True, numPartitions=1).collect()

[('b', 16), ('c', 4), ('g', 123), ('w', 2), ('x', 1)]

## 11. sortByKey

In [102]:
rdd = sc.parallelize([('x', 1), ('w', 2), ('C', 4), ('g', 123), ('B', 16)])

In [103]:
rdd.sortByKey(ascending=True, numPartitions=3,     keyfunc=    lambda  key: str(key).lower()).collect()

[('B', 16), ('C', 4), ('g', 123), ('w', 2), ('x', 1)]

# Action

## 1. countByKey 

In [104]:
rdd = sc.textFile('word.txt')

In [106]:
rdd2 = rdd.flatMap(lambda x:x.split(' ')).map(lambda x: (x,1))

In [107]:
rdd2.countByKey()

defaultdict(int, {'hadoop': 6, 'spark': 3, 'flink': 1})

In [109]:
type(rdd2.countByKey())

collections.defaultdict

## 2.collect => 把各个分区的数据都拉到Driver

## 3.reduce

In [110]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9], 3)

In [111]:
rdd.reduce(lambda a,b: a+b)

45

## 4.fold => 带有初始值的reduce
分区内有
分区间也有

In [112]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9], 3)

In [115]:
rdd.glom().collect()

[[1, 2, 3], [4, 5, 6], [7, 8, 9]]

In [117]:
rdd.fold(10, lambda a, b : a+b)

85

## 5. first

In [None]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9], 3)

In [119]:
rdd.first()

1

## 6. take 前n个

In [120]:
rdd.take(5)

[1, 2, 3, 4, 5]

## 7. top 降序排序的前几个

In [121]:
rdd= sc.parallelize([123,2123,1233,4,54,6,7,8,9], 3)

In [122]:
rdd.top(3)

[2123, 1233, 123]

## 8.count

In [123]:
rdd.count()

9

## 9.takeSample 抽样

In [128]:
# True 允许重复  =》 相同位置
rdd.takeSample(True, 14)

[9, 54, 8, 7, 123, 9, 8, 9, 123, 1233, 4, 8, 7, 1233]

In [129]:
rdd.takeSample(False, 14)

[9, 54, 1233, 4, 8, 7, 2123, 123, 6]

## 10. takeOrdered

In [130]:
rdd.takeOrdered(3)

[4, 6, 7]

In [131]:
rdd.takeOrdered(3, lambda x:-x)

[2123, 1233, 123]

## 11. foreach => 没有返回值的map => excutor 直接输出 (eg输入数据库)

In [132]:
rdd.foreach(lambda x:x*10)

## 12.saveAsTextFile

In [141]:
# rdd.saveAsTextFile("../All/PySpark/out1")

## 13. mapPartition =》 一整个分区。  map 是一次处理一条.  Cpu 一样 但是空间复杂度网络io更好

In [148]:
def process(iter):
    result = list()
    for it in iter:
        result.append(it*10)
        
    return result

In [149]:
rdd.mapPartitions(process).collect()

[1230, 21230, 12330, 40, 540, 60, 70, 80, 90]

## 14. foreachpartition

In [150]:
rdd.foreachPartition(process)

## 15. partitionBy => 自定义分区

In [155]:
def process(k):
    if k =='hadoop'or k =='hellow': return 0
    if k =='spark': return 1
    return 2

In [156]:
rdd= sc.parallelize([('hadoop', 1), ('hadoop', 1), ('spark', 1), ('flink', 1), ('hellow', 1)]) 

In [158]:
rdd.partitionBy(3, process).glom().collect()

[[('hadoop', 1), ('hadoop', 1), ('hellow', 1)], [('spark', 1)], [('flink', 1)]]

## 16.repartition coalesce

In [159]:
rdd.repartition(1).getNumPartitions()

1

In [160]:
rdd.repartition(5).getNumPartitions()

5

In [164]:
rdd.coalesce(1).getNumPartitions()

1

In [166]:
rdd.coalesce(5, shuffle=True).getNumPartitions()

5

# 面试


# 1. 功能 grouyByKey => 只分组； reduceByKey=> 分组加聚合

# 2. 性能 groupbykey 性能远低于 reducebykey

# reducebykey 先聚合 后 发送 后 最终聚合 => 网络io 走的少

In [5]:
rdd1 = sc.textFile('word.txt')

In [6]:
rdd2 = rdd1.flatMap( lambda x: x.split(' '))

In [7]:
rdd3 = rdd2.map( lambda x: (x, 1))

In [14]:
rdd3.cache()

PythonRDD[13] at RDD at PythonRDD.scala:53

In [8]:
rdd4 = rdd3.reduceByKey(lambda a,b : a+b)

In [9]:
rdd4.collect()

[('hadoop', 6), ('spark', 3), ('flink', 1)]

In [11]:
rdd5 = rdd3.groupByKey()

In [12]:
rdd6 = rdd5.mapValues(lambda x:sum(x))

In [13]:
rdd6.collect()

[('hadoop', 6), ('spark', 3), ('flink', 1)]

In [15]:
rdd3.unpersist()

PythonRDD[13] at RDD at PythonRDD.scala:53

In [18]:
from pyspark.storagelevel import StorageLevel

# eg 
# 网站日志


## 1. 关键词

In [30]:
import jieba

In [52]:
file_rdd = sc.textFile('SogouQ.txt')

In [53]:
split_rdd = file_rdd.map(lambda x: x.split('\t'))

In [54]:
split_rdd.persist(StorageLevel.DISK_ONLY)

PythonRDD[49] at RDD at PythonRDD.scala:53

In [55]:
split_rdd.takeSample(True, 3)[0]

['21:00:07', '05687877046704298', '传智播客', '5', '29', 'http://www.itcast.cn']

In [87]:
def context_jieba(data):
    seg = jieba.cut_for_search(data)
    l = list()
    for word in seg:
        l.append(word)
    return l

In [65]:
def filter_words(data):
    return data not in ['谷', '帮', '客']

In [69]:
def append_words(data):
    if data =='传智播': data ='传智播客'
    if data  =='院校' : data ='院校帮'
    if data  =='博学' : data ='博学谷'
    return (data, 1)

In [57]:
context_rdd = split_rdd.map(lambda x: x[2])

In [58]:
word_rdd = context_rdd.flatMap( context_jieba )

In [59]:
 word_rdd.collect()

['传智播',
 '客',
 '黑马',
 '程序',
 '程序员',
 '传智播',
 '客',
 '博学',
 '谷',
 'IDEA',
 '传智',
 '专修',
 '学院',
 'flume',
 'itcast',
 'bigdata',
 'IDEA',
 '酷丁鱼',
 'bigdata',
 'itcast',
 'flume',
 '数据',
 '仓库',
 '数据仓库',
 '传智汇',
 'itheima',
 'itcast',
 '传智汇',
 'hadoop',
 '博学',
 '谷',
 'hadoop',
 'spark',
 'itheima',
 'hadoop',
 '酷丁鱼',
 'spark',
 'java',
 'hadoop',
 'hadoop',
 'itcast',
 '酷丁鱼',
 'scala',
 '酷丁鱼',
 '酷丁鱼',
 '传智汇',
 'c',
 '语言',
 '传智汇',
 '院校',
 '帮',
 'itcast',
 '院校',
 '帮',
 '黑马',
 '程序',
 '程序员',
 '传智播',
 '客',
 'java',
 'itheima',
 'IT',
 '培训',
 '传智',
 '专修',
 '学院',
 '酷丁鱼',
 '传智播',
 '客',
 'c',
 '语言',
 'spark',
 '传智',
 '专修',
 '学院',
 'itheima',
 'itheima',
 'DataLake',
 '数据',
 '湖',
 'c',
 '语言',
 'itheima',
 'java',
 'hadoop',
 'itheima',
 'c',
 '语言',
 'IDEA',
 'itheima',
 '博学',
 '谷',
 'bigdata',
 'DataLake',
 'bigdata',
 'itheima',
 '传智汇',
 '院校',
 '帮',
 '传智播',
 '客',
 '黑马',
 '程序',
 '程序员',
 'itcast',
 'bigdata',
 '数据',
 '湖',
 'kafka',
 '院校',
 '帮',
 '黑马',
 '程序',
 '程序员',
 'java',
 'scala',
 'itcast',
 '黑马'

In [66]:
filter_word =  word_rdd.filter(filter_words)

In [70]:
fianl_word_rdd = filter_word.map(append_words)

In [72]:
fianl_word_rdd.reduceByKey(lambda a,b : a+b) . sortBy(lambda x:x[1], ascending=False , numPartitions=1).take(5)

[('scala', 2310),
 ('hadoop', 2268),
 ('博学谷', 2002),
 ('传智汇', 1918),
 ('itheima', 1680)]

## 2.用户和关键词组合

In [81]:
def extract_user_and_word(data):
    ## data=> (1, 'wo ai ni')
    user_id = data[0]
    content = data[1]
    
    words = context_jieba(content)
    return_list = list()
    for word in words:
        if filter_words(word):
            
            return_list.append((user_id + '_'+ append_words(word)[0], 1))
    return return_list 

In [82]:
user_content_rdd = split_rdd.map(lambda x: (x[1], x[2]))

In [83]:
user_content_rdd.take(1)

[('2982199073774412', '传智播客')]

In [84]:
user_word_with_one_rdd = user_content_rdd.flatMap(extract_user_and_word)

In [85]:
user_word_with_one_rdd.take(1)

[('2982199073774412_传智播客', 1)]

In [86]:
user_word_with_one_rdd.reduceByKey(lambda a,b : a+b) . sortBy(lambda x:x[1], ascending=False , numPartitions=1).take(5)

[('6185822016522959_scala', 2016),
 ('41641664258866384_博学谷', 1372),
 ('44801909258572364_hadoop', 1260),
 ('7044693659960919_数据', 1120),
 ('7044693659960919_仓库', 1120)]

## 3 时间段

In [88]:
time_rdd = split_rdd.map(lambda x: x[0])

In [89]:
time_rdd.take(1)

['00:00:00']

In [93]:
hour_with_one_rdd = time_rdd.map(lambda x:  (    x.split(':')[0] , 1   ))

In [96]:
hour_with_one_rdd.reduceByKey(lambda a,b : a+b) . sortBy(lambda x:x[1], ascending=False , numPartitions=1).take(5)

[('20', 3479), ('23', 3087), ('21', 2989), ('22', 2499), ('01', 1365)]

# 广播变量

In [98]:
stu_info_list = [ (1, 'ZS', 11),
                    (2, 'WE', 13),
                    (3, 'rE', 11)]

In [107]:
# 1.本地广播变量
broadcast = sc.broadcast(stu_info_list)

In [108]:
score_info_rdd = sc.parallelize([(1,'语文',99),
(2,'语文',99),
(3,'语文',99)])

In [109]:
def map_func(data):
    s_id = data[0]
    
    # 从广播变量取出
    
    for stu_info in broadcast.value:
        stu_id = stu_info[0]
        
        if stu_id==s_id:
            name = stu_info[1]
            
    return (name, data[1], data[2])

In [110]:
score_info_rdd.map(map_func).collect()

[('ZS', '语文', 99), ('WE', '语文', 99), ('rE', '语文', 99)]

# 累加器

In [117]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9, 10], 2)

In [147]:
acmlt = sc.accumulator(0)

In [148]:
def map_func(data):
    global acmlt
    acmlt+=1


In [149]:
rdd2 = rdd.map(map_func)
rdd2.collect()
print(acmlt)
rdd3 = rdd2.map(lambda x:x)
rdd3.collect()
print(acmlt)

10
20


In [151]:
acmlt = sc.accumulator(0)

In [152]:
rdd2 = rdd.map(map_func)



rdd2.cache()


rdd2.collect()
print(acmlt)
rdd3 = rdd2.map(lambda x:x)
rdd3.collect()
print(acmlt)

10
10


# EG


In [160]:
import re

In [153]:
file_rdd = sc.textFile('accumulator_broadcast_data.txt')

In [155]:
char = [',', '.', '!', '#', '$', '%']

In [156]:
broadcast = sc.broadcast(char)

In [157]:
acmlt = sc.accumulator(0)

In [158]:
# python 有内容就是 TRrue。 None 就是false
lines_rdd = file_rdd.filter(lambda line: line.strip())

In [163]:
lines_rdd.collect()

['   hadoop spark # hadoop spark spark',
 'mapreduce ! spark spark hive !',
 'hive spark hadoop mapreduce spark %',
 '   spark hive sql sql spark hive , hive spark !',
 '!  hdfs hdfs mapreduce mapreduce spark hive',
 '  #']

In [159]:
data_rdd = lines_rdd.map(lambda line:line.strip())

In [164]:
data_rdd.collect()

['hadoop spark # hadoop spark spark',
 'mapreduce ! spark spark hive !',
 'hive spark hadoop mapreduce spark %',
 'spark hive sql sql spark hive , hive spark !',
 '!  hdfs hdfs mapreduce mapreduce spark hive',
 '#']

In [169]:
#                                         \s+  =》 不确定多少空格
words_rdd = data_rdd.flatMap(lambda line: re.split('\s+', line))

In [170]:
words_rdd.collect()

['hadoop',
 'spark',
 '#',
 'hadoop',
 'spark',
 'spark',
 'mapreduce',
 '!',
 'spark',
 'spark',
 'hive',
 '!',
 'hive',
 'spark',
 'hadoop',
 'mapreduce',
 'spark',
 '%',
 'spark',
 'hive',
 'sql',
 'sql',
 'spark',
 'hive',
 ',',
 'hive',
 'spark',
 '!',
 '!',
 'hdfs',
 'hdfs',
 'mapreduce',
 'mapreduce',
 'spark',
 'hive',
 '#']

In [171]:
def filter_func(data):
    global acmlt
    
    if data in char:
        acmlt +=1 
        return False
    else: return True

In [172]:
normal_words = words_rdd.filter(filter_func)

In [173]:
result_rdd = normal_words.map(lambda x:(x,1)).reduceByKey(lambda a, b: a+b)

In [174]:
result_rdd.collect()

[('hadoop', 3),
 ('hive', 6),
 ('hdfs', 2),
 ('spark', 11),
 ('mapreduce', 4),
 ('sql', 2)]

In [176]:
print('特殊字符', acmlt)

特殊字符 8
