### shuffling & partitioning

### ```shuffling```

shuffling 은 데이터를 한 노드에서 다른 노드로 옮길 때 발생하고, 성능을 저하시킨다. 

코드로 한 번 알아보자.

In [1]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster('local')
sc = SparkContext(conf = conf)

In [2]:
rdd = sc.parallelize([('math', 7), ('math', 9), ('science', 5), ('english', 5), ('math', 2), ('science', 9), ('english', 15), ('science', 5)], 3)

In [3]:
rdd.groupByKey().collect()

[('english', <pyspark.resultiterable.ResultIterable at 0x265f57102e0>),
 ('math', <pyspark.resultiterable.ResultIterable at 0x265f57103a0>),
 ('science', <pyspark.resultiterable.ResultIterable at 0x265f5710430>)]

위와 같은 RDD 가 있고, 저 RDD 를 groupbykey method 로 정리한다.

groupbykey 는 키 값을 기준으로 value 들을 모아주는 method 인데,

위의 코드와 같이 key 들이 각각의 파티션에 나눠져 있어 

key 값 별로 value 를 정리하기 위해 연산량이 많아지게 된다.

각각의 파티션에 나눠져 있다는 말은 서로 다른 worker nodes 에 저장돼있다는 말과 같고

서로 다른 worker nodes 의 통신이 많아진다는 뜻과 같다.

이 때 네트워크 연산이 많아져 성능이 저하되게 된다.

그래서 shuffling 을 일으키는 method 들을 정리하는 것도 좋지만, ```손이 먼저 움직이기 보단 머리로 충분히 생각하는 것이 필요할 것 같다.```

마지막으로 shuffling 이 일어나는 이유에 대해 정리해보면

데이터를 새롭게 파티셔닝하는 과정에서 발생한다.

```ps. 예시로 groupbykey 를 쓴 것이지, reducebykey 나 다른 transformation method 들에서도 발생한다.```

In [4]:
y = rdd.groupByKey()

y.collect()

[('english', <pyspark.resultiterable.ResultIterable at 0x265e9de9670>),
 ('math', <pyspark.resultiterable.ResultIterable at 0x265f5710b50>),
 ('science', <pyspark.resultiterable.ResultIterable at 0x265f5710be0>)]

In [32]:
lis = [(k, list(v)) for (k, v) in y.collect()]

lis

[('english', [5, 15]), ('math', [7, 9, 2]), ('science', [5, 9, 5])]

먼저 groupbykey 를 하게되면 여러 노드에서 english, math, science 에 해당하는 key 값들을 각 파티션들에 모아놓게 된다. (이미 연산량이 많음)

In [31]:
z = y.mapValues(lambda x: sum(x))

z.collect()

[('english', 20), ('math', 18), ('science', 19)]

그 후에

english 5
english 15

math 7
math 9
math 2

science 5
science 9
science 5 

<img src = 'https://drive.google.com/uc?id=1YAN6QSDGqdKe9h7-gwksDQAKdkL5LmKi'>

를 다시 합쳐서 

[('english', 20), ('math', 18), ('science', 19)] 이 결과를 만들어내게 된다.

불필요한 연산이 너무 많아진다.

결국 value 에 해당하는 값들을 다 더할 일이였으면,

애초에 한 파티션에서 같은 key 값을 가지는 value 는 미리 모아서 더해놓는 것이 연산량을 줄이는 방법일 것이다.

그 해답이 reducebykey 이다.

In [36]:
rdd.reduceByKey(lambda x, y: x + y).collect()

[('english', 20), ('math', 18), ('science', 19)]

('math', 7), ('math', 9), 

('science', 5), ('english', 5), ('math', 2), 

('science', 9), ('english', 15), ('science', 5)

이렇게 3 개의 파티션으로 나눠져있을 때

math 끼리 미리 연산하고

science 끼리 미리 연산한 다음 

다른 worker node 의 파티션과 통신해서 연산량을 줄일 수 있는 코드가 reducebykey 이다.

<img src = 'https://drive.google.com/uc?id=1oUdLxfalmY7D5buIpsd5BdG9KGO8dPLk'>

위의 groupbykey + reduce 와 비교해보면 연산되는 양이 reducebykey 가 더 적다. 

shuffle 을 최소화하면 성능향상이 몇 배로 올라가게 된다.

파티션을 조정하고 캐싱하는 것으로도 피할 수가 있다는 것을 알았으면 좋겠다.

위의 rdd 는 파티션이 3 개로 지정돼있지만 

rdd.repartition(4)

rdd.cache()

이런 식으로 파티셔닝하고 캐싱하는 것이 성능향상에 더 도움을 줄 수 있다.

### ```partitioning```

key-value rdd 에서 partitioning 은 최대한 균일하게 데이터를 분할하고 

쿼리가 같이되는 데이터들은 무조건 붙여주어 탐색을 빠르게 하는 것을 목적으로 한다.

spark 사용 시 잊지말아야할 점에 대해 말했던 적이 있다.

데이터들은 분산돼 저장되며, 같은 연산도 여러번 한다고 여러차례 말했었다.

말 그대로 RDD 는 partitioning 처리 후 여러 worker node 로 분산되어 저장된다.

그래서 앞으로 이루어질 연산들을 수월하게 하기 위해 partitioning 에 신경을 써주어야 한다.

spark 에서 partitioning 은 프로그래밍에서 '어떤 자료구조를 선택할까' 와 같은 맥락이기에 많은 신경을 써야한다고 생각한다.

### ```hash partitioning```

hash function 을 이용해서 데이터들을 여러 파티션에 균일하게 분배하는 방식이다.

일반적으로 hash function 을 사용하는 이유 중 다양한 입력에 대해 고르게 분포하는 장점을 살렸다 생각한다.

spark 는 해시충돌을 최소화 하기위해서 고품질 해시함수를 사용, 해시 버킷, 리파티셔닝을 사용한다.

해시 함수로는 MD5, SHA-1, SHA-256 등을 쓰고, 해시 버킷은 해시값을 기반으로 파티션을 구성하는 것이다.

```반례```

만약 짝수의 key 만이 존재하는 RDD 를 2개의 파티션으로 나누고 해시 함수가 x%2 라면 


p1 = [2,4,6,8 ...]

p2 = []


이런 일이 벌어질 수도 있다.

이런 이유로 spark 의 해시함수는 고품질 해시함수들을 사용한다.

### ```range partitioning```

순서가 있는 정렬된 partitioning 이다.

쿼리 패턴이 순서가 있고 정렬된 상태면 range partitioning 을 떠올려보자. ex: 날짜

### ```memory & disk partitioning```

disk -> prtitionBy()

memory -> repartition(), coalesce()


이런 것도 있다.

### disk

In [37]:
pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
pairs.collect()

[(1, 1), (2, 2), (3, 3), (4, 4), (2, 2), (4, 4), (1, 1)]

In [41]:
pairs.partitionBy(2).cache()

MapPartitionsRDD[59] at mapPartitions at PythonRDD.scala:145

partitionBy 를 써서 파티션을 만든 후에는 케싱을 꼭 해주도록 하자.

연산이 불릴 때 마다 파티셔닝을 반복해서 셔플링이 반복적으로 일어나게된다. .cache(), .persist()

In [38]:
pairs.partitionBy(2).glom().collect()

[[(2, 2), (4, 4), (2, 2), (4, 4)], [(1, 1), (3, 3), (1, 1)]]

partitionBy(2) method 를 써서 2개의 파티션으로 나뉜 것을 볼 수 있다. 

.glom 은 어떻게 파티셔닝이 된건지를 보여주는 method 이다.

partitionBy(partition 개수, 새로운 hash function 지정)

pairs.partitionBy(2, lambda x: x%2).glom().collect() 이런 식으로도 사용 가능하다. 

이미 고품질 해시함수 sha256 등을 사용하고 있지만, 혹시 커스텀을 할 수도 있으니 적어보았다.

### memory

repartition, coalesce 는 파티션을 조절하는데 사용한다.

shuffling 을 동반하는 resource 가 많이 드는 method 이다.

repartition 은 말 그대로 파티셔닝을 몇 개로 할지 다시 정하는 method 이고

coalesce 는 파티셔닝의 개수를 줄이는 method 이다.

`additional `

map, flatmap 두 transform 은 key 의 변형을 주어 탐색의 성능을 저하시킬 수 있으니 주의하자.

key 를 수정할 것이 아니라면 mapValues, flatmapValues 를 사용하자. 

key 를 수정하게 되는 일이 많지는 않을 것 같다. 여러 사람들이 싫어할 것임.