<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#combineByKey" data-toc-modified-id="combineByKey-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>combineByKey</a></span><ul class="toc-item"><li><span><a href="#데이터-생성" data-toc-modified-id="데이터-생성-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>데이터 생성</a></span></li><li><span><a href="#partition이-1개일-경우-combiner,-mergeValues만-작동" data-toc-modified-id="partition이-1개일-경우-combiner,-mergeValues만-작동-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>partition이 1개일 경우 combiner, mergeValues만 작동</a></span></li><li><span><a href="#partition이-복수면-mergeCombiner가-작동" data-toc-modified-id="partition이-복수면-mergeCombiner가-작동-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>partition이 복수면 mergeCombiner가 작동</a></span><ul class="toc-item"><li><span><a href="#앞서-사용한-기호를-연산자로-변경해-실행" data-toc-modified-id="앞서-사용한-기호를-연산자로-변경해-실행-1.3.1"><span class="toc-item-num">1.3.1&nbsp;&nbsp;</span>앞서 사용한 기호를 연산자로 변경해 실행</a></span></li></ul></li></ul></li><li><span><a href="#다른-데이터" data-toc-modified-id="다른-데이터-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>다른 데이터</a></span></li></ul></div>

In [1]:
import pyspark

myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession\
    .builder\
    .master("local")\
    .appName("myApp")\
    .config(conf=myConf)\
    .getOrCreate()

### combineByKey
키별로 합계 및 개수 (key, (sum, count))를 계산
#### 데이터 생성

In [2]:
_testList=[("key1",1),("key1",3),("key2",2),("key1",2),("key2",4),
           ("key1",5),("key2",6),
           ("key1",7),("key1",8),("key2",9),("key2",3)]

#### partition이 1개일 경우 combiner, mergeValues만 작동

In [3]:
_testRdd=spark.sparkContext.parallelize(_testList)

In [4]:
_testRdd.getNumPartitions()

1

- key1의 첫째 값은 1이다. combiner (\*표기), 다음은 merge values (# 표기)로 계산이 된다. `1*#3#2#5#7#8` 가 출력된다.
- key2는 `2*#4#6#9#3`가 출력된다.

In [5]:
_testRdd.combineByKey(lambda v : str(v)+"*", lambda c, v : c+"#"+str(v), lambda c1, c2 : c1+'&'+c2).collect()

[('key1', '1*#3#2#5#7#8'), ('key2', '2*#4#6#9#3')]

#### partition이 복수면 mergeCombiner가 작동
partition = 2

In [6]:
_testRdd=spark.sparkContext.parallelize(_testList, 2)

partitions = _testRdd.glom().collect()
for num, partition in enumerate(partitions):
    print(f'Partitions {num} -> {partition}')

Partitions 0 -> [('key1', 1), ('key1', 3), ('key2', 2), ('key1', 2), ('key2', 4)]
Partitions 1 -> [('key1', 5), ('key2', 6), ('key1', 7), ('key1', 8), ('key2', 9), ('key2', 3)]


partition이 분할 되었으므로, partition별로 합산된다.

- key1은 partition 0에서 `1,3,2` , partition 1에서 `5,7,8`이 연산. `1*#3#2&5*#7#8` 출력
- key2는 `2,4,6`과 `6,9,3`이 각각 다른 partition에서 연산. `2*#4&6*#9#3` 출력

In [7]:
_testRdd.combineByKey(lambda v : str(v)+"*", lambda c, v : c+"#"+str(v), lambda c1, c2 : c1+'&'+c2).collect()

[('key1', '1*#3#2&5*#7#8'), ('key2', '2*#4&6*#9#3')]

##### 앞서 사용한 기호를 연산자로 변경해 실행

In [8]:
_testRdd.combineByKey(lambda value: (value,1),
                     lambda x,value: (x[0]+value, x[1]+1),
                     lambda x,y: (x[0]+y[0], x[1]+y[1])) \
        .collect()

[('key1', (26, 6)), ('key2', (24, 5))]

- 평균계산

`combineByKey()` 로 sum, count를 구한 후 sum / count

In [9]:
_testCbkRdd=_testRdd.combineByKey(lambda value: (value,1),
                     lambda x,value: (x[0]+value, x[1]+1),                      
                     lambda x,y: (x[0]+y[0], x[1]+y[1]))

In [10]:
averageByKey = _testCbkRdd.map(lambda x:(x[0],x[1][0]/x[1][1]))
averageByKey.collectAsMap()

{'key1': 4.333333333333333, 'key2': 4.8}

### 다른 데이터

In [11]:
marks = spark.sparkContext.parallelize([('kim',86),('lim',87),('kim',75),
                                      ('kim',91),('lim',78),('lim',92),
                                      ('lim',79),('lee',99)])

In [12]:
marksByKey = marks.combineByKey(lambda value: (value,1),
                             lambda x,value: (x[0]+value, x[1]+1),
                             lambda x,y: (x[0]+y[0], x[1]+y[1]))

In [13]:
marksByKey.collect()

[('kim', (252, 3)), ('lim', (336, 4)), ('lee', (99, 1))]

In [14]:
heights = spark.sparkContext.parallelize([
        ('M',182.),('F',164.),('M',180.),('M',185.),('M',171.),('F',162.)
    ])

In [15]:
heightsByKey = heights.combineByKey(lambda value: (value,1),
                             lambda x,value: (x[0]+value, x[1]+1),
                             lambda x,y: (x[0]+y[0], x[1]+y[1]))

In [16]:
heightsByKey.collect()

[('M', (718.0, 4)), ('F', (326.0, 2))]

In [17]:
avgByKey = heightsByKey.map(lambda x: (x[0],x[1][0]/x[1][1]))

print (avgByKey.collectAsMap())

{'M': 179.5, 'F': 163.0}
