In [None]:
import sys
from pyspark import SparkContext

sc = SparkContext(appName="example23")

print(u'Python version ' + sys.version)
print(u'Spark version ' + sc.version)

In [None]:
from pyspark.sql import SparkSession
from pyspark.mllib.random import RandomRDDs
from pyspark.sql.types import *
from pyspark.sql import SQLContext
import sys
from pyspark import SparkContext

spark = SparkSession.builder.appName("query").getOrCreate()
sqlContext = SQLContext(sc)

In [307]:
data=spark.read.format('csv').options(header='true', inferSchema='true').load('predictionQuery.csv')


In [308]:
data.head()

Row(lon=21.2935, lat=49.31503, ROUTE_NUMBER=701429, Trip=7, TripTime=955, vehicleID=1007802921, lastStopOrder=4, lineID=1007040370, lineType=2, DELAY=14, Current_Stop=u'Bardejov,Bas', Current_Stop_ID=83, Current_Time=1614092592420L, destination=u'Svidn\xedk, AS', Dir=1)

In [309]:
data = data.drop('destination')
data = data.drop('Current_Stop')
data = data.drop('lineType')

In [310]:
what= data.describe(["DELAY"])
what.collect()

[Row(summary=u'count', DELAY=u'25756'),
 Row(summary=u'mean', DELAY=u'-43.44079049541854'),
 Row(summary=u'stddev', DELAY=u'167.93181987115523'),
 Row(summary=u'min', DELAY=u'-1839'),
 Row(summary=u'max', DELAY=u'3897')]

In [311]:
print(data.take(1))

[Row(lon=21.2935, lat=49.31503, ROUTE_NUMBER=701429, Trip=7, TripTime=955, vehicleID=1007802921, lastStopOrder=4, lineID=1007040370, DELAY=14, Current_Stop_ID=83, Current_Time=1614092592420L, Dir=1)]


In [312]:
"Trip","vehicleID","lastStopOrder","lineID","lineType"

('Trip', 'vehicleID', 'lastStopOrder', 'lineID', 'lineType')

In [313]:
#do premennej names si ulozim mena atributov
names = data.schema.names
print(names)


['lon', 'lat', 'ROUTE_NUMBER', 'Trip', 'TripTime', 'vehicleID', 'lastStopOrder', 'lineID', 'DELAY', 'Current_Stop_ID', 'Current_Time', 'Dir']


In [314]:
correlations = []
for name in names:
    correlations.extend([data.stat.corr('DELAY',name)])
print(correlations)

[-0.01788125478517071, 0.031882493255894026, 0.006668791614632634, -0.0009770428683254988, 0.017473294447962327, -0.006197023484606729, 0.2818169452090381, -0.0017870236610697718, 1.0, -0.25478019880996966, -0.012797564451171225, 0.003825891131362385]


In [315]:
from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LinearSVC

In [316]:
vectorAssembler  = VectorAssembler(inputCols=["ROUTE_NUMBER", "DELAY", "Current_Stop_ID",
                                              "Dir","Current_Time"],outputCol="features").transform(data) 
vhouse_df = vectorAssembler.select(['features', 'DELAY'])
vhouse_df.show(3)
#Rozdelenie dat na trenovaciu a testovaciu mnozinu
training_data, test_data = vhouse_df.randomSplit([0.7, 0.3], seed=123)

+--------------------+-----+
|            features|DELAY|
+--------------------+-----+
|[701429.0,14.0,83...|   14|
|[707445.0,-17.0,9...|  -17|
|[712459.0,117.0,2...|  117|
+--------------------+-----+
only showing top 3 rows



In [317]:
training_data

DataFrame[features: vector, DELAY: int]

In [318]:
training_data.take(1)

[Row(features=DenseVector([701402.0, -311.0, 751.0, 0.0, 1.6140]), DELAY=-311)]

In [319]:
#Linear regression
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='DELAY', maxIter=100, regParam=0.3, elasticNetParam=0.6)
lr_model = lr.fit(training_data)

In [320]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [0.0,0.9982203748391999,0.0,0.0,0.0]
Intercept: -0.0757759635693


In [321]:
trainingSummary = lr_model.summary

In [322]:
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 0.299787
r2: 0.999997


In [324]:
predictions = lr_model.transform(test_data)
predictions.select("prediction","DELAY").show(50)

+-------------------+-----+
|         prediction|DELAY|
+-------------------+-----+
| -310.5223125385605| -311|
| -310.5223125385605| -311|
| -310.5223125385605| -311|
| -250.6290900482085| -251|
| -250.6290900482085| -251|
| -250.6290900482085| -251|
|-198.72163055657012| -199|
|-198.72163055657012| -199|
| -190.7358675578565| -191|
| -190.7358675578565| -191|
| -178.7572230597861| -179|
| -170.7714600610725| -171|
| -150.8070525642885| -151|
|-138.82840806621812| -139|
|-138.82840806621812| -139|
|-130.84264506750452| -131|
|-130.84264506750452| -131|
|-125.85154319330853| -126|
|-125.85154319330853| -126|
|-118.86400056943413| -119|
|-118.86400056943413| -119|
|-109.88001719588134| -110|
|-109.88001719588134| -110|
| -98.89959307265013|  -99|
| -90.91383007393654|  -91|
| -90.91383007393654|  -91|
| -88.91738932425814|  -89|
| -79.93340595070534|  -80|
| -78.93518557586613|  -79|
| -78.93518557586613|  -79|
| -76.93874482618773|  -77|
| -76.93874482618773|  -77|
| -70.94942257715253

In [325]:
#kontingencna tabulka Logisticka regresia
cf = predictions.crosstab("prediction","DELAY")
cf.show()

+-------------------+---+---+----+----+----+----+----+----+-----+----+----+----+----+---+----+----+----+----+----+----+----+----+-----+----+----+---+----+----+----+----+-----+----+----+----+----+----+----+-----+---+----+----+----+----+----+----+-----+----+----+----+----+---+----+----+-----+----+----+----+----+----+----+-----+----+----+---+----+----+----+----+-----+----+----+----+----+----+----+-----+---+----+----+----+----+----+----+----+----+----+----+---+----+----+-----+----+----+----+----+----+----+----+----+---+----+----+----+----+----+----+----+----+----+----+---+----+----+----+----+----+----+----+----+----+----+---+---+----+----+----+----+----+----+----+----+----+----+---+----+----+----+----+----+----+----+----+----+----+---+----+----+----+----+----+----+----+----+----+----+---+----+----+----+----+----+----+----+----+----+---+----+----+----+----+----+----+----+----+----+----+---+----+----+----+----+----+----+----+----+----+----+---+----+----+----+----+----+----+----+----+----+---

In [295]:
#vyhodnotenie Logisticka regresia
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import MulticlassMetrics

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluatorMulti = MulticlassClassificationEvaluator(labelCol="DELAY", predictionCol="prediction")
evaluator = BinaryClassificationEvaluator(labelCol="DELAY", rawPredictionCol="prediction", metricName='areaUnderROC')
acc = evaluatorMulti.evaluate(predictions, {evaluatorMulti.metricName: "accuracy"})
f1 = evaluatorMulti.evaluate(predictions, {evaluatorMulti.metricName: "f1"})
Precision = evaluatorMulti.evaluate(predictions, {evaluatorMulti.metricName: "weightedPrecision"})
Recall = evaluatorMulti.evaluate(predictions, {evaluatorMulti.metricName: "weightedRecall"})
auc = evaluator.evaluate(predictions)
print('Accuracy score: ',acc)
print('f1: ',f1)
print('Precision: ',Precision)
print('Recall: ',Recall)
print('Auc: ',auc)

('Accuracy score: ', 0.0)
('f1: ', 0.0)
('Precision: ', 0.0)
('Recall: ', 0.0)
('Auc: ', 1.0)


In [297]:
#Gradient-boosted tree regression
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'DELAY', maxIter=10)
gbt_model = gbt.fit(training_data)
gbt_predictions = gbt_model.transform(test_data)
gbt_predictions.select('prediction', 'DELAY', 'features').show(5)


+-------------------+-----+--------------------+
|         prediction|DELAY|            features|
+-------------------+-----+--------------------+
| -315.9569174710719| -311|[701402.0,-311.0,...|
| -315.9569174710719| -311|[701402.0,-311.0,...|
| -315.9569174710719| -311|[701402.0,-311.0,...|
|-239.93423803789946| -251|[701402.0,-251.0,...|
|-239.93423803789946| -251|[701402.0,-251.0,...|
+-------------------+-----+--------------------+
only showing top 5 rows



In [298]:
gbt_evaluator = RegressionEvaluator(
    labelCol="DELAY", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 37.4395


In [299]:
#vyhodnotenie Logisticka regresia
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import MulticlassMetrics

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluatorMulti = MulticlassClassificationEvaluator(labelCol="DELAY", predictionCol="prediction")
evaluator = BinaryClassificationEvaluator(labelCol="DELAY", rawPredictionCol="prediction", metricName='areaUnderROC')
acc = evaluatorMulti.evaluate(gbt_predictions, {evaluatorMulti.metricName: "accuracy"})
f1 = evaluatorMulti.evaluate(gbt_predictions, {evaluatorMulti.metricName: "f1"})
Precision = evaluatorMulti.evaluate(gbt_predictions, {evaluatorMulti.metricName: "weightedPrecision"})
Recall = evaluatorMulti.evaluate(gbt_predictions, {evaluatorMulti.metricName: "weightedRecall"})
auc = evaluator.evaluate(gbt_predictions)
print('Accuracy score: ',acc)
print('f1: ',f1)
print('Precision: ',Precision)
print('Recall: ',Recall)
print('Auc: ',auc)


('Accuracy score: ', 0.0)
('f1: ', 0.0)
('Precision: ', 0.0)
('Recall: ', 0.0)
('Auc: ', 0.9983967508846527)
