# Spark RDD
-  스파크는 주 언어가 Scala임.
- 기본적으로 클러스터 환경에서 동작하는 프로그램이기 때문에 AWS, Microsoft Azure 혹은 GCP(Google Cloud Platform) 등의 클라우드 환경에서 주로 사용.
- 스파크의 경우 파이썬을 지원하기 때문에 몇 가지 기본 개념은 실습이 가능.

##  탄력적 분산 데이터셋 (RDD: Resilient Distributed Dataset)
- 의미 :  스파크에서 사용하는 기본 추상개념으로 클러스터의 머신(노드)의 여러 메모리에 분산하여 저장할 수 있는 데이터의 집합.
- 스파크는 RDD(Resilient Distributed Dataset)를 구현하기 위한 프로그램.
- RDD를 스파크라는 프로그램을 통해 실행 시킴으로써 메모리 기반의 대량의 데이터 연산이 가능하게 되었고 이는 하둡보다 100배는 빠른 연산을 가능하게 해 주었음.
- 스파크는 하드디스크에서 파일을 읽어온 뒤 연산 단계에는 데이터를 메모리에 저장하자는 아이디어를 생각함.
- 메모리에 적재하기 좋은 새로운 형태의 추상화 작업(abstraction)이 필요하기 때문에 고안된 것이 바로 RDD(Resilient Distributed Dataset), "탄력적 분산 데이터 셋"임.
- 데이터의 유실을 막기 위해 메모리의 데이터를 읽기 전용(Read-Only, 변경 불가)로 만들고 데이터를 만드는 방법(계보, Lineage)을 기록하고 있다가 데이터가 유실되면 다시 만드는 방법을 사용함.

### RDD 특징
- In-Memory
- Fault Tolerance
- Immutable(Read-Only)
- Partition

- 각 파티션은 RDD의 전체 데이터 중 일부를 나타냄.
- 스파크는 데이터를 여러 대의 머신에 분할해서 저장하며, Chunk, 혹은 파티션으로 분할되어 저장함.
- 파티션을 RDD데이터의 부분을 표현하는 단위 정도로 이해하면 됨.

### RDD 생성
- 내부에서 만들어진 데이터 집합을 병렬화하는 방법: parallelize()함수 사용
- 외부의 파일을 로드하는 방법: .textFile() 함수 사용
- 이 작업은 실제로는 RDD의 lineage(계보)를 만드는데 지나지 않음.

### - RDD 동작
- Transformations
- Actions
- RDD는 immutable(불변)하다고 하기 때문에 연산 수행에 있어 기존의 방식과는 다르게 수행됨. 
- Transformations은 RDD에게 변형 방법(연산 로직, 계보, lineage)을 알려주고 새로운 RDD를 만들지만 실제 연산의 수행은 Actions을 통해 행해짐.
- Transformations를 통해 새로운 RDD를 만듦.
- actions은 결괏값을 보여주고 저장하는 역할을 하며 실제 Transformations 연산을 지시하기도 함.

## PySpark 설치

In [1]:
import pyspark
pyspark.__version__

'3.0.1'

## SparkContext를 통한 스파크 초기화
- 분산 환경에서 운영되는 스파크는 driver 프로그램을 구동시킬 때 SparkContext라는 특수 객체를 만들게 됨.
- 스파크는 이 SparkContext 객체를 통해 스파크의 모든 기능에 접근함.
- 이 객체는 스파크 프로그램당 한 번만 실행할 수 있고 사용 후에는 종료해야 함.
- SparkContext를 다른 말로 스파크의 "엔트리 포인트(entry point)"라고도 하고, SparkContext를 생성하는 것을 "스파크를 초기화한다(Initializing Spark)"라고 함.

### - PySpark에서
- 문법: pyspark.SparkContext()
- 스파크 기능의 기본 엔트리 포인트입니다.
- 스파크 클러스터와 연결을 나타내며 RDD를 만들고 브로드캐스트 하는데 사용될 수 있습니다.
- JVM 당 하나만 활성화해야 하며, 새로운 것을 만들기 전에는 활성을 중지해야 합니다.

In [2]:
from pyspark import SparkConf, SparkContext

#보통 변수명은  sc로 선언함.
sc = SparkContext()
sc

In [3]:
type(sc)

pyspark.context.SparkContext

In [4]:
#SparkContext 종료
sc.stop()

In [5]:
sc = SparkContext(master='local', appName='PySpark Basic')
sc

In [6]:
#생성한 SparkContext의 Configuration을 확인
sc.getConf().getAll()

[('spark.master', 'local'),
 ('spark.app.name', 'PySpark Basic'),
 ('spark.app.id', 'local-1630645115691'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.port', '44001'),
 ('spark.driver.host', 'w0ul9i4kxtvxj63dy3wsu0dmb-7c7b5bbc6c-57m57'),
 ('spark.ui.showConsoleProgress', 'true')]

In [7]:
sc.master

'local'

In [8]:
sc.appName

'PySpark Basic'

In [9]:
#종료
sc.stop()

In [10]:
#.setMaster(), .setAppName()을 이용해 어플리케이션의 이름과 Master의 URL을 설정
conf = SparkConf().setAppName('PySpark Basic').setMaster('local')
sc = SparkContext(conf=conf)
sc

## RDD Creation 

In [11]:
#방금 전 만든 SparkContext()의 parallelize()함수를 이용해서 내부의 데이터 집합을 RDD로 만듦
rdd = sc.parallelize([1,2,3])
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262

In [12]:
type(rdd)

pyspark.rdd.RDD

- 지금은 RDD를 생성했지만 RDD가 만들어지지는 않은 것.

In [13]:
#RDD 원소 반환
rdd.take(3)

[1, 2, 3]

In [14]:
#외부 파일 로드하여 RDD 만들기
import os

file_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/test.txt'
with open(file_path, 'w') as f:
    for i in range(10):
        f.write(str(i)+'\n')
        
print('OK')

OK


In [15]:
rdd2 = sc.textFile(file_path)
print(rdd2)
print(type(rdd2))

/aiffel/aiffel/bigdata_ecosystem/test.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0
<class 'pyspark.rdd.RDD'>


In [16]:
rdd2.take(3)
# spark가 .textFile()을 통해 얻어온 데이터 타입을 무조건 string으로 처리함.

['0', '1', '2']

## RDD Operation - Transformations

In [17]:
#map
x = sc.parallelize(["b", "a", "c", "d"])
y = x.map(lambda z: (z, 1))
print(x.collect()) #collect()는 actions 
print(y.collect())

['b', 'a', 'c', 'd']
[('b', 1), ('a', 1), ('c', 1), ('d', 1)]


In [18]:
nums = sc.parallelize([1, 2, 3])
squares = nums.map(lambda x: x*x)
print(squares.collect())

[1, 4, 9]


In [19]:
#filter
x = sc.parallelize([1,2,3,4,5])
y = x.filter(lambda x: x%2 == 0) 
print(x.collect())
print(y.collect())

[1, 2, 3, 4, 5]
[2, 4]


In [20]:
text = sc.parallelize(['a', 'b', 'c', 'd'])
capital = text.map(lambda x: x.upper())
A = capital.filter(lambda x: 'A' in x)
print(text.collect())
print(A.collect())

['a', 'b', 'c', 'd']
['A']


In [21]:
#flatmap
x = sc.parallelize([1,2,3])
y = x.flatMap(lambda x: (x, x*10, 30))
print(x.collect())
print(y.collect())

[1, 2, 3]
[1, 10, 30, 2, 20, 30, 3, 30, 30]


In [22]:
wordsDataset = sc.parallelize(["Spark is funny", "It is beautiful", "And also It is implemented by python"])
result = wordsDataset.flatMap(lambda x: x.split()).filter(lambda x: x != " ").map(lambda x: x.lower())
#공백은 제거
#단어를 공백기준으로 split.
result.collect()

['spark',
 'is',
 'funny',
 'it',
 'is',
 'beautiful',
 'and',
 'also',
 'it',
 'is',
 'implemented',
 'by',
 'python']

In [23]:
#응용 - csv 파일 읽기
import os
csv_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/train.csv'
csv_data_0 = sc.textFile(csv_path)
csv_data_0.take(5)

['survived,sex,age,n_siblings_spouses,parch,fare,class,deck,embark_town,alone',
 '0,male,22.0,1,0,7.25,Third,unknown,Southampton,n',
 '1,female,38.0,1,0,71.2833,First,C,Cherbourg,n',
 '1,female,26.0,0,0,7.925,Third,unknown,Southampton,y',
 '1,female,35.0,1,0,53.1,First,C,Southampton,n']

In [24]:
#비어있는 라인은 제외하고, delimeter인 ,로 line을 분리
csv_data_1 = csv_data_0.filter(lambda line: len(line)>1).map(lambda line: line.split(","))   
csv_data_1.take(5)

[['survived',
  'sex',
  'age',
  'n_siblings_spouses',
  'parch',
  'fare',
  'class',
  'deck',
  'embark_town',
  'alone'],
 ['0',
  'male',
  '22.0',
  '1',
  '0',
  '7.25',
  'Third',
  'unknown',
  'Southampton',
  'n'],
 ['1', 'female', '38.0', '1', '0', '71.2833', 'First', 'C', 'Cherbourg', 'n'],
 ['1',
  'female',
  '26.0',
  '0',
  '0',
  '7.925',
  'Third',
  'unknown',
  'Southampton',
  'y'],
 ['1', 'female', '35.0', '1', '0', '53.1', 'First', 'C', 'Southampton', 'n']]

In [25]:
#칼럼 부분만 분리
columns = csv_data_1.take(1)
columns

[['survived',
  'sex',
  'age',
  'n_siblings_spouses',
  'parch',
  'fare',
  'class',
  'deck',
  'embark_town',
  'alone']]

In [26]:
#칼럼 제외한 나머지 데이터만 분리
csv_data_2 = csv_data_1.filter(lambda line: line[0].isdecimal())  # 첫 번째 컬럼이 숫자인 것만 필터링
csv_data_2.take(5)

[['0',
  'male',
  '22.0',
  '1',
  '0',
  '7.25',
  'Third',
  'unknown',
  'Southampton',
  'n'],
 ['1', 'female', '38.0', '1', '0', '71.2833', 'First', 'C', 'Cherbourg', 'n'],
 ['1',
  'female',
  '26.0',
  '0',
  '0',
  '7.925',
  'Third',
  'unknown',
  'Southampton',
  'y'],
 ['1', 'female', '35.0', '1', '0', '53.1', 'First', 'C', 'Southampton', 'n'],
 ['0',
  'male',
  '28.0',
  '0',
  '0',
  '8.4583',
  'Third',
  'unknown',
  'Queenstown',
  'y']]

In [27]:
#칼럼 기준으로 정리
csv_data_3 = csv_data_2.map(lambda line: [(columns[0][i], linedata) for i, linedata in enumerate(line)])
csv_data_3.take(5)

[[('survived', '0'),
  ('sex', 'male'),
  ('age', '22.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '7.25'),
  ('class', 'Third'),
  ('deck', 'unknown'),
  ('embark_town', 'Southampton'),
  ('alone', 'n')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '38.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '71.2833'),
  ('class', 'First'),
  ('deck', 'C'),
  ('embark_town', 'Cherbourg'),
  ('alone', 'n')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '26.0'),
  ('n_siblings_spouses', '0'),
  ('parch', '0'),
  ('fare', '7.925'),
  ('class', 'Third'),
  ('deck', 'unknown'),
  ('embark_town', 'Southampton'),
  ('alone', 'y')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '35.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '53.1'),
  ('class', 'First'),
  ('deck', 'C'),
  ('embark_town', 'Southampton'),
  ('alone', 'n')],
 [('survived', '0'),
  ('sex', 'male'),
  ('age', '28.0'),
  ('n_siblings_spouses', '0'),
  ('parch'

In [28]:
#csv를 DataFrame으로 읽어 들이는 방법
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark import SparkFiles

url = 'https://storage.googleapis.com/tf-datasets/titanic/train.csv'
sc.addFile(url)
sqlContext = SQLContext(sc)

df = sqlContext.read.csv(SparkFiles.get("train.csv"), header=True, inferSchema= True)
df.show(5, truncate = False)

+--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
|survived|sex   |age |n_siblings_spouses|parch|fare   |class|deck   |embark_town|alone|
+--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
|0       |male  |22.0|1                 |0    |7.25   |Third|unknown|Southampton|n    |
|1       |female|38.0|1                 |0    |71.2833|First|C      |Cherbourg  |n    |
|1       |female|26.0|0                 |0    |7.925  |Third|unknown|Southampton|y    |
|1       |female|35.0|1                 |0    |53.1   |First|C      |Southampton|n    |
|0       |male  |28.0|0                 |0    |8.4583 |Third|unknown|Queenstown |y    |
+--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
only showing top 5 rows



In [29]:
#위에서 얻은 데이터에서 40세 이상인 사람들의 데이터만 필터링. 
df2 = df[df['age']>40]
df2.show(5, truncate = False)

+--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
|survived|sex   |age |n_siblings_spouses|parch|fare   |class |deck   |embark_town|alone|
+--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
|0       |male  |66.0|0                 |0    |10.5   |Second|unknown|Southampton|y    |
|0       |male  |42.0|1                 |0    |52.0   |First |unknown|Southampton|n    |
|1       |female|49.0|1                 |0    |76.7292|First |D      |Cherbourg  |n    |
|0       |male  |65.0|0                 |1    |61.9792|First |B      |Cherbourg  |n    |
|0       |male  |45.0|1                 |0    |83.475 |First |C      |Southampton|n    |
+--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
only showing top 5 rows



## RDD Operation - Actions

In [30]:
#collect
nums = sc.parallelize(list(range(10)))
nums.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [31]:
#take
nums.take(3)

[0, 1, 2]

In [32]:
#count
nums.count()

10

In [33]:
#reduce
nums.reduce(lambda x, y: x + y)

45

In [36]:
#saveAsTextFile
#file_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/file.txt'
#nums.saveAsTextFile(file_path)

#!ls -l ~/aiffel/bigdata_ecosystem

- file.txt 내용을 잘 보시면, 놀랍게도 디렉토리 타입으로 생성되어 있음.
- 이 디렉토리 안에 들어가 보면 part-00000 라는 이름의 텍스트 파일이 생성되어 있어서, 실제 우리가 기록하고 싶었던 내용은 이 파일 안에 있음.
- 스파크가 다룰 파일 사이즈는 하드디스크 하나에 다 담지 못할 만큼 큰 경우일 수도 있지만, 비록 작은 RDD지만 우리는 이미 sc.parallelize()를 통해 분산형 데이터로 생성했음을 잊으면 안됨.

In [37]:
# RDD 생성
rdd = sc.parallelize(range(1,100))

# RDD Transformation 
rdd2 = rdd.map(lambda x: 0.5*x - 10).filter(lambda x: x > 0)

# RDD Action 
rdd2.reduce(lambda x, y: x + y)

1580.0

## RDD Operation - MapReduce

### Word Counter 구현하기
- Map 함수를 구현할 때, 입력 스트링의 각 문자 x에 대해 x -> (x, 1) 형태의 tuple로 매핑.
- 반적인 reduce 함수보다는 reduceByKey 함수를 사용.

In [38]:
text = sc.parallelize('hello python')

# map 함수를 적용한 RDD 구하기
text_1 = text.filter(lambda x: x != " ")
text_2 = text_1.map(lambda x:(x, 1))

#reduceByKey 함수를 적용한 Word Counter 출력
word_count = text_2.reduceByKey(lambda accum, n: accum + n)  
word_count.collect()


[('h', 2),
 ('e', 1),
 ('l', 2),
 ('o', 2),
 ('p', 1),
 ('y', 1),
 ('t', 1),
 ('n', 1)]

### Titanic 데이터 분석하기
- map 함수를 이용해 모든 데이터를 (생존 여부, 연령)의 형태로 바꾸면 생존자, 사망자 각각의 연령의 총합을 쉽게 구할 수 있음.

In [39]:
# 이전 스텝에서 CSV 파일을 로딩했던 내역입니다. 
csv_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/train.csv'
csv_data_0 = sc.textFile(csv_path)
csv_data_1 = csv_data_0.filter(lambda line: len(line)>1).map(lambda line: line.split(","))   
columns = csv_data_1.take(1)
csv_data_2 = csv_data_1.filter(lambda line: line[0].isdecimal())
csv_data_3 = csv_data_2.map(lambda line: [(columns[0][i], linedata) for i, linedata in enumerate(line)])

csv_data_3.take(3)

[[('survived', '0'),
  ('sex', 'male'),
  ('age', '22.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '7.25'),
  ('class', 'Third'),
  ('deck', 'unknown'),
  ('embark_town', 'Southampton'),
  ('alone', 'n')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '38.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '71.2833'),
  ('class', 'First'),
  ('deck', 'C'),
  ('embark_town', 'Cherbourg'),
  ('alone', 'n')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '26.0'),
  ('n_siblings_spouses', '0'),
  ('parch', '0'),
  ('fare', '7.925'),
  ('class', 'Third'),
  ('deck', 'unknown'),
  ('embark_town', 'Southampton'),
  ('alone', 'y')]]

In [40]:
#생존자와 사망자의 연령 총합 구하기
csv_data_4 = csv_data_3.map(lambda line:(line[0][1], line[2][1]))   # (생존여부, 연령)
age_sum_data = csv_data_4.reduceByKey(lambda accum, age: float(accum) + float(age))  
age_sum = age_sum_data.collect()

#생존자와 사망자의 사람 수 구하기
csv_data_5 = csv_data_3.map(lambda line:(line[0][1], 1))
survived_data = csv_data_5.reduceByKey(lambda accum, count: int(accum) + int(count)) 
survived_count = survived_data.collect()

age_sum_dict = dict(age_sum)
survived_dict = dict(survived_count)
avg_age_survived = age_sum_dict['1']/survived_dict['1']
print('생존자 평균 연령:' ,avg_age_survived)
avg_age_died = age_sum_dict['0']/survived_dict['0']
print('사망자 평균 연령:' ,avg_age_died)

생존자 평균 연령: 29.110411522633743
사망자 평균 연령: 29.9609375
