# 2. RDD(Transform)

### 2.1.3 RDD 생성
 - 1. 드라이버 프로그램의 Collection 객체를 ㅇ용.
  - python의 경우 LIST를 활용.
 - 2. 파일과 같은 외부 데이터를 이용. 

In [15]:
rdd1 = sc.parallelize(["a","b","c"])
rdd1.collect()

['a', 'b', 'c']

In [19]:
rdd1 = sc.textFile("../README.md")
rdd1.take(10)

['#pySpark ', '', '## packpub.com ']

### 2.1.4 RDD 기본 액션
 - Transformation & Action으로 나눌 수 있다.
 - 두 연산을 구분하는 기준이 바로 연산의 수행결과가 RDD 인지 아닌지.
  - RDD로 나오면 Transformation 아니면 Action이다.

### 2.1.4.1 collect 
 - 모든 원소를 모아서 배열로 돌려준다 

In [20]:
rdd = sc.parallelize(range(1,10))
rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9]

### 2.1.4.2 Count 
 - 전체 요소의 개수를 반환 

In [21]:
rdd.count()

9

### 2.1.5 Transformation 
 - 기존 RDD를 활용하여 새로운 RDD를 생성하는 연산. 
 - 각 요소의 타입을 문자열에서 숫자로 바꾸거나, 불필요한 요소를 제외하거나 기존 요소의 값에 특정 값을 더하는 등의 작업이 모두 포함 된다. 
 - 맵(map) 연산 : 요소 간의 사상을 정의한 함수를 RDD에 속하는 모든 요소에 적용하여 새로운 RDD 생성
 - 그룹화 연산 : 특정 조건에 따라 요소를 그룹화 하거나 특정함수를 적용.
 - 집합 연산 : RDD에 포함된 요소를 하나의 집합으로 간주할 때 서로 다른 RDD간에 합, 교집합을 계산
 - 파티션 연산 : 파티션 개수를 조정
 - 필터와 정렬 연산 : 특정 조건을 만족하는 요소만 선택하거나 각 요소를 정해진 기준에 따라 정렬. 

### 2.1.5.1 Map
 - 함수를 RDD에 속하는 모든 요소에 적용한 뒤 그 결과로 구성된 새로운 RDD를 생성. 

In [22]:
rdd1 = sc.parallelize(range(1,10))
rdd2 = rdd1.map(lambda v:v+1)
print(rdd2.collect())

[2, 3, 4, 5, 6, 7, 8, 9, 10]


### 2.1.5.2 flatMap
 - map()과 비슷한 동작을 한다. 
 - flat한 형태로 모든 값들을 하나로 쭉 이어서 시퀀스형태로 출력한다. 

In [23]:
rdd1 = sc.parallelize(["apple,oragne","grape,apple,mango","blueberry,totamo,orange"])

In [24]:
print(rdd1.map(lambda v:v.split(",")).collect())

[['apple', 'oragne'], ['grape', 'apple', 'mango'], ['blueberry', 'totamo', 'orange']]


In [25]:
rdd2 = rdd1.flatMap(lambda v:v.split(","))
print(rdd2.collect())

['apple', 'oragne', 'grape', 'apple', 'mango', 'blueberry', 'totamo', 'orange']


### 2.1.5.3 mapPartitions
 - map(), flatMap()의 경우 RDD의 각 요소를 하나씩 처리 
 - mapPartitions는 파티션 단위로 처리 : 인자로 전달받은 함수를 파티션 단위로 적용, 그 결과로 구성된 새로운 RDD를 생성 

In [96]:
def increase(numbers):
    print("DB 연결")
    return(i+1 for i in numbers)

 - 여기서 DB연결이라는 표현이 안나오는 이유는 Jupyter notebook이라서. 
 - 해당 부분이 4번 DB 연결이라고 나온다. 즉, 4개의 파티션으로 구성 되어있다. 

In [97]:
rdd1 = sc.parallelize(range(1,10))
rdd2 = rdd1.mapPartitions(increase)
print(rdd2.collect())

[2, 3, 4, 5, 6, 7, 8, 9, 10]


In [98]:
rdd1.glom().collect()

[[1, 2], [3, 4], [5, 6], [7, 8, 9]]

In [99]:
print("%s 개의 파티션으루 구성 " % len(rdd1.glom().collect()))

4 개의 파티션으루 구성 


In [100]:
print("partition size : %d" % rdd1.getNumPartitions())

partition size : 4


### 2.1.5.4 mapParitionWithIndex (Python은 없다)

### 2.1.5.5 mapValues
 - RDD요소가 Key-Value 형태로 구성되어있을 때사용. 키를 기준으로 작은 그룹들을 만들고 해당 그룹들에 속한 값을 대상으로 연산을 수행(합계, 평균 등). 

In [33]:
rdd1 = sc.parallelize(["a","b","c"]).map(lambda v:(v,1))
rdd1.collect()

[('a', 1), ('b', 1), ('c', 1)]

In [34]:
rdd2 = rdd1.mapValues(lambda i:i+1)
rdd2.collect()

[('a', 2), ('b', 2), ('c', 2)]

### 2.1.5.6 flatMapValues(Python은 없다)

## 그룹과 관련된 연산 

### 2.1.5.7 zip

In [35]:
rdd1 = sc.parallelize(["a","b","c"])
rdd2 = sc.parallelize(range(1,4))
result = rdd1.zip(rdd2)
print(result.collect())

[('a', 1), ('b', 2), ('c', 3)]


### 2.1.5.8 zipPartitions(python은 없다)
 - 파티션별로 zip 연산을 수행하고 특정 함수를 적용해 그결과로 구성된 새로운 RDD를 생성

### 2.1.5.9 groupBy

In [38]:
rdd1 = sc.parallelize(range(1,11))
rdd2 = rdd1.groupBy(lambda v: "even" if  v % 2 ==0 else "odd")
rdd2.collect()

[('even', <pyspark.resultiterable.ResultIterable at 0x7f176c5516d8>),
 ('odd', <pyspark.resultiterable.ResultIterable at 0x7f176c551860>)]

In [40]:
for x in rdd2.collect():
    print(x[0], list(x[1]))

even [2, 4, 6, 8, 10]
odd [1, 3, 5, 7, 9]


### 2.1.5.10 groupByKey 
 - Key별로 Groupby 를 수행. 

In [61]:
rdd1 = sc.parallelize(["a","b","c","b","c"]).map(lambda v: (v,1))
rdd2 = rdd1.groupByKey()
for x in rdd2.collect():
    print(x[0],list(x[1]))

b [1, 1]
c [1, 1]
a [1]


In [62]:
rdd1.reduceByKey(lambda v1, v2: v1+v2).collect()

[('b', 2), ('c', 2), ('a', 1)]

### 2.1.5.11 cogroup 
 - Key-Value 일 경우 사용 가능. 
 - 같은 키를 갖는 요소를 찾아서 같은 요소를 시퀀스로 구성된 튜플로 만들고 새로운 RDD로 생성. 

In [45]:
rdd1 = sc.parallelize([("k1","v1"),("k2","v2"),("k1","v3")])
rdd2 = sc.parallelize([("k1","v4")])
result = rdd1.cogroup(rdd2)
for x in result.collect():
    print(x[0],list(x[1][0]),list(x[1][1]))

k1 ['v1', 'v3'] ['v4']
k2 ['v2'] []


## 집합과 관련된 연산들 

### 2.1.5.12 distinct

In [46]:
rdd1 = sc.parallelize([1,2,3,1,2,3,1,2,3])
result = rdd1.distinct()
print(result.collect())

[1, 2, 3]


### 2.1.5.13 cartesian 
 - 카테시안 곱을 구하고 새로운 RDD로 생성.

In [47]:
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize(["a","b","c"])
result = rdd1.cartesian(rdd2)
print(result.collect())

[(1, 'a'), (1, 'b'), (1, 'c'), (2, 'a'), (2, 'b'), (2, 'c'), (3, 'a'), (3, 'b'), (3, 'c')]


### 2.1.5.14 subtract

In [48]:
rdd1 = sc.parallelize(["a","b","c","d","e"])
rdd2 = sc.parallelize(["d","e"])
result = rdd1.subtract(rdd2)
print(result.collect())

['a', 'b', 'c']


### 2.1.5.15 union 

In [49]:
rdd1 = sc.parallelize(["a","b","c"])
rdd2 = sc.parallelize(["d","e","f"])
result = rdd1.union(rdd2)
print(result.collect())

['a', 'b', 'c', 'd', 'e', 'f']


### 2.1.5.16 intersection

In [50]:
rdd1 = sc.parallelize(["a","a","b","c"])
rdd2 = sc.parallelize(["a","a","c","c"])
result = rdd1.intersection(rdd2)
print(result.collect())

['a', 'c']


### 2.1.5.17 Join (inner join)

In [52]:
rdd1 = sc.parallelize(["a","b","c","d","e"]).map(lambda v:(v,1))
rdd2 = sc.parallelize(["b","c"]).map(lambda v:(v,2))
result = rdd1.join(rdd2)
print(result.collect())

[('b', (1, 2)), ('c', (1, 2))]


### 2.1.5.18 leftOuterJoin. rightOuterJoin

In [56]:
rdd1 = sc.parallelize(["a","b","c"]).map(lambda x: (x,1))
rdd2 = sc.parallelize(["b","c"]).map(lambda v:(v,2))
result1 = rdd1.leftOuterJoin(rdd2)
result2 = rdd1.rightOuterJoin(rdd2)
print("Left : %s" % result1.collect())
print("Right : %s" % result2.collect())

Left : [('a', (1, None)), ('b', (1, 2)), ('c', (1, 2))]
Right : [('b', (1, 2)), ('c', (1, 2))]


### 2.1.5.19 subtractByKey

In [57]:
rdd1 = sc.parallelize(["a","b"]).map(lambda v:(v,1))
rdd2 = sc.parallelize(["b"]).map(lambda v:(v,1))
result = rdd1.subtractByKey(rdd2)
print("subtract : %s" % result.collect())

subtract : [('a', 1)]


## [집계와 연관된 연산들]

### 2.1.5.20 reduceByKey
 - RDD의 구성요소가 키와 값의 쌍으로 구성된 경우 사용.
 - 같은 키를 가진 값들을 하나로 병합해 Key-Value로 구성. 
 - 병학을 위해 두 개의 값을 하나로 합치는 함수를 인자로 전달받는다 
  - 이때 함수가 수행하는 연산은 경합법칙과 쇼괗넙ㅂ칙이 성립됨을 보장해야한다. 
  - 여러 파티션으로 분산돼 항상 같은 순서로 연산이 수행됨을 보장 할 수 없다 

In [58]:
rdd = sc.parallelize(["a","b","b"]).map(lambda v:(v,1))
print(rdd.collect())
result = rdd.reduceByKey(lambda v1, v2 : v1 + v2)
print(result.collect())

[('a', 1), ('b', 1), ('b', 1)]
[('b', 2), ('a', 1)]


### 2.1.5.21 foldByKey
 - 전반적인 동작은 reduceByKey와 비슷하게 동작하여 RDD를 생성한다.
 - 병한 연산의 초기값을 메서드의 인자로 전달해서 병합 시 사용할 수 있다는 점에서 차이가 있다. 
 - 예)
  - 병학에 사용하는 함수가 두 개의 정숫값을 더하는 함수였다면 0을 사용하고 두 문자열을 연결해서 새로운 문자열을 만드는 함수 였다면 공백 문자 ""을 초깃값으로 사용할 수 있다. 

In [64]:
rdd = sc.parallelize(["a","b","b"]).map(lambda v:(v,1))
result = rdd.foldByKey(0, lambda v1,v2 : v1 + v2)
print(result.collect())

[('b', 2), ('a', 1)]


In [68]:
rdd = sc.parallelize([("a1","v1"),("b1","v2"),("b1","v3")])
result = rdd.foldByKey("@", lambda v1,v2 : v1 + v2) # 내가 원하는 구분자를 줄수 있다. 
print(result.collect())

[('a1', '@v1'), ('b1', '@v2@v3')]


### 2.1.5.22 combineByKey 
 - 위의 두 함수와 같은 기능을 수행하지만, 수행과정에서 값의 타입을 바뀔 수 있다는 점의 차이가 있다. 
 - 가장 많이 등장하는 평균의 예제이다. 

In [86]:
class Record:
    def __init__(self, amount, number=1):
        self.amount = amount
        self.number = number
        
    def addAmt(self, amount):
        return Record(self.amount + amount, self.number + 1)
    
    def __add__(self, other):
        amount = self.amount + other.amount
        number = self.number + other.number 
        return Record(amount, number)
        
    def __str__(self):
        return "avg:" + str(self.amount / self.number)

    def __repr__(self):
        return 'Record(%r, %r)' % (self.amount, self.number)

In [87]:
# combineBy
def createCombiner(v):
    return Record(v)


# combineBy
def mergeValue(c, v):
    return c.addAmt(v)


# combineBy
def mergeCombiners(c1, c2):
    return c1 + c2


In [88]:
rdd = sc.parallelize([("Math", 100), ("Eng", 80), ("Math", 50), ("Eng", 70), ("Eng", 90)])
result = rdd.combineByKey(lambda v: createCombiner(v), lambda c, v: mergeValue(c, v),
                                  lambda c1, c2: mergeCombiners(c1, c2))

In [90]:
#print('Math', result.collectAsMap()['Math'], 'Eng', result.collectAsMap()['Eng'])

## pipe 및 파티션과 관련된 연산 

### 2.1.5.24 pipe
 - 데이터를 처리하는 과정에서 외부 프로세스를 활용할 수 있다. 
 - 세 개의 숫자로 구성된 문자열을 리눅스의 cut 유틸리티를 이용해 분리한 뒤 첫번째와 세번째 숫자를 뽑아내는 예제입니다. 

In [91]:
rdd = sc.parallelize(["1,2,3","4,5,6","7,8,9"])
result = rdd.pipe("cut -f 1,3 -d ,")
print(result.collect())

['1,3', '4,6', '7,9']


### 2.1.5.25 coalesce와 repartition 
 - RDD를 생성한 뒤 filter()연산을 비롯한 다양한 트랜스 포메이션을 수행하다보면 최초에 설정된 파티션 개수가 적합하지 않은 경우가 발생한다. 
 - 위의 함수를 통해 조정이 가능하다. 
 - repartition 은 늘리고 줄이는 것이 가능.
 - coalesce는 줄이는 것만 가능. 
 - 두 개의 성능 차이가 있다. 
  - repartition은 셔플을 기반으로 동작, coalesce는 강제로 셔플을 수행하지 않을 경우 사용하지 않는다. 
  - 늘릴때는 repartition, 줄일때는 coalesce를 이용하는 것이 좋다. 

In [92]:
rdd1 = sc.parallelize(list(range(1,11)),10)

In [94]:
rdd2 = rdd1.coalesce(5)
rdd3 = rdd2.repartition(110)

In [95]:
print("partition size : %d" % rdd1.getNumPartitions())
print("partition size : %d" % rdd2.getNumPartitions())
print("partition size : %d" % rdd3.getNumPartitions())

partition size : 10
partition size : 5
partition size : 110


### 2.1.5.26 repartitionAndSortWithinPartitions
 - 모든 데이터를 특정 기준에 따라 여러 개의 파티션으로 분리하고 각 파티션 단위로 정렬을 수행한 뒤 이 결과로 새로운 RDD를 생성. 
 - Key - Value 형태로 존재해야하며,각 데이터가 어떤 파티션에 속할지 결정하기 위한 파티셔너를 설정해야한다. 
 - 10개의 무작위 숫자를 위의 함수를 이용하여 3개의 파티션으로 분리해보는 예제. 

In [105]:
import random

In [108]:
data = [random.randrange(1,100) for i in range(0,10)]
data

[86, 61, 70, 78, 4, 88, 37, 85, 74, 18]

In [109]:
rdd1 = sc.parallelize(data).map(lambda v:(v,"-"))
print(rdd1.collect())

[(86, '-'), (61, '-'), (70, '-'), (78, '-'), (4, '-'), (88, '-'), (37, '-'), (85, '-'), (74, '-'), (18, '-')]


In [111]:
rdd2 = rdd1.repartitionAndSortWithinPartitions(3, lambda x:x)

In [127]:
for x in range(1,len(rdd2.glom().collect()) + 1):
    print("{number} 번째 partition : {lists}".format(number=x, lists=rdd2.glom().collect()[x-1]))

1 번째 partition : [(18, '-'), (78, '-')]
2 번째 partition : [(4, '-'), (37, '-'), (61, '-'), (70, '-'), (85, '-'), (88, '-')]
3 번째 partition : [(74, '-'), (86, '-')]


### 2.1.5.27 partitionBy 
 - Key - Value 구성
 - org.apache.spark.Partitioner 클래스의 인스턴스를 인자로 전달. 
 - Partitioner는 각 요소의 키를 특정 파티션에 할당하는 역할을 수행. 

In [128]:
rdd1 = sc.parallelize([("apple",1),("mouse",1),("monitor",5)],5)
rdd2 = rdd1.partitionBy(3)
print("rdd1 : %d, rdd2 : %d" %(rdd1.getNumPartitions(), rdd2.getNumPartitions()))

rdd1 : 5, rdd2 : 3


## 필터와 정렬 연산 

### 2.1.5.28 filer 

In [129]:
rdd1 = sc.parallelize(range(1,6))
rdd2 = rdd1.filter(lambda v: v > 2)
print(rdd2.collect())

[3, 4, 5]


### 2.1.5.29 sortByKey
 - Key를 기준으로 정렬. 
 - 소팅이 완료된 후에는 파티션 내부의 요소는 소팅 순서상 인접한 요소로 재구성. 

In [130]:
rdd = sc.parallelize([("q",1),("z",1),("a",1)])
result = rdd.sortByKey()
print(result.collect())

[('a', 1), ('q', 1), ('z', 1)]


### 2.1.5.30 keys, values
 - RDD의 구성요소가 Key-Value 쌍으로 구성된 경우에만 사용 가능. 

In [134]:
print(rdd.keys().collect())
print(rdd.values().collect())

['q', 'z', 'a']
[1, 1, 1]


### 2.1.5.31 sample
 - 샘픔을 추출해 새로운 RDD로 생성. 
 - sample(withReplacement:Boolean, fraction:Double, seed:Long=Utils.random.nextLong)
  - withReplacement : 복원추출 수행 여부 
  - fraction : 복원 추출일때와 비복원 추출일때가 달라진다. 
  - 복원 : 각 요소가 나타나는 횟수에 대한 기대값, 즉 각 요소의 평균 발생 횟수를 나타낸다. 반드시 0 이상
  - 비복원 : 각 요소가 샘플에 포함될 확률을 의미하며, 0과 1사이 값으로 지정. 
 - sample은 크기를 지정해놓고 추출하는 것이 아님을 알고 있어야 한다.(pySpark 수정 해야될듯.)
  - 정확한 크기를 정해놓고 샘플을 추출하고자 한다면 다음에 나올 takeSample을 이용해야한다. 
 - seed : 일정한 값이 나올 수 있도록 조정하는 (제어의 목적) 값. 

In [135]:
rdd = sc.parallelize(range(1,101))
result1 = rdd.sample(False,0.5,100)
result2 = rdd.sample(True,1.5,100)
print(result1.take(5))
print(result2.take(5))

[2, 5, 6, 10, 12]
[1, 2, 2, 3, 4]
