In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('lab04-PhanLop').getOrCreate()

22/01/21 17:12:18 WARN Utils: Your hostname, sheepb-HP-Pavilion-Notebook resolves to a loopback address: 127.0.1.1; using 192.168.1.9 instead (on interface wlo1)
22/01/21 17:12:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


## Đọc và xử lí dữ liệu

In [None]:
data_path = './Lab04-Data/'

In [None]:
df = spark.read.csv(data_path + 'mushrooms.csv', inferSchema=True, header=True, sep=',')
df.show()

## a. Tiền xử lí dữ liệu

In [None]:
features_col = df.columns
label_col = 'class'
features_col.remove(label_col)
featuresIndexer = [StringIndexer(inputCol=column, outputCol=column+"Indexer").fit(df) 
                       for column in features_col]
    
for featureIndexer in featuresIndexer:
    df = featureIndexer.transform(df)
        
features_col = [feature_col + "Indexer" for feature_col in features_col]
vec_assembler = VectorAssembler(inputCols = features_col , outputCol = "features")
features_df = vec_assembler.transform(df).select('features', label_col)

# featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(features_df)
labelIndexer = StringIndexer(inputCol=label_col, outputCol="indexedLabel").fit(features_df)
# features_df = featureIndexer.transform(features_df)
features_df = labelIndexer.transform(features_df)
    
features_df.show()

In [None]:
# Chia tập dữ liệu ra thành train, test theo tỉ lệ 80:20
train, test = features_df.randomSplit([0.8, 0.2])

## b. Mô hình decision Tree

In [None]:
decision_tree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='features', maxDepth=20, maxBins=32)
model_dct = decision_tree.fit(train)

## c. Mô hình random forest

In [None]:
rdf_cls = RandomForestClassifier(labelCol='indexedLabel', featuresCol='features', maxDepth=20, numTrees=10)
model_rdf = rdf_cls.fit(train)

## d. Đánh giá 2 mô hình trên tập kiểm thử

In [None]:
test_pred_ds = model_dct.transform(test)
test_pred_rdf = model_rdf.transform(test)
# accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", 
    predictionCol="prediction", 
    metricName="accuracy")

accuracy_ds = evaluator.evaluate(test_pred_ds)
accuracy_rdf = evaluator.evaluate(test_pred_rdf)

print("Accuracy on test")
print(f"- accuracy of decisionTree: {accuracy_ds}")
print(f"- accuracy of RandomForest: {accuracy_rdf}") 

## e. Sử dụng Pipeline để thiết lập các bước trên thành một bước duy nhất

- Decision Tree

In [None]:
df = spark.read.csv(data_path + 'mushrooms.csv', inferSchema=True, header=True, sep=',')

# preprocessing
features_col = df.columns
features_col.remove(label_col)
featuresIndexer = [StringIndexer(inputCol=column, outputCol=column+"Indexer").fit(df) 
                       for column in features_col]
    
for featureIndexer in featuresIndexer:
    df = featureIndexer.transform(df)
        
features_col = [feature_col + "Indexer" for feature_col in features_col]
vec_assembler = VectorAssembler(inputCols = features_col , outputCol = "features")
features_df = vec_assembler.transform(df).select('features', label_col)
    

# split data    
train_df, test_df = features_df.randomSplit([0.8, 0.2])

# Make pipeline
labelIndexer = StringIndexer(inputCol=label_col, outputCol="indexedLabel").fit(features_df)

decision_tree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='features', maxDepth=20, maxBins=32)

pipeline_dct = Pipeline(stages=[labelIndexer, decision_tree])

params_dct = ParamGridBuilder().addGrid(decision_tree.maxDepth, [5, 10, 20])\
                            .addGrid(decision_tree.maxBins, [ 15, 32])\
                            .build()

evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", 
    predictionCol="prediction", 
    metricName="accuracy")

tvs_dct = TrainValidationSplit().setTrainRatio(0.8)\
                            .setEstimatorParamMaps(params_dct)\
                            .setEstimator(pipeline_dct)\
                            .setEvaluator(evaluator)

tvsFitted = tvs_dct.fit(train_df)
evaluator.evaluate(tvsFitted.transform(test_df))

- randomforest

In [None]:
rdf_cls = RandomForestClassifier(labelCol='indexedLabel', featuresCol='features', maxDepth=20, numTrees=10)

pipeline_rdf = Pipeline(stages=[labelIndexer, rdf_cls])

params_rdf = ParamGridBuilder().addGrid(rdf_cls.maxDepth, [5, 10, 20])\
                            .addGrid(rdf_cls.numTrees, [ 5, 10, 15])\
                            .build()

tvs_dct = TrainValidationSplit().setTrainRatio(0.8)\
                            .setEstimatorParamMaps(params_rdf)\
                            .setEstimator(pipeline_rdf)\
                            .setEvaluator(evaluator)

tvsFitted = tvs_dct.fit(train_df)
evaluator.evaluate(tvsFitted.transform(test_df))