# RDD
<br>

RDD(Resilient Distributed Dataset)는 Apache Spark에서 데이터를 표현하는 기본 구조이다.
Spark에서는 RDD, Databrame, DataSet 세 가지 데이터구조를 제공
RDD는 데이터가 비구조적인 경우 사용하기 적합하다

Spark의 RDD, DataFrame 모두 immutable이라 일단 생성되고 나면 원본을 수정할 수 없다.

해당 Jupyter Notebook에서는 아래의 내용을 다루려고 한다.

## 1. RDD 생성과 기본 동작
- parallelize(list) : 배멸에서 읽어서 RDD를 생성
- take(출력개수) || collect() : RDD를 출력
- collect() : 모든 파티션의 데이터를 수집하여 로컬의 리스트로 반환하는 액션(Action) 함수

## 2. RDD 읽기 및 구성 
- Partition : Spark의 논리적인 데이터 분할, 즉 일정한 크기로 잘라놓은 데이터 뭉치
- 파일에서 RDD로 읽기, CSV에서 RDD로 읽기

## 3. Map-Reduce
- map() : 각 데이터 요소에 함수 적용
- reduce() : 각 데이터에 대해 함수를 반복적으로 적용하여 결과 값을 만들어 냄

## 4. 데이터 조작과 필터링
- filter() : 데이터 선별
- flat : 2차원 -> 1차원으로 변환
- foreach() : 각 데이터 요소에 함수 적용(반환값이 없음)

## 5. 통계
- 통계함수

<br><hr><br>

In [1]:
import pyspark

spark = pyspark.sql.SparkSession.builder\
    .master("local")\
    .appName("myApp")\
    .getOrCreate()

23/11/27 05:24:58 WARN Utils: Your hostname, sojaehwiui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.29 instead (on interface en0)
23/11/27 05:24:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/27 05:24:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
myList=[1,2,3,4,5,6,7]

#####  parallelize(list) : 배멸에서 읽어서 RDD를 생성

In [3]:
myRdd1 = spark.sparkContext.parallelize(myList) #RDD
type(myRdd1)

pyspark.rdd.RDD

##### take(출력개수) || collect() : RDD를 출력

In [4]:
myRdd1.take(3) #List

                                                                                

[1, 2, 3]

##### collect() : 모든 파티션의 데이터를 수집하여 로컬의 리스트로 반환하는 액션(Action) 함수

In [5]:
spark.sparkContext.parallelize([0, 2, 3, 4, 6], 2).collect() #List

[0, 2, 3, 4, 6]

##### Partition

파티션이란 논리적인 데이터 분할, 즉 일정한 크기로 잘라놓은 데이터 뭉치이다.

따라서 RDD는 파티션으로 구성되어 있다고 할 수 있다. 파티션의 수는 원하는 숫자만큼 구성할 수 있다. 데이터를 분할해 놓으면 동시에 여러 파티션을 나누어 여러 노드/스레드에서 병렬처리가 가능해진다.

In [6]:
rdd = spark.sparkContext.parallelize([0, 2, 3, 4, 6], 3) # 3개의 파티션으로 분리.

##### glom() : 파티션을 유지함

In [7]:
rdd.glom().collect()

[[0], [2, 3], [4, 6]]

## RDD 읽기

### 파일에서 RDD 읽어오기

In [8]:
%%writefile data/ds_spark_wiki.txt
Wikipedia
Apache Spark is an open source cluster computing framework.
아파치 스파크는 오픈 소스 클러스터 컴퓨팅 프레임워크이다.
Apache Spark Apache Spark Apache Spark Apache Spark
아파치 스파크 아파치 스파크 아파치 스파크 아파치 스파크
Originally developed at the University of California, Berkeley's AMPLab,
the Spark codebase was later donated to the Apache Software Foundation,
which has maintained it since.
Spark provides an interface for programming entire clusters with
implicit data parallelism and fault-tolerance.

Overwriting data/ds_spark_wiki.txt


In [10]:
import os
myRdd2=spark.sparkContext\
    .textFile(os.path.join("data","ds_spark_wiki.txt"))

In [11]:
myRdd2.first() #첫 데이터만 조회 -> RDD는 줄바꿈을 기준으로 Row 단위로 만든다.
# myRdd2.collect()

'Wikipedia'

### CSV에서 RDD 읽어오기

In [12]:
%%writefile ./data/ds_spark_2cols.csv
35, 2
40, 27
12, 38
15, 31
21, 1
14, 19
46, 1
10, 34
28, 3
48, 1
16, 2
30, 3
32, 2
48, 1
31, 2
22, 1
12, 3
39, 29
19, 37
25, 2

Overwriting ./data/ds_spark_2cols.csv


In [13]:
myRdd4 = spark.sparkContext\
    .textFile(os.path.join("data","ds_spark_2cols.csv"))

In [14]:
myList=myRdd4.take(5)

print(myList)

['35, 2', '40, 27', '12, 38', '15, 31', '21, 1']


In [15]:
myRdd4.map(lambda x: x.split(",")).collect()

#Lambda식으로 ,를 기준으로 쪼갬

[['35', ' 2'],
 ['40', ' 27'],
 ['12', ' 38'],
 ['15', ' 31'],
 ['21', ' 1'],
 ['14', ' 19'],
 ['46', ' 1'],
 ['10', ' 34'],
 ['28', ' 3'],
 ['48', ' 1'],
 ['16', ' 2'],
 ['30', ' 3'],
 ['32', ' 2'],
 ['48', ' 1'],
 ['31', ' 2'],
 ['22', ' 1'],
 ['12', ' 3'],
 ['39', ' 29'],
 ['19', ' 37'],
 ['25', ' 2']]

In [16]:
myRdd4.map(lambda x: x.split(","))\
    .map(lambda x: int(x[0])+int(x[1]))\
    .collect()
 #Lambda식으로 리스트의 1,2번째 합

[37,
 67,
 50,
 46,
 22,
 33,
 47,
 44,
 31,
 49,
 18,
 33,
 34,
 49,
 33,
 23,
 15,
 68,
 56,
 27]

### 문제: 파일에서 RDD 생성
다음 링크에서 파일을 읽어서 RDD를 생성하고, 5줄을 화면출력하세요.

경기도 의정부시 인구현황 (파일명: 경기도 의정부시_인구현황_20230731)
https://www.data.go.kr/data/15009613/fileData.do

제주특별자치도 서귀포시 내 연도별 65세이상 인구수 및 고령화비율, 노령화지수 현황 (파일명: 제주특별자치도 서귀포시_고령화비율및노령화지수현황_20200623)
https://www.data.go.kr/data/15051545/fileData.do

파일을 읽을 경우, 문자가 한글인지, 영어인지 어떻게 인코딩되었는지 주의해야 한다. 결과가 깨져보인다면, 왜 그런지 이유를 적어보자.

In [17]:
popRdd = spark.sparkContext\
    .textFile(os.path.join("data","경기도 의정부시_인구현황_20230831.csv"), use_unicode=True)

In [18]:
popRdd.take(5) # 화면에 출력하면 한글이 깨져있다. use_unicode=True설정을 주었는데도 그렇다. 다운로드 받으면서 한글이 깨져 있기 때문에 그렇다. 다운로드 받은 파일을 수정해서 출력하면 된다.

['�������,�α���(��),�α���(��),�α���(��),������(��),������(��),������(��),����,�����,������α�,���������,�����μ���,�μ���ȭ��ȣ,�����ͱ�������',
 '������1��,37557 ,19039 ,18518 ,8.08,4.1,3.98,102.81,22514 ,1.67,��\u2d75 �����ν�û,�ο����ǰ�,031-828-2466,2023-06-30',
 '������2��,29729 ,14817 ,14912 ,6.4,3.19,3.21,99.36,16007 ,1.86,��\u2d75 �����ν�û,�ο����ǰ�,031-828-2466,2023-06-30',
 'ȣ��1��,34658 ,16762 ,17896 ,7.46,3.61,3.85,93.66,15296 ,2.27,��\u2d75 �����ν�û,�ο����ǰ�,031-828-2466,2023-06-30',
 'ȣ��2��,33400 ,16088 ,17312 ,7.19,3.46,3.72,92.93,13422 ,2.49,��\u2d75 �����ν�û,�ο����ǰ�,031-828-2466,2023-06-30']

In [19]:
agedRdd = spark.sparkContext\
    .textFile(os.path.join("data","제주특별자치도 서귀포시_고령화비율및노령화지수현황_20230324.csv"), use_unicode=True)

In [20]:
for i in agedRdd.take(5)[1:]:
    j=i.split(",")
    j.append(int(j[2])-int(j[3])-int(j[4]))
    print(j)

['2008', '12', '153120', '22241', '26792', '14.52', '83.01', '2023-03-24', 104087]
['2009', '12', '152285', '23031', '25504', '15.12', '90.3', '2023-03-24', 103750]
['2010', '12', '153716', '23990', '24633', '15.6', '97.38', '2023-03-24', 105093]
['2011', '12', '153366', '24839', '23686', '16.2', '104.86', '2023-03-24', 104841]


##### binaryFiles
binaryFiles()는 이진파일을 읽는 함수이다. 인코딩된 cp949, euc-kr 등으로 디코딩하여 한글을 불러오는것을 시도한다.

In [21]:
popRddBin = spark.sparkContext.binaryFiles(os.path.join("data","경기도 의정부시_인구현황_20230831.csv"))

In [22]:
_my = popRddBin.map(lambda x :x[1].decode('euc-kr'))

In [23]:
_my.take(1) # RDD binaryFiles로 읽으니, 파일의 전체내용을 하나의 값으로 읽을 뿐만 아니라, 2차원 배열로 읽어도 행렬의 구조가 없어서 이해하기 어렵다.

['행정기관,인구수(계),인구수(남),인구수(여),구성비(계),구성비(남),구성비(여),성비,세대수,세대당인구,관리기관명,관리부서명,부서전화번호,데이터기준일자\r\n의정부1동,37557 ,19039 ,18518 ,8.08,4.1,3.98,102.81,22514 ,1.67,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n의정부2동,29729 ,14817 ,14912 ,6.4,3.19,3.21,99.36,16007 ,1.86,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n호원1동,34658 ,16762 ,17896 ,7.46,3.61,3.85,93.66,15296 ,2.27,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n호원2동,33400 ,16088 ,17312 ,7.19,3.46,3.72,92.93,13422 ,2.49,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n장암동,19197 ,9156 ,10041 ,4.13,1.97,2.16,91.19,8324 ,2.31,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n신곡1동,39848 ,19475 ,20373 ,8.57,4.19,4.38,95.59,17028 ,2.34,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n신곡2동,45593 ,22018 ,23575 ,9.81,4.74,5.07,93.4,19005 ,2.4,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n송산1동,56595 ,28012 ,28583 ,12.18,6.03,6.15,98,23887 ,2.37,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n송산2동,32054 ,15897 ,16157 ,6.9,3.42,3.48,98.39,13103 ,2.45,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\

In [24]:
popList = _my.map(lambda x: x.split()).take(3)
print("---00: ", popList[0][0])
print("---01: ", popList[0][1])

---00:  행정기관,인구수(계),인구수(남),인구수(여),구성비(계),구성비(남),구성비(여),성비,세대수,세대당인구,관리기관명,관리부서명,부서전화번호,데이터기준일자
---01:  의정부1동,37557


### 구조화된 데이터 -> Dataframe

In [25]:
popDf = spark\
            .read.option("charset", "euc-kr")\
            .option("header", "true")\
            .csv(os.path.join("data","경기도 의정부시_인구현황_20230831.csv"))

popDf.show(5)

+---------+----------+----------+----------+----------+----------+----------+------+------+----------+-----------------+----------+------------+--------------+
| 행정기관|인구수(계)|인구수(남)|인구수(여)|구성비(계)|구성비(남)|구성비(여)|  성비|세대수|세대당인구|       관리기관명|관리부서명|부서전화번호|데이터기준일자|
+---------+----------+----------+----------+----------+----------+----------+------+------+----------+-----------------+----------+------------+--------------+
|의정부1동|    37557 |    19039 |    18518 |      8.08|       4.1|      3.98|102.81|22514 |      1.67|경기도 의정부시청|민원여권과|031-828-2466|    2023-06-30|
|의정부2동|    29729 |    14817 |    14912 |       6.4|      3.19|      3.21| 99.36|16007 |      1.86|경기도 의정부시청|민원여권과|031-828-2466|    2023-06-30|
|  호원1동|    34658 |    16762 |    17896 |      7.46|      3.61|      3.85| 93.66|15296 |      2.27|경기도 의정부시청|민원여권과|031-828-2466|    2023-06-30|
|  호원2동|    33400 |    16088 |    17312 |      7.19|      3.46|      3.72| 92.93|13422 |      2.49|경기도 의정부시청|민원여권과|031-828-2466|    2023-06-30|
|   장암동

# spark-submit 실행 

위 프로그램을 .py로 저장하고, spark-submit 명령으로 배치 실행을 할 수 있다.(클러스터로 실행)

In [26]:
%%writefile src/ds3_popCsvRead.py
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import os
import pyspark

def doIt():
    print ("---------RESULT-----------")
    popDf = spark\
                .read.option("charset", "euc-kr")\
                .option("header", "true")\
                .csv(os.path.join("data","경기도 의정부시_인구현황_20230831.csv"))
    popDf.show(5)
    agedDf = spark\
                .read.option("charset", "euc-kr")\
                .option("header", "true")\
                .csv(os.path.join("data","제주특별자치도 서귀포시_고령화비율및노령화지수현황_20230324.csv"))
    agedDf.show(5)

if __name__ == "__main__":
    os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
    os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3"
    myConf=pyspark.SparkConf()
    spark = pyspark.sql.SparkSession.builder\
        .master("local")\
        .appName("myApp")\
        .config(conf=myConf)\
        .getOrCreate()
    doIt()
    spark.stop()

Overwriting src/ds3_popCsvRead.py


In [27]:
!spark-submit src/ds3_popCsvRead.py

23/11/27 05:26:07 WARN Utils: Your hostname, sojaehwiui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.29 instead (on interface en0)
23/11/27 05:26:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/11/27 05:26:08 INFO SparkContext: Running Spark version 3.4.1
23/11/27 05:26:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/27 05:26:08 INFO ResourceUtils: No custom resources configured for spark.driver.
23/11/27 05:26:08 INFO SparkContext: Submitted application: myApp
23/11/27 05:26:08 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/11/27 05:26:08 INFO ResourceProfile: Limiting resource is cpu
23/11/

23/11/27 05:26:12 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 12.1 KiB, free 433.9 MiB)
23/11/27 05:26:12 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 6.4 KiB, free 433.9 MiB)
23/11/27 05:26:12 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.30.1.29:59406 (size: 6.4 KiB, free: 434.3 MiB)
23/11/27 05:26:12 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1535
23/11/27 05:26:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[13] at showString at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
23/11/27 05:26:12 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks resource profile 0
23/11/27 05:26:12 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1) (172.30.1.29, executor driver, partition 0, PROCESS_LOCAL, 7991 bytes) 
23/11/27 05:26:12 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
23/1

23/11/27 05:26:12 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
23/11/27 05:26:12 INFO DAGScheduler: Got job 3 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
23/11/27 05:26:12 INFO DAGScheduler: Final stage: ResultStage 3 (showString at NativeMethodAccessorImpl.java:0)
23/11/27 05:26:12 INFO DAGScheduler: Parents of final stage: List()
23/11/27 05:26:12 INFO DAGScheduler: Missing parents: List()
23/11/27 05:26:12 INFO DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[27] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
23/11/27 05:26:12 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 11.4 KiB, free 434.2 MiB)
23/11/27 05:26:12 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 6.1 KiB, free 434.2 MiB)
23/11/27 05:26:12 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 172.30.1.29:59406 (size: 6.1 KiB, free: 434.4 Mi

In [28]:
!which python

/Users/sojaehwi/opt/anaconda3/bin/python


In [29]:
!python src/ds3_popCsvRead.py

23/11/27 05:26:17 WARN Utils: Your hostname, sojaehwiui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.29 instead (on interface en0)
23/11/27 05:26:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/27 05:26:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/27 05:26:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
---------RESULT-----------
+---------+----------+----------+----------+----------+----------+----------+------+------+----------+-----------------+----------+------------+--------------+
| 행정기관|인구수(계)|인구수(남)|인구수(여)|구성비(계)|구성비(남)|구성비(여)|  성비|세대수|세대당인구|       관리기관명|관리부서명|부서전화번호|데이터기준일자|
+---------+----------+----------+----------+----------+----------+----------+------+------+--

# Map-Reduce

### 1. Python으로 따라해보기

### 1-1. map() : 각 데이터 요소에 함수를 적용해서 list를 반환

In [30]:
def c2f(c):
    return (float(9)/5)*c + 32

f= map(c2f, [39.2, 36.5, 37.3, 37.8])
print (list(f))

[102.56, 97.7, 99.14, 100.03999999999999]


In [31]:
x = lambda x, y : x + y

print(x(1, 2))

3


In [32]:
# lambda func
celsius = [10, 20, 30, 40]
f=map(lambda c:(float(9)/5)*c + 32, celsius)
print(f, list(f))

<map object at 0x7fc548f56250> [50.0, 68.0, 86.0, 104.0]


In [33]:
list(map(lambda x:x.split(), "Hello World"))

[['H'], ['e'], ['l'], ['l'], ['o'], [], ['W'], ['o'], ['r'], ['l'], ['d']]

### 1-2. filter(): 데이터를 선별한다.

In [34]:
fib = [0,1,1,2,3,5,8,13,21,34,55]
result = filter(lambda x: x % 2, fib)
print (list(result))

[1, 1, 3, 5, 13, 21, 55]


### 1-3. reduce() : 데이터에 대해 함수를 반복적으로 적용하여 결과 값을 만들게 된다

reduce() 역시 함수와 데이터 2개의 인자를 받는다.

아래 예는 1부터 101까지 두 수 x,y 인자를 반복해서 더한다는 것이다. x는 부분합계로 y를 계속 저장해 나가는 역할을 하며, 최종 합계에 이르게 된다.

In [35]:
from functools import reduce
reduce(lambda x, y: x + y, range(1,101))

5050

### 2. RDD 사용하기

### 2-1. map() : 각 데이터 요소에 함수 적용

In [36]:
nRdd = spark.sparkContext.parallelize([1, 2, 3, 4])
squared = nRdd.map(lambda x: x * x)

print (squared)

PythonRDD[37] at RDD at PythonRDD.scala:53


In [37]:
squared.collect()   #변환의 실제 경과 보기

[1, 4, 9, 16]

#### 문자열을 정수로 반환하기

In [38]:
myRdd4.take(5)

['35, 2', '40, 27', '12, 38', '15, 31', '21, 1']

In [39]:
myRdd5 = myRdd4.map(lambda x : x.split(','))
myRdd5.take(5)

[['35', ' 2'], ['40', ' 27'], ['12', ' 38'], ['15', ' 31'], ['21', ' 1']]

In [40]:
myRdd6 = myRdd5.map(lambda x: [int(i) for i in x])
myRdd6.take(5)

[[35, 2], [40, 27], [12, 38], [15, 31], [21, 1]]

#### 단어 분리하기

In [41]:
myRdd2=spark.sparkContext\
    .textFile(os.path.join("data","ds_spark_wiki.txt"))

In [42]:
sentences=myRdd2.map(lambda x:x.split()) #공백을 기준으로 분할

In [43]:
sentences.count()

10

In [44]:
sentences.take(3)

[['Wikipedia'],
 ['Apache',
  'Spark',
  'is',
  'an',
  'open',
  'source',
  'cluster',
  'computing',
  'framework.'],
 ['아파치', '스파크는', '오픈', '소스', '클러스터', '컴퓨팅', '프레임워크이다.']]

In [45]:
for line in sentences.collect():
    for word in line:
        print (word, end=" ")
    print ("\n-----")

Wikipedia 
-----
Apache Spark is an open source cluster computing framework. 
-----
아파치 스파크는 오픈 소스 클러스터 컴퓨팅 프레임워크이다. 
-----
Apache Spark Apache Spark Apache Spark Apache Spark 
-----
아파치 스파크 아파치 스파크 아파치 스파크 아파치 스파크 
-----
Originally developed at the University of California, Berkeley's AMPLab, 
-----
the Spark codebase was later donated to the Apache Software Foundation, 
-----
which has maintained it since. 
-----
Spark provides an interface for programming entire clusters with 
-----
implicit data parallelism and fault-tolerance. 
-----


In [46]:
myRdd2.map(lambda s:len(s)).collect() #한 줄 Length

[9, 59, 32, 51, 31, 72, 71, 30, 64, 46]

In [47]:
's'.upper() #대소문자 변환

'S'

### 2-2. reduce() : 각 데이터 요소에 함수 적용

In [48]:
myRdd100 = spark.sparkContext.parallelize(range(1,101))

In [49]:
myRdd100.reduce(lambda subtotal, x: subtotal + x)

5050

### Fold
reduce와 유사하나 파티션별로 초기값을 할당함(많이 안씀)

In [50]:
spark.sparkContext.parallelize(range(1,11),2).glom().collect()

[[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]

In [51]:
# 파티션 수 2 * 초기값 10 + (초기값 10 + 55) = 85
spark.sparkContext.parallelize(range(1,11),2).fold(10, lambda subtotal, x: subtotal + x)

85

### 통계함수

In [52]:
print ("sum: ", myRdd100.sum())
print ("min: ", myRdd100.min())
print ("max: ", myRdd100.max())
print ("count: ", myRdd100.count())
print ("standard deviation:", myRdd100.stdev())
print ("variance: ", myRdd100.variance())

sum:  5050
min:  1
max:  100
count:  100
standard deviation: 28.86607004772212
variance:  833.25


### filter

In [53]:
myRdd2.collect()

['Wikipedia',
 'Apache Spark is an open source cluster computing framework.',
 '아파치 스파크는 오픈 소스 클러스터 컴퓨팅 프레임워크이다.',
 'Apache Spark Apache Spark Apache Spark Apache Spark',
 '아파치 스파크 아파치 스파크 아파치 스파크 아파치 스파크',
 "Originally developed at the University of California, Berkeley's AMPLab,",
 'the Spark codebase was later donated to the Apache Software Foundation,',
 'which has maintained it since.',
 'Spark provides an interface for programming entire clusters with',
 'implicit data parallelism and fault-tolerance.']

In [54]:
myRdd_spark=myRdd2.filter(lambda line: "Spark" in line)
print ("How many lines having 'Spark': ", myRdd_spark.count())

How many lines having 'Spark':  4


In [55]:
myRdd_unicode = myRdd2.filter(lambda line: u"스파크" in line) #u는 유니코드. pyhton3부터는 안해도된다.
print (myRdd_unicode.first())

아파치 스파크는 오픈 소스 클러스터 컴퓨팅 프레임워크이다.


### flat : 2차원 -> 1차원으로 변환

In [56]:
# 문장 안에 stopwords를 포함한 경우는 제거되지 않는다. 따라서 flatMap()을 하고 단어에 대해 불용어를 제거해야 한다. 
# 불용어는 단어빈도를 계산하면서 제거하고 싶은 단어를 말한다. 불용어는 빈도를 세어도 의미가 없는 대명사 (이, 그, 저...) 또는 한 글자 단어 (등...)이 
# 될 수 있다. 
# 한글은 유니코드로 처리해야 한다. 영어는 대소문자를 모두 처리하기 위해 여기서는 소문자로 만들어 처리한다.


stopwords = ['is','am','are','the','for','a', 'an', 'at']
myRdd_stop = myRdd2.flatMap(lambda x:x.split())\
                    .filter(lambda x: x not in stopwords)
                            
                            
for words in myRdd_stop.collect():
    print (words, end=' ')

Wikipedia Apache Spark open source cluster computing framework. 아파치 스파크는 오픈 소스 클러스터 컴퓨팅 프레임워크이다. Apache Spark Apache Spark Apache Spark Apache Spark 아파치 스파크 아파치 스파크 아파치 스파크 아파치 스파크 Originally developed University of California, Berkeley's AMPLab, Spark codebase was later donated to Apache Software Foundation, which has maintained it since. Spark provides interface programming entire clusters with implicit data parallelism and fault-tolerance. 

### foreach() : map()과 달리 반환값이 없다

In [57]:
spark.sparkContext.parallelize([1, 2, 3, 4, 5]).foreach(lambda x: x + 1)

In [58]:
spark.sparkContext.parallelize([1, 2, 3, 4, 5]).map(lambda x: x + 1).collect()

[2, 3, 4, 5, 6]

In [59]:
def f(x): 
    print(x)
    
    
spark.sparkContext.parallelize([1, 2, 3, 4, 5]).foreach(f)

1
2
3
4
5


### Pipeline Chaining
Function을 연이어서 적용하는 방식

In [60]:
wordsLength = myRdd_stop\
    .map(len)\
    .collect()


print (wordsLength)

[9, 6, 5, 4, 6, 7, 9, 10, 3, 4, 2, 2, 4, 3, 8, 6, 5, 6, 5, 6, 5, 6, 5, 3, 3, 3, 3, 3, 3, 3, 3, 10, 9, 10, 2, 11, 10, 7, 5, 8, 3, 5, 7, 2, 6, 8, 11, 5, 3, 10, 2, 6, 5, 8, 9, 11, 6, 8, 4, 8, 4, 11, 3, 16]
