In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setAppName('appName3').setMaster('local')
conf.set("spark.executor.memory",'4G')
conf.set("spark.driver.memory",'4G')
conf.set("spark.cores.max",'4')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

# ML Packages of Pyspark
스파크 2.0부터 ML 패키지는 데이터프레임에 대해 작동

In [37]:
import pyspark.sql.types as typ
labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.IntegerType()),
    ('DIABETES_GEST', typ.IntegerType()),
    ('HYP_TENS_PRE', typ.IntegerType()),
    ('HYP_TENS_GEST', typ.IntegerType()),
    ('INFANT_WEIGHT_GRAMS', typ.IntegerType())
]

schema = typ.StructType([typ.StructField(e[0], e[1], False) for e in labels])

In [38]:
births = spark.read.csv('births_transformed.csv.gz', header=True, schema=schema)

In [39]:
# BIRTH_PLACE 컬럼 인코딩 작업
births = births.withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE'].cast(typ.IntegerType()))

In [40]:
import pyspark.ml.feature as ft
encoder = ft.OneHotEncoder(inputCol = 'BIRTH_PLACE_INT', outputCol='BIRTH_PALCE_VEC')

In [41]:
featuresCreator = ft.VectorAssembler(inputCols=[col[0] for col in labels[2:]] + [encoder.getOutputCol()],
                                    outputCol='features')

In [42]:
import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(maxIter=10, regParam=0.01,
                                labelCol='INFANT_ALIVE_AT_REPORT')

In [43]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[encoder, featuresCreator, logistic])

In [44]:
births_train, births_test = births.randomSplit([0.7,0.3], seed=55)

In [45]:
model = pipeline.fit(births_train)

In [46]:
test_model = model.transform(births_test)

In [48]:
test_model.take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=12, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=62, MOTHER_PRE_WEIGHT=145, MOTHER_DELIVERY_WEIGHT=152, MOTHER_WEIGHT_GAIN=7, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, INFANT_WEIGHT_GRAMS=0, BIRTH_PLACE_INT=1, BIRTH_PALCE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 12.0, 1: 99.0, 6: 62.0, 7: 145.0, 8: 152.0, 9: 7.0, 16: 1.0}), rawPrediction=DenseVector([0.9458, -0.9458]), probability=DenseVector([0.7203, 0.2797]), prediction=0.0)]

In [49]:
import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='INFANT_ALIVE_AT_REPORT')

In [68]:
print(evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderPR'}))

0.7434490669849834
0.7201551931757337


In [52]:
pipelinePath = './infant_oneHotEncoder_logistic_pipeline'
pipeline.write().overwrite().save(pipelinePath)

In [56]:
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline.fit(births_train).transform(births_test).take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=12, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=62, MOTHER_PRE_WEIGHT=145, MOTHER_DELIVERY_WEIGHT=152, MOTHER_WEIGHT_GAIN=7, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, INFANT_WEIGHT_GRAMS=0, BIRTH_PLACE_INT=1, BIRTH_PALCE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 12.0, 1: 99.0, 6: 62.0, 7: 145.0, 8: 152.0, 9: 7.0, 16: 1.0}), rawPrediction=DenseVector([0.9458, -0.9458]), probability=DenseVector([0.7203, 0.2797]), prediction=0.0)]

In [57]:
from pyspark.ml import PipelineModel
modelPath = './infant_oneHotEncoder_logistic_pipelineModel'
model.write().overwrite().save(modelPath)

In [58]:
loadedPipelineModel = PipelineModel.load(modelPath)
test_loadedModel = loadedPipelineModel.transform(births_test)
test_loadedModel

DataFrame[INFANT_ALIVE_AT_REPORT: int, BIRTH_PLACE: string, MOTHER_AGE_YEARS: int, FATHER_COMBINED_AGE: int, CIG_BEFORE: int, CIG_1_TRI: int, CIG_2_TRI: int, CIG_3_TRI: int, MOTHER_HEIGHT_IN: int, MOTHER_PRE_WEIGHT: int, MOTHER_DELIVERY_WEIGHT: int, MOTHER_WEIGHT_GAIN: int, DIABETES_PRE: int, DIABETES_GEST: int, HYP_TENS_PRE: int, HYP_TENS_GEST: int, INFANT_WEIGHT_GRAMS: int, BIRTH_PLACE_INT: int, BIRTH_PALCE_VEC: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]

# Parameter Hyper-tuning
Grid Search

In [59]:
import pyspark.ml.tuning as tune

In [60]:
logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT')

In [61]:
grid = tune.ParamGridBuilder().addGrid(logistic.maxIter,[2,10,50]
                              ).addGrid(logistic.regParam, [0.01,0.05,0.3]
                               ).build()

In [62]:
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='INFANT_ALIVE_AT_REPORT')

In [63]:
cv = tune.CrossValidator(estimator=logistic, estimatorParamMaps=grid, evaluator=evaluator)

In [64]:
pipeline = Pipeline(stages=[encoder, featuresCreator])
data_transformer = pipeline.fit(births_train)

In [65]:
cvModel = cv.fit(data_transformer.transform(births_train))

In [66]:
data_train = data_transformer.transform(births_test)
results = cvModel.transform(data_train)

In [67]:
results

DataFrame[INFANT_ALIVE_AT_REPORT: int, BIRTH_PLACE: string, MOTHER_AGE_YEARS: int, FATHER_COMBINED_AGE: int, CIG_BEFORE: int, CIG_1_TRI: int, CIG_2_TRI: int, CIG_3_TRI: int, MOTHER_HEIGHT_IN: int, MOTHER_PRE_WEIGHT: int, MOTHER_DELIVERY_WEIGHT: int, MOTHER_WEIGHT_GAIN: int, DIABETES_PRE: int, DIABETES_GEST: int, HYP_TENS_PRE: int, HYP_TENS_GEST: int, INFANT_WEIGHT_GRAMS: int, BIRTH_PLACE_INT: int, BIRTH_PALCE_VEC: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [69]:
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderPR'}))

0.7441328157027877
0.7212737531230229


In [73]:
results = [([{key.name: paramValue} for key, paramValue in zip(params.keys(), params.values())], 
            metric) for params, metric in zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics)]

In [74]:
sorted(results, key=lambda el: el[1], reverse=True)

[([{'maxIter': 50}, {'regParam': 0.01}], 0.7370975222895131),
 ([{'maxIter': 50}, {'regParam': 0.05}], 0.7319692995491217),
 ([{'maxIter': 10}, {'regParam': 0.01}], 0.7318310325069776),
 ([{'maxIter': 10}, {'regParam': 0.05}], 0.7283675067258283),
 ([{'maxIter': 10}, {'regParam': 0.3}], 0.7215628129975982),
 ([{'maxIter': 50}, {'regParam': 0.3}], 0.7183894610136353),
 ([{'maxIter': 2}, {'regParam': 0.3}], 0.6958649711790963),
 ([{'maxIter': 2}, {'regParam': 0.05}], 0.6951038391305423),
 ([{'maxIter': 2}, {'regParam': 0.01}], 0.6949743233092479)]

In [75]:
selector = ft.ChiSqSelector(numTopFeatures=5,
                           featuresCol=featuresCreator.getOutputCol(),
                           outputCol='selectedFeatures',
                           labelCol='INFANT_ALIVE_AT_REPORT')

In [76]:
logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT',
                                featuresCol='selectedFeatures')

In [77]:
pipeline = Pipeline(stages=[encoder, featuresCreator, selector])

In [78]:
data_transformer = pipeline.fit(births_train)

In [79]:
tvs = tune.TrainValidationSplit(estimator=logistic, estimatorParamMaps=grid, evaluator=evaluator)

In [80]:
tvsModel = tvs.fit(data_transformer.transform(births_train))

In [81]:
data_train = data_transformer.transform(births_test)
results = tvsModel.transform(data_train)

In [82]:
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderPR'}))

0.735357446903953
0.7148733525428738


In [84]:
# 표준화 작업
import numpy as np
x = np.arange(0,100)
x = x / 100.0 * np.pi * 4
y = x * np.sin(x / 1.764) + 20.1234

In [90]:
schema = typ.StructType([
    typ.StructField('continuous_var', typ.DoubleType(), False)
])
data = spark.createDataFrame([[float(e),] for e in y], schema=schema)

In [92]:
data.show()

+------------------+
|    continuous_var|
+------------------+
|           20.1234|
|20.132344452369832|
|20.159087064491775|
|20.203356291885854|
| 20.26470185735763|
|20.342498180090526|
|  20.4359491438498|
|20.544094172020312|
|20.665815568330437|
|20.799847073505322|
|  20.9447835797997|
| 21.09909193743627|
|21.261122779470593|
| 21.42912328456607|
| 21.60125079063745|
|21.775587166351258|
|21.950153842094366|
|22.122927397273514|
|22.291855596719525|
|22.454873765567744|
+------------------+
only showing top 20 rows



In [94]:
vectorizer = ft.VectorAssembler(inputCols=['continuous_var'], outputCol='continuous_vec')

In [95]:
normalizer = ft.StandardScaler(inputCol=vectorizer.getOutputCol(),
                              outputCol='normalized',
                              withMean=True,
                              withStd=True)

In [96]:
pipeline = Pipeline(stages=[vectorizer, normalizer])
data_standardized = pipeline.fit(data).transform(data)

In [97]:
data_standardized.show()

+------------------+--------------------+--------------------+
|    continuous_var|      continuous_vec|          normalized|
+------------------+--------------------+--------------------+
|           20.1234|           [20.1234]|[0.2342913955450239]|
|20.132344452369832|[20.132344452369832]|[0.23630959828688...|
|20.159087064491775|[20.159087064491775]|[0.24234373105178...|
|20.203356291885854|[20.203356291885854]|[0.2523325232564438]|
| 20.26470185735763| [20.26470185735763]|[0.2661743755372571]|
|20.342498180090526|[20.342498180090526]|[0.28372813348174...|
|  20.4359491438498|  [20.4359491438498]|[0.3048141635135427]|
|20.544094172020312|[20.544094172020312]|[0.32921572364798...|
|20.665815568330437|[20.665815568330437]|[0.3566806198337408]|
|20.799847073505322|[20.799847073505322]|[0.38692313665363...|
|  20.9447835797997|  [20.9447835797997]|[0.41962622928625...|
| 21.09909193743627| [21.09909193743627]|[0.45444396184237...|
|21.261122779470593|[21.261122779470593]|[0.49100417549