Лабораторная работа №2-2

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator

spark = SparkSession.builder.master("local[*]").getOrCreate()
filename_data = 'us-dealers-used-cleaned.csv'
csv = spark.read.csv(filename_data, inferSchema=True, header=True)
csv = csv.withColumn('miles', csv.miles.cast(IntegerType()))
csv.show(10)

+-------------+-----+------+----+-------------+-------------+---------+------------+-----------+
|           id|price| miles|year|         make|        model|body_type|transmission|engine_size|
+-------------+-----+------+----+-------------+-------------+---------+------------+-----------+
|38b2f52e-8f5d|20998|115879|2015|    Chevrolet|Express Cargo|Cargo Van|   Automatic|        4.8|
|97ba4955-ccf0|27921|  7339|2018|          BMW|           i3|Hatchback|   Automatic|        0.6|
|be1da9fd-0f34|11055| 39798|2018|   Mitsubishi|    Mirage G4|    Sedan|   Automatic|        1.2|
|84327e45-6cb6|52997| 28568|2019|    Chevrolet|     Colorado|   Pickup|   Automatic|        2.8|
|43847b9a-6fed| 3995|137537|2000|        Dodge|   Ram Pickup|   Pickup|      Manual|        5.2|
|8d10256f-3be9| 6500| 74274|2010|    Chevrolet|          HHR| Mini Mpv|   Automatic|        2.2|
|3c539e0f-3eb8|23024|131286|2016|    Chevrolet|     Colorado|   Pickup|   Automatic|        2.8|
|dffc4e35-48e7|16995|110615|20

In [2]:
csv = csv.drop(csv.id).withColumn('label', when(col('engine_size') >=2.6, 1).otherwise(0))
csv.drop(csv.engine_size).show()

+-----+------+----+-------------+-------------+---------+------------+-----+
|price| miles|year|         make|        model|body_type|transmission|label|
+-----+------+----+-------------+-------------+---------+------------+-----+
|20998|115879|2015|    Chevrolet|Express Cargo|Cargo Van|   Automatic|    1|
|27921|  7339|2018|          BMW|           i3|Hatchback|   Automatic|    0|
|11055| 39798|2018|   Mitsubishi|    Mirage G4|    Sedan|   Automatic|    0|
|52997| 28568|2019|    Chevrolet|     Colorado|   Pickup|   Automatic|    1|
| 3995|137537|2000|        Dodge|   Ram Pickup|   Pickup|      Manual|    1|
| 6500| 74274|2010|    Chevrolet|          HHR| Mini Mpv|   Automatic|    0|
|23024|131286|2016|    Chevrolet|     Colorado|   Pickup|   Automatic|    1|
|16995|110615|2011|Mercedes-Benz|    CLS-Class|    Coupe|   Automatic|    1|
| 5870|144159|2004|    Chevrolet|  TrailBlazer|      SUV|   Automatic|    1|
|27999| 60122|2008|     Maserati|  GranTurismo|    Coupe|   Automatic|    1|

In [3]:
splits = csv.randomSplit([0.85, 0.15])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
print("Training Rows:", train.count(), " Testing Rows:", test.count())

Training Rows: 1951691  Testing Rows: 344505


In [4]:
strIdx = StringIndexer(inputCols = ['make', 'model', 'transmission','body_type'], 
                       outputCols = ['make_index', 'model_index', 'transmission_index','body_type_index'], 
                       handleInvalid = "keep")
catVect = VectorAssembler(inputCols = ['make_index', 'model_index', 'transmission_index','body_type_index'], 
                          outputCol="features_cat")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), 
                       outputCol = "features_index", 
                       handleInvalid = "keep")
numVect = VectorAssembler(inputCols = ['miles', 'price'], 
                          outputCol="features_num", 
                          handleInvalid = "keep")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), 
                      outputCol="features_norm")
featVect = VectorAssembler(inputCols=["features_index", "features_norm"], 
                           outputCol="features", 
                           handleInvalid = "keep")
rf = RandomForestClassifier(labelCol="label", 
                            featuresCol="features", 
                            numTrees=10,  
                            maxDepth=2, 
                            maxBins=900)
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, rf])

In [5]:
pipelineModel = pipeline.fit(train)

In [6]:
pred_df = pipelineModel.transform(test)
pred_df.select("features", "prediction", "trueLabel").show()

+--------------------+----------+---------+
|            features|prediction|trueLabel|
+--------------------+----------+---------+
|[19.0,143.0,0.0,1...|       0.0|        0|
|[3.0,76.0,0.0,6.0...|       1.0|        1|
|[2.0,328.0,0.0,11...|       0.0|        0|
|[25.0,320.0,1.0,1...|       0.0|        0|
|[4.0,11.0,0.0,1.0...|       0.0|        0|
|[0.0,0.0,0.0,2.0,...|       1.0|        1|
|[2.0,114.0,0.0,1....|       1.0|        1|
|[3.0,5.0,0.0,1.0,...|       0.0|        0|
|[2.0,318.0,1.0,5....|       0.0|        0|
|[6.0,47.0,0.0,0.0...|       1.0|        1|
|[16.0,595.0,1.0,2...|       0.0|        1|
|[5.0,16.0,0.0,0.0...|       0.0|        0|
|[15.0,140.0,0.0,4...|       0.0|        1|
|[26.0,89.0,1.0,7....|       0.0|        0|
|[16.0,368.0,1.0,2...|       0.0|        1|
|[15.0,120.0,1.0,1...|       0.0|        0|
|[2.0,555.0,1.0,1....|       0.0|        0|
|[2.0,555.0,1.0,1....|       0.0|        0|
|[8.0,44.0,0.0,0.0...|       1.0|        1|
|(6,[1,5],[4.0,0.0...|       0.0

In [7]:
tp = float(pred_df.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(pred_df.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(pred_df.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(pred_df.filter("prediction == 0.0 AND truelabel == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           86975.0|
|       FP|           21472.0|
|       TN|          213037.0|
|       FN|           23322.0|
|Precision|0.8020046658736526|
|   Recall|0.7885527258220986|
|       F1|0.7952218117982666|
+---------+------------------+



In [8]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(pred_df)
print ("AUR = ", aur)

AUR =  0.909421202881171


In [5]:
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [10,15,20]).addGrid(rf.maxDepth, [1, 2, 4]).addGrid(rf.maxBins, [1000, 1200, 1400]).build()
cv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(metricName='areaUnderPR'), estimatorParamMaps=paramGrid, 
                    numFolds=2)

In [6]:
cv_model = cv.fit(train)

In [7]:
newPrediction = cv_model.transform(test)

In [8]:
# Recalculate confusion matrix
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", pr2),
 ("Recall", re2),
 ("F1", 2*pr2*re2/(re2+pr2))],["metric", "value"])
metrics2.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           91423.0|
|       FP|           22232.0|
|       TN|          211818.0|
|       FN|           19032.0|
|Precision|0.8043904799612863|
|   Recall|0.8276945362364764|
|       F1|0.8158761322564811|
+---------+------------------+



In [9]:
# Recalculate the Area Under ROC
evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur2 = evaluator2.evaluate(newPrediction)
print( "AUR2 = ", aur2)

AUR2 =  0.8663531429313123
