# 2강. Basic Spark

### 다양한 종류의 transformation과 action 함수들을 실행해보자

In [8]:
pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Note: you may need to restart the kernel to use updated packages.


In [9]:
import findspark; findspark.init()
import pyspark
sc = pyspark.SparkContext(appName="myAppName")

In [10]:
print(sc.parallelize(['hello world']).collect())

['hello world']


In [3]:
#np array 함수 0-9를 만든 후 출력 해보자
import numpy as np
foo_np = np.arange(10)
print (foo_np)

[0 1 2 3 4 5 6 7 8 9]


<div class="alert alert-warning"/>
- spark.sparkContext.parallelize(c, n)<br>
 + 주어진 데이터 c를 n개의 rdd로 조각내어 spark에 전달 시키는 명령<br>
- rdd.collect()<br>
 + 주어진 rdd를 하나의 결과로 모으는 명령<br>
- rdd.take(n)<br>
 + 주어진 rdd에서 n개의 데이터를 모으는 명령<br>

In [4]:
#rdd에 collect함수를 실행하면 action 후 모든 정보를 메모리로 반환시킴 (결과가 대용량 자료일 경우 추천하지 않음)
rdd = sc.parallelize(foo_np)
print (rdd.collect(), rdd.take(3))
rdd = sc.parallelize(foo_np, 10)
print (rdd.collect(), rdd.take(3))
print (type(rdd))

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] [0, 1, 2]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] [0, 1, 2]
<class 'pyspark.rdd.RDD'>


<div class="alert alert-warning"/>
- rdd.map(func)<br>
 + 주어진 rdd를 순회하면서 func를 통하여 변경된 새로운 rdd를 만드는 명령<br><br>
- rdd.filter(func)<br>
 + 주어진 rdd를 순회하면서 func의 동작이 True인 row를 새로운 rdd로 만드는 명령<br><br>
- rdd.first()<br>
 + take(1)과 동일하나 결과가 row의 배열이 아님<br><br>
- rdd.count()<br>
 + 주어진 rdd의 row 개수를 측정함<br>

In [5]:
#rdd에서 first 동작을 수행하면 action이 일어나 결과 중 제일 처음 record를 반환함
rdd.first()

0

In [6]:
#rdd는 map 함수를 주어진 함수에 대하여 전달 할 수 있음
#x의 원소가 +5가 되는 rdd를 실행한 뒤 collect를 이용하여 출력해보자
rdd.map(lambda x: x + 5).collect()

[5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

In [7]:
#filter에 짝수일 경우 참이 되는 function을 전달하여 collect로 출력하여 보자
rdd.filter(lambda x: x % 2 == 0).collect()

[0, 2, 4, 6, 8]

In [8]:
#rdd.count를 이용하여 짝수개였던 원소가 몇개였는지 출력하여 보자
rdd.count()

10

<div class="alert alert-warning"/>
- sample(withReplacement, fraction, seed)<br>
 + 주어진 rdd를 순회하면서 복원/비복원 추출을 주어진 비율만큼 주어진 random seed를 사용하여 추출<br><br>


In [9]:
#값이 0-99인 데이터를 rdd로 변경해보자
rdd = sc.parallelize(range(100))

#rdd.sample은 원소의 중복을 허용하면서 지정된 비율의 데이터를 표본으로 생성함
#sampling은 랜덤함수의 결과로 반환되기 때문에 동일한 명령어여도 값이 다를 수 있음
sampleA = rdd.sample(True, 0.5)
sampleB = rdd.sample(True, 0.5)
print ("sampleA:", sampleA.count())
print ("sampleB:", sampleB.count())

#비율이 바뀌면 반환값도 바뀌게 됨
sampleC = rdd.sample(True, 0.3)
sampleD = rdd.sample(True, 0.3)
print ("sampleC:", sampleC.count())
print ("sampleD:", sampleD.count())

sampleA: 69
sampleB: 45
sampleC: 31
sampleD: 24


In [10]:
#sampleA와 sampleB 결과가 다른 것을 take를 통하여 확인할 수 있음
print (sampleA.take(3))
print ('-------------')
print (sampleB.take(3))

[0, 0, 0]
-------------
[1, 2, 2]


In [11]:
#랜덤 함수의 seed를 고정하여 동알한 랜덤값이 발생하게 고정 할 수 있음
sampleE = rdd.sample(True, 0.3, 1)
sampleF = rdd.sample(True, 0.3, 1)
print ("sampleE:", sampleE.count())
print ("sampleF:", sampleF.count())
print (sampleE.take(3))
print ('-------------')
print (sampleF.take(3))

sampleE: 38
sampleF: 38
[1, 3, 5]
-------------
[1, 3, 5]


<div class="alert alert-warning"/>
- union(rdd)<br>
 + 주어진 rdd와 다른 rdd를 결합함<br><br>
- intersection(rdd)<br>
 + 주어진 rdd와 다른 rdd의 공통적인 row를 추출함<br><br>
- distinct([numPartitions])<br>
 + 주어진 rdd에서 공통적인 row를 제거함<br><br>
- cartesian(rdd)<br>
 + 주어진 2개의 rdd 값을 순회하며서 값들의 조합을 생성함<br><br>
- subtract(rdd)<br>
 + 주어진 2개의 rdd에서 2번째 rdd의 원소들을 제거한 값을 추출함

In [12]:
#합집합을 수행하여보자
a = sc.parallelize([1, 1, 2, 3, 4])
b = sc.parallelize([1, 1, 2, 3, 4])

ab = a.union(b)
print(ab.collect())

[1, 1, 2, 3, 4, 1, 1, 2, 3, 4]


In [13]:
#교집합을 수행하여보자
a = sc.parallelize([1, 1, 2, 3, 4, 5, 6, 7])
b = sc.parallelize([1, 1, 2, 3, 4])

print(a.intersection(b).collect())

[1, 2, 3, 4]


In [14]:
a = sc.parallelize([1, 2, 2, 3, 3, 4, 2, 4, 1, 5])
b = sc.parallelize([1, 2, 2, 3, 3, 4, 1, 5, 2, 6])
print(a.intersection(b).collect())

[1, 2, 3, 4, 5]


In [15]:
a = sc.parallelize([(1, 2), (2, 3), (3, 4), (2, 4), (1, 5)])
b = sc.parallelize([(1, 2), (2, 3), (3, 4), (1, 5), (2, 6)])
print(a.intersection(b).collect())

[(2, 3), (1, 5), (1, 2), (3, 4)]


In [16]:
#중복 제거를 수행한뒤 개수를 비교하여 보자
print (a.collect())
print (a.distinct().collect())

[(1, 2), (2, 3), (3, 4), (2, 4), (1, 5)]
[(1, 5), (1, 2), (2, 4), (2, 3), (3, 4)]


In [17]:
a = sc.parallelize([1, 2, 1, 3, 1, 2, 2, 4, 1, 5])
a.distinct().collect()

[1, 2, 3, 4, 5]

In [18]:
a = sc.parallelize([(1, 2), (1, 3), (1, 2), (2, 4), (1, 5)])
a.distinct().collect()

[(1, 5), (1, 2), (1, 3), (2, 4)]

In [19]:
#1-3의 값을 가지는 rdd와 a,b,c 를 가지는 rdd를 생성하여 cartesian product를 수행한 뒤 collect로 출력해보자
a = sc.parallelize([1, 2, 3])
b = sc.parallelize(['a', 'b', 'c'])
a.cartesian(b).collect()

[(1, 'a'),
 (1, 'b'),
 (1, 'c'),
 (2, 'a'),
 (2, 'b'),
 (2, 'c'),
 (3, 'a'),
 (3, 'b'),
 (3, 'c')]

In [20]:
#1-5의 값을 가지는 rdd와 0, 2, 3의 값을 가지는 rdd를 이용하여 subtract를 수행한 뒤 collect로 출력해보자
a = sc.parallelize([1, 2, 3, 4, 5])
b = sc.parallelize([0, 2, 3])
a.subtract(b).collect()

[1, 4, 5]

<div class="alert alert-warning"/>
- zip(withReplacement, fraction, seed)<br>
 + 서로 다른 2개의 rdd를 인덱스에 맞춰 key, value로 묶어주는 동작<br><br>
- groupBy(func)<br>
 + rdd의 값을 순회하면서 func가 반환시키는 기준의 데이터로 값들을 묶어주는 동작<br><br>
- sortBy(keyFunc, ascending, numPartitions)<br>
 + rdd의 값을 keyFunc의 반환 값 기준으로 ascending의 order로 정렬함 <br>

In [21]:
#1-3의 값을 가지는 rdd와 a,b,c 를 가지는 rdd를 생성하여 zip을 수행한 뒤 collect로 출력해보자
a = sc.parallelize(["a", "b", "c"])
b = sc.parallelize([1, 2, 3])
a.zip(b).collect()

[('a', 1), ('b', 2), ('c', 3)]

In [22]:
#1-8의 값을 가지는 rdd에 짝수를 판별하는 func를 이용하여 groupby를 수행한 뒤 값을 출력 해보자
rdd = sc.parallelize([1,2,3,4,5,6,7,8])
grouped = rdd.groupBy(lambda x:x%2==0)
print (type(grouped), grouped.collect())

#grouped의 각 그룹의 결과를 출력하여보자
a, b = grouped.collect()
print (type(a), a[0], list(a[1]))
print (type(b), b[0], list(b[1]))

<class 'pyspark.rdd.PipelinedRDD'> [(False, <pyspark.resultiterable.ResultIterable object at 0x7f98e4223c50>), (True, <pyspark.resultiterable.ResultIterable object at 0x7f98e4223f90>)]
<class 'tuple'> False [1, 3, 5, 7]
<class 'tuple'> True [2, 4, 6, 8]


In [23]:
#1-9의 값을 가진 rdd를 sortBy를 이용하여 데이터를 정렬하여 보자
#숫자 타입의 값인 경우 x => -x로 바꿔주는 방식으로 오름차순으로 변경 가능
rdd = sc.parallelize(range(10))
print ( rdd.sortBy(lambda x:x).collect())
print ( rdd.sortBy(lambda x:-x).collect())
print ( rdd.sortBy(lambda x:x, False).collect())

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]


<div class="alert alert-warning"/>
- cogroup(rdd)<br>
 + 서로 다른 rdd를 key 기준으로 value를 묶음<br><br>
- join(rdd)<br>
 + 서로 다른 rdd에서 공통의 key 기준으로 value를 묶음<br><br>
- leftOuterJoin(rdd)<br>
 + 왼쪽 rdd 기준으로 value 를 묶음<br><br>
- rightOuterJoin(rdd)<br>
 + 오른쪽 rdd 기준으로 value 를 묶음<br>

In [24]:
#key가 존재하는 연산일 경우 cogroup을 사용하여 편리하게 같은 key의 값을 group으로 묶을 수 있다
rdd1 = sc.parallelize([(1,1),(1,2),(2,3),(3,4)])
rdd2 = sc.parallelize([(1,'a'),(2,'b'),(4,'c'),(1,'d')])
cogrouped = rdd1.cogroup(rdd2)
cogrouped.collect()

[(1,
  (<pyspark.resultiterable.ResultIterable at 0x7f98e4268cd0>,
   <pyspark.resultiterable.ResultIterable at 0x7f98e4268f90>)),
 (2,
  (<pyspark.resultiterable.ResultIterable at 0x7f98e4268dd0>,
   <pyspark.resultiterable.ResultIterable at 0x7f98e4268bd0>)),
 (3,
  (<pyspark.resultiterable.ResultIterable at 0x7f98e4268390>,
   <pyspark.resultiterable.ResultIterable at 0x7f98e4268b10>)),
 (4,
  (<pyspark.resultiterable.ResultIterable at 0x7f98e4268fd0>,
   <pyspark.resultiterable.ResultIterable at 0x7f98e423add0>))]

In [25]:
a, b, c, d = cogrouped.collect()
print (a[0], list(a[1][0]), list(a[1][1]))
print (b[0], list(b[1][0]), list(b[1][1]))
print (c[0], list(c[1][0]), list(c[1][1]))
print (d[0], list(d[1][0]), list(d[1][1]))

1 [1, 2] ['a', 'd']
2 [3] ['b']
3 [4] []
4 [] ['c']


In [26]:
#key가 존재하는 연산일 경우 join을 사용하여 공통 key를 가지고 있는 데이터를 묶어볼 수 있다
#공통 key가 존재하지 않는다면 해당 데이터는 제거 됨
rdd1.join(rdd2).collect()

[(1, (1, 'a')), (1, (1, 'd')), (1, (2, 'a')), (1, (2, 'd')), (2, (3, 'b'))]

In [27]:
#공통 key가 없더라도 좌측 기준의 rdd의 key를 기준으로 우측에 rdd가 존재하지 않을 경우 leftOuterJoin를 이용하여 데이터를 묶을 수 있다
rdd1.leftOuterJoin(rdd2).collect()

[(1, (1, 'a')),
 (1, (1, 'd')),
 (1, (2, 'a')),
 (1, (2, 'd')),
 (2, (3, 'b')),
 (3, (4, None))]

In [28]:
#leftOuterJoin과 반대로 rightOuterJoin으로 우측기준으로 묶을 수도 있다
rdd1.rightOuterJoin(rdd2).collect()

[(1, (1, 'a')),
 (1, (1, 'd')),
 (1, (2, 'a')),
 (1, (2, 'd')),
 (2, (3, 'b')),
 (4, (None, 'c'))]

<div class="alert alert-warning"/>
- pipe(cmd)<br>
 + rdd를 순회하면서 cmd 명령의 결과를 전달함<br><br>
- reparition(num)<br>
 + 주어진 rdd를 주어진 num 만큼의 파티션으로 변경<br><br>
- coalesce(num)<br>
 + 주어진 rdd를 주어진 num으로 현재 파티션 개수보다 줄임<br><br>
- partitionBy(num, partitionFunc)<br>
 + 주어진 rdd를 주어진 num의 파티션 만큼 쪼개고 기준은 partitionFunc으로 잡음<br>

In [29]:
#linux환경이거나 gow를 깔았다면 유용한 명령어를 spark안에서 수행시킬 수 있음 | 로 연속된 동작은 지원하지 않음
rdd = sc.parallelize(str(i) for i in range(20))
rdd.pipe('grep [015]').collect()

['0', '1', '5', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19']

In [30]:
#병렬 처리 상황 (분산환경, 멀티프로세스 환경)에서 파티션이 많을 경우 executor가 많아져 동시 수행을 할 수 있음
rdd = sc.parallelize(range(100))
print (rdd.getNumPartitions())
rdd = sc.parallelize(range(100), 100)
print (rdd.getNumPartitions())

8
100


In [31]:
#repartition의 경우 주어진 rdd의 파티션 개수를 늘이거나 줄일 수 있음
rdd = sc.parallelize(range(100))
rdd = rdd.repartition(5)
print (rdd.getNumPartitions())
rdd = rdd.repartition(10)
print (rdd.getNumPartitions())

5
10


In [32]:
#coalesce의 경우 주어진 rdd의 파티션 개수를 줄일 수는 있으나 늘릴 수는 없음
rdd = sc.parallelize(range(100), 100)
rdd = rdd.coalesce(200)
print (rdd.getNumPartitions())
rdd = rdd.coalesce(10)
print (rdd.getNumPartitions())

100
10


In [33]:
#partitionBy의 경우 repartiion과 동일한 연산을 수행하며, partition을 수행할 기준이 되는 func를 넘길 수 있음 (대용량 분산처리에서 간혹  쓰임)
rdd = sc.parallelize(range(100), 100)
rdd = rdd.partitionBy(3)
print (rdd.getNumPartitions())
rdd = rdd.partitionBy(10)
print (rdd.getNumPartitions())

3
10


<div class="alert alert-warning"/>
- reduce(func)<br>
 + 주어진 rdd를 순회하면서 기존 값과 현재값의 연산을 func를 통하여 수행 및 반복<br>
 + 교환법칙과 결합법칙이 적용되는 데이터 타입에만 적용 가능<br><br>
- fold(initValue, func)<br>
 + 주어진 rdd를 순회하면서 기존 값과 현재값의 연산을 func를 통하여 수행 및 반복, 단 초기값 설정 가능<br>
 + 교환법칙과 결합법칙이 적용되는 데이터 타입에만 적용 가능<br><br>

In [34]:
#reduce를 이용하여 값을 합쳐보자
rdd = sc.parallelize(range(10))
print(rdd.getNumPartitions())
print(rdd.reduce(lambda a,b:a+b))
print()

#파티션이 3개로 주어졌을때와 비교하여 보자
rdd = sc.parallelize(range(10), 3)
print(rdd.getNumPartitions())
print(rdd.reduce(lambda a,b:a+b))

8
45

3
45


In [35]:
#fold의 경우로 값을 합쳐보자
rdd = sc.parallelize(range(10))
print( rdd.fold(0, lambda a,b:a+b), rdd.getNumPartitions())
print( rdd.fold(1, lambda a,b:a+b), rdd.getNumPartitions() )
print()

#fold의 경우 파티션이 나눠진 경우 값이 다른 것을 확인 할 수 있다
rdd = sc.parallelize(range(10), 3)
print ( rdd.fold(0, lambda a,b:a+b), rdd.getNumPartitions())
print ( rdd.fold(1, lambda a,b:a+b), rdd.getNumPartitions() )

45 8
54 8

45 3
49 3


<div class="alert alert-warning"/>
- foreach(func)<br>
 + rdd의 값을 순회하면서 func를 실행<br><br>
- sum()<br>
 + rdd의 값을 sum 한뒤 반환<br><br>
- min()/max()/mean()<br>
 + rdd의 값들 중 가장 크거나 작은 값, 평균값을 반환<br>

In [36]:
#foreach로 원소를 출력하여 보자
rdd = sc.parallelize(range(1,10), 3)
rdd.foreach(lambda x: print(x))
rdd.foreach(print)

In [37]:
#rdd에 기본 집계 함수를 사용할 수 있음
rdd = sc.parallelize(range(1,10), 3)
print(rdd.sum())
print(rdd.min(), rdd.max(), rdd.mean())

45
1 9 5.0


## test yourself

* 타이타닉 데이터의 모든 column을 '구별'하여 RDD로 로드하여보자
 * textFile를 이용하여 로드하시오. #spark SQL이 아닌 spark core의 기능으로만 구현

In [38]:
rdd = sc.textFile('data/titanic.csv')
rdd.take(5)

['Survived,Pclass,Name,Sex,Age,Siblings/Spouses Aboard,Parents/Children Aboard,Fare',
 '0,3,Mr. Owen Harris Braund,male,22,1,0,7.25',
 '1,1,Mrs. John Bradley (Florence Briggs Thayer) Cumings,female,38,1,0,71.2833',
 '1,3,Miss. Laina Heikkinen,female,26,0,0,7.925',
 '1,1,Mrs. Jacques Heath (Lily May Peel) Futrelle,female,35,1,0,53.1']

* 주어진 타이타닉 사고 데이터를 이용하여 남성/여성의 총 인원수를 집계해보시오

* 주어진 타이타닉 데이터에서 생존 남성의 평균 연령, 생존 여성의 평균 연령을 산출해보시오.

* 타이타닉 사고 데이터에서 남성의 생존률과 여성의 생존률을 Pclass 대비로 측정해 보시오