In [4]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
        .builder
        .appName("Chapter 4. Spark API")
        .getOrCreate()
)

# 4장 스파크 API 깊이 파헤치기
## 4.1 Pair RDD 다루기

- 구매 횟수가 가장 많은 고객에게는 곰 인형을 보낸다.
- 바비 쇼핑몰 놀이 세트를 두 개 이상 구매하면 청구 금행을 5% 할인한다.
- 사전을 다섯 권 이상 구매한 고객에게는 칫솔을 보낸다.
- 가장 많은 금액을 지출한 고객에게는 커플 잠옷 세트를 보낸다.

### 4.1.2 Pair RDD

In [5]:
!tail -10 "../book-samples/ch04/ch04_data_transactions.txt"

2015-03-30#4:11 AM#27#59#4#7949.36
2015-03-30#2:17 PM#26#68#3#7071.04
2015-03-30#8:03 AM#56#56#5#102.47
2015-03-30#11:42 PM#75#73#10#8196.51
2015-03-30#1:24 AM#25#9#4#7848.42
2015-03-30#2:44 PM#91#12#7#5487.02
2015-03-30#5:04 AM#83#1#8#5208.21
2015-03-30#1:20 PM#51#69#10#1194.28
2015-03-30#9:53 PM#68#84#2#3886.99
2015-03-30#5:49 PM#18#31#4#4700.48

In [6]:
tranFile = spark.sparkContext.textFile("../book-samples/ch04/ch04_data_transactions.txt")
tranData = tranFile.map(lambda line: line.split("#"))
transByCust = tranData.map(lambda t: (int(t[2]), t))

#### 4.1.2.1 키 및 값 가져오기

In [7]:
# 고객 ID 고유 개수 확인
transByCust.keys().distinct().count()

100

In [8]:
# 동일한 방법
transByCust.map(lambda x: x[1][2]).distinct().count()

100

#### 4.1.2.2 키별 개수 세기

- 53번 고객이 구매 횟수가 가장 많음

In [9]:
# 키의 출현 횟수를 반환
# 키가 고객 ID이므로 고객 별 구매 횟수를 의미

transByCust.countByKey()[51]

18

In [10]:
# value를 기준으로 정렬

sort_orders = sorted(transByCust.countByKey().items(), key=lambda x: x[1], reverse=True)
sort_orders[:5]

[(53, 19), (51, 18), (56, 17), (2, 15), (76, 15)]

In [11]:
# 구매 기록을 저장
complTrans = [["2015-03-30", "11:59 PM", "53", "4", "1", "0.00"]]

#### 4.1.2.3 단일 키로 값 찾기

- 53번 고객의 모든 구매 기록을 가져옴

In [12]:
transByCust.lookup(53) # 결과값을 드라이버로 전송함

[['2015-03-30', '6:18 AM', '53', '42', '5', '2197.85'],
 ['2015-03-30', '4:42 AM', '53', '44', '6', '9182.08'],
 ['2015-03-30', '2:51 AM', '53', '59', '5', '3154.43'],
 ['2015-03-30', '5:57 PM', '53', '31', '5', '6649.27'],
 ['2015-03-30', '6:11 AM', '53', '33', '10', '2353.72'],
 ['2015-03-30', '9:46 PM', '53', '93', '1', '2889.03'],
 ['2015-03-30', '4:15 PM', '53', '72', '7', '9157.55'],
 ['2015-03-30', '2:42 PM', '53', '94', '1', '921.65'],
 ['2015-03-30', '8:30 AM', '53', '38', '5', '4000.92'],
 ['2015-03-30', '6:06 AM', '53', '12', '6', '2174.02'],
 ['2015-03-30', '3:44 AM', '53', '47', '1', '7556.32'],
 ['2015-03-30', '10:25 AM', '53', '30', '2', '5107.0'],
 ['2015-03-30', '1:48 AM', '53', '58', '4', '718.93'],
 ['2015-03-30', '9:31 AM', '53', '18', '4', '8214.79'],
 ['2015-03-30', '9:04 AM', '53', '68', '4', '9246.59'],
 ['2015-03-30', '1:51 AM', '53', '40', '1', '4095.5'],
 ['2015-03-30', '1:53 PM', '53', '85', '9', '1630.24'],
 ['2015-03-30', '6:51 PM', '53', '100', '1', '1694

In [13]:
# 리스트를 스트링으로 전환
for t in transByCust.lookup(53):
    print(", ".join(t))

2015-03-30, 6:18 AM, 53, 42, 5, 2197.85
2015-03-30, 4:42 AM, 53, 44, 6, 9182.08
2015-03-30, 2:51 AM, 53, 59, 5, 3154.43
2015-03-30, 5:57 PM, 53, 31, 5, 6649.27
2015-03-30, 6:11 AM, 53, 33, 10, 2353.72
2015-03-30, 9:46 PM, 53, 93, 1, 2889.03
2015-03-30, 4:15 PM, 53, 72, 7, 9157.55
2015-03-30, 2:42 PM, 53, 94, 1, 921.65
2015-03-30, 8:30 AM, 53, 38, 5, 4000.92
2015-03-30, 6:06 AM, 53, 12, 6, 2174.02
2015-03-30, 3:44 AM, 53, 47, 1, 7556.32
2015-03-30, 10:25 AM, 53, 30, 2, 5107.0
2015-03-30, 1:48 AM, 53, 58, 4, 718.93
2015-03-30, 9:31 AM, 53, 18, 4, 8214.79
2015-03-30, 9:04 AM, 53, 68, 4, 9246.59
2015-03-30, 1:51 AM, 53, 40, 1, 4095.5
2015-03-30, 1:53 PM, 53, 85, 9, 1630.24
2015-03-30, 6:51 PM, 53, 100, 1, 1694.52
2015-03-30, 7:39 PM, 53, 100, 8, 7885.35


#### 4.1.2.4  mapValues 변환 연산자로 Pari RDD 값 바꾸기

- 25번 상품을 2개 이상 구매하면 5% 할인
- 구매수량 (5번), 구매 금액 (6번)

In [14]:
def applyDiscount(tran):
    if(int(tran[3])==25 and float(tran[4])>1):
        tran[5] = str(float(tran[5])*0.95)
    return tran

transByCust = transByCust.mapValues(lambda t: applyDiscount(t))
transByCust.collect()[:5]

[(51, ['2015-03-30', '6:55 AM', '51', '68', '1', '9506.21']),
 (99, ['2015-03-30', '7:39 PM', '99', '86', '5', '4107.59']),
 (79, ['2015-03-30', '11:57 AM', '79', '58', '7', '2987.22']),
 (51, ['2015-03-30', '12:46 AM', '51', '50', '6', '7501.89']),
 (86, ['2015-03-30', '11:39 AM', '86', '24', '5', '8370.2'])]

#### 4.1.2.5 flatMapValues 변환 연산자로 키에 값을 추가

- 상품 81번을 다섯 권 이상 구매한 고객에세 사은품으로 칫솔을 보내야 함

In [15]:
def addToothbrush(tran):
    if(int(tran[3]) == 81 and int(tran[4])>=5):
        from copy import copy
        cloned = copy(tran)
        cloned[5] = "0.00"
        cloned[3] = "70"
        cloned[4] = "1"
        return [tran, cloned]
    else:
        return [tran]

transByCust = transByCust.flatMapValues(lambda t: addToothbrush(t))

In [16]:
transByCust.count() # 6건의 구매 이력이 추가됨

1006

#### 4.1.2.6 reduceByKey 변환 연산자로 키의 모든 값 병합


In [17]:
amounts = transByCust.mapValues(lambda t: float(t[5]))
amounts.reduceByKey(lambda p1, p2: p1 + p2).collect()[:5]

[(86, 36925.590000000004),
 (94, 51430.14000000001),
 (20, 32997.8),
 (38, 59110.46000000001),
 (46, 36687.53)]

#### 4.1.2.7 reduceByKey 대신 foldByKey 사용

In [18]:
amounts = transByCust.mapValues(lambda t: float(t[5]))
amounts.foldByKey(0, lambda p1, p2: p1 + p2).collect()[:5] # 0은 덧샘의 제로 밸류

[(86, 36925.590000000004),
 (94, 51430.14000000001),
 (20, 32997.8),
 (38, 59110.46000000001),
 (46, 36687.53)]

In [19]:
amounts.foldByKey(100000, lambda p1, p2: p1 + p2).collect()[:5] # 병렬로 실행되기 때문에 zeroValue가 여러번 쓰일 수 있음에 주의

[(86, 236925.59),
 (94, 251430.14),
 (20, 232997.8),
 (38, 259110.46),
 (46, 236687.52999999997)]

In [20]:
sort_orders = sorted(amounts.reduceByKey(lambda p1, p2: p1 + p2).collect(), key=lambda x: x[1], reverse=True)
sort_orders[:5] # 100049달러를 구매한 고객이 있음

[(76, 100049.0),
 (53, 88829.76),
 (56, 85906.94),
 (51, 83312.12),
 (31, 83202.61)]

In [23]:
# 구매 기록을 저장

complTrans += [["2015-03-30", "11:59 PM", "76", "63", "1", "0.00"]]
transByCust = transByCust.union(spark.sparkContext.parallelize(complTrans).map(lambda t: (int(t[2]), t)))
transByCust.map(lambda t: "#".join(t[1])).saveAsTextFile("ch04output-transByCust")

#### 4.1.2.8 aggregateByKey로 키의 모든 값 그루핑

- 3개의 인자
    - ZeroValue, 변환 함수(zerovalue와 변환 내용을 결합), 병합 함수(변환 내용끼리 결합)

In [24]:
prods = transByCust.aggregateByKey([], lambda prods, tran: prods + [tran[3]],
    lambda prods1, prods2: prods1 + prods2)
prods.collect()[:5]

[(54, ['78', '57', '51', '4', '98', '46', '47']),
 (72, ['13', '57', '9', '94', '46', '30', '26']),
 (90, ['34', '65', '66', '46', '99', '99', '28', '5']),
 (18, ['32', '12', '68', '62', '89', '87', '98', '42', '31']),
 (36, ['84', '86', '57', '91', '98'])]

## 4.2 데이터 파티셔닝을 이해하고 테이터 셔플링 최소화 
### 4.2.1 스파크의 데이터 Partitioner

#### 4.2.1.1 HashPartitioner
- 단순한 mod에 대입해 파티션 번호를 계산함
- 대규모 데이터셋을 상대적으로 적은 수의 파티션으로 나누면 대체로 데이터를 고르게 분산
- spark.default.parallelism 환경 매개변수 값을 기본 사용

#### 4.2.1.2 RangePartitioner
- 정렬된 RDD의 데이터를 거의 같은 범위 간격으로 분할할 수 있음
- 직접 사용할 일이 많지는 않음(?)

#### 4.2.1.3 Pair RDD의 사용자 정의 Paritioner
- mapValues, flatMapValues를 제외하고 모든 함수는 사용자 정의 파티셔너를 제공
    - 위 두 함수는 파티셔너를 보전함
- Pair RDD에만 쓸 수 있음
- 부모 RDD(현재 RDD를 만드는 데 사용한 RDD) 중 지정된 파티션 개수가 가장 큰 수를 사용하고 부모가 없는 경우는 spark.default.parallelism 설정에 의존함

```
# 동일한 결과
rdd.foldBykey(afunction, 100)
rdd.foldBykey(afunction, new HashPartitioner(100))
```

### 4.2.2 불필요한 셔플링 줄이기

#### 4.2.2.1 셔플링 발생 조건: Partitioner를 명시적으로 변경

In [25]:
# rdd.aggregateByKey(zeroValue, 100)(seqFunc, comboFunc).collect() # 파티션 개수를 변경
# rdd.aggregateByKey(zeroValue, new CustomPartitioner())(seqFunc, comboFunc).collect() # 사용자 정의 파티션 적용

#### 4.2.2.2 셔플링 발생 조건: Partitioner 제거
- map, flatMap은 RDD의 Partitioner를 제거하므로, 이후 다른 변환 연산자를 사용하면 셔플링이 발생함
- 셔플링이 발생하는 연산자
    - Pair RDD 변환: aggregateByKey, foldByKey, reduceByKey, groupBy, join, leftOuterJoin, rightOuterJoin, fullOuterJoin, subtractByKey
    - subtract, intersection, groupWith
    - sortByKey
    - partiionBy, coalesce

#### 4.2.2.3 외부 셔플링 서비스로 셔플링 최적화
- spark.shuffle.service.enabled = ture
- [Spark 모범 사례](https://docs.qubole.com/en/latest/user-guide/engines/spark/spark-best-practices.html)

#### 4.2.2.4 셔플링 관련 매개변수

- [Spark Configuration](https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior)
    - spark.shuffle.manager (없음)
    - spark.shuffle.consolidateFiles (없음)
    - spark.shuffle.memoryFraction
        - spark.memory.fraction (변경)
        - default 0.6
    - spark.shuffle.spill.compress
        - default: true
    - spark.shuffle.compress
        - default: true
    - spark.shuffle.spill.batchSize (없음)
    - spark.shuffle.service.port
        - default: 7337

### 4.2.3 RDD 파티션 변경
- 작업 부하를 효율적으로 분산시키거나 메모리 문제를 방지하는 목적
- partitionBy, coalesce, repartition, repartitionAndSortWithinPartition

#### 4.2.3.1 partitionBy
- Pair RDD 에서만 사용 가능, 기존과 파티셔너와 다르면 새로운 RDD를 생성

#### 4.2.3.2 coalesce, repartition
- 파티션 개수를 줄이거나 늘릴 때 사용
- shuffle = True 인 경우 이전까지 기존 파티셔너를 사용하고 이후에는 변경된 파티셔너를 사용
- shuffle = False 인 경우 이전에도 변경된 파티셔너를 사용

### 4.2.4 파티션 단위로 데이터 매핑

#### 4.2.4.1 mapPartiions와 mapPartiionsWithIndex

- mapPartiions: 각 파티션의 모든 요소를 반복문으로 처리하고 새로운 RDD 파티션을 생성함
- mapPartiionsWithIndex: mapPartiions와 유사하지만 매핑 함수에 파티션 번호가 함께 전달됨

#### 4.2.4.2 파티션의 데이터를 수집하는 glom 연산자

- 각 파티션의 모든 요소를 배열 하나로 모으고, 이 배열들을 요소로 포함하는 새로운 RDD를 반환함
- RDD 파티션 갯수를 1개로 설정하여 손쉽게 단일 배열로 만들 수 있음
    - collection 과 차이점은(?)

In [27]:
import random
l = [random.randrange(100) for x in range(500)]
rdd = spark.sparkContext.parallelize(l, 30).glom()

30

In [36]:
list_sizes = [len(lst) for lst in rdd.collect()]
print(list_sizes)

[16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 32, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 20]


In [30]:
rdd.count()

30

## 4.3 데이터 조인, 정렬, 그루핑

- 어제 판매한 상품 이름과 각 상품별 매출액 합계
- 어제 판매하지 않은 상품 목록
- 전일 판매 실적 통계: 각 고객이 구입한 상품의 평균 가격, 최저 가격 및 최고 가격, 구매 금액 합계

### 4.3.1 데이터 조인

In [43]:
transByProd = tranData.map(
    lambda tran: (int(tran[3]), tran) # (상품 ID, 데이터 전체)
)
transByProd.take(10)

[(68, ['2015-03-30', '6:55 AM', '51', '68', '1', '9506.21']),
 (86, ['2015-03-30', '7:39 PM', '99', '86', '5', '4107.59']),
 (58, ['2015-03-30', '11:57 AM', '79', '58', '7', '2987.22']),
 (50, ['2015-03-30', '12:46 AM', '51', '50', '6', '7501.89']),
 (24, ['2015-03-30', '11:39 AM', '86', '24', '5', '8370.2']),
 (19, ['2015-03-30', '10:35 AM', '63', '19', '5', '1023.57']),
 (77, ['2015-03-30', '2:30 AM', '23', '77', '7', '5892.41']),
 (58, ['2015-03-30', '7:41 PM', '49', '58', '4', '9298.18']),
 (86, ['2015-03-30', '9:18 AM', '97', '86', '8', '9462.89']),
 (26, ['2015-03-30', '10:06 PM', '94', '26', '4', '4199.15'])]

In [47]:
totalsByProd = transByProd.mapValues(lambda t: float(t[5])).reduceByKey(lambda p1, p2: p1 + p2)
totalsByProd.take(10)

[(68, 62133.899999999994),
 (86, 86316.57),
 (58, 186539.8),
 (50, 61318.78),
 (24, 46973.67),
 (26, 60695.88),
 (18, 36973.270000000004),
 (6, 43252.97),
 (48, 50163.71),
 (8, 43761.869999999995)]

In [45]:
products = (
    spark.sparkContext.textFile("../book-samples/ch04/ch04_data_products.txt")
    .map(lambda line: line.split("#"))
    .map(lambda p: (int(p[0]), p))
)

products.take(10)

[(1, ['1', 'ROBITUSSIN PEAK COLD NIGHTTIME COLD PLUS FLU', '9721.89', '10']),
 (2, ['2', 'Mattel Little Mommy Doctor Doll', '6060.78', '6']),
 (3, ['3', 'Cute baby doll, battery', '1808.79', '2']),
 (4, ['4', 'Bear doll', '51.06', '6']),
 (5, ['5', 'LEGO Legends of Chima', '849.36', '6']),
 (6, ['6', 'LEGO Castle', '4777.51', '10']),
 (7, ['7', 'LEGO Mixels', '8720.91', '1']),
 (8, ['8', 'LEGO Star Wars', '7592.44', '4']),
 (9, ['9', 'LEGO Lord of the Rings', '851.67', '2']),
 (10, ['10', 'LEGO The Hobbit', '7314.55', '9'])]

#### 4.3.1.1 RDBMS와 유사한 조인 연산자

- 종류
    - join: inner 조인과 유사
    - leftOuterJoin
    - rightOuterJoin
    - fullOuterJoin  

In [49]:
# 제품별 판매액
totalsAndProds = totalsByProd.join(products)
totalsAndProds.take(10)

[(68, (62133.899999999994, ['68', 'Niacin', '6295.48', '1'])),
 (24, (46973.67, ['24', 'LEGO Pirates', '4150.34', '2'])),
 (48, (50163.71, ['48', 'LG LED TV 42LA6130', '6918.75', '8'])),
 (8, (43761.869999999995, ['8', 'LEGO Star Wars', '7592.44', '4'])),
 (64, (68147.48000000001, ['64', 'Disposable diapers', '3003.77', '4'])),
 (28,
  (82055.45999999999,
   ['28', 'Far Cry 4 Limited Edition for Xbox One', '711.88', '1'])),
 (4, (63520.21000000001, ['4', 'Bear doll', '51.06', '6'])),
 (32, (34104.55, ['32', 'Intel Core i7 3770K', '3132.7', '8'])),
 (44,
  (128743.52, ['44', 'SAMSUNG LED TV 39F5500, Full HD, USB', '2531.15', '1'])),
 (88, (46411.33, ['88', 'healthy accents sinus', '5247.51', '4']))]

In [75]:
# 어제 판매하지 않은 상품 목록 (여집합 활용)
totalsWithMissingProds = products.leftOuterJoin(totalsByProd)
missingProds = totalsWithMissingProds.filter(lambda x: x[1][1] is None).map(lambda x: x[1][0]) # 판매량이 없는 제품은 None
missingProds.foreach(lambda p: print(", ".join(p)))

In [76]:
missingProds.take(10)

[['20', 'LEGO Elves', '4589.79', '4'],
 ['3', 'Cute baby doll, battery', '1808.79', '2'],
 ['43', 'Tomb Raider PC', '2718.14', '1'],
 ['63', 'Pajamas', '8131.85', '3']]

#### 4.3.1.2 subtract, subtractByKey

- 첫 번째 RDD에서 두 번째 요소를 제거

In [77]:
missng_prods = products.subtractByKey(totalsAndProds)
missng_prods.take(10)

[(43, ['43', 'Tomb Raider PC', '2718.14', '1']),
 (20, ['20', 'LEGO Elves', '4589.79', '4']),
 (3, ['3', 'Cute baby doll, battery', '1808.79', '2']),
 (63, ['63', 'Pajamas', '8131.85', '3'])]

#### 4.3.1.3 cogroup


In [85]:
prodTotCogroup = totalsByProd.cogroup(products)

In [86]:
totalsAndProds = (
    prodTotCogroup
    .filter(lambda x: len(x[1][0].data)>0)
    .map(lambda x: (int(x[1][1].data[0][0]),(x[1][0].data[0], x[1][1].data[0])))
)
totalsAndProds.take(10)

[(68, (62133.899999999994, ['68', 'Niacin', '6295.48', '1'])),
 (24, (46973.67, ['24', 'LEGO Pirates', '4150.34', '2'])),
 (48, (50163.71, ['48', 'LG LED TV 42LA6130', '6918.75', '8'])),
 (8, (43761.869999999995, ['8', 'LEGO Star Wars', '7592.44', '4'])),
 (64, (68147.48000000001, ['64', 'Disposable diapers', '3003.77', '4'])),
 (28,
  (82055.45999999999,
   ['28', 'Far Cry 4 Limited Edition for Xbox One', '711.88', '1'])),
 (4, (63520.21000000001, ['4', 'Bear doll', '51.06', '6'])),
 (32, (34104.55, ['32', 'Intel Core i7 3770K', '3132.7', '8'])),
 (44,
  (128743.52, ['44', 'SAMSUNG LED TV 39F5500, Full HD, USB', '2531.15', '1'])),
 (88, (46411.33, ['88', 'healthy accents sinus', '5247.51', '4']))]

#### 4.3.1.4 intersection 변환 연산자 사용

- 양쪽 모두에 포함된 공통 요소, 즉 교집합을 새로운 RDD로 반환함

In [88]:
totalsByProd.map(lambda t: t[0]).intersection(products.map(lambda p: p[0])).take(10)

[68, 24, 48, 8, 64, 28, 4, 32, 44, 88]

#### 4.3.1.5 cartesian 변환 연산자로 RDD 두 개 결함

In [94]:
rdd1 = spark.sparkContext.parallelize([7,8,9])
rdd2 = spark.sparkContext.parallelize([1,2,3])

rdd1.cartesian(rdd2).collect() # 모든 컨비네이션 조합

[(7, 1), (7, 2), (7, 3), (8, 1), (8, 2), (8, 3), (9, 1), (9, 2), (9, 3)]

In [95]:
rdd1.cartesian(rdd2).filter(lambda el: el[0] % el[1] == 0).collect() # 나누어 나머지가 0인 경우

[(7, 1), (8, 1), (8, 2), (9, 1), (9, 3)]

#### 4.3.1.6 zip 변환 연산자로 RDD 조인

In [97]:
rdd1 = spark.sparkContext.parallelize([1,2,3])
rdd2 = spark.sparkContext.parallelize(["n4","n5","n6"])
rdd1.zip(rdd2).collect()

#zipPartitions is not implemented in Python yet(20/10/31)

[(1, 'n4'), (2, 'n5'), (3, 'n6')]

### 4.3.2 데이터 정렬

In [109]:
sortedProds = totalsAndProds.sortBy(lambda t: t[1][1][1]).collect()

In [111]:
sortedProds[:5] # 알파벳 순 정렬

[(90, (48601.89, ['90', 'AMBROSIA TRIFIDA POLLEN', '5887.49', '1'])),
 (94, (31049.07, ['94', 'ATOPALM MUSCLE AND JOINT', '1544.25', '7'])),
 (87, (26047.72, ['87', 'Acyclovir', '6252.58', '4'])),
 (79, (50917.700000000004, ['79', 'Alphanate', '4218.17', '4'])),
 (83, (31498.84, ['83', 'Ativan', '9511.99', '9']))]

- [repartitionAndSortWithinPartitions](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=repartitionandsortwithinpartitions#pyspark.RDD.repartitionAndSortWithinPartitions)
- [takeOrdered](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=takeordered#pyspark.RDD.takeOrdered)

### 4.3.3 데이터 그루핑

#### 4.3.3.1 groupByKey, groupBy 

- 동일한 키를 가진 모든 요소를 단일 키-값 쌍으로 모은 Pair RDD를 반환함

```
# 동일한 결과
rdd.map(lambda x: (f(x), x)).groupByKey()
rdd.groupby(f)
```

#### 4.3.3.2 combineByKey 변환 연산자로 데이터 그루핑

In [130]:
def createComb(t):
    total = float(t[5])
    q = int(t[4])
    return (total/q, total/q, q, total)

def mergeVal(tup, t):
    (mn, mx, c, tot) = tup
    total = float(t[5])
    q = int(t[4])
    return (min(mn,total/q),max(mx,total/q),c+q,tot+total)

def mergeComb(tup1, tup2):
    (mn1,mx1,c1,tot1) = tup1 
    (mn2,mx2,c2,tot2) = tup2
    return (min(mn1,mn1),max(mx1,mx2),c1+c2,tot1+tot2)

In [131]:
avgByCust = transByCust.combineByKey(createComb, mergeVal, mergeComb)

In [147]:
avgByCust.collect()[:5]

[(54, (348.48375, 6138.63, 43, 36307.04)),
 (72, (354.04999999999995, 1420.4066666666668, 41, 32216.130000000005)),
 (90, (582.62, 2765.34, 41, 39947.380000000005)),
 (18, (153.4177777777778, 5558.75, 48, 35827.33)),
 (36, (143.6211111111111, 2778.0733333333333, 32, 25640.04))]

In [146]:
data = {k:v for (k, v) in avgByCust.collect()}
pd.DataFrame.from_dict(data, orient='index', columns=['min', 'max', 'cnt', 'total'])

Unnamed: 0,min,max,cnt,total
54,348.483750,6138.630000,43,36307.0400
72,354.050000,1420.406667,41,32216.1300
90,582.620000,2765.340000,41,39947.3800
18,153.417778,5558.750000,48,35827.3300
36,143.621111,2778.073333,32,25640.0400
...,...,...,...,...
35,305.350000,1657.540000,56,45141.2300
17,391.194286,1138.909083,88,58301.3545
71,300.728000,7993.740000,52,47184.5100
53,235.372000,7556.320000,87,88829.7600


## 4.4 RDD 의존 관계

### 4.4.1 RDD 의존 관계와 스파크 동작 메커니즘

- 스파크의 실행 모델은 방향성 비순환 그래프에 기반함
- 변환 연산자로 생성된 새 RDD가 이전 RDD에 의존하므로 간선 방향은 자식 RDD에서 부모 RDD로 향한다. 이러한 의존 관계를 RDD 계보라고 한다.
- 좁은 의존 관계와 넓은 의존 관계가 있으며 넓은 의존 관계는 셔플링을 동반한다.


In [150]:
import random
l = [random.randrange(10) for x in range(500)]
listrdd = spark.sparkContext.parallelize(l, 5)
pairs = listrdd.map(lambda x: (x, x*x))
reduced = pairs.reduceByKey(lambda v1, v2: v1+v2)
finalrdd = reduced.mapPartitions(lambda itr: ["K="+str(k)+",V="+str(v) for (k,v) in itr])
finalrdd.collect()

['K=5,V=1125',
 'K=0,V=0',
 'K=1,V=63',
 'K=6,V=1512',
 'K=2,V=204',
 'K=7,V=2597',
 'K=8,V=2752',
 'K=3,V=432',
 'K=4,V=928',
 'K=9,V=4374']

In [158]:
finalrdd.toDebugString()

b'(5) PythonRDD[300] at collect at <ipython-input-150-9b3006dfb8f3>:7 []\n |  MapPartitionsRDD[299] at mapPartitions at PythonRDD.scala:133 []\n |  ShuffledRDD[298] at partitionBy at <unknown>:0 []\n +-(5) PairwiseRDD[297] at reduceByKey at <ipython-input-150-9b3006dfb8f3>:5 []\n    |  PythonRDD[296] at reduceByKey at <ipython-input-150-9b3006dfb8f3>:5 []\n    |  ParallelCollectionRDD[295] at readRDDFromFile at PythonRDD.scala:262 []'


### 4.4.2 스파크의 스테이지와 태스크

- 스파크는 셔플링이 발생하는 지점을 기준으로 스파크 잡 하나를 여러 스테이지로 나눈다.
- 중간 스테이지 결과는 중간 파일의 형태로 실행자 머신의 로컬 디스크에 저장된다.

### 4.4.3 체크포인트로 RDD 계보 저장

- 장애 발생 이전에 저장된 스냅샷(체크포인팅)을 사용해 이 지점부터 나머지 계보를 다시 계산
- RDD의 checkpoint 메서드를 실행해 호출할 수 있음
- SparkContext.setCheckpointDir에 설정된 디텍터리로 저장

## 4.5 누적 변수와 공유 변수

- 누적 변수: 여러 실행자가 공유하는 변수로 값을 더하는 연산만 허용가능
- 공유 변수: 실행자가 수정할 수 없으며 오직 드라이버만 공유 변수를 생성하며, 실행자에서는 읽기만 가능
    - SparkContext.broadcast(value) 메서드로 생성
    - 공유값을 접근할 때에는 value메서드를 항상 사용해야 성능을 얻을 수 있음

In [167]:
#accumulators in Python cannot be named
acc = spark.sparkContext.accumulator(0)
l = spark.sparkContext.parallelize(range(1000000))
l.foreach(lambda x: acc.add(1))

In [168]:
acc.value

1000000

- spark.broadcast.compress = true
- spark.broadcast.blockSize = 4096
- spark.python.worker.reuse = true  # 워커를 재사용하지 않으면 각 태스크별로 공유 변수를 전송해야 함