# 2 часть

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [2]:
filename_data = 'mycsv.csv'
csv = spark.read.csv(filename_data, inferSchema=True, header=True)
csv = csv.drop(csv._c0).withColumn('label', when(col('Rating') >= 3.2, 1).otherwise(0))
csv.drop(csv['Rating']).show(10)


+--------------------+----------------+----------------+----+------------------+----+--------------+------------+----------------+-----+
|              App Id|        Category|Maximum Installs|Free|             Price|Size|Content Rating|Ad Supported|In App Purchases|label|
+--------------------+----------------+----------------+----+------------------+----+--------------+------------+----------------+-----+
| com.ishakwe.gakondo|       Adventure|              15|True|2.2742768824640542| 10M|      Everyone|       False|           false|    0|
|com.webserveis.ba...|           Tools|            7662|True|2.2742768824640542|2.9M|      Everyone|        True|           false|    1|
|com.doantiepvien.crm|    Productivity|              58|True|2.2742768824640542|3.7M|      Everyone|       False|           false|    0|
|cst.stJoseph.ug17...|   Communication|              19|True|2.2742768824640542|1.8M|      Everyone|        True|           false|    1|
|com.horodyski.grower|           Tools|  

In [3]:
splits = csv.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 860683  Testing Rows: 368116


In [4]:
strIdx = StringIndexer(inputCols = ['Category'], 
                       outputCols = ['CategoryIdx'], 
                       handleInvalid = "keep")
catVect = VectorAssembler(inputCols = ['CategoryIdx'], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ['Maximum Installs', 'Price'], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
lr = LogisticRegression(labelCol="label", 
                        featuresCol="features", 
                        maxIter=30,
                        regParam=0.3)
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, lr])

In [5]:
pipelineModel = pipeline.fit(train)

In [6]:
pred_df = pipelineModel.transform(test)
pred_df.select("features", "prediction", "trueLabel").show()

+--------------------+----------+---------+
|            features|prediction|trueLabel|
+--------------------+----------+---------+
|[1.0,0.0212509584...|       0.0|        1|
|[0.0,0.1121700076...|       0.0|        1|
|[23.0,0.008544199...|       0.0|        1|
|[13.0,0.021250958...|       0.0|        0|
|[3.0,1.0954102311...|       0.0|        0|
|[5.0,0.2882024318...|       0.0|        1|
|[13.0,0.310877423...|       1.0|        0|
|[19.0,0.002902837...|       0.0|        1|
|[7.0,0.1340234417...|       0.0|        1|
|[18.0,0.003286230...|       0.0|        0|
|[23.0,0.116003943...|       0.0|        1|
|[7.0,0.0478146565...|       0.0|        1|
|[8.0,7.6678716179...|       0.0|        0|
|[6.0,0.0288640595...|       0.0|        0|
|[7.0,0.2059371234...|       0.0|        1|
|[1.0,0.1693504217...|       0.0|        1|
|[1.0,0.0435973271...|       0.0|        1|
|[1.0,0.0375178004...|       0.0|        1|
|[0.0,0.1258078650...|       0.0|        1|
|[1.0,0.0118852010...|       0.0

In [7]:
print(pred_df)
tp = float(pred_df.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(pred_df.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(pred_df.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(pred_df.filter("prediction == 0.0 AND truelabel == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()

DataFrame[App Id: string, Category: string, Rating: double, Maximum Installs: int, Free: string, Price: double, Size: string, Content Rating: string, Ad Supported: string, In App Purchases: boolean, trueLabel: int, CategoryIdx: double, catFeatures: vector, idxCatFeatures: vector, numFeatures: vector, normFeatures: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]
+---------+-------------------+
|   metric|              value|
+---------+-------------------+
|       TP|            36570.0|
|       FP|             6793.0|
|       TN|           216343.0|
|       FN|           108410.0|
|Precision| 0.8433457094758204|
|   Recall|0.25224168850875983|
|       F1| 0.3883340501107023|
+---------+-------------------+



In [9]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(pred_df)
print ("AUR = ", aur)

AUR =  0.8420298623974344


In [10]:
paramGrid = ParamGridBuilder().\
    addGrid(lr.maxIter, [30, 40, 60]).\
    addGrid(lr.regParam, [0.6, 0.8, 0.9]).build()
cv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(metricName='areaUnderPR'), estimatorParamMaps=paramGrid, 
                    numFolds=2)

In [11]:
cv_model = cv.fit(train)

In [12]:
newPrediction = cv_model.transform(test)

In [13]:
# Recalculate confusion matrix
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", pr2),
 ("Recall", re2),
 ("F1", 2*pr2*re2/(re2+pr2))],["metric", "value"])
metrics2.show()

+---------+-------------------+
|   metric|              value|
+---------+-------------------+
|       TP|            25825.0|
|       FP|             4618.0|
|       TN|           218518.0|
|       FN|           119155.0|
|Precision| 0.8483066714844135|
|   Recall|0.17812801765760794|
|       F1| 0.2944311749314514|
+---------+-------------------+



In [14]:
# Recalculate the Area Under ROC
evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur2 = evaluator2.evaluate(newPrediction)
print( "AUR2 = ", aur2)

AUR2 =  0.578716059596049


# 1 часть

In [34]:
csv = spark.read.csv(filename_data, inferSchema=True, header=True)
# csv = csv.withColumn('Rating', csv.Rating.cast(IntegerType()))
csv.show(10)

+---+--------------------+----------------+-----------------+----------------+----+------------------+----+--------------+------------+----------------+
|_c0|              App Id|        Category|           Rating|Maximum Installs|Free|             Price|Size|Content Rating|Ad Supported|In App Purchases|
+---+--------------------+----------------+-----------------+----------------+----+------------------+----+--------------+------------+----------------+
|  0| com.ishakwe.gakondo|       Adventure|2.203142098575241|              15|True|2.2742768824640542| 10M|      Everyone|       False|           false|
|  1|com.webserveis.ba...|           Tools|              4.4|            7662|True|2.2742768824640542|2.9M|      Everyone|        True|           false|
|  2|com.doantiepvien.crm|    Productivity|2.203142098575241|              58|True|2.2742768824640542|3.7M|      Everyone|       False|           false|
|  3|cst.stJoseph.ug17...|   Communication|              5.0|              19|True

In [66]:
splits = csv.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("Rating", "trueRating")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 859398  Testing Rows: 369401


In [67]:
strIdx = StringIndexer(inputCols = ['Category'], 
                       outputCols = ['CategoryIdx'], 
                       handleInvalid = "keep")
catVect = VectorAssembler(inputCols = ['CategoryIdx'], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ['Maximum Installs', 'Price'], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
rfr = RandomForestRegressor(featuresCol = 'features', 
                      labelCol='Rating',
                      numTrees = 10,
                      maxDepth=2,
                      maxBins = 54)
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, rfr])

In [68]:
pipelineModel = pipeline.fit(train)

In [69]:
pred_df = pipelineModel.transform(test)
pred_df.select("features", "prediction", "trueRating").show()

+--------------------+------------------+-----------------+
|            features|        prediction|       trueRating|
+--------------------+------------------+-----------------+
|[25.0,8.215576733...|2.8726565871252197|2.203142098575241|
|[9.0,0.0031766896...|2.7826774568279093|2.203142098575241|
|[19.0,0.004874575...|2.8726565871252197|2.203142098575241|
|[37.0,0.018019498...|2.8726565871252197|              4.9|
|[12.0,0.051977215...|3.1781970847105447|              3.4|
|[6.0,1.0954102311...|2.8042760999555467|2.203142098575241|
|[21.0,0.001697885...|2.7826774568279093|2.203142098575241|
|[16.0,0.847354584...| 3.506894106966494|              4.4|
|[12.0,1.095410231...|2.7826774568279093|2.203142098575241|
|[11.0,0.045185672...|3.4635777976761277|              5.0|
|[14.0,0.187698543...| 3.506894106966494|              5.0|
|[35.0,0.002026508...|2.8726565871252197|2.203142098575241|
|[13.0,0.389911271...| 3.506894106966494|              3.1|
|[3.0,0.7407163982...|3.4635129797069966

In [70]:
from pyspark.ml.evaluation import RegressionEvaluator

regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="trueRating", metricName="rmse")

In [71]:
# RMSE
rmse = regressionEvaluator.evaluate(pred_df)
print(f"The RMSE for the random forest regression model is {rmse:0.2f}")
# MSE
mse = regressionEvaluator.setMetricName("mse").evaluate(pred_df)
print(f"The MSE for the random forest regression model is {mse:0.2f}")
# R2
r2 = regressionEvaluator.setMetricName("r2").evaluate(pred_df)
print(f"The R2 for the random forest regression model is {r2:0.2f}")
# MAE
mae = regressionEvaluator.setMetricName("mae").evaluate(pred_df)
print(f"The MAE for the random forest regression model is {mae:0.2f}")

The RMSE for the random forest regression model is 0.96
The MSE for the random forest regression model is 0.93
The R2 for the random forest regression model is 0.21
The MAE for the random forest regression model is 0.84


In [72]:
param_grid = ParamGridBuilder().\
    addGrid(rfr.numTrees, [10, 15, 20]).\
    addGrid(rfr.maxDepth, [1, 2, 4]).\
    addGrid(rfr.maxBins , [54, 108, 216]).\
    build()

In [73]:
cv = CrossValidator(estimator=pipeline, \
                    estimatorParamMaps=param_grid, \
                    evaluator=RegressionEvaluator(
                                predictionCol="prediction", \
                                labelCol="Rating", \
                                metricName="rmse"), \
                    numFolds=2)

In [None]:
cv_model = cv.fit(train)