In [1]:
# DataFrame -> RDD
spark.range(10).rdd

MapPartitionsRDD[5] at javaToPython at NativeMethodAccessorImpl.java:0

In [4]:
spark.range(10).toDF("id").rdd.map(lambda row: row[0])

PythonRDD[16] at RDD at PythonRDD.scala:53

In [6]:
# rdd 호출후 toDF는 일반적이지 않음
spark.range(10).rdd.toDF()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [22]:
myCollection = "Spark The Definitive Guide: Big Data Processing Made Simple".split()
words = spark.sparkContext.parallelize(myCollection,2)  # 2 partition 

In [23]:
# 각 문장의 길이를 구한다 
# w_length =  words.map(lambda w : len(w))
# w_length.collect()

In [24]:
words.setName("myWords")
words.name()

'myWords'

In [30]:
words

myWords ParallelCollectionRDD[50] at readRDDFromFile at PythonRDD.scala:289

In [26]:
# RDD filterling 
def startsWithS(data):
    return data.startswith("S")
    

In [27]:
words.filter(lambda w : startsWithS(w)).collect()

['Spark', 'Simple']

In [29]:
words2 = words.map(lambda word: (word,word[0],word.startswith("S")))
words2

PythonRDD[52] at RDD at PythonRDD.scala:53

In [31]:
words2.filter(lambda record : record[2]).take(5)   # 5개 반환

[('Spark', 'S', True), ('Simple', 'S', True)]

In [33]:
words.flatMap(lambda word : list(word)).take(10)

['S', 'p', 'a', 'r', 'k', 'T', 'h', 'e', 'D', 'e']

In [34]:
list("Spark The" )

['S', 'p', 'a', 'r', 'k', ' ', 'T', 'h', 'e']

In [35]:
words.sortBy(lambda word: len(word)* -1).take(2)   # 단어의 길이를 계산 해서 음수로 변경... 정렬의 기본은 오름차순-->내림차순

['Definitive', 'Processing']

In [36]:
split_50 =  words.randomSplit([0.5,0.5]) # RDD  50% 두 부분으로 나눈다

In [37]:
# reduce  1 ~20 total summary   range(1,21)
spark.sparkContext.parallelize(range(1,21)).reduce(lambda x , y : x+y)

210

In [40]:
# 두 개의 단어중에서 길이가 긴 단어를 선택 reducder
def wordLengthReducder(leftWord, rightWord):
    return leftWord if len(leftWord) > len(rightWord) else rightWord
    
    # if leftWord> rightWord:
    #     return leftWord
    # else:
    #     return rightWord
    

In [41]:
words.reduce(wordLengthReducder)

'Processing'

In [43]:
# 파티션 개수 확인하기
words.getNumPartitions()

2

In [44]:
words.mapPartitions(lambda part : [1]).sum()

2

In [45]:
# 각 파티션의 인덱스와 해당 파티션에 속한 각 요소를 사용 새로운 값을 생성
def indexedFunc(partitionIndex, withinPartIterator):
    return [ f"partition:{partitionIndex} => {x}" for x in  withinPartIterator]

In [46]:
words.mapPartitionsWithIndex(indexedFunc).collect()

['partition:0 => Spark',
 'partition:0 => The',
 'partition:0 => Definitive',
 'partition:0 => Guide:',
 'partition:1 => Big',
 'partition:1 => Data',
 'partition:1 => Processing',
 'partition:1 => Made',
 'partition:1 => Simple']

In [47]:
# glom()  RDD 요소를 Partition별로 그룹화->리스트로 반환  RDD구조를 확인하거나 디버깅용도로
spark.sparkContext.parallelize(["Hello","Word"],2).glom().collect()

[['Hello'], ['Word']]

In [57]:
# word count
words = '''
 Prospects of talks between the government and the medical community over a prolonged walkout by junior doctors were raised as Yoon instructed officials to seek dialogue with doctors, with the walkout disrupting medical services at major hospitals for nearly five weeks.
"Related ministries have immediately launched working-level preparations to hold talks with the medical community," Cho told a government response meeting. "We will arrange a meeting between the government and the medical community as soon as possible."
'''

In [58]:
text = spark.sparkContext.parallelize([words])
text.getNumPartitions()

4

In [56]:
text.flatMap(lambda line:line.split()).take(5)

['Prospects', 'of', 'talks', 'between', 'the']

In [61]:
word_counts = text.flatMap(lambda line:line.split()).map(lambda word : (word, 1)).reduceByKey(lambda x, y : x+ y)
word_counts.collect()

[('of', 1),
 ('community', 2),
 ('walkout', 2),
 ('raised', 1),
 ('instructed', 1),
 ('have', 1),
 ('working-level', 1),
 ('Cho', 1),
 ('talks', 2),
 ('between', 2),
 ('the', 6),
 ('and', 2),
 ('over', 1),
 ('a', 3),
 ('by', 1),
 ('junior', 1),
 ('doctors', 1),
 ('to', 2),
 ('seek', 1),
 ('dialogue', 1),
 ('disrupting', 1),
 ('services', 1),
 ('nearly', 1),
 ('weeks.', 1),
 ('launched', 1),
 ('preparations', 1),
 ('hold', 1),
 ('"We', 1),
 ('possible."', 1),
 ('medical', 4),
 ('prolonged', 1),
 ('as', 3),
 ('Yoon', 1),
 ('officials', 1),
 ('at', 1),
 ('major', 1),
 ('hospitals', 1),
 ('"Related', 1),
 ('ministries', 1),
 ('told', 1),
 ('response', 1),
 ('meeting', 1),
 ('Prospects', 1),
 ('government', 3),
 ('were', 1),
 ('with', 3),
 ('doctors,', 1),
 ('for', 1),
 ('five', 1),
 ('immediately', 1),
 ('community,"', 1),
 ('meeting.', 1),
 ('will', 1),
 ('arrange', 1),
 ('soon', 1)]

In [63]:
# each multiple 2
numbers = spark.sparkContext.parallelize([1,2,3,4,5,6,7,8,9])
result =  numbers.map(lambda x: x*2)
result.collect()                                         

[2, 4, 6, 8, 10, 12, 14, 16, 18]

In [70]:
# 짝수만 필터링  filter
print( numbers.filter(lambda x : x%2 == 0).collect() )
# 모든요소의 합  reduce

temp = spark.sparkContext.parallelize(["abc","cccc","bbbbbb"])
# 각 요소의 길이  map
print( temp.map(lambda x : len(x)).collect() )         

# 정렬  sortBy
print( temp.sortBy( lambda x : x).collect())

[2, 4, 6, 8]
[3, 4, 6]
['abc', 'bbbbbb', 'cccc']


In [72]:
# RDD 두개를 분할해서 합치기
rdd1 = spark.sparkContext.parallelize([1,10,20])
rdd2 = spark.sparkContext.parallelize([10,100,200])
merged_rdd = rdd1.union(rdd2)
merged_rdd.collect()

[1, 10, 20, 10, 100, 200]

In [73]:
# CSV 파일 로드 RDD 로 처리 

In [82]:
read_csv_ = "/root/spark/bydata/by-day/2011-12-09.csv"

In [88]:
csv_lines =  spark.sparkContext.textFile(read_csv_)
header = csv_lines.first()
header

'InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country'

In [94]:
csv_lines.getNumPartitions()

2

In [92]:
# remove header
data = csv_lines.filter(lambda line : line != header)
data.take(5)

['581475,22596,CHRISTMAS STAR WISH LIST CHALKBOARD,36,2011-12-09 08:39:00,0.39,13069.0,United Kingdom',
 '581475,23235,STORAGE TIN VINTAGE LEAF,12,2011-12-09 08:39:00,1.25,13069.0,United Kingdom',
 '581475,23272,TREE T-LIGHT HOLDER WILLIE WINKIE,12,2011-12-09 08:39:00,0.39,13069.0,United Kingdom',
 '581475,23239,SET OF 4 KNICK KNACK TINS POPPIES,6,2011-12-09 08:39:00,1.65,13069.0,United Kingdom',
 '581475,21705,BAG 500g SWIRLY MARBLES,24,2011-12-09 08:39:00,0.39,13069.0,United Kingdom']

In [100]:
data.map(lambda x : x.split(",")).map(lambda x: (x[0],x[1],x[2]) ).take(5)

[('581475', '22596', 'CHRISTMAS STAR WISH LIST CHALKBOARD'),
 ('581475', '23235', 'STORAGE TIN VINTAGE LEAF'),
 ('581475', '23272', 'TREE T-LIGHT HOLDER WILLIE WINKIE'),
 ('581475', '23239', 'SET OF 4 KNICK KNACK TINS POPPIES'),
 ('581475', '21705', 'BAG 500g SWIRLY MARBLES')]

In [101]:
# 데이터 읽을때 빈 줄을 제거하고 읽기
csv_lines =  spark.sparkContext.textFile(read_csv_)
non_empty_lines = csv_lines.filter(lambda line : line.split() != "")
non_empty_lines.take(2)

['InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country',
 '581475,22596,CHRISTMAS STAR WISH LIST CHALKBOARD,36,2011-12-09 08:39:00,0.39,13069.0,United Kingdom']

In [102]:
# 중복된 행을 제거
data =  non_empty_lines.distinct()
data.take(2)

['581475,22596,CHRISTMAS STAR WISH LIST CHALKBOARD,36,2011-12-09 08:39:00,0.39,13069.0,United Kingdom',
 '581475,23239,SET OF 4 KNICK KNACK TINS POPPIES,6,2011-12-09 08:39:00,1.65,13069.0,United Kingdom']

In [104]:
# 특정폴더 하위에 있는 모든 csv파일 읽어서 RDD 로합치기  glob or os
# spark.sparContext.union

In [105]:
from glob import glob

In [114]:
filepaths = '/root/spark/bydata/by-day/*.csv'
files = glob(filepaths)

In [127]:
# 한개의 csv 파일을 읽어들여서 RDD로변경하는 함수
def read_csv_to_rdd(file_path,header=None):
    data = spark.sparkContext.textFile(file_path)
    if header == None:
        header = data.first()
        data = csv_lines.filter(lambda line : line != header)    
    return data    
# 합치는 함수
def merge_csv_files_folers(filepaths):
    csv_file_lists = glob(filepaths)
    return spark.sparkContext.union( [ read_csv_to_rdd(csvfile)  for csvfile in csv_file_lists ] )


In [117]:
filepaths = '/root/spark/bydata/by-day/'
all_csv_rdd = merge_csv_files_folers(filepaths)

In [119]:
all_csv_rdd.getNumPartitions()

610

In [125]:
len(all_csv_rdd.distinct().collect())
distinct_all_csv_rdd = all_csv_rdd.distinct()
distinct_all_csv_rdd.take(2)

                                                                                

['536392,22150,3 STRIPEY MICE FELTCRAFT,6,2010-12-01 10:29:00,1.95,13705.0,United Kingdom',
 '536569,21823,PAINTED METAL HEART WITH HOLLY BELL,4,2010-12-01 15:35:00,1.45,16274.0,United Kingdom']

In [124]:
len(all_csv_rdd.collect())

                                                                                

542214

In [131]:
# 파일이 있는 경로만 설정해도 파일 리딩이 가능
new_files = '/root/spark/bydata/by-day/'
new_read_csv = spark.sparkContext.textFile(new_files)
len(new_read_csv.collect() )

                                                                                

542214

### advenced RDD

In [133]:
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split()

In [140]:
words = spark.sparkContext.parallelize(myCollection,2)
words.take(2)

['Spark', 'The']

In [137]:
words.map(lambda w : (w.lower(),1)).collect()

[('spark', 1),
 ('the', 1),
 ('definitive', 1),
 ('guide', 1),
 (':', 1),
 ('big', 1),
 ('data', 1),
 ('processing', 1),
 ('made', 1),
 ('simple', 1)]

In [139]:
words.keyBy(lambda w : w.lower()[0]).collect()

[('s', 'Spark'),
 ('t', 'The'),
 ('d', 'Definitive'),
 ('g', 'Guide'),
 (':', ':'),
 ('b', 'Big'),
 ('d', 'Data'),
 ('p', 'Processing'),
 ('m', 'Made'),
 ('s', 'Simple')]

In [141]:
keyword = words.keyBy(lambda w : w.lower()[0])

In [142]:
keyword.mapValues(lambda w: w.upper()).collect()

[('s', 'SPARK'),
 ('t', 'THE'),
 ('d', 'DEFINITIVE'),
 ('g', 'GUIDE'),
 (':', ':'),
 ('b', 'BIG'),
 ('d', 'DATA'),
 ('p', 'PROCESSING'),
 ('m', 'MADE'),
 ('s', 'SIMPLE')]

In [143]:
keyword.flatMapValues(lambda w: w.upper()).collect()

[('s', 'S'),
 ('s', 'P'),
 ('s', 'A'),
 ('s', 'R'),
 ('s', 'K'),
 ('t', 'T'),
 ('t', 'H'),
 ('t', 'E'),
 ('d', 'D'),
 ('d', 'E'),
 ('d', 'F'),
 ('d', 'I'),
 ('d', 'N'),
 ('d', 'I'),
 ('d', 'T'),
 ('d', 'I'),
 ('d', 'V'),
 ('d', 'E'),
 ('g', 'G'),
 ('g', 'U'),
 ('g', 'I'),
 ('g', 'D'),
 ('g', 'E'),
 (':', ':'),
 ('b', 'B'),
 ('b', 'I'),
 ('b', 'G'),
 ('d', 'D'),
 ('d', 'A'),
 ('d', 'T'),
 ('d', 'A'),
 ('p', 'P'),
 ('p', 'R'),
 ('p', 'O'),
 ('p', 'C'),
 ('p', 'E'),
 ('p', 'S'),
 ('p', 'S'),
 ('p', 'I'),
 ('p', 'N'),
 ('p', 'G'),
 ('m', 'M'),
 ('m', 'A'),
 ('m', 'D'),
 ('m', 'E'),
 ('s', 'S'),
 ('s', 'I'),
 ('s', 'M'),
 ('s', 'P'),
 ('s', 'L'),
 ('s', 'E')]

In [145]:
keyword.take(2), keyword

([('s', 'Spark'), ('t', 'The')], PythonRDD[819] at RDD at PythonRDD.scala:53)

In [149]:
keyword.keys().collect(),  keyword.values().collect()

(['s', 't', 'd', 'g', ':', 'b', 'd', 'p', 'm', 's'],
 ['Spark',
  'The',
  'Definitive',
  'Guide',
  ':',
  'Big',
  'Data',
  'Processing',
  'Made',
  'Simple'])

In [None]:
# 각 단어에서 소문자로 변환된 문자들을 추출
# 중복된 문자를 제거
# 각 문자에서 무작위로 생성된 값을 가지는 샘플을 생성
# 각 단어의 첫 문자를 키로하고 단어를 값으로 하는 키 - 값  쌍을 생성
# sampleBykey() 함수를 사용해서 각 키에 대한 샘플을 추출

In [150]:
import random

In [158]:
distinctChar =  words.flatMap(lambda x : x.lower()).distinct().collect()

In [173]:
sampleMap = dict( map(lambda c : (c, random.random()),distinctChar ) )
words.map(lambda w:(w.lower()[0], w)).sampleByKey(True,sampleMap,6).collect()

[('t', 'The'), ('t', 'The'), ('g', 'Guide')]

In [174]:
# words  단어리스트를 소문자로 변환한후 각 문자를 추출
chars = words.flatMap(lambda w:w.lower())
chars.take(2)

['s', 'p']

In [181]:
# 각 문자를 (문자,1)형태로  키-값 변환
dic_chars =  chars.map(lambda w:(w,1))
dic_chars.take(2), dic_chars

([('s', 1), ('p', 1)], PythonRDD[869] at RDD at PythonRDD.scala:53)

In [180]:
def maxFunc(left,right):
    return max(left,right)
def addFunc(left,right):
    return left + right
nums = spark.sparkContext.parallelize(range(1,31),5)    

In [182]:
# dic_chars을 이용해서 각 문자의 출현 빈도를 계산
dic_chars.reduceByKey(lambda x,y : x+y).collect()

[('s', 4),
 ('p', 3),
 ('r', 2),
 ('h', 1),
 ('d', 4),
 ('i', 7),
 ('g', 3),
 ('b', 1),
 ('c', 1),
 ('l', 1),
 ('a', 4),
 ('k', 1),
 ('t', 3),
 ('e', 7),
 ('f', 1),
 ('n', 2),
 ('v', 1),
 ('u', 1),
 (':', 1),
 ('o', 1),
 ('m', 2)]

In [183]:
# num 에서 최대값과
max_value =  nums.reduce(maxFunc)
print(f'max value : {max_value}')
# num 에서 모든 값의 합
total_sum = nums.reduce(addFunc)
print(f'total_sum : {total_sum}')


max value : 30
total_sum : 465


In [184]:
dic_chars.countByKey()

defaultdict(int,
            {'s': 4,
             'p': 3,
             'a': 4,
             'r': 2,
             'k': 1,
             't': 3,
             'h': 1,
             'e': 7,
             'd': 4,
             'f': 1,
             'i': 7,
             'n': 2,
             'v': 1,
             'g': 3,
             'u': 1,
             ':': 1,
             'b': 1,
             'o': 1,
             'c': 1,
             'm': 2,
             'l': 1})

In [185]:
from functools import reduce
dic_chars.groupByKey().map(lambda row : (row[0], reduce(addFunc,row[1]))).collect()

[('s', 4),
 ('p', 3),
 ('r', 2),
 ('h', 1),
 ('d', 4),
 ('i', 7),
 ('g', 3),
 ('b', 1),
 ('c', 1),
 ('l', 1),
 ('a', 4),
 ('k', 1),
 ('t', 3),
 ('e', 7),
 ('f', 1),
 ('n', 2),
 ('v', 1),
 ('u', 1),
 (':', 1),
 ('o', 1),
 ('m', 2)]

In [188]:
nums.aggregate(0,maxFunc,addFunc)  # 초기 값은 0 이고 파티션별로 시퀀스연산(maxFunc)을 하고 그 결과를 병합연산(addFunc)
# 모든 파티션의 최대값의  합

90

In [190]:
# RDD의 요소를 병렬로 집계
depth = 3
nums.treeAggregate(0,maxFunc,addFunc,depth) 

90

In [None]:
# RDD 키별로 값을 집계하는 작업
dic_chars.aggregateByKey(0,addFunc,maxFunc).collect()

### CSV 파일 로드하고 전처리해서 parquet으로 저장하기
### mysql write

In [194]:
from pyspark.sql import SQLContext
from pyspark.sql.functions import *

In [203]:
# read csv file
spark_df = spark.read.format('csv').option('header','true').option('inferSchema','true').load('doc_log.csv')
spark_df.show(2)

+----------+-------+---+--------------------+----------------+---------+
|actiontype|ismydoc|ext|           sessionid|documentposition| datetime|
+----------+-------+---+--------------------+----------------+---------+
|      OPEN|  false|PDF|9400fd2e43d7dc2d0...|    LOCALSTORAGE|2016.7.18|
|     CLOSE|  false|PDF|9400fd2e43d7dc2d0...|    LOCALSTORAGE|2016.7.18|
+----------+-------+---+--------------------+----------------+---------+
only showing top 2 rows



In [204]:
# 임시테이블로 등록  spark_df
spark_df.createOrReplaceTempView('spark_df')

In [208]:
# ismydoc = treu
df1 = spark.sql("select * from spark_df where ismydoc = true")
df1.show(5,False)

+----------+-------+---+--------------------------------+----------------+---------+
|actiontype|ismydoc|ext|sessionid                       |documentposition|datetime |
+----------+-------+---+--------------------------------+----------------+---------+
|OPEN      |true   |PDF|9400fd2e43d7dc2d054ca78806236ee1|MYPOLARISDRIVE  |2016.7.18|
|CLOSE     |true   |PDF|9400fd2e43d7dc2d054ca78806236ee1|MYPOLARISDRIVE  |2016.7.18|
|RESET     |true   |PDF|f191063c562691041dfa935ff0876975|OTHERAPP        |2016.7.6 |
|OPEN      |true   |PDF|3da5ab986c93803de1e25012d9972274|OTHERAPP        |2016.7.28|
|RESET     |true   |PDF|9e37751e132b5eb96e7d3fde7db132e3|OTHERAPP        |2016.7.19|
+----------+-------+---+--------------------------------+----------------+---------+
only showing top 5 rows



In [212]:
# sessionid,ext 컬럼중에서 ext = PDF 또는  ext == DOC인 데이터들중에서 중복데이터를 제거하고 캐쉬
df2 = spark.sql('select * from spark_df')
df2_pdf =  df2.select('sessionid','ext').filter("ext=='PDF' or ext=='DOC'").dropDuplicates().cache()
df2.count()

24/03/25 16:19:37 WARN CacheManager: Asked to cache already cached data.


301861

In [215]:
df2_pdf.show(2)



+--------------------+---+
|           sessionid|ext|
+--------------------+---+
|551de498388693734...|PDF|
|ffef6402dac05483f...|PDF|
+--------------------+---+
only showing top 2 rows



                                                                                

In [218]:
# sessionid 별로 datetime의 최소값
df2_min_date = df2.groupBy(['sessionid']).agg(min('datetime').alias('min_date'))
df2_min_date.show(2)

+--------------------+---------+
|           sessionid| min_date|
+--------------------+---------+
|00042bfc107cef995...| 2016.7.9|
|00050ec6afac496d0...|2016.7.14|
+--------------------+---------+
only showing top 2 rows



                                                                                

In [220]:
# df2_pdf 를 마스터로 해서 df2_min_date 병합
# df2_pdf 목록은 전부 나오게하고 매칭 안되는 데이터는 na 
df2_join = df2_pdf.join(df2_min_date,'sessionid','left')
df2_join.show(5)                        
                        



+--------------------+---+---------+
|           sessionid|ext| min_date|
+--------------------+---+---------+
|551de498388693734...|PDF| 2016.7.9|
|ffef6402dac05483f...|PDF|2016.7.12|
|635a5c8d3df7b0a40...|PDF|2016.7.15|
|c389b7b211b044b56...|PDF|2016.7.22|
|7fb01e6cc98ece873...|DOC| 2016.7.1|
+--------------------+---+---------+
only showing top 5 rows



                                                                                

In [226]:
# 결측치 조사
df2_join.select(sum(col('sessionid').isNull().cast('int') )).show()

+-------------------------------------+
|sum(CAST((sessionid IS NULL) AS INT))|
+-------------------------------------+
|                                    0|
+-------------------------------------+



                                                                                

In [238]:
# 모든 열에 대해서 결측치 여부 조사
result = []
for col_name in df2_join.columns:
    result.append(df2_join.select(sum(col(col_name).isNull().cast('int') ).alias(col_name)   ))
result

[DataFrame[sessionid: bigint],
 DataFrame[ext: bigint],
 DataFrame[min_date: bigint]]