## 3단계: 모델 만들기
2_feature_engineering.ipynb 주피터 노트북에서 만들었던 라벨된 피처 데이터를 S3에서 불러와 훈련 데이터와 테스트 데이터로 나눌 것입니다. 그리고 부품의 결함을 예측하기 위한 모델을 만들 것입니다. 

In [1]:
# 필요한 라이브러리 셋팅하기
import os
import boto3

# 파이프라인과 모델 생성을 위한 라이브러리
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, VectorIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql.functions import col
from pyspark.sql import SparkSession

# 데이터 처리를 위한 라이브러리
import pandas as pd
import numpy as np

spark = SparkSession.builder.getOrCreate()


### 피처 데이터 세트 가져오기
2_feature_engineering.ipynb 주피터 노트북에서 생성했던 피처 데이터를 가져옵니다. 

In [2]:
# S3 스토리지 관련 정보
bucket = 'jdkimexample'
key = 'train/train.csv'
s3 = boto3.client('s3')

labeled_features = s3.get_object(Bucket=bucket,Key=key)
data = pd.read_csv(labeled_features['Body'])
feat_data = spark.createDataFrame(data)

feat_data.limit(10).toPandas().head(10)

Unnamed: 0,machineID,dt_truncated,volt_rollingmean_12,rotate_rollingmean_12,pressure_rollingmean_12,vibration_rollingmean_12,volt_rollingmean_24,rotate_rollingmean_24,pressure_rollingmean_24,vibration_rollingmean_24,...,volt_rollingstd_24,rotate_rollingstd_24,pressure_rollingstd_24,vibration_rollingstd_24,volt_rollingstd_36,rotate_rollingstd_36,pressure_rollingstd_36,vibration_rollingstd_36,failure,label_e
0,26,2016-01-01 12:00:00,165.702173,455.766882,101.902947,39.639879,165.637453,452.475071,99.82777,39.824437,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,26,2016-01-01 00:00:00,166.603907,447.205223,99.023361,39.672101,167.830647,445.184014,99.127203,39.866664,...,1.189061,2.454594,0.55485,0.447165,1.04379,0.56014,0.332557,0.199284,0.0,0.0
2,26,2015-12-31 12:00:00,169.057387,443.162804,99.231046,40.061227,169.796918,435.977714,100.219854,38.872411,...,0.449784,2.017003,0.583172,0.253893,0.442978,1.071922,0.33281,0.116037,0.0,0.0
3,26,2015-12-31 00:00:00,170.53645,428.792623,101.208663,37.683596,170.932927,437.500517,100.64632,38.129784,...,0.823724,2.002825,0.342686,0.253877,0.677239,1.07041,0.793118,0.171761,0.0,0.0
4,26,2015-12-30 12:00:00,171.329404,446.20841,100.083978,38.575972,168.80177,442.079976,100.462764,38.314688,...,1.278771,1.976804,1.209214,0.271177,0.550194,0.831818,0.193804,0.129767,0.0,0.0
5,26,2015-12-30 00:00:00,166.274135,437.951542,100.84155,38.053403,169.554872,440.233805,101.490006,37.385859,...,0.915386,1.611549,0.478531,0.151568,0.613444,1.413596,0.606638,0.094085,0.0,0.0
6,26,2015-12-29 12:00:00,172.835608,442.516069,102.138461,36.718315,172.801718,442.802266,100.235111,37.056091,...,0.724528,2.909094,0.558389,0.163627,0.603856,2.646676,0.660504,0.08786,0.0,0.0
7,26,2015-12-29 00:00:00,172.767828,443.088463,98.331761,37.393867,174.436946,441.149097,99.964685,38.452211,...,1.127955,4.161043,0.214182,0.304616,0.448994,0.865341,0.536995,0.176725,0.0,0.0
8,26,2015-12-28 12:00:00,176.106064,439.209732,101.59761,39.510554,172.794005,456.519796,98.924274,39.245226,...,0.473582,3.814364,0.292487,0.375115,0.381619,3.117538,0.54313,0.094798,0.0,0.0
9,26,2015-12-28 00:00:00,169.481945,473.829861,96.250937,38.979898,172.361398,464.973274,95.965903,38.913203,...,1.107598,3.267389,0.257612,0.206946,0.778525,1.566241,0.318768,0.240451,0.0,0.0


### 훈련/테스트 데이터 준비
머신러닝의 기본 방법은 모델을 보정하고 테스트하는 데 훈련 때 사용했던 데이터를 사용하지 않는 것입니다. 모델을 평가하기 위해 훈련 데이터와 테스트 데이터로 나눠야하고 일반적으로 데이터의 80%는 모델을 훈련하는데 사용하고, 나머지 10%씩은 매개변수를 보정하고 모델을 평가하는데 사용됩니다.

일반적으로 무작위로 분할해서 사용할 수 있지만 시계열 데이터는 관측값 간에 고유한 상관 관계가 있기 때문에 예지 정비 같은 경우에는 시간을 기준으로 분리하는 것이 더 나은 접근 방식인 경우가 많습니다. 

In [3]:
# define list of input columns for downstream modeling

# We'll use the known label, and key variables.
label_var = ['label_e']
key_cols =['machineID','dt_truncated']

# Then get the remaing feature names from the data
input_features = feat_data.columns

# We'll use the known label, key variables and 
# a few extra columns we won't need.
remove_names = label_var + key_cols + ['failure','model_encoded','model' ]

# Remove the extra names if that are in the input_features list
input_features = [x for x in input_features if x not in set(remove_names)]

input_features

['volt_rollingmean_12',
 'rotate_rollingmean_12',
 'pressure_rollingmean_12',
 'vibration_rollingmean_12',
 'volt_rollingmean_24',
 'rotate_rollingmean_24',
 'pressure_rollingmean_24',
 'vibration_rollingmean_24',
 'volt_rollingmean_36',
 'vibration_rollingmean_36',
 'rotate_rollingmean_36',
 'pressure_rollingmean_36',
 'volt_rollingstd_12',
 'rotate_rollingstd_12',
 'pressure_rollingstd_12',
 'vibration_rollingstd_12',
 'volt_rollingstd_24',
 'rotate_rollingstd_24',
 'pressure_rollingstd_24',
 'vibration_rollingstd_24',
 'volt_rollingstd_36',
 'rotate_rollingstd_36',
 'pressure_rollingstd_36',
 'vibration_rollingstd_36']

스파크 모델은 벡터화된 데이터 프레임이 필요합니다. 여기서 데이터 세트를 변환한 다음 데이터를 훈련 및 테스트 세트로 나눕니다. 이 분할 데이터를 사용하여 9개월 분량의 데이터로 모델을 학습하고 남은 3개월로 모델을 평가합니다.

In [4]:
# assemble features
va = VectorAssembler(inputCols=(input_features), outputCol='features')
feat_data = va.transform(feat_data).select('machineID','dt_truncated','label_e','features')

# set maxCategories so features with > 10 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", 
                               outputCol="indexedFeatures", 
                               maxCategories=10).fit(feat_data)

# fit on whole dataset to include all labels in index
labelIndexer = StringIndexer(inputCol="label_e", outputCol="indexedLabel").fit(feat_data)

# split the data into train/test based on date
split_date = "2015-10-30"
training = feat_data.filter(feat_data.dt_truncated < split_date)
testing = feat_data.filter(feat_data.dt_truncated >= split_date)

print(training.count())
print(testing.count())

60334
12808


### 분류 모델
예지 정비에서 특히 문제가 되는 부분은 기계의 결함이 일반적으로 정상 작동에서 드물게 발생한다는 점입니다. 그 결과 레이블 분포에 불균형이 생기게 되고 이러한 불균형은 알고리즘이 다수의 데이터를 가진 클래스를 기준으로 분류하려는 경향이 있어 성능 저하를 발생시킬 수 있습니다. 다수의 데이터를 가진 클래스에 올바른 레이블이 지정될 때 전체 오류가 훨씬 개선되기 때문입니다. 그러나 잘못된 예측의 비용이 매우 높을 때 이는 더 큰 문제를 발생시킬 수 있습니다. 이 문제를 해결하기 위해 소수 데이터를 가진 클래스를 오버 샘플링을 할 수 있습니다. 이런 문제 때문에 정확도 이외의 평가 지표를 살펴보는 것도 중요합니다.

In [6]:
model = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures",
                                      # Maximum depth of the tree. (>= 0) 
                                      # E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'
                                      maxDepth=15,
                                      # Max number of bins for discretizing continuous features. 
                                      # Must be >=2 and >= number of categories for any categorical feature.
                                      maxBins=32, 
                                      # Minimum number of instances each child must have after split. 
                                      # If a split causes the left or right child to have fewer than 
                                      # minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.
                                      minInstancesPerNode=1, 
                                      # Minimum information gain for a split to be considered at a tree node.
                                      minInfoGain=0.0, 
                                      # Criterion used for information gain calculation (case-insensitive). 
                                      # Supported options: entropy, gini')
                                      impurity="gini")

# chain indexers and model in a Pipeline
pipeline_cls_mthd = Pipeline(stages=[labelIndexer, featureIndexer, model])

# train model.  This also runs the indexers.
model_pipeline = pipeline_cls_mthd.fit(training)

이 모델을 평가하기 위해 테스트 데이터 세트에 대한 부품 결함을 예측해봅니다. 테스트 데이터 세트는 모델이 이전에 본 적이 없는 데이터에서 생성이 되었기 때문에 새로운 데이터를 가지고 시뮬레이션을 하는 것과 유사합니다. 

In [7]:
# make predictions. The Pipeline does all the same operations on the test data
predictions = model_pipeline.transform(testing)

# Create the confusion matrix for the multiclass prediction results
# This result assumes a decision boundary of p = 0.5
conf_table = predictions.stat.crosstab('indexedLabel', 'prediction')
confuse = conf_table.toPandas()
confuse.head()

Unnamed: 0,indexedLabel_prediction,0.0,1.0,2.0,3.0,4.0
0,0.0,11477,132,98,94,79
1,1.0,243,62,18,3,3
2,2.0,164,3,45,5,2
3,3.0,175,6,3,26,3
4,4.0,128,10,1,2,26


Confusion matrix는 각각의 실제 부품 결함의 종류는 행에 나열하고 예측된 값은 열에 나타내는 구조입니다. 0.0으로 매겨진 레이블은 부품 결함이 없는 것을 나타내며 1.0에서 4.0까지 번호가 매겨진 레이블은 장비에서 4가지 부품 중 하나에서 결함이 발생했다는 것을 뜻합니다. 예를 들어, 맨 위 행의 세 번째 숫자는 부품에 실제로 결함이 없지만 모델은 부품 2번에서 결함이 발생했을 것이라고 예측한 경우입니다. 

여기서 올바르게 예측한 것은 대각선을 따라 읽을 수 있는 값입니다. 대각선 위의 숫자는 모델에 결함이 발생하지 않았을 때 결함이 발생했다고 예측한 것이고, 아래쪽은 결함이 났는데 결함이 나지 않았다고 예측한 것입니다. 

일반적으로 분류 모델을 평가할 때는 결과를 단일한 지표로 평가하는 것이 편리합니다. 그러나, 문제에 따라 이런 평가를 단일한 지표로 수행하는 것은 불가능합니다. 그래서 아래와 같은 4가지 지표를 통해 계산합니다.

정확도(Accuracy) : 레이블이 지정된 데이터를 얼마나 자주 예측했는지 보고합니다. 클래스에 불균형이 있을 때는 가장 데이터 수가 큰 클래스로 편향될 수 있습니다.

예지 정비 문제에는 클래스 불균형이 존재하기 때문에 아래와 같은 나머지 값도 살펴보는 것이 좋습니다.
정밀도(Precision) : 정밀도는 모델이 true positive를 얼마나 잘 분류하는지 측정합니다. 
재현율(Recall) : 재현율은 모델이 양성 샘플을 얼마나 잘 찾을 수 있는지 측정합니다.
F1 : F1은 정밀도와 재현율을 모두 고려합니다. F1 점수는 정밀도와 재현율의 조화 평균입니다. 

이러한 메트릭은 이진 분류에 가장 적합하지만 다중 클래스 분류에도 유용하게 쓰일 수 있습니다. 

In [8]:
# select (prediction, true label) and compute test error
# select (prediction, true label) and compute test error
# True positives - diagonal failure terms 
tp = confuse['1.0'][1]+confuse['2.0'][2]+confuse['3.0'][3]+confuse['4.0'][4]

# False positves - All failure terms - True positives
fp = np.sum(np.sum(confuse[['1.0', '2.0','3.0','4.0']])) - tp

# True negatives 
tn = confuse['0.0'][0]

# False negatives total of non-failure column - TN
fn = np.sum(np.sum(confuse[['0.0']])) - tn

# Accuracy is diagonal/total 
acc_n = tn + tp
acc_d = np.sum(np.sum(confuse[['0.0','1.0', '2.0','3.0','4.0']]))
acc = acc_n/acc_d

# Calculate precision and recall.
prec = tp/(tp+fp)
rec = tp/(tp+fn)

# Print the evaluation metrics to the notebook
print("Accuracy = %g" % acc)
print("Precision = %g" % prec)
print("Recall = %g" % rec )
print("F1 = %g" % (2.0 * prec * rec/(prec + rec)))
print("")

Accuracy = 0.908495
Precision = 0.256039
Recall = 0.182969
F1 = 0.213423



In [9]:
importances = model_pipeline.stages[2].featureImportances

importances

SparseVector(24, {0: 0.0609, 1: 0.146, 2: 0.0502, 3: 0.0411, 4: 0.0787, 5: 0.0259, 6: 0.0567, 7: 0.0867, 8: 0.0231, 9: 0.019, 10: 0.0423, 11: 0.0169, 12: 0.0251, 13: 0.0338, 14: 0.0312, 15: 0.0378, 16: 0.0254, 17: 0.0322, 18: 0.0265, 19: 0.0264, 20: 0.0289, 21: 0.0276, 22: 0.0296, 23: 0.0279})

In [10]:

# 로컬에 모델 저장하기
model_pipeline.write().overwrite().save('model/pdmrfull.model')


FileNotFoundError: [Errno 2] No such file or directory: 'model/pdmrfull.zip'

In [18]:
# S3에 모델 저장하기
import zipfile
new_zips= zipfile.ZipFile('model/pdmrfull.zip', 'w')

for folder, subfolders, files in os.walk('model/pdmrfull.model'):
    for file in files:
        new_zips.write(os.path.join(folder, file), os.path.relpath(os.path.join(folder,file), 'model'), compress_type = zipfile.ZIP_DEFLATED)
 
new_zips.close()


In [19]:
s3 = boto3.client('s3')
s3.upload_file('model/pdmrfull.zip', bucket, 'model/pdmrfull.zip')


print("Model saved")

Model saved


In [20]:
# 모델 가져오기 
from pyspark.ml.pipeline import PipelineModel
saved_model = PipelineModel.load('model/pdmrfull.model')



In [21]:
bucket = 'jdkimexample'
key = 'model/pdmrfull.zip'
s3 = boto3.resource('s3')
s3.Bucket(bucket).download_file(key, 'pdmrfull.zip')



In [22]:
output_unzip = zipfile.ZipFile('pdmrfull.zip','r')
output_unzip.extractall("")
output_unzip.close()

In [24]:
pipeline = PipelineModel.load('pdmrfull.model')

In [27]:
def run(input_df):
    import json
    response = ''
    try:
        #Get prediction results for the dataframe
        
        # We'll use the known label, key variables and 
        # a few extra columns we won't need.
        key_cols =['label_e','machineID','dt_truncated', 'failure','model_encoded','model' ]

        # Then get the remaing feature names from the data
        input_features = input_df.columns

        # Remove the extra stuff if it's in the input_df
        input_features = [x for x in input_features if x not in set(key_cols)]
        
        # Vectorize as in model building
        va = VectorAssembler(inputCols=(input_features), outputCol='features')
        data = va.transform(input_df).select('machineID','features')
        score = pipeline.transform(data)
        predictions = score.collect()

        #Get each scored result
        preds = [str(x['prediction']) for x in predictions]
        response = ",".join(preds)
    except Exception as e:
        print("Error: {0}",str(e))
        return (str(e))
    
    # Return results
    print(json.dumps(response))
    return json.dumps(response)

In [31]:
key_cols =['label_e','dt_truncated', 'failure','model_encoded','model' ]

# Then get the remaining feature names from the data
input_features = feat_data.columns
# Remove the extra stuff if it's in the input_df
input_features = [x for x in input_features if x not in set(key_cols)]

smple = feat_data.sample(False, .8).limit(1).select(input_features)

smple.toPandas().head()

Unnamed: 0,machineID,features
0,26,"(165.70217320642197, 455.76688230562223, 101.9..."


In [32]:
run(smple)

Error: {0} 'Output column features already exists.'


"'Output column features already exists.'"