<a href="https://colab.research.google.com/github/ilya-lykov/google_colab_labs/blob/main/3_lab/Laba_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark



In [30]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import pyspark.sql.functions as F
from pyspark.sql import Window as W
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

import matplotlib.pyplot as plt

In [4]:
spark = (SparkSession
    .builder
    .appName("Test")
    .getOrCreate()
)

In [6]:
acc = spark.read.csv("/content/drive/MyDrive/University/data/2014/DfTRoadSafety_Accidents_2014.csv", header=True, inferSchema=True)
veh = spark.read.csv("/content/drive/MyDrive/University/data/2014/DfTRoadSafety_Vehicles_2014.csv", header=True, inferSchema=True)

In [9]:
accidents = acc.join(veh, ["Accident_Index"], "inner")
accidents.show()

+--------------+---------------------+----------------------+---------+---------+------------+-----------------+------------------+--------------------+----------+-----------+-------------------+--------------------------+-------------------------+--------------+---------------+---------+-----------+---------------+----------------+--------------+---------------+---------------------------------+---------------------------------------+----------------+------------------+-----------------------+--------------------------+-------------------+-------------------+-------------------------------------------+-------------------------+-----------------+------------+-----------------------+-----------------+--------------------------------+-----------------+------------------------+-------------------------+---------------------------+--------------------------+-------------------+----------------------------+-------------------------+-------------+-------------+------------------+------------

In [14]:
accidents = (accidents.select(
    "Weather_Conditions",
    "Engine_Capacity_(CC)",
    "Age_of_Vehicle",
    "Sex_of_Driver",
    "Age_of_Driver",
    "Age_Band_of_Driver",
    "Speed_limit")
    .filter((F.col("Engine_Capacity_(CC)")>0) &
           ((F.col("Age_of_Vehicle")>0)) &
           ((F.col("Age_of_Driver")>0)) &
           ((F.col("Age_Band_of_Driver")>0)) &
           ((F.col("Speed_limit")>0)))
    )
accidents.show()

+------------------+--------------------+--------------+-------------+-------------+------------------+-----------+
|Weather_Conditions|Engine_Capacity_(CC)|Age_of_Vehicle|Sex_of_Driver|Age_of_Driver|Age_Band_of_Driver|Speed_limit|
+------------------+--------------------+--------------+-------------+-------------+------------------+-----------+
|                 2|                1997|             2|            1|           49|                 8|         30|
|                 1|                1598|             4|            1|           49|                 8|         30|
|                 1|                 999|             2|            1|           27|                 6|         30|
|                 1|                1596|             9|            1|           32|                 6|         30|
|                 1|                 124|             3|            1|           42|                 7|         30|
|                 1|                2776|             9|            1|  

In [15]:
encoder = OneHotEncoder(
    inputCols=["Weather_Conditions", "Sex_of_Driver", "Age_Band_of_Driver"],
    outputCols=["Weather_Conditions_vec", "Sex_of_Driver_vec", "Age_Band_of_Driver_vec"]
)

In [17]:
accidents_encoded = encoder.fit(accidents).transform(accidents)
accidents_encoded.show()

+------------------+--------------------+--------------+-------------+-------------+------------------+-----------+----------------------+-----------------+----------------------+
|Weather_Conditions|Engine_Capacity_(CC)|Age_of_Vehicle|Sex_of_Driver|Age_of_Driver|Age_Band_of_Driver|Speed_limit|Weather_Conditions_vec|Sex_of_Driver_vec|Age_Band_of_Driver_vec|
+------------------+--------------------+--------------+-------------+-------------+------------------+-----------+----------------------+-----------------+----------------------+
|                 2|                1997|             2|            1|           49|                 8|         30|         (9,[2],[1.0])|    (3,[1],[1.0])|        (11,[8],[1.0])|
|                 1|                1598|             4|            1|           49|                 8|         30|         (9,[1],[1.0])|    (3,[1],[1.0])|        (11,[8],[1.0])|
|                 1|                 999|             2|            1|           27|                

In [19]:
assembler = VectorAssembler(
    inputCols=["Weather_Conditions_vec", "Sex_of_Driver_vec", "Age_Band_of_Driver_vec", "Engine_Capacity_(CC)", "Age_of_Vehicle"],
    outputCol="features"
)

assembled_data = assembler.transform(accidents_encoded)

assembled_data.select("features").show(truncate=False)

+----------------------------------------------+
|features                                      |
+----------------------------------------------+
|(25,[2,10,20,23,24],[1.0,1.0,1.0,1997.0,2.0]) |
|(25,[1,10,20,23,24],[1.0,1.0,1.0,1598.0,4.0]) |
|(25,[1,10,18,23,24],[1.0,1.0,1.0,999.0,2.0])  |
|(25,[1,10,18,23,24],[1.0,1.0,1.0,1596.0,9.0]) |
|(25,[1,10,19,23,24],[1.0,1.0,1.0,124.0,3.0])  |
|(25,[1,10,20,23,24],[1.0,1.0,1.0,2776.0,9.0]) |
|(25,[1,10,19,23,24],[1.0,1.0,1.0,124.0,13.0]) |
|(25,[1,10,19,23,24],[1.0,1.0,1.0,1995.0,3.0]) |
|(25,[1,10,18,23,24],[1.0,1.0,1.0,125.0,3.0])  |
|(25,[1,11,22,23,24],[1.0,1.0,1.0,1598.0,9.0]) |
|(25,[2,10,20,23,24],[1.0,1.0,1.0,2198.0,4.0]) |
|(25,[1,11,19,23,24],[1.0,1.0,1.0,124.0,15.0]) |
|(25,[2,10,19,23,24],[1.0,1.0,1.0,1968.0,1.0]) |
|(25,[5,10,20,23,24],[1.0,1.0,1.0,1998.0,6.0]) |
|(25,[10,19,23,24],[1.0,1.0,125.0,4.0])        |
|(25,[2,10,19,23,24],[1.0,1.0,1.0,2143.0,3.0]) |
|(25,[1,10,20,23,24],[1.0,1.0,1.0,2979.0,10.0])|
|(25,[1,10,20,23,24]

In [27]:
lr = LinearRegression(featuresCol="features", labelCol="Engine_Capacity_(CC)")

lr_model = lr.fit(assembled_data)

predictions = lr_model.transform(assembled_data)

predictions.select("features", "prediction").show(truncate=False)

+----------------------------------------------+------------------+
|features                                      |prediction        |
+----------------------------------------------+------------------+
|(25,[2,10,20,23,24],[1.0,1.0,1.0,1997.0,2.0]) |1996.999996548706 |
|(25,[1,10,20,23,24],[1.0,1.0,1.0,1598.0,4.0]) |1597.9999875851536|
|(25,[1,10,18,23,24],[1.0,1.0,1.0,999.0,2.0])  |998.9999984779228 |
|(25,[1,10,18,23,24],[1.0,1.0,1.0,1596.0,9.0]) |1596.000005199235 |
|(25,[1,10,19,23,24],[1.0,1.0,1.0,124.0,3.0])  |123.99997397948245|
|(25,[1,10,20,23,24],[1.0,1.0,1.0,2776.0,9.0]) |2775.9999960981827|
|(25,[1,10,19,23,24],[1.0,1.0,1.0,124.0,13.0]) |123.99997936900402|
|(25,[1,10,19,23,24],[1.0,1.0,1.0,1995.0,3.0]) |1994.9999832205524|
|(25,[1,10,18,23,24],[1.0,1.0,1.0,125.0,3.0])  |124.99999470009512|
|(25,[1,11,22,23,24],[1.0,1.0,1.0,1598.0,9.0]) |1598.000002713162 |
|(25,[2,10,20,23,24],[1.0,1.0,1.0,2198.0,4.0]) |2197.999998619371 |
|(25,[1,11,19,23,24],[1.0,1.0,1.0,124.0,15.0]) |

In [33]:
print(lr_model)
evaluator = RegressionEvaluator(labelCol="Engine_Capacity_(CC)", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)

print(f"R2: {r2}")

LinearRegressionModel: uid=LinearRegression_a4511da0d2c0, numFeatures=25
R2: 1.0
