## spark MLlib 실습
수지, 수입, 학생여부등이 담긴 데이터셋으로부터 MLlib를 활용하여 binary classification을 진행하여 봅시다.


**전처리부터 모델 학습 및 평가, hyperparameter tuning까지의 일련의 과정입니다.**

[참고자료](https://github.com/songhunhwa/MachineLearning_Pyspark)

## 데이터셋 정보
- Description: the dataset including the target variable(default) and features
- Rows: 10000
- Columns(type): Default(bool) / Student(bool) / Balance(double) / Income(double)
- Issue => Binary Classification
- Target var: Default (Skewed)
- Features: Student, Balance, Income

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 47 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 61.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=07d9a85b93cd8e1528c8d4a0561cc526f78661bc1caea8d38802869ac970bd00
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [6]:
# import modules
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import col, stddev_samp
from pyspark import SparkContext, SQLContext
sc= SparkContext()
sqlContext=SQLContext(sc)



> 파일을 불러오겠습니다.

In [8]:
# read datafiles
df = sqlContext.read.format('com.databricks.spark.csv')\
                    .options(header='true', inferSchema='true')\
                    .load('/content/sample_data/Default.csv')\
                    .drop("_c0")\
                    .cache()

In [27]:
df.show(5)

+-------+-------+-----------+-----------+------------------+------------------+
|default|student|    balance|     income|      incomeScaled|     balanceScaled|
+-------+-------+-----------+-----------+------------------+------------------+
|     No|     No|729.5264952|44361.62507|3.3262970676634867|1.5081742710178534|
|     No|    Yes|817.1804066| 12106.1347|0.9077350139857981|1.6893841034192338|
|     No|     No|1073.549164|31767.13895|2.3819447770614217|2.2193837214557224|
|     No|     No|529.2506047|35704.49394|2.6771731943459827|1.0941372934102322|
|     No|     No|785.6558829|38463.49588|2.8840470419162356|1.6242124121054071|
+-------+-------+-----------+-----------+------------------+------------------+
only showing top 5 rows



### 전처리 
- student 칼럼의 경우, one-hot encoding을 통해 데이터 피쳐 변환
- default 는 label 칼럼으로 변경
- 수지와 수입은 Normalization

In [25]:
# transform categorical values to int
strIdx = StringIndexer(inputCol = "studentㅡ", outputCol = "studentIdx")

In [10]:
# one-hot encoding
encode = OneHotEncoder(inputCol = "studentIdx", outputCol = "studentclassVec")

In [11]:
# transform categorical values to int
label_StrIdx = StringIndexer(inputCol = "default", outputCol = "label")

In [26]:
# set stages for pipeline
stages = [strIdx, encode, label_StrIdx]
stages

[StringIndexer_6ca4ea3d071b,
 OneHotEncoder_7f54480ac0a1,
 StringIndexer_97e1dc36172a]

In [13]:
# columns
numCols = ['income', 'balance']
for c in numCols:
    df = df.withColumn(c + "Scaled", col(c)/df.agg(stddev_samp(c)).first()[0])

> 이제 feature를 stage에 append합니다.

In [14]:
# set inputs and append it to the stage
inputs = ["studentclassVec", "incomeScaled", "balanceScaled"]
assembler = VectorAssembler(inputCols = inputs, outputCol = "features")
stages += [assembler]

> Feature를 통해 pipeline 생성합니다.`pipelineModel`

In [28]:
# create pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
dataset = pipelineModel.transform(df)
dataset

DataFrame[default: string, student: string, balance: double, income: double, incomeScaled: double, balanceScaled: double, studentIdx: double, studentclassVec: vector, label: double]

> 이제 Pipeline을 통해 Dataset이 준비되었습니다! 
- train/test 분리
- Logistic regression으로 이진분류 학습

In [16]:
# cross validation and fit models
(train, test) = dataset.randomSplit([0.7, 0.3], seed = 14)
lr = LogisticRegression(labelCol = "label", featuresCol = "features", maxIter=10)

In [17]:
lrModel = lr.fit(train)
predictions = lrModel.transform(test)
predictions.show()

+-------+-------+-------+-----------+------------------+-------------+----------+---------------+-----+--------------------+--------------------+--------------------+----------+
|default|student|balance|     income|      incomeScaled|balanceScaled|studentIdx|studentclassVec|label|            features|       rawPrediction|         probability|prediction|
+-------+-------+-------+-----------+------------------+-------------+----------+---------------+-----+--------------------+--------------------+--------------------+----------+
|     No|     No|    0.0|16601.63528|1.2448139729585133|          0.0|       0.0|  (1,[0],[1.0])|  0.0|[1.0,1.2448139729...|[10.6128038095014...|[0.99997540158156...|       0.0|
|     No|     No|    0.0|16834.80271|1.2622971949428254|          0.0|       0.0|  (1,[0],[1.0])|  0.0|[1.0,1.2622971949...|[10.6130159125976...|[0.99997540679828...|       0.0|
|     No|     No|    0.0|17059.36832| 1.279135440360174|          0.0|       0.0|  (1,[0],[1.0])|  0.0|[1.0,1.

> 맨 오른쪽 칼럼이 prediction에 해당하는 칼럼이군요! 학습이 잘 진행된 것 같습니다.

마지막으로,
- 모델 평가
- GridSearch를 통한 Hyperparameter tuning을 진행해 보겠습니다.

In [18]:
# evaluation
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions) # AUC

0.9549682684102574

In [19]:
# grid search for parametor tuning
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [20]:
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = cv.fit(train)

In [21]:
predictions = cvModel.transform(test)
evaluator.evaluate(predictions)

0.9521045136279103