# Spark ML

- RDD 기반의 MLlib이 아닌 DataFrame 기반의 ML 패키지를 설명할 예정
- 사용한 버전: spark 2.2+, python 3.5+

In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import StorageLevel

In [2]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Spark ML") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("INFO")

### Dataset - Kaggle Titanic

In [3]:
df = spark.read.csv("../dataset/train.csv", header=True, inferSchema=True).cache()
df.createOrReplaceTempView("train")
df.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

---
## EDA: Spark SQL + Zeppelin

- EDA는 분산쿼리를 통해 빠르게 수행
- `Zeppelin` 환경을 구축해서 쿼리에 대한 그래프를 바로 확인 가능
- `printSchema()`, `describe()`, `isNull()`, `select()` 함수를 통해 데이터 상태 확인

In [4]:
from pyspark.sql import Row
from pyspark.sql.functions import *

In [5]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [6]:
df.describe(['age']).show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|               714|
|   mean| 29.69911764705882|
| stddev|14.526497332334035|
|    min|              0.42|
|    max|              80.0|
+-------+------------------+



In [7]:
# column들에 대한 null 체크
df.select(*(
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df.columns)).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



In [8]:
query = """
SELECT Embarked, count(PassengerId) as count
FROM train
WHERE Survived = 1
GROUP BY Embarked
"""

spark.sql(query).show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   30|
|    null|    2|
|       C|   93|
|       S|  217|
+--------+-----+



In [9]:
query = """
SELECT PClass, count(PassengerId) as count
FROM train
WHERE Survived = 1
GROUP BY PClass
"""

spark.sql(query).show()

+------+-----+
|PClass|count|
+------+-----+
|     1|  136|
|     3|  119|
|     2|   87|
+------+-----+



---
## Preprocessing: Spark DataFrame Function + UDF

#### Missing Value

- `pyspark.sql.DataFrameNaFunctions`에서 확인
- Spark ML의 `Imputer`로도 처리 가능 (Pipeline과의 연계)
- http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameNaFunctions

---
#### Feature Engineering

- udf를 만들어서 내가 원하는 형태로 전처리 가능
- approxQuantile, correlation, covariance, stratified sampling 등이 필요한 경우
- `pyspark.sql.DataFrameStatFunctions`에서 확인
- http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameStatFunctions

In [10]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType

In [11]:
# column에서 null 값을 drop 시키는 경우
df = df.drop('cabin')
before = df.select('age').where('age is null').count()
print("Before: {}".format(before))

test = df.na.drop(subset=["age"])
after = test.select('age').where('age is null').count()
print("After {}".format(after))

Before: 177
After 0


In [12]:
# column에서 null 값을 mean으로 채우는 경우
avg_age = df.where('age is not null').groupBy().avg('age').collect()[0][0]
df = df.na.fill({'age': avg_age})
df.select('age').show(5)

+----+
| age|
+----+
|22.0|
|38.0|
|26.0|
|35.0|
|35.0|
+----+
only showing top 5 rows



In [13]:
# label을 기준으로 Stratified Sampling 예시
sample_df = df.sampleBy('survived', fractions={0: 0.1, 1: 0.5}, seed=0)
print("Before:")
df.groupBy('survived').count().show()
print("After:")
sample_df.groupBy('survived').count().show()

Before:
+--------+-----+
|survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+

After:
+--------+-----+
|survived|count|
+--------+-----+
|       1|  168|
|       0|   57|
+--------+-----+



In [14]:
# 승객 이름의 길이를 새로운 feature로 추가하는 예시
str_length = udf(lambda x: len(x), IntegerType())
df = df.withColumn('len_name', str_length(df['name']))
df.select('name', 'len_name').show(5)

+--------------------+--------+
|                name|len_name|
+--------------------+--------+
|Braund, Mr. Owen ...|      23|
|Cumings, Mrs. Joh...|      51|
|Heikkinen, Miss. ...|      22|
|Futrelle, Mrs. Ja...|      44|
|Allen, Mr. Willia...|      24|
+--------------------+--------+
only showing top 5 rows



In [15]:
# udf를 사용해서 categorical feature를 전처리하는 예시
# Spark ML의 StringIndexer를 사용해도 결과는 동일

def embarked_to_int(embarked):
    if embarked == 'C': return 1
    elif embarked == 'Q': return 2
    elif embarked == 'S': return 3    
    else: return 0

embarked_to_int = udf(embarked_to_int, IntegerType())
df = df.withColumn('embarked_ix', embarked_to_int(df['embarked']))
df.select('embarked', 'embarked_ix').show(5)

+--------+-----------+
|embarked|embarked_ix|
+--------+-----------+
|       S|          3|
|       C|          1|
|       S|          3|
|       S|          3|
|       S|          3|
+--------+-----------+
only showing top 5 rows



In [16]:
# Spark SQL Function의 when-otherwise 절을 사용하는 방법
# categorical feature를 전처리하는 예시
df.select('sex', 
    when(df['sex'] == 'male', 0).otherwise(1).alias('sex_ix')).show(5)

+------+------+
|   sex|sex_ix|
+------+------+
|  male|     0|
|female|     1|
|female|     1|
|female|     1|
|  male|     0|
+------+------+
only showing top 5 rows



---
## Extracting, transforming and selecting features
http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.feature

#### Extraction

- raw 데이터로부터 feature를 추출하기 위한 패키지
- `TF-IDF`, `Word2Vec`, `CountVectorizer`, `FeatureHasher`

---
#### Transformation

- feature를 변형시키기 위한 패키지 (scaling, coverting)
- `Tokenizer`, `StopWordsRemover`, `n-gram`, `PCA`, `StringIndexer`, `OneHotEncoder`
- `StandardScaler`, `MinMaxScaler` 등

---
#### Selection

- feature selection을 지원하는 패키지 (feature가 정말 많은 경우 유용)
- `VectorSlicer`, `RFormula`, `ChiSqSelector`

In [17]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

In [18]:
# StringIndexer를 사용해서 categorical feature를 전처리하는 예시
df = StringIndexer(inputCol='Sex', outputCol='sex_ix').fit(df).transform(df)
df.select('Sex', 'sex_ix').show(5)

+------+------+
|   Sex|sex_ix|
+------+------+
|  male|   0.0|
|female|   1.0|
|female|   1.0|
|female|   1.0|
|  male|   0.0|
+------+------+
only showing top 5 rows



In [19]:
# VectorAssembler를 사용해서 feature를 vector 형태로 변환
inputCols = ['Pclass', 'Age', 'SibSp', 'Parch', 'Fare', 'embarked_ix', 'sex_ix', 'len_name']
assembler = VectorAssembler(inputCols=inputCols, outputCol='features')
train = assembler.transform(df).select('PassengerId', col('Survived').alias('label'), 'features')
train.show(5)

+-----------+-----+--------------------+
|PassengerId|label|            features|
+-----------+-----+--------------------+
|          1|    0|[3.0,22.0,1.0,0.0...|
|          2|    1|[1.0,38.0,1.0,0.0...|
|          3|    1|[3.0,26.0,0.0,0.0...|
|          4|    1|[1.0,35.0,1.0,0.0...|
|          5|    0|[3.0,35.0,0.0,0.0...|
+-----------+-----+--------------------+
only showing top 5 rows



---
## Model
- http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.classification
- 대부분 **Data parallelism**을 통해 분산학습하는 방식
- Spark 2.3 버전부터 **Model parallelism**을 지원

#### Classification, Regression

- 트리 모델: `DecisionTree`, `RandomForest`, `GBTClassifier`
- SVM 모델: `LinearSVC`, `OneVsRest`
- `MultilayerPerceptronClassifier`: hidden layer가 없는 Softmax 모델
- `LinearRegression`, `SurvivalRegression`, `NaiveBayes`

---
#### Clustering

- 다양한 클러스터링 알고리즘을 지원
- `KMeans`, `LDA`, `GMM`
- 이전에는 computeCost 함수를 통해 SSE로 모델을 평가
- 2.3 버전부터 `ClusteringEvaluator` 사용 가능

---
#### Recommendation

- CF 방식의 `Alternating Least Squares(ALS)` 추천 알고리즘을 지원
- "Large-Scale Parallel Collaborative Filtering for the Netflix Prize" 논문을 참고
- Production에 쉽게 연동할 수 있게 만든 **Apache PredictionIO**도 참고 (MLlib)

In [20]:
from pyspark.ml.classification import RandomForestClassifier

In [21]:
# RandomForestClassifier 예제
# training set을 row 단위로 partitioning
splits = train.randomSplit([0.8, 0.2])
train = splits[0].cache()
test = splits[1].cache()

# cacheNodeIds: 인스턴스 마다 노드의 Id를 캐싱, 트리가 깊어진다면 성능 향상 팁
model = RandomForestClassifier(
    labelCol="label",
    featuresCol="features",
    cacheNodeIds=True)

predict = model.fit(train).transform(test)
predict.show(5)

+-----------+-----+--------------------+--------------------+--------------------+----------+
|PassengerId|label|            features|       rawPrediction|         probability|prediction|
+-----------+-----+--------------------+--------------------+--------------------+----------+
|         27|    0|[3.0,29.699117647...|[16.4071621231905...|[0.82035810615952...|       0.0|
|         34|    0|[2.0,66.0,0.0,0.0...|[17.1275400788451...|[0.85637700394225...|       0.0|
|         44|    1|[2.0,3.0,1.0,2.0,...|[0.35197754315401...|[0.01759887715770...|       1.0|
|         49|    0|[3.0,29.699117647...|[14.2343850263881...|[0.71171925131940...|       0.0|
|         50|    0|[3.0,18.0,1.0,0.0...|[7.40981147634526...|[0.37049057381726...|       1.0|
+-----------+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



---
## Evaluation
http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.evaluation

- 모델을 평가하기 위한 패키지, 사용할 수 있는 metric을 확인할 필요가 있음
- BinaryClassificationEvaluator: `areaUnderROC`만 사용 가능
- MulticlassClassificationEvaluator: `f1`, `weightedPrecision`, `weightedRecall`, `accuracy`
- RegressionEvaluator: `rmse`, `mse`, `mae`
- ClusteringEvaluator: 2.3 버전에 새롭게 추가, metric으로 `silhouette` 사용 가능
- `confusionMatrix()` 등 몇 가지는 아직 Spark MLlib에만 존재함

In [22]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    predictionCol="prediction", 
    labelCol="label", 
    metricName="accuracy")

evaluator.evaluate(predict)

0.8469387755102041

---
## Tuning: model selection and hyperparameter tuning
- http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.tuning
- 지정한 parameter의 조합에 대하여 반복 학습하는 형태
- 원래 `data parallelism` 만 지원했지만, 2.3버전부터 `model parallelism`도 지원하기 시작
- CrossValidator와 TrainValidationSplit에 `parallelism` 파라메터 지정

#### ParamGridBuilder

- 파라메터를 자동으로 튜닝하기 위한 빌더 패키지 (Grid Search)
- 각 모델에 대한 파라메터는 `spark.ml.param` module

---
#### CrossValidator

- K-Fold CrossValidation 그 자체 (위키 참고)
- 지정한 Fold 만큼 반복 학습

---
#### TrainValidationSplit (Experimental)

- 지정한 비율에 따라 훈련/검증 셋을 나누어 학습에 반영
- CrossValidator에 비해 금방 끝나겠지만, 주어진 학습 데이터가 적다면 결과가 부정확할 수 있음

In [23]:
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [24]:
# Modeling
model = RandomForestClassifier(
    labelCol="label",
    featuresCol="features",
    cacheNodeIds=True)

# Parameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(model.numTrees, [500, 700]) \
    .addGrid(model.maxDepth, [5, 7]) \
    .addGrid(model.impurity, ["gini"]) \
    .addGrid(model.maxBins, [31]) \
    .addGrid(model.subsamplingRate, [0.7]) \
    .build()

# Evaluator: accuracy
evaluator = MulticlassClassificationEvaluator(
    predictionCol="prediction", 
    labelCol="label", 
    metricName="accuracy")

# train:validation = 7:3
tvs = TrainValidationSplit(
    estimator=model,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    trainRatio=0.7)

tvsModel = tvs.fit(train)
predict = tvsModel.transform(test)
evaluator.evaluate(predict)

0.8622448979591837

In [25]:
train.unpersist()
test.unpersist()
df.unpersist()

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Embarked: string, len_name: int, embarked_ix: int, sex_ix: double]

In [30]:
spark.stop()