In [1]:
import org.apache.spark.ml.{PipelineModel , Pipeline}
import org.apache.spark.ml.classification.{DecisionTreeClassifier,
                                          RandomForestClassifier, RandomForestClassificationModel}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{VectorAssembler, VectorIndexer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import scala.util.Random

Intitializing Scala interpreter ...

Spark Web UI available at http://localhost:4040
SparkContext available as 'sc' (version = 2.4.6, master = local[*], app id = local-1633189663748)
SparkSession available as 'spark'


import org.apache.spark.ml.{PipelineModel, Pipeline}
import org.apache.spark.ml.classification.{DecisionTreeClassifier, RandomForestClassifier, RandomForestClassificationModel}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{VectorAssembler, VectorIndexer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import scala.util.Random


In [2]:
val dataWithoutHeader = spark.read.
    option("inferSchema",true).
    option("header",false).
    csv("Data/covtype/covtype.data")

dataWithoutHeader: org.apache.spark.sql.DataFrame = [_c0: int, _c1: int ... 53 more fields]


In [None]:
dataWithoutHeader.first
//디시전 트리를 사용하려면 헤더 이름 즉 피쳐 이름이 있어야 한다.
//그렇기 떄문에 아래쪽에서 피쳐 이름을 넣어주도록 하자

In [3]:
dataWithoutHeader.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: integer (nullable = true)
 |-- _c9: integer (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: integer (nullable = true)
 |-- _c14: integer (nullable = true)
 |-- _c15: integer (nullable = true)
 |-- _c16: integer (nullable = true)
 |-- _c17: integer (nullable = true)
 |-- _c18: integer (nullable = true)
 |-- _c19: integer (nullable = true)
 |-- _c20: integer (nullable = true)
 |-- _c21: integer (nullable = true)
 |-- _c22: integer (nullable = true)
 |-- _c23: integer (nullable = true)
 |-- _c24: integer (nullable = true)
 |-- _c25: integer (nullable = true)
 |-- _c26: integer (nullable = true)
 |-- _

In [None]:
val colNames = Seq(
    "Elevation","Aspect","Slope",
    "Horizontal_Distance_To_Hydrology","Vertical_Distance_To_Hydrology",
    "Horizontal_Distance_To_Roadways",
    "Hillshade_9am","Hillshade_Noon","Hillshade_3pm",
    "Horizontal_Distance_To_Fire_Points") ++
    ((0 until 4).map(i => s"Wilderness_Area_$i")) ++
    ((0 until 40).map(i => s"Soil_Type_$i")) ++ Seq("Cover_Type")

val data = dataWithoutHeader.toDF(colNames:_*).withColumn("Cover_Type",$"Cover_Type".cast("double"))

data.printSchema()

//칼럼 이름을 넣어주기 윟서 이전에 본 피쳐에 대한 정보가 있는 인포메이션 파일을 보고 
//그에 맞추어서 다 이름을 넣어주면 된다. 위 코드에서는 칼럼의 시퀀스로 만들어서 안에 넣어주었다.
//스키마를 학인하면 헤더가 잘 들어가 있는 것을 볼 수 있다.

In [None]:
val Array(trainData, testData) = data.randomSplit(Array(0.9 , 0.1))
//tr데이터와 test 데이터를 0.9와 0.1로 나누었다.
trainData.cache()
testData.cache()
//계속 볼거니까 캐시에 올렸다.

In [9]:
val inputCols = trainData.columns.filter(_ != "Cover_Type")
val assembler = new VectorAssembler().
    setInputCols(inputCols).
    setOutputCol("featureVector")

//스파크에서 ml로 디시전 트리를 돌려보기 위해서는 모든 인풋 피쳐 칼럼이 하나의 칼럼으로
//구성되어있다. 이걸 위해 벡터어셈블러를 이용할 것이다.
//tr데이터 칼럼 중에 프리딕션 해야하는 아웃풋 타입만 제외하고 나머지는 다 모았다.
//그 모든 것을 가지고 벡터 어셈블러를 만들 것이다. 그리고 그걸 피쳐벡터라는 이름으로 만들겠다는 코드이다.

inputCols: Array[String] = Array(Elevation, Aspect, Slope, Horizontal_Distance_To_Hydrology, Vertical_Distance_To_Hydrology, Horizontal_Distance_To_Roadways, Hillshade_9am, Hillshade_Noon, Hillshade_3pm, Horizontal_Distance_To_Fire_Points, Wilderness_Area_0, Wilderness_Area_1, Wilderness_Area_2, Wilderness_Area_3, Soil_Type_0, Soil_Type_1, Soil_Type_2, Soil_Type_3, Soil_Type_4, Soil_Type_5, Soil_Type_6, Soil_Type_7, Soil_Type_8, Soil_Type_9, Soil_Type_10, Soil_Type_11, Soil_Type_12, Soil_Type_13, Soil_Type_14, Soil_Type_15, Soil_Type_16, Soil_Type_17, Soil_Type_18, Soil_Type_19, Soil_Type_20, Soil_Type_21, Soil_Type_22, Soil_Type_23, Soil_Type_24, Soil_Type_25, Soil_Type_26, Soil_Type_27, Soil_Type_28, Soil_Type_29, Soil_Type_30, Soil_Type_31, Soil_Type_32, Soil_Type_33, Soil_Type_34, S...

In [10]:
val assembledTrainData = assembler.transform(trainData)
assembledTrainData.select("featureVector").show(truncate = false)

//tr데이터에 대해서 실제 트랜스폼을 돌려서 어셈블된 tr데이터를 만들 것이다. 여기서 피쳐벡터를 보여줘 하면
//하나로 만들어진 피쳐벡터를 볼 수 있다.

+-----------------------------------------------------------------------------------------------------+
|featureVector                                                                                        |
+-----------------------------------------------------------------------------------------------------+
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1863.0,37.0,17.0,120.0,18.0,90.0,217.0,202.0,115.0,769.0,1.0,1.0])  |
|(54,[0,1,2,5,6,7,8,9,13,18],[1874.0,18.0,14.0,90.0,208.0,209.0,135.0,793.0,1.0,1.0])                 |
|(54,[0,1,2,3,4,5,6,7,8,9,13,18],[1879.0,28.0,19.0,30.0,12.0,95.0,209.0,196.0,117.0,778.0,1.0,1.0])   |
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1888.0,33.0,22.0,150.0,46.0,108.0,209.0,185.0,103.0,735.0,1.0,1.0]) |
|(54,[0,1,2,3,4,5,6,7,8,9,13,14],[1889.0,28.0,22.0,150.0,23.0,120.0,205.0,185.0,108.0,759.0,1.0,1.0]) |
|(54,[0,1,2,3,4,5,6,7,8,9,13,18],[1889.0,353.0,30.0,95.0,39.0,67.0,153.0,172.0,146.0,600.0,1.0,1.0])  |
|(54,[0,1,2,3,4,5,6,7,8,9,13,18],[1896.0,337.0,12.0,30.0,6.0,175

assembledTrainData: org.apache.spark.sql.DataFrame = [Elevation: int, Aspect: int ... 54 more fields]


In [None]:
val classifier = new DecisionTreeClassifier().
    setSeed(Random.nextLong()).
    setLabelCol("Cover_Type").
    setFeatureCol("featureVector").
    setPredictionCol("prediction")

//랜덤 시드를 세틸한 후 예측할 레이블 칼럼이 뭐냐 해서 안에 넣어주고 이후 그럼 사용할 
//피쳐 벡터는 뭐냐 해서 피쳐벡터 넣어주고 그럼 예측한 내용을 어디다 할거냐 해서 
//이름을 프리딕션으로 지은 칼럼에 넣어주기로 한 코드이다.

In [11]:
val model = classifier.fit(assembledTrainData)
println(model.toDebugString)

//코어개수가 많이 없으면 커널이 죽을 수도 있다. 투디버그스트링을 하면 학습 이후의 만들어진 디시전 트리를 눈으로
//확인 가능하다.

<console>: 50: error: not found: value classifier

In [None]:
model.featureImportances.toArray.zip(inputCols).
    sorted.reverse.foreach(println)

//어떤 피쳐가 중요한지 확인할 수 있다. 즉 엔트로피가 낮은 피쳐들이 들어갈 것 이다.
//설명가능한 머신러닝 모델이라는 것이다.

In [None]:
val predictions = model.transform(assembledTrainData)

predictions.select("Cover_Type","prediction","probability").
    show(truncate = false)

//커버타입 , 프리딕션 , 그게 얼마나 맞는지에 대한 확률을 볼 수 있다.
//결과가 맘에 들지 않을 수도 있다. 우리가 하이퍼파라미터 튜닝을 하지 않았기 때문이다.

In [None]:
val evaluator = new MulticlassClassificationEvaluator().
    setLabelCol("Cover_Type").
    setPredictionCol("prediction")

val accuracy = evaluator.setMetricName("accuracy").evaluate(predictions)

//모델을 이벨류에이션 하기위해 이벨류에이터를 만든다.
//이후 커버타입과 프리딕션을 비교하고 애큐러시를 만들고 정확도를 기반으로 이벨류에이션
//하겠다는 것이다.

In [None]:
val predictionRDD = predictions.
 select("prediction","Cover_Type").
 as[(Double,Double)].rdd

val multiclassMetrics = new MulticlassMetrics(predictionRDD)
println(multiclassMetrics.confusionMatrix)

//모든 타겟 밸류에 대해 로우와 칼럼으로 되어있는 7 바이 7 메트릭스를 보자.
//각각의 로우는 actual correct value / 각각의 칼럼은 predicted value 이다.
//즉 정확한 판단 결과라면 대각선에만 값이 있어야 한다.
//클래스 1번을 2번으로 판별하거나 이런 잘못 오인한 클래스파이어가 없어야 좋은 것이다.
//즉 대각선에 값들이 모여있다면 좋은 프리딕션 일 것이고 아니면 나쁜 프리딕션 일 것이다.
//RDD를 통해서 바꾸어 주고 , 그걸 가지고 다시 컨퓨전메트릭스를 만든다.

In [None]:
//디시전 트리의 하이퍼파라미터의 튜닝을 한다.

//뎁스라는 것인데 디시전 트리의 뎁스를 리밋을 걸어주는 것이다.
//우리는 최대한 심플한 트리를 만들고 싶기 때문에 걸어줄 수 있따.

//빈스는 디시전 룰의 개수를 의미한다. 디시전 룰의 개수가 최대한 작은 것이 좋아서
//맥시멈 값을 정할 수 있다.

//Impurity는 엔트로피와 같은 뜻이다.

//미니멈 인포메이션 게인 즉 , 인포메이션 게인을 가장 큰걸 선택하려 하는데 
//이걸 설정하면 이것보단 큰 걸 택해야 한다는 옵션이고 이걸 다 못넘으면 우리는
//이후를 정할 수 없다는 뜻이다.

//여기 나온 것들은 모두 디시전 트리의 컴플렉시티를 조정하겠다는 의미이다.

In [None]:
val classifierHyper = new DecisionTreeClassifier().
setSeed(Random.nextLong()).
setLabelCol("Cover_Type").
setFeatureCol("featureVector").
setPredictionCol("prediction")

val pipeline = new Pipeline().setStages(Array(assembler, classifierHyper))

//파이프라인을 만들어야 하는데 벡터 어셈블러가 인풋 피쳐를 만드는 것이어서 그게 있어야 하고
//디시전 트리 클래스파이어가 있어야 한다.

//우리가 루프를 돌면서 할때 파이프라인이 어셈블러만들어서 클래스파이어 만들고 이런 과정을
//묶는 다는 개념이다.

In [None]:
//가장 좋은 하이퍼파라미터의 조합을 찾기위해 ParamGridBuilder를 사용한다.

val paramGrid = new ParamGridBuilder().
addGrid(classifierHyper.impurity, Seq("gini","entropy")).
addGrid(classifierHyper.maxDepth,Seq(1,5)).
addGrid(classifierHyper.maxBins, Seq(40,80)).
addGrid(classifierHyper.minInfoGain,Seq(0,0,0.05)).
build()

//위의 코드처럼 세팅을 해놓아야 한다.
//지니계수인지 엔트로피 인지 , 뎁스는 1인지 5로 할지 이런식으로 2개씩 4종류의 하이퍼 파라미터를
//세팅했다. 즉 16개의 모델을 학습하고 베스트 파라미터 조합을 찾겠다는 마인드 이다.
//메모리가 부족하거나 너무 느리면 코어를 늘리거나 메모리를 늘리는 것이 좋다.

In [None]:
//벨리데이션 모델을 만들고 파이프 라인을 만들게 되는데 모델을 이벨류에이션 하고 하이퍼
//파라미터 서칭까지 하는 것을 파이프라인으로 만들게 된다.

val multiclassEval = new MulticlassClassificationEvaluator().
setLabelCol("Cober_Type").
setPredictionCol("prediction").
setMetricName("accuracy")

val validator = new TrainValidationSplit().
setSeed(Random.nextLong()).
setEstimator(pipeline).
setEvaluator(multiclassEval).
setEstimatorParamMaps(paramGrid).
setTrainRatio(0.9)

val validatorModel = validator.fit(trainData)

//90프로를 tr에 쓰고 나머지를 벨리데이션에 쓰겠다는 것을 라티오 비율로 두었다.
//커널이 죽거나 시간이 오래 걸리면 하이퍼 파라미터 개수를 줄이거나 메모리 늘리거나
//코어 늘리거나 등으로 해결해보자.

In [None]:
val bestModel = vaildatorModel.bestModel
bestModel.asInstanceOf[PipelineModel].stages.last.extractParamMap

//위의 코드를 통해서 베스트 모델을 볼 수 있다.

In [None]:
val paramAndMetrics = validatorModel.validationMetrics.
zip(validatorModel.getEstimatorParamMaps).sortBy(-_._1)
paramsAndMetrics.foreach{ case (metric, params) =>
println(metric)
println(params)
println()
}
//베스트 모댈과 하이퍼 파라미터 조합에 대한 정확도도 다음과 같이 볼 수 있다.

In [None]:
validatorModel.validationMetrics.max
multiclassEval.evaluate(bestMode.transform(testData))

//이후 구한 베스트 모델에 대해서 test데이터를 넣어서 결과를 확인해보면
//앞에서 구했던 하이퍼 파라미터 없이 만든 디시전 트리보다는
//좋은 결과가 나올 것이라는 것을 알 수 있다.