# PySpark Tutorial

In [1]:
!pip install pyspark==3.0.1 py4j==0.10.9

Collecting pyspark==3.0.1
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 36 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 53.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=e24e4191d08ef2f607e9e1fe11fc49fb43f560f3c2d21c0ffdbf2b269b285c64
  Stored in directory: /root/.cache/pip/wheels/5e/34/fa/b37b5cef503fc5148b478b2495043ba61b079120b7ff379f9b
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


py4j 패키지는 python 프로그램이 자바가상머신의 오브젝트들에 접근할 수 있도록 해준다.

In [2]:
!ls -tl

total 4
drwxr-xr-x 1 root root 4096 Mar  9 14:48 sample_data


In [3]:
!ls -tl sample_data

total 55504
-rw-r--r-- 1 root root 18289443 Mar  9 14:48 mnist_test.csv
-rw-r--r-- 1 root root 36523880 Mar  9 14:48 mnist_train_small.csv
-rw-r--r-- 1 root root   301141 Mar  9 14:48 california_housing_test.csv
-rw-r--r-- 1 root root  1706430 Mar  9 14:48 california_housing_train.csv
-rwxr-xr-x 1 root root     1697 Jan  1  2000 anscombe.json
-rwxr-xr-x 1 root root      930 Jan  1  2000 README.md


**Spark Session**
- SparkSession은 Spark 2.0부터 엔트리 포인트로 사용된다.  
- 그 이전에는 SparkContext가 사용되었다.  
- SparkSession을 이용해 RDD, 데이터프레임 등을 만든다.  
- SparkSession은 SparkSession.builder를 호출하여 생성하며 다양한 세부 설정이 가능 

In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').appName('PySpark_Tutorial').getOrCreate()

In [10]:
spark

## Python 객체를 RDD로 변환

### 1. Python 리스트 생성

In [19]:
name_list_json = ['{"name": "keeyong"}', '{"name": "benjamin"}', '{"name": "claire"}']

In [20]:
for n in name_list_json:
  print(n)

{"name": "keeyong"}
{"name": "benjamin"}
{"name": "claire"}


In [23]:
import json

for n in name_list_json:
  jn = json.loads(n)
  print(jn['name'])

keeyong
benjamin
claire


### 2. Python 리스트를 RDD로 변환
- RDD로 변환되는 순간 Spark Cluster의 서버들에 데이터가 나눠 저장된다.(파티션)
- 또한 **Lazy Execution**이 된다.

In [24]:
rdd = spark.sparkContext.parallelize(name_list_json)

**Lazy Execution**
- 바로 Spark Cluster에 올라가서 RDD가 되는것은 아님
- RDD를 가지고 의미있는 일을 하는 순간 위 코드가 실행이 된다.

In [25]:
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262

In [26]:
rdd.count() # 여기서 실제 parallelize가 실행

3

In [27]:
parsed_rdd = rdd.map(lambda el:json.loads(el))

In [29]:
parsed_rdd

PythonRDD[2] at RDD at PythonRDD.scala:53

In [30]:
parsed_rdd.collect()

[{'name': 'keeyong'}, {'name': 'benjamin'}, {'name': 'claire'}]

In [32]:
parsed_name_rdd = rdd.map(lambda el:json.loads(el)['name'])

In [34]:
parsed_name_rdd.collect()

['keeyong', 'benjamin', 'claire']

### 3. Python 리스트를 데이터프레임으로 변환

In [35]:
from pyspark.sql.types import StringType

df = spark.createDataFrame(name_list_json, StringType()) # 데이터의 타입을 명시해 주어야 한다.

In [36]:
df.count()

3

In [37]:
df.printSchema()

root
 |-- value: string (nullable = true)



필드명을 따로 설정해 주지 않아 필드 이름이 value가 된다.

In [41]:
df.select('*').collect()

[Row(value='{"name": "keeyong"}'),
 Row(value='{"name": "benjamin"}'),
 Row(value='{"name": "claire"}')]

In [42]:
df.select('value').collect()

[Row(value='{"name": "keeyong"}'),
 Row(value='{"name": "benjamin"}'),
 Row(value='{"name": "claire"}')]

In [43]:
from pyspark.sql import Row

row = Row("name")
df_name = parsed_name_rdd.map(row).toDF()

In [45]:
df_name.printSchema()

root
 |-- name: string (nullable = true)



In [44]:
df_name.select('name').collect()

[Row(name='keeyong'), Row(name='benjamin'), Row(name='claire')]