# Introducing ML package of PySpark

## Predict chances of infant survival with ML

### Load the data

First, we load the data.

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()

In [3]:
import pyspark.sql.types as typ

labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.IntegerType()),
    ('DIABETES_GEST', typ.IntegerType()),
    ('HYP_TENS_PRE', typ.IntegerType()),
    ('HYP_TENS_GEST', typ.IntegerType()),
    ('PREV_BIRTH_PRETERM', typ.IntegerType())
]

schema = typ.StructType([
    typ.StructField(e[0], e[1], False) for e in labels
])

births = spark.read.csv('births_transformed.csv.gz', 
                        header=True, 
                        schema=schema)

### Create transformers

데이터셋으로 모델을 생성하기 전에 데이터를 다소 변형해야 한다. 확률 모델은 숫자 데이터만을 받아들이기 때문에 BIRTH_PLACE 변수를 인코딩해야 한다.
BIRTH_PLACE 칼럼을 인코딩하기 위해 oneHotEncoder 함수를 사용할 것이다. 그러나 이 함수는 StringType 칼럼을 허용하지 않는다. 이 함수는 숫자 타입만을 다루기 때문에 우선 칼럼을 IntegerType으로 캐스팅한다. 

In [4]:
import pyspark.ml.feature as ft

births = births \
    .withColumn(       'BIRTH_PLACE_INT', 
                births['BIRTH_PLACE'] \
                    .cast(typ.IntegerType()))

Having done this, we can now create our first `Transformer`.

In [5]:
encoder = ft.OneHotEncoder(
    inputCol='BIRTH_PLACE_INT', 
    outputCol='BIRTH_PLACE_VEC')

모든 피처가 수집된 하나의 칼럼을 만들어보자. VectorAssembler함수를 사용한다. 

In [6]:
featuresCreator = ft.VectorAssembler(
    inputCols=[
        col[0] 
        for col 
        in labels[2:]] + \
    [encoder.getOutputCol()], 
    outputCol='features'
)

VectorAssembler 객체에 전달된 inputCols 파라미터는 outputCol을 형성하기 위해 합쳐질 모든 칼럼을 포함하는 리스트다. **inputCols파라미터의 값을 변경하고자 할 때는 inputCols 파라미터의 값을 직접 바꿀 것이 아니라 인코더 객체의 output 칼럼명을 바꿔야 한다.** getOutputCol() 함수를 통해 인코더 객체의 출력을 얻는 것을 생각하자.

### Create an estimator

Logistic Regression model을 사용한다.

In [7]:
import pyspark.ml.classification as cl

Once loaded, let's create the model.

In [8]:
logistic = cl.LogisticRegression(
    maxIter=10, 
    regParam=0.01, 
    labelCol='INFANT_ALIVE_AT_REPORT')

타킷 칼럼이 'label'을 갖고 있다면 labelCol 파라미터를 명시하지 않아도 된다.   
featuresCreator의 출력이 'features'라고 명시돼 있지 않으면 featuresCol 파라미터를 featuresCreator 객체의 getOutputCol() 함수를 사용해 명시해야 한다. 

### Create a pipeline

All that is left now is to create a `Pipeline` and fit the model. First, let's load the `Pipeline` from the package.

In [10]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
        encoder, 
        featuresCreator, 
        logistic
    ])

encoder -> featuresCreator -> logistic

### Fit the model

모델을 학습하기 전에 데이터셋을 학습 데이터셋과 테스트 데이터셋으로 나눠야 한다.
Conventiently, `DataFrame` API has the `.randomSplit(...)` method.

In [13]:
births_train, births_test = births \
    .randomSplit([0.7, 0.3], seed=666)

첫 번째 파라미터는 데이터셋을 나눌 비율을 나타내는 리스트다.   
0.7은 births_train, 0.3은 births_test의 비율을 의미한다. seed 파라미터는 랜덤 숫자를 생성하기 위한 랜덤 시드다.

Now run our `pipeline` and estimate our model.

In [14]:
model = pipeline.fit(births_train)
test_model = model.transform(births_test)

Here's what the `test_model` looks like.

In [15]:
test_model.take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=14, FATHER_COMBINED_AGE=16, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=63, MOTHER_PRE_WEIGHT=180, MOTHER_DELIVERY_WEIGHT=206, MOTHER_WEIGHT_GAIN=26, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 14.0, 1: 16.0, 6: 63.0, 7: 180.0, 8: 206.0, 9: 26.0, 16: 1.0}), rawPrediction=DenseVector([-0.3229, 0.3229]), probability=DenseVector([0.42, 0.58]), prediction=1.0)]

모든 칼럼을 Transformer와 Estimator로부터 얻을 수 있다. 로지스틱 회귀 모델은 칼럼 몇 개를 출력한다.   
rawPrediction은 피처와 베타 계수의 선형 결합 값이고, probability가 최종 예측값이다. 

### Model performance

Obviously, we would like to now test how well our model did.   
분류 모델과 회귀 모델에 대한 여러 가지 평가 함수들을 ML 패키지의 .evaluation 섹션에 갖고 있다.

In [17]:
import pyspark.ml.evaluation as ev

evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='INFANT_ALIVE_AT_REPORT')

print(evaluator.evaluate(test_model, 
     {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))

0.7343101403374708
0.7169195458786022


ROC 커브 밑의 넓이가 73%이고 PR밑의 넓이가 71%이므로 꽤 괜찮다고 할 수 있지만 그렇게 좋은 모델은 아니다.   
다른 피처를 가지고도 이정도의 성능을 낼 수 있지만, 여기서의 목적은 좋은 모델을 생성하는 법을 배우는 것이 아니므로 생략한다.   

로지스틱 회귀 모형에서 모형의 성능과 예측력을 비교하는데 흔히 사용되는 것이 receiver operating characteristic curve 혹은 ROC curve 입니다.   
이 커브 아래의 면적인 AUC (Area under the curve)이 1에 가까울수록 로지스틱 회귀 모형이 정확히 분류를 한 것으로 해석할 수 있습니다.   


### Saving the model

PySpark allows you to save the `Pipeline` definition for later use.   
파이프라인을 다음과 같이 저장한다.

In [18]:
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)

So, you can load it up later and use straight away to `.fit(...)` and predict.    
다음에 이용할 때는 pipeline모델의 load()함수를 사용하여 평가된 모델을 로드한다.

In [19]:
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline \
    .fit(births_train)\
    .transform(births_test)\
    .take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=14, FATHER_COMBINED_AGE=16, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=63, MOTHER_PRE_WEIGHT=180, MOTHER_DELIVERY_WEIGHT=206, MOTHER_WEIGHT_GAIN=26, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 14.0, 1: 16.0, 6: 63.0, 7: 180.0, 8: 206.0, 9: 26.0, 16: 1.0}), rawPrediction=DenseVector([-0.3229, 0.3229]), probability=DenseVector([0.42, 0.58]), prediction=1.0)]

다음과 같이 학습된 모델을 저장할수도 있다. 그럴 경우, Pipeline객체를 저장하지 말고 PipelineModel객체를 저장하면 된다.

In [20]:
from pyspark.ml import PipelineModel

modelPath = './infant_oneHotEncoder_Logistic_PipelineModel'
model.write().overwrite().save(modelPath)

loadedPipelineModel = PipelineModel.load(modelPath)
test_loadedModel = loadedPipelineModel.transform(births_test)

## Parameter hyper-tuning

맨 처음 만든 모델이 최고의 모델인 경우는 없다.   
파라미터 하이퍼튜닝은 모델에 대한 최고의 파라미터를 찾는 과정이다.   
예를 들어, 로지스틱 회귀 모델을 제대로 측정하기 위해 필요한 최대 반복 횟수나 결정 트리의 최대 깊이가 그것이다.   
모델의 최고 파라미터를 찾는 과정인 그리드 탐색과 학습-테스트셋 나누기를 다룬다.   

### Grid search

그리드 탐색은 이미 정해진 파라미터 리스트를 모두 테스트해 최고의 모델을 찾는 알고리즘이다.   
주의할 점은 최적화하고 싶은 파라미터를 너무 많이 설정하거나 각 파라미터에 대해 너무 많은 값들을 설정하면, 최선의 모델을 찾는 데에 드는 시간이 매우 급격하게 증가한다.   
특별히 주의하지 않으면 손댈 수 없을만큼 급격하게 시간이 증가해버린다.

이제 파라미터에 대해 tuning을 해보자.

Load the `.tuning` part of the package.

In [21]:
import pyspark.ml.tuning as tune

다음에는 모델과 테스트할 파라미터 값의 리스트를 명시한다.

In [22]:
logistic = cl.LogisticRegression(
    labelCol='INFANT_ALIVE_AT_REPORT')

grid = tune.ParamGridBuilder() \
    .addGrid(logistic.maxIter,  
             [2, 10, 50]) \
    .addGrid(logistic.regParam, 
             [0.01, 0.05, 0.3]) \
    .build()

우선 최적화하고 싶은 모델을 명시한다.   
다음으로 어떤 파라미터를 최적화할지 정하고, 최적화 테스트를 진행할 값들을 정한다.   
tuning 서브 패키지로부터 ParamGridBuilder 객체를 사용할 것이며, addGrid() 함수를 사용해 그리드에 파라미터 값을 지속적으로 추가해줄 것이다.   
첫 번째 파라미터는 최적화하고자 하는 모델의 파라미터 객체다.   
두 번째 파라미터는 테스트할 파라미터의 값 리스트다.   
ParamGridBuilder에 있는 build() 함수를 호출해 그리드를 빌드한다.

이제 모델을 비교할 방법을 알아보자.

In [23]:
evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='INFANT_ALIVE_AT_REPORT')

검증 작업을 하는 로직을 다음과 같이 작성한다. 

In [24]:
cv = tune.CrossValidator(
    estimator=logistic, 
    estimatorParamMaps=grid, 
    evaluator=evaluator
)

모델은 그리드의 값에 대해 루프를 돌면서 evaluator을 이용해 모델의 성능을 평가한다.

birth_train과 birth_test 데이터셋에 인코딩되지 않은 BIRTHS_PLACE가 있기 때문에 이 데이터셋을 바로 사용할 수는 없다.   
따라서 다음과 같이 Transformation pipeline을 만든다.

In [25]:
pipeline = Pipeline(stages=[encoder,featuresCreator])
data_transformer = pipeline.fit(births_train)

Having done this, we are ready to find the optimal combination of parameters for our model.

In [26]:
cvModel = cv.fit(data_transformer.transform(births_train))

cvModel은 **학습된 모델을 리턴받는다.** 이제 이 모델이 이전의 모델과 비교해 더 잘 동작하는지 확인이 가능하다.

In [27]:
data_train = data_transformer \
    .transform(births_test)
results = cvModel.transform(data_train)

print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderPR'}))

0.7353349101843498
0.7193573299878625


어떤 파라미터가 가장 좋은 모델을 생성하는지는 다음과 같이 확인할 수 있다.

In [28]:
results = [
    (
        [
            {key.name: paramValue} 
            for key, paramValue 
            in zip(
                params.keys(), 
                params.values())
        ], metric
    ) 
    for params, metric 
    in zip(
        cvModel.getEstimatorParamMaps(), 
        cvModel.avgMetrics
    )
]

sorted(results, 
       key=lambda el: el[1], 
       reverse=True)[0]

([{'maxIter': 50}, {'regParam': 0.01}], 0.741015932023901)

### Train-Validation splitting (학습/검증 데이터셋 쪼개기)

최선의 모델을 선택하기 위해 TrainValidationSplit 모델을 이용해 입력 데이터셋을 '학습 데이터셋'과 '검증 서브 데이터셋' 두개로 나눈다.

가장 좋은 다섯 개의 피처를 선택하기 위해 ChiSqSelector을 사용할 것이다. 이로 인해 모델의 복잡도를 제한할 수 있다. 

In [29]:
selector = ft.ChiSqSelector(
    numTopFeatures=5, 
    featuresCol=featuresCreator.getOutputCol(), 
    outputCol='selectedFeatures',
    labelCol='INFANT_ALIVE_AT_REPORT'
)

numTopFeatures는 리턴할 피처의 개수를 명시한다. featuresCreator의 getOutputCol()을 호출할 수 있도록 featuresCreator 이후에 selector을 정의한다. 

다음은 전에 다뤄봤던 LogisticRegression Estimator와 pipeline을 생성하는 코드이다.

In [30]:
logistic = cl.LogisticRegression(
    labelCol='INFANT_ALIVE_AT_REPORT',
    featuresCol='selectedFeatures'
)

pipeline = Pipeline(stages=[encoder,featuresCreator,selector])
data_transformer = pipeline.fit(births_train)

`TrainValidationSplit` 객체는 `CrossValidator` 모델과 같은 방법으로 생성된다.

In [31]:
tvs = tune.TrainValidationSplit(
    estimator=logistic, 
    estimatorParamMaps=grid, 
    evaluator=evaluator
)

이전과 같이 데이터셋을 이용해 모델을 학습시키고 결과를 계산한다. 

In [32]:
tvsModel = tvs.fit(
    data_transformer \
        .transform(births_train)
)

data_train = data_transformer \
    .transform(births_test)
results = tvsModel.transform(data_train)

print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderPR'}))

0.6067232391648452
0.5823462144916927


더 적은 피처를 사용한 모델이 더 많은 피처를 사용한 모델보다 더 안좋게 동작한다. 궁극적으로 보면, 성능과 시간 비용 간의 Tradeoff다. 