# 키/ 값 페어로 작업하기

1. 키/값 페어 RDD

2. 파티셔닝

## 배경


## 페어 RDD 생성

다음 포맷은 키/값 데이터를 그대로 pair RDD로 만들어 리턴함.
이를 위해 map() 함수가 쓰임.
이 예제는 README를 읽어 각 라인의 첫 단어를 키로 데이터를 만드는 코드임.
만들어진 데이터는 튜플로 기록 

In [17]:
from pyspark import SparkContext
# lines=sc.textFile("README.md")
lines= sc.parallelize(["holden likes coffee", "panda likes long strings and coffee"])
pairs = lines.map(lambda x: (x.split(" ")[0],x))

## 페어 RDD 트랜스포메이션

#### 예( {(1,2), (3,4), (3,6)})
|함수이름| 목적| 예| 결과|
|---
|reduceByKey(func)|동일 키에 대한 값을 합침|rdd.reduceByKey((x,y)=>x+y)| {(1,2),(3,10)}|
|groupByKey()|동일 키에 대한 값을 그룹화|rdd.groupBykey()|{(1,[2]),(3,[4,6])|
|combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)|다른 결과 타입으로 동일 키 값 합침|||
|mapValues(func)|키 변경 없이 각 값에 func 적용|rdd.mapValues(x=>x+1)|{(1,3),(3,5),(3,7)}|
|flatMapValues(func)|각 값에 반복자 함수 적용해서 리턴 값과 기존 키 페어를 만듦|rdd.flatMapValues(x=>(x to5))|{(1,2),(1,3),(1,4), (1,5),(3,4),(3,5)}| 
|keys()| 키값|rdd.key()|(1,3,3)|
|values()| 밸류값|rdd.values()|(2,4,6)|
|sortByKey()| 키로 정렬된 RDD|rdd.sortByKey()| {(1,2), (3,4), (3,6)}|


#### 두 페어 RDD (rdd={(1,2), (3,4), (3,6)}, other={(3,9)}
|함수이름| 목적| 예| 결과|
|---
|subtractByKey|다른쪽 RDD 키로 RDD데이터 삭제|rdd.subtractByKey(other)|{(1,2)}
|join|inner join 수행|rdd.join(other)|{(3,(4,9)),(3,(6,9))}|
|rightOuterJoin|오른쪽 RDD 키들을 대상으로 조인|rdd.rightOuterJoin(other)|{(3,(some(4),9)),(3,(some(6),9))}|
|leftOuterJoin|왼쪽 RDD 키들을 대상으로 조인|rdd.leftOuterJoin(other)|{1,(2,None), (3,(some(4),9)),(3,(some(6),9))}|
|cogroup|동일키의 양쪽 RDD를 그룹화|rdd.cogroup(other)| {(1,{[2],[]}, {(3,{[4],[6],[9]}| 

또한 기존 RDD 함수도 지원한다.

In [18]:
result = pairs.filter(lambda keyValue: len(keyValue[1])<20)
result.collect()

[('holden', 'holden likes coffee')]

### 집합연산

reduceByKey() : reduce와 유사. 각 키와 키에 대해 합쳐진 값으로 새로운 RDD를 리턴

foldByKey(): fold와 유사

In [48]:
rdd= sc.parallelize({("panda",0), ("pink",3),("pirate",3),("panda",1),("pink",4)})
#rdd=sc.textFile("README.md")
rdd.mapValues(lambda x: (x,1).reduceByKey(lambda x, y: (x[0]+y[0],x[1]+y[1])))

PythonRDD[96] at RDD at PythonRDD.scala:43

In [45]:
rdd=sc.textFile("README.md")
words=rdd.flatMap(lambda x: x.split(" "))
result = words.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)
result.collect()

[(u'', 67),
 (u'when', 1),
 (u'R,', 1),
 (u'including', 3),
 (u'computation', 1),
 (u'using:', 1),
 (u'guidance', 2),
 (u'Scala,', 1),
 (u'environment', 1),
 (u'only', 1),
 (u'rich', 1),
 (u'Apache', 1),
 (u'sc.parallelize(range(1000)).count()', 1),
 (u'Building', 1),
 (u'guide,', 1),
 (u'return', 2),
 (u'Please', 3),
 (u'Try', 1),
 (u'not', 1),
 (u'Spark', 13),
 (u'scala>', 1),
 (u'Note', 1),
 (u'cluster.', 1),
 (u'./bin/pyspark', 1),
 (u'params', 1),
 (u'through', 1),
 (u'GraphX', 1),
 (u'[run', 1),
 (u'abbreviated', 1),
 (u'[project', 2),
 (u'##', 8),
 (u'library', 1),
 (u'see', 1),
 (u'"local"', 1),
 (u'[Apache', 1),
 (u'will', 1),
 (u'#', 1),
 (u'processing,', 1),
 (u'for', 11),
 (u'[building', 1),
 (u'provides', 1),
 (u'print', 1),
 (u'supports', 2),
 (u'built,', 1),
 (u'[params]`.', 1),
 (u'available', 1),
 (u'run', 7),
 (u'tests](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools).',
  1),
 (u'This', 2),
 (u'Hadoop,', 2),
 (u'Tests', 1),
 (u'example:', 1),

combineByKey(): 한 파티션 내의 데이터를 하나씩 읽어 새로운 데이터는 createCombiner()를 사용하여 해당 키의 어큐뮬레이터의 초기값을 만들고 기존 값이 있을 때는 mergeValue() 로 어큐뮬레이터의 현재 값에 새로운 값을 적용
각 파티션에서 같은 키에 대해 여러 어큐뮬레이터가 작동할 수 있음. 이 때는 mergeCombiner()를 써서 합쳐짐.

### 병렬화 수준 최적화
모든 RDD는 고정된 개수의 파티션을 가지고 있으며 이것이 연산이 처리될 때 동시 작업의 수준을 결정하게 된다. 이 때 특정 개수의 파티션을 사용하도록 요구할 수 있다.


In [None]:
data = [("a",3), ("b",4),("a",1)]
sc.parallelize(data.reduceByKey(lamda x, y: x+y,10))

repartition(): 파티셔닝 바꾸고 싶을 때(셔플링이 일어남)

coalesce(): 파티션 개수 줄이는 경우에 데이터 이동이 일어나지 않음

rdd.getNumPartitions(): 파티션 개수

### 데이터 그룹화
groupByKey(): [K,Iterable[v]]
cogroup(): 여러 RDD의 키를 공유해 그룹화 [K,(Iterable[v], Iterable[W])]

### 조인
내부 조인: 양쪽 RDD에 모두 존재하는 키만 결과가 됨.

leftOuterJoin(): 원본 RDD의 키값과 합침.

rightOuterJoin(): 키가 다른쪽 RDD에 존재해야 하며 원본 RDD에 옵션으로 표시


### 데이터 정렬

sortByKey(): ascending이라는 인자를 받음

In [51]:
rdd.sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x:str(x))
rdd.collect()

[('panda', 1), ('pink', 3), ('pirate', 3), ('panda', 0), ('pink', 4)]

## 페어 RDD 액션

countByKey(): 키 값 개수

colectAsMap(): 결과를 맵 형태로

lookup(key): 키에 대한 모든 값을 되돌려 줌

## 데이터 파티셔닝
키의 모음들이 같은 노드에 함께 모이는 것을 지원

### RDD의 파티셔너 정하기

### 파티셔닝이 도움이 되는 연산들
reduceByKey, combineByKey, lookup…
• 각키에 대한 연산이 단일장비에서 이루어짐

• cogorup, join …

• 최소 하나 이상의 RDD가 네트워크를 통해 전송될 필요가 없게 해줌

### 파티셔닝에 영향을 주는 연산들
• 데이터를 파티션하는 연산에 의해 만들어진 RDD는 자동으로 설정됨

   • ex) join: Hash 파티셔닝 됨

• 지정된 파티셔닝을 쓰는 것이 보장되지 못하는 연산에서는 설정 X

   • ex) map: 키의 변경 가능성이 존재


### 예제: 페이지 랭크

각 페이지의 랭크를 1.0으로 초기화

반복 주기마다 공헌치 {랭크(p) /이웃숫자(p)}를 이웃들에게 전송

각 페이지 랭크를 갱신 0.15 + 0.85 * 받은 공헌치

In [52]:
// 이웃 리스트는 스파크 오브젝트에 저장되어 있다고 가정
val links = sc.objectFile[(String, Seq[String])]("links")
              .partitionBy(new HashPartitioner(100))
              .persist()

        // 각 페이지의 기본 랭크를 1.0으로 초기화
var ranks = links.mapValues(v => 1.0)

// 알고리즘 10회 반복
for (i <- 0 until 10) {
    val contributions = links.join(ranks).flatMap {
        case (pageId, ( links , rank)) =>
            links .map(dest => (dest, rank / links.size))
}
ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)
}
// 결과 저장
ranks.saveAsTextFile("ranks")

SyntaxError: invalid syntax (<ipython-input-52-43f70aa6fc27>, line 1)

### 사용자 지정 파티셔너
같은 도메인의 안의 링크를 동일한 파티션으로 지정

org.apache.spark.Partitioner 상속

• numPartitions: 파티션 수

• getPartiton: 키에 대한 파티션 ID 반환

• equlas: 두 RDD가 같은 방식으로 파티션 되었는지 검사

In [None]:
class DomainNamePartitioner(numParts: Int) extends Partitioner {
    override def numPartitions: Int = numParts
    override def getPartition(key: Any): Int = {
        val domain = new Java.net.URL(key.toString).getHost()
        val code = (domain.hashCode % numPartitions)
        if (code < 0) {
            code + numPartitions // 음수인 경우 0 이상인 값으로
        } else {
          code
        }
    }
    // 자바의 equals 메소드, 스파크가 직접 만든 파티셔너 객체를 비교하는데 사용
    override def equals(other: Any): Boolean = other match {
        case dnp: DomainNamePartitioner =>
            dnp.numPartitions == numPartitions
        case _ =>
        false
    }
}

In [53]:
import urlparse

def hash_domain(url):
    return hash(urlparse.urlparse(url).netloc)

rdd.partitionBy(20,hash_domain)

MapPartitionsRDD[113] at mapPartitions at PythonRDD.scala:374