PySpark을 로컬머신에 설치하고 노트북을 사용하기 보다는 머신러닝 관련 다양한 라이브러리가 이미 설치되었고 좋은 하드웨어를 제공해주는 Google Colab을 통해 실습을 진행한다.

이를 위해 pyspark과 Py4J 패키지를 설치한다. Py4J 패키지는 파이썬 프로그램이 자바가상머신상의 오브젝트들을 접근할 수 있게 해준다. Local Standalone Spark을 사용한다.

In [1]:
!pip install pyspark==3.0.1 py4j==0.10.9 

Collecting pyspark==3.0.1
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 38 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 76.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=2392d90e20895ac9d78a594990f67f922714e5c922fbdc1cbcbc00f7a4d42ef9
  Stored in directory: /root/.cache/pip/wheels/5e/34/fa/b37b5cef503fc5148b478b2495043ba61b079120b7ff379f9b
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [2]:
!ls -tl

total 4
drwxr-xr-x 1 root root 4096 Mar 23 14:22 sample_data


In [4]:
!ls -tl sample_data

total 55504
-rw-r--r-- 1 root root   301141 Mar 23 14:22 california_housing_test.csv
-rw-r--r-- 1 root root  1706430 Mar 23 14:22 california_housing_train.csv
-rw-r--r-- 1 root root 18289443 Mar 23 14:22 mnist_test.csv
-rw-r--r-- 1 root root 36523880 Mar 23 14:22 mnist_train_small.csv
-rwxr-xr-x 1 root root     1697 Jan  1  2000 anscombe.json
-rwxr-xr-x 1 root root      930 Jan  1  2000 README.md


**Spark Session:** SparkSession은 Spark 2.0부터 엔트리 포인트로 사용된다. 그 이전에는 SparkContext가 사용되었다. SparkSession을 이용해 RDD, 데이터 프레임등을 만든다. SparkSession은 SparkSession.builder를 호출하여 생성하며 다양한 함수들을 통해 세부 설정이 가능하다

In [8]:
from pyspark.sql import SparkSession

# master( ): 인자로 사용하고 싶은 Spark cluster Host name을 지정. "local[*]" : Local Standalone spark cluster를 쓸 것이며, [*] : cpu를 전부 쓰겠다는 것.
spark = SparkSession.builder \
    .master("local[*]")   \
    .appName('PySpark_Tutorial') \
    .getOrCreate()

In [9]:
spark

**Python 객체를 RDD로 변환해보기**

**1> Python 리스트 생성**

In [10]:
name_list_json = [ '{"name": "keeyong"}', '{"name": "benjamin"}', '{"name": "claire"}' ]

In [11]:
for n in name_list_json:
  print(n)

{"name": "keeyong"}
{"name": "benjamin"}
{"name": "claire"}


In [12]:
import json

for n in name_list_json:
  jn = json.loads(n)
  print(jn["name"])

keeyong
benjamin
claire


**2> 파이썬 리스트를 RDD로 변환. RDD로 변환되는 순간 Spark 클러스터의 서버들에 데이터가 나눠 저장됨 (파티션). 또한 Lazy Execution이 된다는 점 기억**

In [13]:
# Lazy Execution이기 때문에, parallelize를 한 순간 클러스터 상에 올라가 RDD가 되는 것은 아니고, 이후 rdd가 호출되어 동작이 실행될 때 진행된다
rdd = spark.sparkContext.parallelize(name_list_json)

In [14]:
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262

In [15]:
rdd.count() # parallelize 실행 시점

3

In [17]:
parsed_rdd = rdd.map(lambda el:json.loads(el)) # functional programming 적용 가능. JSON > Python Dictionary로 mapping한 형태가 된다

In [18]:
parsed_rdd

PythonRDD[2] at RDD at PythonRDD.scala:53

In [19]:
parsed_rdd.collect()

[{'name': 'keeyong'}, {'name': 'benjamin'}, {'name': 'claire'}]

In [20]:
parsed_name_rdd = rdd.map(lambda el:json.loads(el)["name"]) 

In [21]:
parsed_name_rdd.collect()

['keeyong', 'benjamin', 'claire']

**파이썬 리스트를 데이터프레임으로 변환하기**

In [22]:
from pyspark.sql.types import StringType

df = spark.createDataFrame(name_list_json, StringType())

In [23]:
df.count()

3

In [24]:
df.printSchema()

root
 |-- value: string (nullable = true)



In [25]:
df.select('*').collect()

[Row(value='{"name": "keeyong"}'),
 Row(value='{"name": "benjamin"}'),
 Row(value='{"name": "claire"}')]

In [26]:
df.select('value').collect()

[Row(value='{"name": "keeyong"}'),
 Row(value='{"name": "benjamin"}'),
 Row(value='{"name": "claire"}')]

In [28]:
from pyspark.sql import Row

row = Row("name") # Or some other column name
df_name = parsed_name_rdd.map(row).toDF() # RDD to DataFrame. Row()를 통해 Column에 name을 가진 객체를 선택하고 이를 인자로 사용(RDD는 field name이 없어 이와 같이 선택하기 어려우므로. 그래서 앞서 RDD에서는 lambda로 선택함)

In [29]:
df_name.printSchema()

root
 |-- name: string (nullable = true)



In [30]:
df_name.select('name').collect()

[Row(name='keeyong'), Row(name='benjamin'), Row(name='claire')]