# chapter 3, RDD

## RDD는?
- R(회복 가능한) D(분산) D(데이터셋)
- 강력한 점:
 - 알아서 분산/병렬 처리 연산을 해줌
 - 알아서 분산 저장소에 저장
 - 즉 분산처리를 쉽게하기 위해 데이터를 추상화한 형태

In [1]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("3_rdd")
sc = SparkContext(conf = conf)

## transformation and action

**transformation**
- RDD ----(transformation) ---> RDD
- RDD에서 새로운 RDD (**important!:** 기존의 RDD를 바꾸는게 아님)
- Lazy calculation
    - spark는 transformation연산은 필요할 때에만 수행함
    - 좀 더 정확히 말하면, action이 수행될 때에 transformation도 같이 수행함
    - 그러니까  RDD.map( ... ).filter( ... ) 라고 써도 이 시점에서 계산하는게 아니라
    - 그냥 저 instruction을 기억해 두고 있다가 필요할 때에 한꺼번에 계산함.
    
**action**
- RDD ---(action)---> other types(e.g. int ...)
- Invoke calculation

## examples of transformation

__textFile__

In [19]:
lines = sc.textFile("data/3_rdd.txt") 
lines.take(5)

['995921일반외부장학금 받기 너무 힘들다5 47e2ab54 9분 전 0 28',
 '995917일반이종걸 원내대표 박근혜 대통령에게 탓통령...1 d896b15c 20분 전333',
 '995916일반연게에 가끔 평범 연돌이 무시하는 글 올...2 dc12bde5 20분 전362',
 '995914일반광역어그로8 3068ef1d 28분 전 0 63',
 '995913일반전현무 센빠이 평이 그닥인 듯.....? ㅠㅠ11 63de2cfe 29분 전 0 110']

__filter__

In [18]:
lawSchool = lines.filter( lambda line: "로스쿨" in line)
lawSchool.collect() # collect: show all

['995648일반로스쿨 진학 1학기에 상법듣게 생겼는데;;8 84afba02 21시간 전 0 173']

__map__

In [21]:
import re
skip_first_num = lines.map( lambda line: re.match(r"\d{6}(.*)",line).group(1))#정규식을 배웁시다 여러분
skip_first_num.take(5)# 995921 이거 잘라봄.

['일반외부장학금 받기 너무 힘들다5 47e2ab54 9분 전 0 28',
 '일반이종걸 원내대표 박근혜 대통령에게 탓통령...1 d896b15c 20분 전333',
 '일반연게에 가끔 평범 연돌이 무시하는 글 올...2 dc12bde5 20분 전362',
 '일반광역어그로8 3068ef1d 28분 전 0 63',
 '일반전현무 센빠이 평이 그닥인 듯.....? ㅠㅠ11 63de2cfe 29분 전 0 110']

__flatMap__

In [34]:
split_by_space_map = lines.map( lambda line: line.split(" "))
split_by_space_flatMap = lines.flatMap( lambda line: line.split(" "))

print("map")
print(split_by_space_map.take(2))
print("flatMap")#말 그대로 평평하게 펼쳐줌. inner list없음
print(split_by_space_flatMap.take(10))

map
[['995921일반외부장학금', '받기', '너무', '힘들다5', '47e2ab54', '9분', '전', '0', '28'], ['995917일반이종걸', '원내대표', '박근혜', '대통령에게', '탓통령...1', 'd896b15c', '20분', '전333']]
flatMap
['995921일반외부장학금', '받기', '너무', '힘들다5', '47e2ab54', '9분', '전', '0', '28', '995917일반이종걸']


__intersection__(비싼 operation)

In [28]:
set_a = sc.parallelize([1,2,3])
set_b = sc.parallelize([1,2,4])
set_a.intersection(set_b).collect()

[2, 1]

__distinct__(주의: 비싼 operation)

In [29]:
distinct_example = sc.parallelize([1,1,2,3,3,4])
distinct_example.distinct().collect()

[1, 2, 3, 4]

__subtract__(비쌈)

In [31]:
A = sc.parallelize([1,2,3])
B = sc.parallelize([1,2])
A.subtract(B).collect()

[3]

__cartesian (product)__(비쌈)

In [58]:
coordinate = sc.parallelize([1,2,3])
name = sc.parallelize(["철수","영희","안철수"])
coordinate.cartesian(name).collect()

[(1, '철수'),
 (1, '영희'),
 (1, '안철수'),
 (2, '철수'),
 (2, '영희'),
 (2, '안철수'),
 (3, '철수'),
 (3, '영희'),
 (3, '안철수')]

__sample__(샘플링)

RDD.sample(True(복원)/False(비복원),fraction)

In [102]:
A = sc.parallelize(range(20))
A.sample(False, 0.2).collect()

[12, 16, 18]

## example of actions

__count, countByValue__

In [36]:
lines.count()

157

In [55]:
sorted(lines.map(lambda line: re.match(r"\d{6}..(.*)",line).group(1))\
.flatMap(lambda line: line.split(" "))\
.countByValue().items(), key=lambda x:x[1], reverse=True)[:10]# 죄성합니다... 밑에 다시 쉬운 예제

[('전', 119),
 ('0', 116),
 ('한시간', 18),
 ('20시간', 15),
 ('2시간', 14),
 ('21시간', 13),
 ('5시간', 12),
 ('3시간', 10),
 ('6시간', 10),
 ('8시간', 9)]

In [56]:
A = sc.parallelize([1,1,2,3])
A.countByValue()

defaultdict(int, {1: 2, 2: 1, 3: 1})

__collect__ (이미 많이 씀, RDD에 있는 transformation을 모두 다 꺼내올 때에 씀)

__take, top__ (이름만으로 이미 유추됨? ㅇㅇ 몇개만 꺼내오겠다는거, take는 무작위로 top은 상위에 있는거부터)

In [103]:
A = sc.parallelize(["i","am","ronaldo"])
A.take(3) # 아직 분산하여 넣지 않았으므로 순서가 보존됨

['i', 'am', 'ronaldo']

__reduce__

In [68]:
A = sc.parallelize(["1","2","3"])
print(A.reduce(lambda x,y: x+y))
print(A.reduce(lambda x,y: y+x))
print(A.reduce(lambda currentItem,accumulatedValue: currentItem + accumulatedValue))
#이걸 다 이해해야 reduce를 이해한거!

123
321
123


__fold__ (reduce랑 거의 비슷한데 멱등원을 넣어줘야함. 멱등원이란 + 에서 0, x에서 1과 같이 값을 여려번 적용해도 원래 값이 바뀌지 않는 것)

- reduce vs fold
 - reduce는 계산 순서가 상관 없어야 함.(물론 위에서는 상관있는 것 처럼 보이지만)
     - 수학적으로 commutative monoid 여야 reduce 연산을 사용할 수 있다고 함.
     - 모노이드는 수학의 범주론에서 나오는 개념임.(즉 하스켈을 쓰지 않는 이상 몰라도 됨)
     - 교환법칙과 결합법칙이 다 만족하는 거라고 하는데 자세히는 모르겠음.
 - fold는 그럴 필요 없음.(더 상위 개념)
     - 즉 교환법칙이나 결합법칙이 성립할 필요가 음슴
     
자세한건 [여기](http://stackoverflow.com/questions/25158780/difference-between-reduce-and-foldleft-fold-in-functional-programming-particula) 참고해보셈

In [75]:
A = sc.parallelize([1,2,3])
A.fold(0, lambda x,y : x+y)

6

__aggregate__
- 이놈도 reduce랑 비슷한데 좀 개념이 더 애매함
- 이 놈은 reduce랑 다르게 두 개의 함수를 pass 해줘야함
    1. 노드 내에서 일어나는 계산을 정의하는 함수
    2. 노드와 노드를 합치는 계산을 정의하는 함수
- fold와 같이 멱등원도 넣어줘야함

In [76]:
A = sc.parallelize([1,2,3,4])
A.aggregate(0,
    lambda acc, val: acc + val,
    lambda acc1, acc2: acc1 + acc2
) 
# ㅋㅋㅋ 이짓을 왜하는지 모르겠음. 지금은 reduce랑 정확히 동일함.
# 하지만 뭐 책을 보면, aggregate는 input과 다른 타입의 output이 나올 수도 있다고 함.

10

In [80]:
A.aggregate( ("",0),
    (lambda acc, val : (acc[0]+str(val),acc[1]+val) ),
    (lambda acc1, acc2 : (acc1[0]+acc2[0], acc1[1]+acc2[1]))
)
# 즉, 이런식으로 장난질 칠 수 있나봄(좀 더 유연한가) 여튼 아직 왜 이게 있는지는 잘 모르겠음

('1234', 10)

## persist, cache
- 아까 spark는 lazy calculation을 한다고 했는데 대부분의 경우 효율적임
- 하지만 똑같은 RDD를 여러번 사용해야한다면?
    - lazy하므로 해당 RDD에 action이 취해질 때 마다 transformation chain을 매번 수행함
    - 예를들어 
        - line = lines.filter(lambda x: x > 2).map(lambda x: x + 2)
        - 저기서 line을 부르면 부를 때 마다 filter,map transformation 수행하고 그다음 연산을 함
        - 이럴땐 시간이 많이 걸림...
    - 이 때에 시간을 절약하기 위해 사용하는게 persist나 cache임.
- pyspark.StorageLevel로 어디에 저장할건지 설정할 수 있습니다. 이건 함 찾아보세요

In [89]:
B = A.filter(lambda x: x>1).persist() # 이 이후론 B계산결과는 저장되어있음
B.collect()

[2, 3, 4]

In [91]:
B.unpersist() #저장된거 삭제

PythonRDD[139] at RDD at PythonRDD.scala:48

*reference*
- learning spark(책)
- 하용호씨 slide share [링크](https://www.slideshare.net/yongho/rdd-paper-review)
- RDD 논문 [링크](http://www-bcf.usc.edu/~minlanyu/teach/csci599-fall12/papers/nsdi_spark.pdf)