In [1]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()

#### 2.1.5.17 join

두 RDD에서 서로 같은 키를 가지고 있는 요소를 모아 그룹을 형성. 이 결과로 구성된 새로운 RDD를 생성.

Tuple(키, Tuple(첫번째 RDD의 요소, 두번째 RDD의 요소)) 형태로 구성.

스칼라
<pre>
    val rdd1 = sc.parallelize(List("a", "b", "c", "d", "e")).map((_, 1))
    val rdd2 = sc.parallelize(List("b", "c")).map((_, 2))
    val result = rdd1.join(rdd2)
    println(result.collect.mkString("\n"))
</pre>

자바
<code>
    List&lt;Tuple2&lt;String, Integer&gt;&gt; data1 = Arrays.asList(new Tuple2("a", 1), new Tuple2("b", 1), new Tuple2("c", 1), new Tuple2("d", 1), new Tuple2("e", 1));
    List&lt;Tuple2&lt;String, Integer&gt;&gt; data2 = Arrays.asList(new Tuple2("b", 2), new Tuple2("c", 2));
    
    JavaPairRDD&lt;String, Integer&gt; rdd1 = sc.parallelizePairs(data1);
    JavaPairRDD&lt;String, Integer&gt; rdd2 = sc.parallelizePairs(data2);
    
    JavaPairRDD&lt;String, Tuple2&lt;Integer, Integer&gt;&gt; result = rdd1.&lt;Integer&gt;join(rdd2);
    System.out.println(result.collect());
</code>

In [2]:
# 파이썬
rdd1 = sc.parallelize(["a", "b", "c", "d", "e"]).map(lambda v: (v, 1), 1)
rdd2 = sc.parallelize(["b", "c"]).map(lambda v: (v, 2), 1)
result = rdd1.join(rdd2)
print(result.collect())

[('c', (1, 2)), ('b', (1, 2))]


In [3]:
# 파이썬
rdd1 = sc.parallelize(["a", "b", "c", "d", "e"]).map(lambda v: (v, 1))
rdd2 = sc.parallelize(["b", "c"]).map(lambda v: (v, 2))
result = rdd2.join(rdd1)
print(result.collect())

[('c', (2, 1)), ('b', (2, 1))]


#### 2.1.5.18 leftOuterJoin, rightOuterJoin

왼쪽 외부 조인과 오른쪽 외부 조인을 수행하고, 그 결과로 구성된 새로운 RDD를 돌려줌.

catesian() 메서드의 실행 결과에는 나타나지 않았던 요소들이 포함.

스칼라
<pre>
    val rdd1 = sc.parallelize(List("a", "b", "c")).map((_, 1))
    val rdd2 = sc.parallelize(List("b", "c")).map((_, 2))
    val result1 = rdd1.leftOuterJoin(rdd2)
    val result2 = rdd1.rightOuterJoin(rdd2)
    println("Left: " + result1.collect.mkString("\t"))
    println("Right: " + result2.collect.mkString("\t"))
</pre>

자바
<pre>
    List&lt;Tuple2&lt;String, Integer&gt;&gt; data1 = Arrays.asList(new Tuple2("a", 1), new Tuple2("b", "1"), new Tuple2("c", "1"));
    List&lt;Tuple2&lt;String, Integer&gt;&gt; data2 = Arrays.asList(new Tuple2("b", 2), new Tuple2("c", "2"));

    JavaPairRDD&lt;String, Integer&gt; rdd1 = sc.parallelizePairs(data1);
    JavaPairRDD&lt;String, Integer&gt; rdd2 = sc.parallelizePairs(data2);

    JavaPairRDD&lt;String, Tuple2&lt;Integer, Optional&lt;Integer&gt;&gt;&gt; result1 = rdd1.&lt;Integer&gt;leftOuterJoin(rdd2);
    JavaPairRDD&lt;String, Tuple2&lt;Optional&lt;Integer&gt;, Integer&gt;&gt; result2 = rdd1.&lt;Integer&gt;rightOuterJoin(rdd2);
    System.out.println("Left: " + result1.collect());
    System.out.println("Right: " + result2.collect());
</pre>

In [4]:
# 파이썬
rdd1 = sc.parallelize(["a", "b", "c"]).map(lambda v: (v, 1))
rdd2 = sc.parallelize(["b", "c"]).map(lambda v: (v, 2))
result1 = rdd1.leftOuterJoin(rdd2)
result2 = rdd1.rightOuterJoin(rdd2)
print("Left: %s" % result1.collect())
print("Right: %s" % result2.collect())

Left: [('a', (1, None)), ('c', (1, 2)), ('b', (1, 2))]
Right: [('c', (1, 2)), ('b', (1, 2))]


#### 2.1.5.19 subtractByKey

rdd1.subtractByKey(rdd2)는 rdd1의 요소 중에서 rdd2에 같은 키가 존재하는 요소를 제외한 나머지로 구성된 새로운 RDD를 돌려줌.

스칼라
<pre>
    val rdd1 = sc.parallelize(List("a", "b")).map((_, 1))
    val rdd2 = sc.parallelize(List("b")).map((_, 2))
    val result = rdd1.subtractByKey(rdd2)
    println(result.collect.mkString("\n"))
</pre>

자바
<pre>
    List&lt;Tuple2&lt;String, Integer&gt;&gt; data1 = Arrays.asList(new Tuple2("a", 1), new Tuple2("b", 1));
    List&lt;Tuple2&lt;String, Integer&gt;&gt; data2 = Arrays.asList(new Tuple2("b", 2));
    
    JavaPairRDD&lt;String, Integer&gt; rdd1 = sc.parallelizePairs(data1);
    JavaPairRDD&lt;String, Integer&gt; rdd2 = sc.parallelizePairs(data2);
    
    JavaPairRDD&lt;String, Integer&gt; result = rdd1.subtractByKey(rdd2);
    System.out.println(result.collect());
</pre>

In [5]:
# 파이썬
rdd1 = sc.parallelize(["a", "b"]).map(lambda v: (v, 1))
rdd2 = sc.parallelize(["b"]).map(lambda v: (v, 2))
result = rdd1.subtractByKey(rdd2)
print(result.collect())

[('a', 1)]


#### 2.1.5.20 reduceByKey

같은 키를 가진 값들을 하나로 병합해 키-값 쌍으로 구성된 새로운 RDD를 생성.

두 개의 값을 하나로 합치는 함수를 인자로 전달 받으며 이 함수는 결합법칙과 교환법칙이 성립해야 함.

스칼라
<pre>
    val rdd = sc.parallelize(List("a", "b", "b")).map((_, 1))
    val result = rdd.reduceByKey(_ + _)
    println(result.collect.mkString(","))
</pre>

자바
<pre>
    List&lt;Tuple2&lt;String, Integer&gt;&gt; data = Arrays.asList(new Tuple2("a", 1), new Tuple2("b", 1), new Tuple2("b", 1));

    JavaPairRDD&lt;String, Integer&gt; rdd = sc.parallelizePairs(data);

    // Java7
    JavaPairRDD&lt;String, Integer&gt; result = rdd.reduceByKey(new Function2&lt;Integer, Integer, Integer&gt;() {
      @Override
      public Integer call(Integer v1, Integer v2) throws Exception {
        return v1 + v2;
      }
    });

    // Java8 Lambda
    JavaPairRDD&lt;String, Integer&gt; result2 = rdd.reduceByKey((Integer v1, Integer v2) -&gt; v1 + v2);
    System.out.println(result.collect());
</pre>

In [6]:
# 파이썬
rdd = sc.parallelize(["a", "b", "b"]).map(lambda v: (v, 1))
result = rdd.reduceByKey(lambda v1, v2: v1 + v2)
print(result.collect())

[('b', 2), ('a', 1)]


* RDD의 트랜스포메이션 메서드에는 데이터 처리 과정에서 사용할 파티셔너와 파티션 개수를 지정할 수 있는 옵션 존재.

#### 2.1.5.21 foldByKey

reduceByKey와 유사하게 같은 키를 가진 값들을 하나로 병합해 키-값 쌍으로 구성된 새로운 RDD를 생성.

초기값을 메서드의 인자로 전달해서 병합 시 사용가능.

각 단위 병합 단계에서 결과에 영향이 없는 값을 초기값으로 사용하므로 이 함수는 교환법칙만 만족하면 사용가능.

스칼라
<pre>
    val rdd = sc.parallelize(List("a", "b", "b")).map((_, 1))
    val result = rdd.foldByKey(0)(_ + _)
    println(result.collect.mkString(","))
</pre>

자바
<pre>
    List&lt;Tuple2&lt;String, Integer&gt;&gt; data = Arrays.asList(new Tuple2("a", 1), new Tuple2("b", 1), new Tuple2("b", 1));

    JavaPairRDD&lt;String, Integer&gt; rdd = sc.parallelizePairs(data);

    // Java7
    JavaPairRDD&lt;String, Integer&gt; result = rdd.foldByKey(0, new Function2&lt;Integer, Integer, Integer&gt;() {
      @Override
      public Integer call(Integer v1, Integer v2) throws Exception {
        return v1 + v2;
      }
    });

    // Java8 Lambda
    JavaPairRDD&lt;String, Integer&gt; result2 = rdd.foldByKey(0, (Integer v1, Integer v2) -&gt; v1 + v2);
    System.out.println(result.collect());
</pre>

In [7]:
# 파이썬
rdd = sc.parallelize(["a", "b", "b"]).map(lambda v: (v, 1))
result = rdd.foldByKey(0, lambda v1, v2: v1 + v2)
print(result.collect())

[('b', 2), ('a', 1)]


#### 2.1.5.22 combineByKey

같은 키를 가진 값들을 하나로 병합하는 기능을 수행. 병합을 수행하는 과정에서 값의 타입이 바뀔 수 있음.

기존 병합 함수의 스칼라 API
<pre>
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zerovalue: V)(func: (V, V) => V): RDD[(K, V)]
</pre>

combineByKey 스칼라 API, C는 클래스
<pre>
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V), mergeCombiners: (C, C) => C): RDD[(K, C)]
</pre>

인자로 지정된 3개의 함수 의미
* createCombiner() : 값을 병합하기 위한 콤바이너를 생성. 두 개의 값을 하나로 병합하는 객체. 각 키별로 생성됨.
* mergeValue() : 키에 대한 콤바이너가 이미 존재한다면 이 함수를 이용해 값을 기존 콤바이너에 병합.
* createCombiner(), mergeValue()는 파티션 단위로 수행
* mergeCombiners() : 병합이 끝난 콤바이너끼리 다시 병합을 수행해 최종 콤바이너를 생성. 최종 결과를 생성함.

스칼라
<pre>
case class Record(var amount: Long, var number: Long = 1) {
    def map(v: Long) = Record(v)
    def add(amount: Long): Record = {
        add(map(amount))
     }
    def add(other: Record): Record = {
        this.number += other.number
        this.amount += other.amount
        this
    }
    override def toString: String = s"avg:${amount / number}"
}
// combineByKey()를 이용한 평균값 계산
val data = Seq(("Math", 100L), ("Eng", 80L), ("Math", 50L), ("Eng", 70L), ("Eng", 90L))
val rdd = sc.parallelize(data)
val createCombiner = (v: Long) => Record(v)
val mergeValue = (c: Record, v: Long) => c.add(v)
val mergeCombiners = (c1: Record, c2: Record) => c1.add(c2)
val result = rdd.combineByKey(createCombiner, mergeValue, mergeCombiners)
println(result.collect.mkString(",\t"))
</pre>

자바
<pre>
    List&lt;Tuple2&lt;String, Long&gt;&gt; data = Arrays.asList(new Tuple2("Math", 100L), new Tuple2("Eng", 80L), new Tuple2("Math", 50L), new Tuple2("Eng", 70L), new Tuple2("Eng", 90L));

    JavaPairRDD&lt;String, Long&gt; rdd = sc.parallelizePairs(data);

    // Java7
    Function&lt;Long, Record&gt; createCombiner = new Function&lt;Long, Record&gt;() {
      @Override
      public Record call(Long v) throws Exception {
        return new Record(v);
      }
    };

    Function2&lt;Record, Long, Record&gt; mergeValue = new Function2&lt;Record, Long, Record&gt;() {
      @Override
      public Record call(Record record, Long v) throws Exception {
        return record.add(v);
      }
    };

    Function2&lt;Record, Record, Record&gt; mergeCombiners = new Function2&lt;Record, Record, Record&gt;() {
      @Override
      public Record call(Record r1, Record r2) throws Exception {
        return r1.add(r2);
      }
    };

    JavaPairRDD&lt;String, Record&gt; result = rdd.combineByKey(createCombiner, mergeValue, mergeCombiners);

    // Java8
    JavaPairRDD&lt;String, Record&gt; result2 = rdd.combineByKey((Long v) -&gt; new Record(v), (Record record, Long v) -&gt; record.add(v), (Record r1, Record r2) -&gt; r1.add(r2));

    System.out.println(result.collect());

</pre>

In [8]:
# 파이썬 - 다음 소스는 jupyter notebook에서 바로 실행 불가.
# https://stackoverflow.com/questions/28569374/spark-returning-pickle-error-cannot-lookup-attribute
record_py = """class Record:
    def __init__(self, amount, number=1):
        self.amount = amount
        self.number = number
        
    def addAmt(self, amount):
        return Record(self.amount + amount, self.number + 1)
    
    def __add__(self, other):
        amount = self.amount + other.amount
        number = self.number + other.number 
        return Record(amount, number)
        
    def __str__(self):
        return "avg:" + str(self.amount / self.number)

    def __repr__(self):
        return 'Record(%r, %r)' % (self.amount, self.number)
"""

src = """import pyspark
from record import *

sc = pyspark.SparkContext()

def createCombiner(v):
    return Record(v)

def mergeValue(c, v):
    return c.addAmt(v)

def mergeCombiners(c1, c2):
    return c1 + c2

rdd = sc.parallelize([("Math", 100), ("Eng", 80), ("Math", 50), ("Eng", 70), ("Eng", 90)])
result = rdd.combineByKey(lambda v: createCombiner(v), lambda c, v: mergeValue(c, v),
                                  lambda c1, c2: mergeCombiners(c1, c2))

print('Math', result.collectAsMap()['Math'], 'Eng', result.collectAsMap()['Eng'])
"""

with open('record.py', 'w') as f:
    f.write(record_py)
with open('job.py', 'w') as f:
    f.write(src)

In [9]:
%%bash
$SPARK_HOME/bin/spark-submit --py-files record.py job.py 2>/dev/null

Math avg:75.0 Eng avg:80.0


In [10]:
import os
#os.remove('record.py')
os.remove('job.py')

#### 2.1.5.23 aggregateByKey

combineByKey()의 특수한 경우로 초기값을 생성하는 부분을 제외하면 동일한 동작을 수행.

def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

seqOp은 mergeValue(), combOp은 mergeCombiner()와 서로 같은 역할을 하는 함수.

병합에 필요한 초기값을 알 기 위해 zeroValue라는 "값"을 사용함.

스칼라
<pre>
    val data = Seq(("Math", 100L), ("Eng", 80L), ("Math", 50L), ("Eng", 70L), ("Eng", 90L))
    val rdd = sc.parallelize(data)
    val zero = Record(0, 0)
    val mergeValue = (c: Record, v: Long) => c.add(v)
    val mergeCombiners = (c1: Record, c2: Record) => c1.add(c2)
    val result = rdd.aggregateByKey(zero)(mergeValue, mergeCombiners)
    println(result.collect.mkString(",\t"))
</pre>

자바
<pre>
List&lt;Tuple2&lt;String, Long&gt;&gt; data = Arrays.asList(new Tuple2("Math", 100L), new Tuple2("Eng", 80L), new Tuple2("Math", 50L), new Tuple2("Eng", 70L), new Tuple2("Eng", 90L));

    JavaPairRDD&lt;String, Long&gt; rdd = sc.parallelizePairs(data);

    // Java7
    Record zero = new Record(0, 0);

    Function2&lt;Record, Long, Record&gt; mergeValue = new Function2&lt;Record, Long, Record&gt;() {
      @Override
      public Record call(Record record, Long v) throws Exception {
        return record.add(v);
      }
    };

    Function2&lt;Record, Record, Record&gt; mergeCombiners = new Function2&lt;Record, Record, Record&gt;() {
      @Override
      public Record call(Record r1, Record r2) throws Exception {
        return r1.add(r2);
      }
    };

    JavaPairRDD&lt;String, Record&gt; result = rdd.aggregateByKey(zero, mergeValue, mergeCombiners);

    // Java8
    JavaPairRDD&lt;String, Record&gt; result2 = rdd.aggregateByKey(zero, (Record record, Long v) -&gt; record.add(v), (Record r1, Record r2) -&gt; r1.add(r2));

    System.out.println(result.collect());
</pre>

In [11]:
# 파이썬 - 다음 소스는 jupyter notebook에서 바로 실행 불가.
src = """import pyspark
from record import *

sc = pyspark.SparkContext()

def mergeValue(c, v):
    return c.addAmt(v)

def mergeCombiners(c1, c2):
    return c1 + c2

rdd = sc.parallelize([("Math", 100), ("Eng", 80), ("Math", 50), ("Eng", 70), ("Eng", 90)])
result = rdd.aggregateByKey(Record(0, 0), lambda c, v: mergeValue(c, v), lambda c1, c2: mergeCombiners(c1, c2))
print('Math', result.collectAsMap()['Math'], 'Eng', result.collectAsMap()['Eng'])

"""

with open('job.py', 'w') as f:
    f.write(src)

In [12]:
%%bash
$SPARK_HOME/bin/spark-submit --py-files record.py job.py 2>/dev/null

Math avg:75.0 Eng avg:80.0


In [13]:
os.remove('record.py')
os.remove('job.py')

### [pipe 및 파티션 관련 연산]

#### 2.1.5.24 pipe

pipe를 이용하면 데이터를 처리하는 과정에서 외부 프로세스 활용 가능.

cut 유틸리티를 이용해 문자열 분리 후 첫 번째와 세 번째 숫자를 뽑아내는 예제

스칼라
<pre>
    val rdd = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9"))
    val result = rdd.pipe("cut -f 1,3 -d ,")
    println(result.collect.mkString(", "))
</pre>

자바
<pre>
    JavaRDD&lt;String&gt; rdd = sc.parallelize(Arrays.asList("1,2,3", "4,5,6", "7,8,9"));
    JavaRDD&lt;String&gt; result = rdd.pipe("cut -f 1,3 -d ,");
    System.out.println(result.collect());
</pre>

In [14]:
# 파이썬
rdd = sc.parallelize(["1,2,3", "4,5,6", "7,8,9"])
result = rdd.pipe("cut -f 1,3 -d ,")
print(result.collect())

['1,3', '4,6', '7,9']


#### 2.1.5.25 coelesce와 repartition

repartition() : 파티션 수를 늘리거나 줄이는 것 모두 가능. 셔플을 기반으로 동작. 파티션을 늘릴 경우 주로 사용.
    
coalesce() : 줄이는 것만 가능. 셔플옵션 없으면 셔플 안함. 파티션을 줄일 경우 주로 사용.

스칼라
<pre>
    val rdd1 = sc.parallelize(1 to 1000000, 10)
    val rdd2 = rdd1.coalesce(5)
    val rdd3 = rdd2.repartition(10);
    println(s"partition size: ${rdd1.getNumPartitions}")
    println(s"partition size: ${rdd2.getNumPartitions}")
    println(s"partition size: ${rdd3.getNumPartitions}")
</pre>

자바
<pre>
    JavaRDD&lt;Integer&gt; rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 0), 10);
    JavaRDD&lt;Integer&gt; rdd2 = rdd1.coalesce(5);
    JavaRDD&lt;Integer&gt; rdd3 = rdd2.coalesce(10);
    System.out.println("partition size:" + rdd1.getNumPartitions());
    System.out.println("partition size:" + rdd2.getNumPartitions());
    System.out.println("partition size:" + rdd3.getNumPartitions());
</pre>

In [15]:
# 파이썬
rdd1 = sc.parallelize(list(range(1, 11)), 10)
rdd2 = rdd1.coalesce(5)
rdd3 = rdd2.repartition(10)
print("partition size: %d" % rdd1.getNumPartitions())
print("partition size: %d" % rdd2.getNumPartitions())
print("partition size: %d" % rdd3.getNumPartitions())

partition size: 10
partition size: 5
partition size: 10


#### 2.1.5.26 repartitionAndSortWithinPartitions

RDD를 구성하는 모든 데이터를 특정 기준에 따라 여러 개의 파티션으로 분리하고 각 파티션 단위로 정렬을 수행한 뒤 이 결과로 새로운 RDD를 생성해 주는 메서드.

데이터가 키와 값 쌍으로 구성돼 있어야 함.

파티셔너 : 각 데이터의 키 값을 이용해 데이터가 속할 파티션을 결정. 이 때 키 값을 이용한 정렬도 함께 수행.

10개의 무작위 숫자를 3개의 파티션으로 분리해 보는 예제

스칼라
<pre>
    val r = scala.util.Random
    val data = for (i &lt;- 1 to 10) yield (r.nextInt(100), "-")
    val rdd1 = sc.parallelize(data)
    val rdd2 = rdd1.repartitionAndSortWithinPartitions(new HashPartitioner(3))
    rdd2.foreachPartition(it =&gt; {
      println("==========");
      it.foreach(v => println(v))
    })
</pre>

자바
<pre>
    List&lt;Integer&gt; data = fillToNRandom(10);
    JavaPairRDD&lt;Integer, String&gt; rdd1 = sc.parallelize(data).mapToPair((Integer v) -&gt; new Tuple2(v, "-"));
    JavaPairRDD&lt;Integer, String&gt; rdd2 = rdd1.repartitionAndSortWithinPartitions(new HashPartitioner(3));

    rdd2.foreachPartition(new VoidFunction&lt;Iterator&lt;Tuple2&lt;Integer, String&gt;&gt;&gt;() {
      @Override
      public void call(Iterator&lt;Tuple2&lt;Integer, String&gt;&gt; it) throws Exception {        System.out.println("==========");
        while (it.hasNext()) {
          System.out.println(it.next());
        }
      }
    });

</pre>

In [16]:
# 파이썬
import random
data = [random.randrange(1, 100) for i in range(0, 10)]
rdd1 = sc.parallelize(data).map(lambda v: (v, "-"))
rdd2 = rdd1.repartitionAndSortWithinPartitions(3, lambda x: x)
# 결과 검증
# rdd2.foreachPartition(lambda values: print(list(values)))
rdd2.collect()

[(3, '-'),
 (42, '-'),
 (96, '-'),
 (43, '-'),
 (64, '-'),
 (91, '-'),
 (97, '-'),
 (8, '-'),
 (32, '-'),
 (44, '-')]

foreachPartition() 메서드는 RDD의 파티션 단위로 특정 함수를 실행해 주는 메서드.

결과 : 각 기 값에 따라 파티션이 분리되고 동시에 키 값에 따라 정렬.

#### 2.1.5.27 partitionBy

RDD 구성요소가 키와 값의 쌍으로 구성된 경우 사용할 수 있는 메서드.

Partitioner 클래스의 인스턴스를 인자로 전달.

HashPartitioner, RangePartitioner 두 종류 존재. 파티션 생성 기준을 변경하고 싶으면 Partitioner 클래스를 상속.

스칼라
<pre>
    val rdd1 = sc.parallelize(List("apple", "mouse", "monitor"), 5).map { a => (a, a.length) }
    val rdd2 = rdd1.partitionBy(new HashPartitioner(3))
    println(s"rdd1:${rdd1.getNumPartitions}, rdd2:${rdd2.getNumPartitions}")
</pre>

자바
<pre>
    List&lt;Tuple2&lt;String, Integer&gt;&gt; data = Arrays.asList(new Tuple2("apple", 1), new Tuple2("mouse", 1), new Tuple2("monitor", 1));
    JavaPairRDD&lt;String, Integer&gt; rdd1 = sc.parallelizePairs(data, 5);
    JavaPairRDD&lt;String, Integer&gt; rdd2 = rdd1.partitionBy(new HashPartitioner(3));
    System.out.println("rdd1:" + rdd1.getNumPartitions() + ", rdd2:" + rdd2.getNumPartitions());
</pre>

In [17]:
# 파이썬
rdd1 = sc.parallelize([("apple", 1), ("mouse", 1), ("monitor", 1)], 5)
rdd2 = rdd1.partitionBy(3)
print("rdd1: %d, rdd2: %d" % (rdd1.getNumPartitions(), rdd2.getNumPartitions()))

rdd1: 5, rdd2: 3


### 필터와 정렬 연산

#### 2.1.5.28 filter

RDD의 원하는 요소만 남기고 원하지 않는 요소를 걸러내는 메서드.

참 거짓으로 가려내는 함수를 RDD의 각 요소에 적용해 결과가 참인 것만 남김.

스칼라
<pre>
    val rdd = sc.parallelize(1 to 5)
    val result = rdd.filter(_ > 2)
    println(result.collect.mkString(", "))
</pre>

자바
<pre>
    JavaRDD&lt;Integer&gt; rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
    JavaRDD&lt;Integer&gt; result = rdd.filter(new Function&lt;Integer, Boolean&gt;() {
      @Override
      public Boolean call(Integer v1) throws Exception {
        return v1 &gt; 2;
      }
    });
    System.out.println(result.collect());
</pre>

In [18]:
rdd1 = sc.parallelize(range(1, 5+1))
rdd2 = rdd1.filter(lambda i: i > 2)
print(rdd2.collect())

[3, 4, 5]


#### 2.1.5.29 sortByKey

sortByKey()는 키 값을 기준으로 요소를 정렬하는 연산.

키 값을 기준으로 정렬하기 때문에 모든 요소가 키와 값 형태도 구성돼 있어야 함.

스칼라
<pre>
    val rdd = sc.parallelize(List("q", "z", "a"))
    val result = rdd.map((_, 1)).sortByKey()
    println(result.collect.mkString(", "))
</pre>

자바
<pre>
    List&lt;Tuple2&lt;String, Integer&gt;&gt; data = Arrays.asList(new Tuple2("q", 1), new Tuple2("z", 1), new Tuple2("a", 1));
    JavaPairRDD&lt;String, Integer&gt; rdd = sc.parallelizePairs(data);
    JavaPairRDD&lt;String, Integer&gt; result = rdd.sortByKey();
    System.out.println(result.collect());
</pre>

In [19]:
# 파이썬
rdd = sc.parallelize([("q", 1), ("z", 1), ("a", 1)])
result = rdd.sortByKey()
print(result.collect())

[('a', 1), ('q', 1), ('z', 1)]


#### 2.1.5.30 keys, values

키와 값 쌍으로 구성된 경우에 사용할 수 있는 메서드

keys() : 키에 해당하는 요소로 구성된 RDD를 생성
    
values() : 값에 해당하는 요소로 구성된 RDD를 리턴

스칼라
<pre>
    val rdd = sc.parallelize(List(("k1", "v1"), ("k2", "v2"), ("k3", "v3"))
    println(rdd.keys.collect.mkString(","))
    println(rdd.values.collect.mkString(","))
</pre>

자바
<pre>
    List&lt;Tuple2&lt;String, String&gt;&gt; data = Arrays.asList(new Tuple2("k1", "v1"), new Tuple2("k2", "v2"), new Tuple2("k3", "v3"));
    JavaPairRDD&lt;String, String&gt; rdd = sc.parallelizePairs(data);
    System.out.println(rdd.keys().collect());
    System.out.println(rdd.values().collect());
</pre>

In [20]:
# 파이썬
rdd = sc.parallelize([("k1", "v1"), ("k2", "v2"), ("k3", "v3")])
print(rdd.keys().collect())
print(rdd.values().collect())

['k1', 'k2', 'k3']
['v1', 'v2', 'v3']


#### 2.1.5.31 sample

샘플을 추출해 새로운 RDD를 생성.

sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

withReplacement : 복원 추출을 수행할지 여부를 정하는 것. True면 복원, False면 비복원.

fraction

복원 추출 : 각 요소의 평균 발생 회수. 반드시 0 이상이여야 함.

비복원 추출 : 각 요소가 샘플에 포함될 확률. 0과 1 사이의 값으로 지정.

seed : 반복 시행 시 결과가 바뀌지 않고 일정한 값이 나오도록 제어하는 목적.

스칼라
<pre>
    val rdd = sc.parallelize(1 to 100)
    val result1 = rdd.sample(false, 0.5)
    val result2 = rdd.sample(true, 1.5)
    println(result1.take(5).mkString(","))
    println(result2.take(5).mkString(","))
</pre>

자바
<pre>
    List&lt;Integer&gt; data = fillToN(100);
    JavaRDD&lt;Integer&gt; rdd = sc.parallelize(data);
    JavaRDD&lt;Integer&gt; result1 = rdd.sample(false, 0.5);
    JavaRDD&lt;Integer&gt; result2 = rdd.sample(true, 1.5);
    System.out.println(result1.take(5));
    System.out.println(result2.take(5));
</pre>

In [21]:
# 파이썬
rdd = sc.parallelize(range(1,100+1))
result1 = rdd.sample(False, 0.5)
result2 = rdd.sample(True, 1.5)
print(result1.take(5))
print(result2.take(5))

[4, 5, 8, 11, 12]
[1, 2, 4, 4, 4]


### 2.1.6 RDD 액션

결과값이 정수, 리스트, 맵 등 RDD가 아닌 타입인 경우

트렌스포메이션 : 결과값이 RDD인 메서드. 느긋한 평가(lazy evaluation, 지연계산) 방식을 채택.

액션 메서드를 여러번 호출하면 트렌스포메이션 메서드도 여러번 실행됨.