In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("operation_example ")
sc = SparkContext(conf=conf)

In [3]:
foods = sc.parallelize([
    "짜장면","짬뽕","마라탕",
    "떡볶이","쌀국수","짬뽕",
    "짜장면","짜장면","짬뽕",
    "마라탕","라면","라면",
    "우동","쌀국수"
    
])
foods.collect()

['짜장면',
 '짬뽕',
 '마라탕',
 '떡볶이',
 '쌀국수',
 '짬뽕',
 '짜장면',
 '짜장면',
 '짬뽕',
 '마라탕',
 '라면',
 '라면',
 '우동',
 '쌀국수']

### 그룹핑하는건 좋은데 후처리를 for 문으로 처리를 하면 효과가 좋지않다.

- 그룹핑 하는거 괜찮다. 하지만 후처리를 for문으로 하는 것은 효과가 좋지 않다.  
- `for문 쓰는순간 병렬처리 x`  
    - 파티션 내에서 변환수행하는 것이 아니라 드라이버 프로그램에서 하겠다.  
    - `즉, worker node 에서 일어나는 작업이 아니라  master node에서 작업을 하는거 -> 속도 느려진다.`  
- groupby 하고 key는 그대로 냅두고 value에 대한 처리를 따로 할 수 있도록 함수를 만들곤 한다.  
    - 일반적으로 groupby랑 같이 사용되는 mapValues이다.  
==> 값 처리를 모조리 워커에서 할수있게 해준다.  

In [4]:
foodsGroup = foods.groupBy(lambda x : x) 
foodsGroup

PythonRDD[5] at RDD at PythonRDD.scala:53

In [5]:
result = foodsGroup.collect()
result

[('짜장면', <pyspark.resultiterable.ResultIterable at 0x2b2bd06af70>),
 ('짬뽕', <pyspark.resultiterable.ResultIterable at 0x2b2bd42ca90>),
 ('마라탕', <pyspark.resultiterable.ResultIterable at 0x2b2bd42caf0>),
 ('떡볶이', <pyspark.resultiterable.ResultIterable at 0x2b2bd42cb50>),
 ('쌀국수', <pyspark.resultiterable.ResultIterable at 0x2b2bd42cbb0>),
 ('라면', <pyspark.resultiterable.ResultIterable at 0x2b2bd42cc10>),
 ('우동', <pyspark.resultiterable.ResultIterable at 0x2b2bd42cc70>)]

##### 비추천 방식

In [6]:
# (직렬처리) 추천하지 않는다.
for k,v in result:
    lst = (k,len(v))
    print(lst)

('짜장면', 3)
('짬뽕', 3)
('마라탕', 2)
('떡볶이', 1)
('쌀국수', 2)
('라면', 2)
('우동', 1)


##### 추천 방식

In [7]:
# (병렬처리)이 방법이 더 효율적이다.
foodsGroup.groupByKey().mapValues(len).collect()
# 함수 만들고 mapvalues로 돌리는 것이 훨씬 더 유리 => master가 일하지 않고 woker가 일해 (transformation 작업)

[('짜장면', 1),
 ('짬뽕', 1),
 ('마라탕', 1),
 ('떡볶이', 1),
 ('쌀국수', 1),
 ('라면', 1),
 ('우동', 1)]

In [8]:
foodsGroup.groupByKey().mapValues(len).collect()

[('짜장면', 1),
 ('짬뽕', 1),
 ('마라탕', 1),
 ('떡볶이', 1),
 ('쌀국수', 1),
 ('라면', 1),
 ('우동', 1)]

#### groupBy vs groupByKey
- groupBy
    - groupBy는 그룹핑할 때 키에 대한 정의를 개발자가 직접 해줘야 한다.
    - groupBy(lambda x: x[0])과 같이 직접
<br>
</br>
- groupByKey
    - groupByKey 는 k-v RDD를 사용할 때 key가 알아서 그룹핑의 기준이 된다.

In [11]:
# groupByKey의 예시
x = sc.parallelize([
    ("MATH", 7), ("MATH", 2), ("ENGLISH", 7),
    ("SCIENCE", 7), ("ENGLISH", 4), ("ENGLISH", 9),
    ("MATH", 8), ("MATH", 3), ("ENGLISH", 4),
    ("SCIENCE", 6), ("SCIENCE", 9), ("SCIENCE", 5)
], 3)


In [12]:
# 기준을 정하지 않았지만 기준을 알아서 정의해준다.
y = x.groupByKey()
y.mapValues(list).collect() 

[('MATH', [7, 2, 8, 3]), ('ENGLISH', [7, 4, 9, 4]), ('SCIENCE', [7, 6, 9, 5])]

#### reduceByKey
- KeyValueRDD.reduceByKey(task, numPartitions=None, partitionFuc=function portable_hash)
- 주어지는 key를 기준으로 Group을 만들고 합쳐준다.(task대로)
- Transformations 함수

In [13]:
# reduceBykey 예시
from operator import add

rdd = sc.parallelize([
    ("짜장면",15),
    ("짬뽕",10),
    ("짜장면",5)
])

rdd.reduceByKey(add).collect()

[('짜장면', 20), ('짬뽕', 10)]

- 개념적으로는 groupByKey + reduce 이다.
- groupbykey는 파티션이 나눠진 환경에서 reduce 진행.
- reduce groupbykey 순으로 하는 것이 좋다.

#### mapValues
- KeyValueRDD.mapValues(<task>)
- ``함수를 Value에만 적용합니다.``
    - 파티션과 키는 그 위치에 그대로 있다.
- Transformations 함수
- key는 그대로 파티션 변경 되지 않는다. values 대해서만 수정가능
- ``keyvalue RDD 에게는 mapValues가 좋다.(map보다는)``
    - 이유 : ``파티션을 그대로 유지하기 때문에 map보다 훨씬 더 빠르다.``

In [14]:
rdd = sc.parallelize([
    ("하의", ["청바지", "반바지", "치마"]),
    ("상의", ["니트", "반팔", "긴팔", "나시"])
])

rdd.mapValues(len).collect()

[('하의', 3), ('상의', 4)]

#### Keys()
- 모든 key를 가진 RDD를 생성한다.
- 파티션을 유지하거나 키가 굉장히 많은 경우가 있기 때문에 transformations
- `key의 개수`는 데이터의 개수를 의미한다.

In [16]:
# transformation
rdd.keys()

PythonRDD[29] at RDD at PythonRDD.scala:53

In [17]:
# collect() 추가로 action
rdd.keys().collect()

['하의', '상의']

In [21]:
x = sc.parallelize([
    ("MATH", 7), ("MATH", 2), ("ENGLISH", 7),
    ("SCIENCE", 7), ("ENGLISH", 4), ("ENGLISH", 9),
    ("MATH", 8), ("MATH", 3), ("ENGLISH", 4),
    ("SCIENCE", 6), ("SCIENCE", 9), ("SCIENCE", 5)
], 3)

print(x.keys().count()) # RDD
# key의 개수는 데이터의 개수와 동일하다.
# 수학 과학 영어 3개가 아니다.

12


In [22]:
# key의 종류의 개수
print(x.keys().distinct().count())

3


#### countByKey

In [18]:
# 리스트가 하나씩
rdd.countByKey()

defaultdict(int, {'하의': 1, '상의': 1})

# Joins
- inner Join 서로간에 존재하는 키만 합쳐준다.
- outer Join 기준이 되는 한쪽에는 데이터가 있고, 다른 쪽에는 데이터가 없는 경우  
    -설정한 기준에 따라서 기준에 맞는 데이터가 항상 남아있는다.  
    -leftOuterJoin : 왼쪽에 있는 rdd가 기준이 된다.( 함수를 호출 하는 쪽이 기준이 된다.)  
    -rightOuterJoin : 오른쪽에 있는 rdd가 기준이 된다. ( 함수의 매개변수로 들어가는 쪽이 기준이 된다.)

In [23]:
rdd1 = sc.parallelize([
    ("foo", 1),
    ("goo", 2),
    ("hoo", 3)
])

rdd2 = sc.parallelize([
    ("foo", 1),
    ("goo", 2),
    ("goo", 10),
    ("moo", 6)
])

# join은 셔플링 현상일 일어날 수 밖에 없다.

In [24]:
# Inner Join
rdd1.join(rdd2).collect()
# goo의 경우 앞쪽은 Rdd1 뒷쪽은 rdd2

[('foo', (1, 1)), ('goo', (2, 2)), ('goo', (2, 10))]

In [25]:
# left Outer Join
rdd1.leftOuterJoin(rdd2).collect()
# left의 모든 것은 살아있다. 매칭되지 않는 moo는 삭제 

[('foo', (1, 1)), ('goo', (2, 2)), ('goo', (2, 10)), ('hoo', (3, None))]

In [26]:
# Right Outer Join
rdd1.rightOuterJoin(rdd2).collect()
# right의 모든 것은 살아있다.

[('foo', (1, 1)), ('moo', (None, 6)), ('goo', (2, 2)), ('goo', (2, 10))]

In [27]:
sc.stop()