In [33]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator

spark = SparkSession.builder.master("local[*]").getOrCreate()
filename_data = 'datasetautoru.csv'
csv = spark.read.csv(filename_data, inferSchema=True, header=True)
csv = csv.withColumn('mileage', csv.mileage.cast(IntegerType()))
csv.show(10)

+---+----------+---------+----------+----+-------+-----------------+---------+------------+--------------+--------+------+------------+---------+
|_c0|     brand|    model|  offer_id|year|mileage|        body_type|fuel_type|engine_power|  transmission|   drive| wheel|owners_count|    price|
+---+----------+---------+----------+----+-------+-----------------+---------+------------+--------------+--------+------+------------+---------+
|  0|    Toyota|     Vitz|1102756899|2001|   6236|    хэтчбек 5 дв.|   Бензин|        70.0|автоматическая|передний|Правый|           2| 230000.0|
|  1|LADA (ВАЗ)|     2107|1102754237|1995|  96000|            седан|   Бензин|        75.0|  механическая|  задний| Левый|           2|  35000.0|
|  2|    Daewoo|   Nubira|1102763224|1998|  50000|            седан|   Бензин|       106.0|  механическая|передний| Левый|           3|  45000.0|
|  3| Chevrolet|     Aveo|1102757199|2014| 119120|            седан|   Бензин|       115.0|  механическая|передний| Левый|  

In [34]:
splits = csv.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("price", "truePrice")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 192445  Testing Rows: 82309


In [35]:
strIdx = StringIndexer(inputCols = ['brand', 'model', 'fuel_type','transmission', 'wheel', 'body_type', 'drive'], 
                       outputCols = ['brand_index', 'model_index', 'fuel_type_index', 'transmission_index', 'wheel_index', 'body_type_index', 'drive_index'], 
                       handleInvalid = "keep")
catVect = VectorAssembler(inputCols = ['brand_index', 'model_index', 'fuel_type_index','transmission_index', 'wheel_index', 'body_type_index', 'drive_index'],
                          outputCol="features_cat")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), 
                       outputCol = "features_index", 
                       handleInvalid = "keep")
numVect = VectorAssembler(inputCols = ["mileage",'engine_power', 'year', 'owners_count'], 
                          outputCol="features_num", 
                          handleInvalid = "keep")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), 
                      outputCol="features_norm")
featVect = VectorAssembler(inputCols=["features_index", "features_norm"], 
                           outputCol="features", 
                           handleInvalid = "keep")
rfr = RandomForestRegressor(featuresCol = 'features', 
                      labelCol='price',
                      numTrees = 10,
                      maxDepth=2,
                      maxBins = 181834)
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, rfr])

In [36]:
pipelineModel = pipeline.fit(train)

In [37]:
pred_df = pipelineModel.transform(test)
pred_df.select("features", "prediction", "truePrice").show()

+--------------------+------------------+---------+
|            features|        prediction|truePrice|
+--------------------+------------------+---------+
|[2.0,146.0,0.0,1....| 282783.1949918218| 230000.0|
|(11,[1,6,7,8,9,10...|196411.45801311987|  35000.0|
|(11,[0,1,7,8,9,10...|  659393.305265277| 520000.0|
|[4.0,91.0,0.0,1.0...| 760140.8078540338|1199000.0|
|[5.0,50.0,0.0,3.0...| 522252.0363037003| 590000.0|
|[3.0,118.0,0.0,1....| 500836.5029053331| 530000.0|
|[10.0,130.0,0.0,2...| 785790.8398750708| 889000.0|
|[2.0,148.0,3.0,1....|  656096.349398652|1100000.0|
|[9.0,8.0,0.0,0.0,...|417142.30158954306| 470000.0|
|[5.0,111.0,1.0,1....| 561656.4087856311| 600000.0|
|[3.0,30.0,0.0,2.0...| 785790.8398750708|1280000.0|
|[2.0,62.0,0.0,1.0...|  534119.376854248| 500000.0|
|(11,[7,8,9],[0.17...|366947.62678586156| 561000.0|
|[17.0,58.0,0.0,2....| 689263.9647799084| 985000.0|
|[4.0,29.0,0.0,0.0...| 775724.8335529368| 777000.0|
|[2.0,285.0,0.0,1....| 282783.1949918218| 300000.0|
|(11,[0,1,7,

In [38]:
from pyspark.ml.evaluation import RegressionEvaluator

regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="truePrice", metricName="rmse")

In [39]:
# RMSE
rmse = regressionEvaluator.evaluate(pred_df)
print(f"The RMSE for the random forest regression model is {rmse:0.2f}")
# MSE
mse = regressionEvaluator.setMetricName("mse").evaluate(pred_df)
print(f"The MSE for the random forest regression model is {mse:0.2f}")
# R2
r2 = regressionEvaluator.setMetricName("r2").evaluate(pred_df)
print(f"The R2 for the random forest regression model is {r2:0.2f}")
# MAE
mae = regressionEvaluator.setMetricName("mae").evaluate(pred_df)
print(f"The MAE for the random forest regression model is {mae:0.2f}")

The RMSE for the random forest regression model is 190384.21
The MSE for the random forest regression model is 36246146646.34
The R2 for the random forest regression model is 0.64
The MAE for the random forest regression model is 140186.90


In [40]:
param_grid = ParamGridBuilder().\
    addGrid(rfr.numTrees, [10, 15, 20]).\
    addGrid(rfr.maxDepth, [1, 2, 4]).\
    addGrid(rfr.maxBins , [181834, 362432, 724864]).\
    build()

In [41]:
cv = CrossValidator(estimator=pipeline, \
                    estimatorParamMaps=param_grid, \
                    evaluator=RegressionEvaluator(
                                predictionCol="prediction", \
                                labelCol="price", \
                                metricName="rmse"), \
                    numFolds=2)

In [42]:
cv_model = cv.fit(train)

In [43]:
newPrediction = cv_model.transform(test)

In [44]:
# RMSE
rmse = regressionEvaluator.evaluate(newPrediction)
print(f"The RMSE for the random forest regression model is {rmse:0.2f}")
# MSE
mse = regressionEvaluator.setMetricName("mse").evaluate(newPrediction)
print(f"The MSE for the random forest regression model is {mse:0.2f}")
# R2
r2 = regressionEvaluator.setMetricName("r2").evaluate(newPrediction)
print(f"The R2 for the random forest regression model is {r2:0.2f}")
# MAE
mae = regressionEvaluator.setMetricName("mae").evaluate(newPrediction)
print(f"The MAE for the random forest regression model is {mae:0.2f}")

The RMSE for the random forest regression model is 105395.86
The MSE for the random forest regression model is 22737123505.68
The R2 for the random forest regression model is 0.77
The MAE for the random forest regression model is 105395.86


# 2 часть

In [45]:
csv = csv.drop(csv._c0).withColumn('label', when(col('mileage') >= 150942.9, 1).otherwise(0))
csv.drop(csv.mileage).show()

+----------+---------+----------+----+-----------------+---------+------------+----------------+--------+------+------------+-----------------+-----+
|     brand|    model|  offer_id|year|        body_type|fuel_type|engine_power|    transmission|   drive| wheel|owners_count|            price|label|
+----------+---------+----------+----+-----------------+---------+------------+----------------+--------+------+------------+-----------------+-----+
|    Toyota|     Vitz|1102756899|2001|    хэтчбек 5 дв.|   Бензин|        70.0|  автоматическая|передний|Правый|           2|         230000.0|    0|
|LADA (ВАЗ)|     2107|1102754237|1995|            седан|   Бензин|        75.0|    механическая|  задний| Левый|           2|          35000.0|    0|
|    Daewoo|   Nubira|1102763224|1998|            седан|   Бензин|       106.0|    механическая|передний| Левый|           3|          45000.0|    0|
| Chevrolet|     Aveo|1102757199|2014|            седан|   Бензин|       115.0|    механическая|пере

In [46]:
splits = csv.randomSplit([0.85, 0.15])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
print("Training Rows:", train.count(), " Testing Rows:", test.count())

Training Rows: 233867  Testing Rows: 40887


In [47]:
strIdx = StringIndexer(inputCols = ['brand', 'model', 'fuel_type','transmission', 'wheel', 'body_type', 'drive'], 
                       outputCols = ['brand_index', 'model_index', 'fuel_type_index', 'transmission_index', 'wheel_index', 'body_type_index', 'drive_index'], 
                       handleInvalid = "keep")
catVect = VectorAssembler(inputCols = ['brand_index', 'model_index', 'fuel_type_index', 'transmission_index', 'wheel_index', 'body_type_index', 'drive_index'], 
                          outputCol="features_cat")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), 
                       outputCol = "features_index", 
                       handleInvalid = "keep")
numVect = VectorAssembler(inputCols = ['engine_power', 'year', 'owners_count'], 
                          outputCol="features_num", 
                          handleInvalid = "keep")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), 
                      outputCol="features_norm")
featVect = VectorAssembler(inputCols=["features_index", "features_norm"], 
                           outputCol="features", 
                           handleInvalid = "keep")
lr = LogisticRegression(labelCol="label", 
                        featuresCol="features", 
                        maxIter=10,
                        regParam=0.3)
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, lr])

In [48]:
pipelineModel = pipeline.fit(train)

In [49]:
pred_df = pipelineModel.transform(test)
pred_df.select("features", "prediction", "trueLabel").show()

+--------------------+----------+---------+
|            features|prediction|trueLabel|
+--------------------+----------+---------+
|[57.0,1359.0,0.0,...|       1.0|        1|
|[57.0,1142.0,0.0,...|       0.0|        0|
|(10,[0,1,7,8,9],[...|       1.0|        1|
|[17.0,131.0,1.0,0...|       1.0|        1|
|(10,[0,1,7,8,9],[...|       1.0|        1|
|(10,[0,1,7,8],[17...|       0.0|        1|
|(10,[0,1,7,8,9],[...|       1.0|        1|
|(10,[0,1,7,8,9],[...|       1.0|        1|
|[17.0,590.0,0.0,0...|       1.0|        1|
|(10,[0,1,7,8,9],[...|       1.0|        1|
|(10,[0,1,7,8,9],[...|       1.0|        1|
|(10,[0,1,7,8,9],[...|       1.0|        1|
|(10,[0,1,7,8,9],[...|       1.0|        1|
|(10,[0,1,7,8,9],[...|       1.0|        1|
|[17.0,133.0,0.0,1...|       1.0|        1|
|[17.0,133.0,0.0,3...|       0.0|        0|
|[17.0,133.0,0.0,3...|       0.0|        0|
|[17.0,133.0,0.0,3...|       0.0|        0|
|(10,[0,1,3,7,8],[...|       0.0|        0|
|[17.0,133.0,0.0,3...|       0.0

In [50]:
tp = float(pred_df.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(pred_df.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(pred_df.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(pred_df.filter("prediction == 0.0 AND truelabel == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|            9584.0|
|       FP|            3553.0|
|       TN|           19091.0|
|       FN|            8659.0|
|Precision|0.7295425135114562|
|   Recall|0.5253521898810503|
|       F1|0.6108349267049076|
+---------+------------------+



In [51]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(pred_df)
print ("AUR = ", aur)

AUR =  0.778810336933759


In [52]:
paramGrid = ParamGridBuilder().\
    addGrid(lr.maxIter, [10, 20, 30]).\
    addGrid(lr.regParam, [0.3, 0.5, 0.7]).build()
cv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(metricName='areaUnderPR'), estimatorParamMaps=paramGrid, 
                    numFolds=2)

In [53]:
cv_model = cv.fit(train)

In [54]:
newPrediction = cv_model.transform(test)

In [55]:
# Recalculate confusion matrix
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", pr2),
 ("Recall", re2),
 ("F1", 2*pr2*re2/(re2+pr2))],["metric", "value"])
metrics2.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|            9584.0|
|       FP|            3553.0|
|       TN|           19091.0|
|       FN|            8659.0|
|Precision|0.7295425135114562|
|   Recall|0.5253521898810503|
|       F1|0.6108349267049076|
+---------+------------------+



In [56]:
# Recalculate the Area Under ROC
evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur2 = evaluator2.evaluate(newPrediction)
print( "AUR2 = ", aur2)

AUR2 =  0.6842226414870717
