In [None]:
# [+] PySpark 임포트 및 설정
from pyspark import SparkConf, SparkContext
conf = SparkContext()
sc = SparkConf(conf=conf)

### groupBy(): 입력 함수를 기준으로 그룹핑

In [32]:
# [+] 리스트로부터 rdd 생성 [1, 1, 2, 3, 5, 8]
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])

In [33]:
# [+] groupBy()를 이용한 홀수/짝수 그룹핑 -> groups
groups = rdd.groupby(lambda x: x % 2)

In [34]:
# [+] 그룹핑 결과를 리스트로 반환 -> res
res = groups.collect()

In [35]:
# [+] 그룹핑 결과(res) 출력
for k, v in res:
	print(k, list(v))

1 [1, 1, 3, 5]
0 [2, 8]


### groupByKey(): Key 값을 기준으로 그룹핑

In [38]:
# [+] 리스트로부터 key-value rdd 생성 [('a', 1), ('b', 1), ('a', 1)]
rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])

[('a', 1), ('b', 1), ('a', 1)]

In [39]:
# [+] groupByKey()를 이용한 그룹핑 및 리스트 반환 -> groups
groups = rdd.groupByKey()

In [40]:
# [+] 그룹핑 결과를 리스트로 반환 -> res
res = groups.collect()

In [41]:
# [+]그룹핑 결과(res) 출력
for k, v in res:
	print(k, list(v))

a [1, 1]
b [1]


In [42]:
# len(): Python 객체의 길이를 리턴
value = [1, 1]
len(value)

2

In [44]:
# mapValues(), len()을 이용한 값 빈도 계산
counts = groups.mapValues(len)

In [45]:
counts.collect()

[('a', 2), ('b', 1)]

### groupBy() 예제
1. 리스트로부터 RDD 객체 생성
    + 리스트: ['C', 'C++', 'Python', 'Java', 'C#']  
2. 원소의 첫 번째 문자 값을 기준으로 그룹핑
    + 기대 결과:  
        J ['Java']  
        C ['C', 'C++', 'C#']  
        P ['Python']  
3. 그룹핑 결과를 리스트 객체로 출력

In [60]:
# [+] 리스트로부터 RDD 객체 생성
rdd = sc.parallelize(['C', 'C++', 'Python', 'Java', 'C#'])

In [61]:
# [+] 원소별 첫 번째 값(x[0])을 기준으로 그룹핑 -> groups
groups = rdd.groupBy(lambda x: x[0])

In [62]:
# [+] 그룹핑 결과를 리스트로 반환 -> res
res = groups.collect()

In [63]:
# [+] 그룹핑 결과(res) 출력
for k, v in res:
    print(k, list(v))

C ['C', 'C++', 'C#']
P ['Python']
J ['Java']


### groupByKey() 예제
1. list 객체로부터 3개의 파티션을 갖는 Key-Value RDD 생성하기
    + getNumPatitions(): RDD 객체의 파티션 수 출력
2. Key 별로 그룹핑
3. 그룹핑 결과 리스트로 반환
4. 리스트 출력

In [102]:
# list 객체로부터 3개의 파티션을 갖는 Key-Value RDD 생성하기

rdd = sc.parallelize([  # 과목 별 점수 데이터
    ('Math', 7), ('Math', 2), ('English', 7),
    ('Science', 7), ('English', 4), ('English', 9),
    ('Math', 8), ('Math', 3), ('English', 4),
    ('Science', 6), ('Science', 9), ('Science', 5)
], 3) # 파티션 수: 3

In [117]:
# getNumPartitions(): RDD 객체의 파티션 수를 확인하는 액션 메서드
rdd.getNumPartitions()

3

In [104]:
# [+] Key 별로 그룹핑 -> groups
groups = rdd.groupByKey()

In [105]:
# [+] 그룹핑 결과 리스트로 반환 -> res
res = groups.collect()

In [106]:
# [+] 그룹핑 결과(res) 출력
for k, v in res:
    print(k, list(v))

Science [7, 6, 9, 5]
Math [7, 2, 8, 3]
English [7, 4, 9, 4]


In [114]:
# key별 그룹핑을 2개의 파티션으로 분리하여 수행 -> groups
groups = rdd.groupByKey(2)

In [116]:
# [+] 파티션 수 출력
groups.getNumPartitions()

2

In [109]:
# 파티션별 RDD 값들을 리스트로 반환
res = groups.glom().collect()

In [110]:
res

[[('English', <pyspark.resultiterable.ResultIterable at 0x24ea37c7430>),
  ('Science', <pyspark.resultiterable.ResultIterable at 0x24ea37cfee0>)],
 [('Math', <pyspark.resultiterable.ResultIterable at 0x24ea37cfa30>)]]

In [112]:
# [+] 0번째 파티션에 대한 그룹핑 결과(res)를 출력
for k, v in res[0]:
    print(k, list(v))

English [7, 4, 9, 4]
Science [7, 6, 9, 5]


In [113]:
# [+] 1번째 파티션에 대한 그룹핑 결과(res)를 출력
for k, v in res[1]:
    print(k, list(v))

Math [7, 2, 8, 3]


### reduce() vs reduceByKey()
+ reduce(): 입력 함수를 기준으로 집계를 수행하는 액션 메서드
+ reduceByKey(): Key를 기준으로 그룹핑 및 집계를 수행하는 변환 메서드

In [1]:
# [+] reduce() 를 이용한 총합 구하기
sc.parallelize([1, 2, 3, 4, 5]).reduce(lambda x, y: x + y)

NameError: name 'sc' is not defined

In [123]:
# [+] 리스트로부터 K-V RDD 생성 -> rdd
rdd = sc.parallelize([('a', 1), ('a', 4), ('a', 3), ('b', 2), ('b', 9)])

In [124]:
# [+] reduceByKey()를 이용한 Key별 총합 계산 및 리스트로 반환
rdd.reduceByKey(lambda x, y : x + y).collect()

[('a', 2), ('b', 1)]

In [125]:
# 과목 별 점수 데이터
rdd = sc.parallelize([
    ('Math', 7), ('Math', 2), ('English', 7),
    ('Science', 7), ('English', 4), ('English', 9),
    ('Math', 8), ('Math', 3), ('English', 4),
    ('Science', 6), ('Science', 9), ('Science', 5)
], 3)

In [126]:
# [+] reduceByKey() 를 이용한 과목 별 점수 총합 구하기
rdd.reduceByKey(lambda x, y : x + y).collect()

[('Science', 27), ('Math', 20), ('English', 24)]

In [2]:
# [++] 과목별 점수 평균 구하기
# map()함수를 쓰게되면 key, value로 묶이기 때문에 무조건 mapValues()를 사용해야 함.
rdd.reduceByKey(lambda x, y : x + y).mapValues().collect()

### mapValues() vs map()
+ mapValues(): value에만 함수를 적용

In [129]:
# 리스트로부터 K-V RDD 객체 생성
rdd = sc.parallelize([
    ('a', ['apple', 'banana', 'lemon']),
    ('b', ['grapes'])
])

[('a', 3), ('b', 1)]

In [130]:
# mapValues(), len()를 이용하여 키 별 갯수 세기
rdd.mapValues(lambda x, y: (x, len(y)))

[('a', 3), ('b', 1)]

In [128]:
# map(): key, value 에 함수를 적용
rdd.map(lambda)

[2, 2]

### countByKey()
Key별 값 개수를 계산 및 딕셔너리로 반환하는 액션 메서드

In [140]:
# countByKey()를 이용하여 Key별 값 개수 계산 및 반환 -> res
rdd = sc.parallelize([
    ('a', 1), ('b', 1), ('a', 1)
])

res = rdd.countByKey() # countByKey()에 collect 붙이지 않아도 됨 => action 이기 때문.

In [141]:
# countByKey() 결과 출력
print(res)

defaultdict(<class 'int'>, {'a': 2, 'b': 1})


### keys()
key 목록을 RDD로 생성하는 변환 메서드

In [143]:
# 과목 별 점수 데이터
rdd = sc.parallelize([
    ('Math', 7), ('Math', 2), ('English', 7),
    ('Science', 7), ('English', 4), ('English', 9),
    ('Math', 8), ('Math', 3), ('English', 4),
    ('Science', 6), ('Science', 9), ('Science', 5)
], 3)

['Math',
 'Math',
 'English',
 'Science',
 'English',
 'English',
 'Math',
 'Math',
 'English',
 'Science',
 'Science',
 'Science']

In [144]:
# RDD 키 목록 생성 및 출력
# rdd.keys() # 새로운 rdd를 만든 것.(transformation이다.) 그러므로 collect():action 메서드를 사용해야 출력이 된다.
rdd.keys().collect()

['Math',
 'Math',
 'English',
 'Science',
 'English',
 'English',
 'Math',
 'Math',
 'English',
 'Science',
 'Science',
 'Science']

In [145]:
# [+] 유니크한 키 값 출력
rdd.keys().distinct().collect()

['Science', 'Math', 'English']

In [146]:
# [+] 유니크한 키 개수 출력
rdd.keys().distinct().count()

['Science', 'Math', 'English']

### Join 연산
+ join(): 내부조인
+ leftOuterJoin(): 외부조인(첫 번째 RDD 중심)
+ rightOuterJoin(): 외부조인(두 번째 RDD 중심)

In [147]:
# K-V RDD 객체 두 개 생성 
rdd1 = sc.parallelize([("a", 1), ("b", 4)])
rdd2 = sc.parallelize([("a", 2)])

In [148]:
# Join 연산
# narrow, wide transformation => join은 wide transformation이라 performance가 떨어질 수 있다.
rdd1.join(rdd2).collect()

[('a', (1, 2))]

In [149]:
# Left Outer Join 연산
rdd1.leftOuterJoin(rdd2).collect()

[('b', (4, None)), ('a', (1, 2))]

In [150]:
# Right Outer Join 연산
rdd1.rightOuterJoin(rdd2).collect()

[('a', (1, 2))]