In [1]:
import pyspark
print(pyspark.__version__)


3.5.0


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = (
    SparkSession.builder.appName("ML_App")
    .config("spark.executor.memory", "512M")
    .config("spark.executor.cores", "1")
    .master("spark://master:7077")  
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/04 05:33:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
spark

In [3]:
df = spark.read.csv("/proiect/vehicle_claims.csv", inferSchema=True, header=True)

                                                                                

In [4]:
df = df.withColumn("IsManual", when(col("Gearbox") == "Manual", 1).otherwise(0))
df.show()

+-------+---------+------------+--------+---------+------+--------+--------+------------+----------+---------+---------+-------+--------+--------+--------------------+--------+-------+--------------+-----------------+------------------+-----------------+-----------+--------+
|  Maker| Genmodel| Genmodel_ID|Adv_year|Adv_month| Color|Reg_year|Bodytype|Runned_Miles|Engin_size|  Gearbox|Fuel_type|  Price|Seat_num|Door_num|               issue|issue_id|Adv_day|breakdown_date|repair_complexity|       repair_cost|     repair_hours|repair_date|IsManual|
+-------+---------+------------+--------+---------+------+--------+--------+------------+----------+---------+---------+-------+--------+--------+--------------------+--------+-------+--------------+-----------------+------------------+-----------------+-----------+--------+
|Bentley|   Arnage|        10_1|    2018|        4|Silver|  2000.0|  Saloon|       60000|      6.8L|Automatic|   Petrol|21500.0|     5.0|     4.0|    Electrical Issue|     

In [5]:
df.printSchema()

root
 |-- Maker: string (nullable = true)
 |--  Genmodel: string (nullable = true)
 |--  Genmodel_ID: string (nullable = true)
 |-- Adv_year: integer (nullable = true)
 |-- Adv_month: integer (nullable = true)
 |-- Color: string (nullable = true)
 |-- Reg_year: double (nullable = true)
 |-- Bodytype: string (nullable = true)
 |-- Runned_Miles: string (nullable = true)
 |-- Engin_size: string (nullable = true)
 |-- Gearbox: string (nullable = true)
 |-- Fuel_type: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Seat_num: double (nullable = true)
 |-- Door_num: double (nullable = true)
 |-- issue: string (nullable = true)
 |-- issue_id: integer (nullable = true)
 |-- Adv_day: integer (nullable = true)
 |-- breakdown_date: date (nullable = true)
 |-- repair_complexity: integer (nullable = true)
 |-- repair_cost: double (nullable = true)
 |-- repair_hours: double (nullable = true)
 |-- repair_date: date (nullable = true)
 |-- IsManual: integer (nullable = false)



In [6]:
df.show(2)

+-------+---------+------------+--------+---------+------+--------+--------+------------+----------+---------+---------+-------+--------+--------+----------------+--------+-------+--------------+-----------------+-----------+------------+-----------+--------+
|  Maker| Genmodel| Genmodel_ID|Adv_year|Adv_month| Color|Reg_year|Bodytype|Runned_Miles|Engin_size|  Gearbox|Fuel_type|  Price|Seat_num|Door_num|           issue|issue_id|Adv_day|breakdown_date|repair_complexity|repair_cost|repair_hours|repair_date|IsManual|
+-------+---------+------------+--------+---------+------+--------+--------+------------+----------+---------+---------+-------+--------+--------+----------------+--------+-------+--------------+-----------------+-----------+------------+-----------+--------+
|Bentley|   Arnage|        10_1|    2018|        4|Silver|  2000.0|  Saloon|       60000|      6.8L|Automatic|   Petrol|21500.0|     5.0|     4.0|Electrical Issue|       4|     12|    2018-04-12|                3|      1

In [7]:
df = df.select("Maker", "Engin_size", "issue", "repair_complexity", "repair_cost", "repair_hours", "IsManual")

In [8]:
df.select("repair_complexity").distinct().show()




+-----------------+
|repair_complexity|
+-----------------+
|                1|
|                3|
|                4|
|                2|
+-----------------+



                                                                                

In [9]:
df.show(2)

+-------+----------+----------------+-----------------+-----------+------------+--------+
|  Maker|Engin_size|           issue|repair_complexity|repair_cost|repair_hours|IsManual|
+-------+----------+----------------+-----------------+-----------+------------+--------+
|Bentley|      6.8L|Electrical Issue|                3|      184.5|         6.0|       0|
|Bentley|      6.8L| Brake Pads Worn|                3|    134.375|         6.0|       0|
+-------+----------+----------------+-----------------+-----------+------------+--------+
only showing top 2 rows



In [10]:
train, test = df.randomSplit([0.7, 0.3], seed=7)

In [11]:
print(f"Train set length: {train.count()} records")
print(f"Test set length: {test.count()} records")

                                                                                

Train set length: 187753 records




Test set length: 80502 records


                                                                                

In [12]:
train.show(2)

+-------+----------+---------------+-----------------+-----------+------------+--------+
|  Maker|Engin_size|          issue|repair_complexity|repair_cost|repair_hours|IsManual|
+-------+----------+---------------+-----------------+-----------+------------+--------+
|Bentley|      1.5L|Brake Pads Worn|                3|   204.9975|         6.0|       0|
|Bentley|      3.0L|Brake Pads Worn|                3|     186.25|         6.0|       0|
+-------+----------+---------------+-----------------+-----------+------------+--------+
only showing top 2 rows



In [13]:
train.dtypes

[('Maker', 'string'),
 ('Engin_size', 'string'),
 ('issue', 'string'),
 ('repair_complexity', 'int'),
 ('repair_cost', 'double'),
 ('repair_hours', 'double'),
 ('IsManual', 'int')]

In [14]:
catCols = [x for (x, dataType) in train.dtypes if dataType == "string"]
numCols = [
    x for (x, dataType) in train.dtypes if (((dataType == "double") | (dataType == "int")) & (x != "IsManual"))
]

In [15]:
print(numCols)
print(catCols)

['repair_complexity', 'repair_cost', 'repair_hours']
['Maker', 'Engin_size', 'issue']


In [16]:
train.agg(F.countDistinct("issue")).show()



+---------------------+
|count(DISTINCT issue)|
+---------------------+
|                   15|
+---------------------+



                                                                                

In [17]:
train.groupBy("issue").count().show()



+--------------------+-----+
|               issue|count|
+--------------------+-----+
|  Alternator Failing|12413|
|    Radiator Leaking|12398|
|        Engine Issue|12556|
|     Brake Pads Worn|12586|
|          Flat Tyres|12386|
|      Tyre Alignment|12549|
|    Windscreen Crack|12593|
| Excessive Emissions|12574|
|Steering Wheel Sh...|12437|
|      Gear Box Issue|12438|
|  Sensor Malfunction|12459|
|    Electrical Issue|12559|
|  Transmission Issue|12575|
| Starter Motor Issue|12668|
+--------------------+-----+



                                                                                

In [18]:
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
)

In [19]:
string_indexer = [
    StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip")
    for x in catCols
]

In [20]:
string_indexer

[StringIndexer_112be833a641,
 StringIndexer_a577d5b7a071,
 StringIndexer_b30d3f4f80c2]

In [21]:
one_hot_encoder = [
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in catCols],
        outputCols=[f"{x}_OneHotEncoder" for x in catCols],
    )
]

In [22]:
one_hot_encoder

[OneHotEncoder_0580166efa5f]

In [23]:
from pyspark.ml.feature import VectorAssembler

In [24]:
assemblerInput = [x for x in numCols]
assemblerInput += [f"{x}_OneHotEncoder" for x in catCols]

In [25]:
assemblerInput

['repair_complexity',
 'repair_cost',
 'repair_hours',
 'Maker_OneHotEncoder',
 'Engin_size_OneHotEncoder',
 'issue_OneHotEncoder']

In [26]:
vector_assembler = VectorAssembler(
    inputCols=assemblerInput, outputCol="VectorAssembler_features"
)

In [27]:
stages = []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]

In [28]:
stages

[StringIndexer_112be833a641,
 StringIndexer_a577d5b7a071,
 StringIndexer_b30d3f4f80c2,
 OneHotEncoder_0580166efa5f,
 VectorAssembler_31c43a10cfe4]

In [29]:
%%time
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)
model = pipeline.fit(train)

pp_df = model.transform(test)

                                                                                

CPU times: user 63.2 ms, sys: 2.51 ms, total: 65.7 ms
Wall time: 5.86 s


In [30]:
pp_df.select(
    "Maker", "Engin_size", "issue", "repair_cost", "repair_hours", "repair_complexity",
    "VectorAssembler_features",
).show(truncate=False)

+-------+----------+------------------+-----------------+------------------+-----------------+-------------------------------------------------------------------------------+
|Maker  |Engin_size|issue             |repair_cost      |repair_hours      |repair_complexity|VectorAssembler_features                                                       |
+-------+----------+------------------+-----------------+------------------+-----------------+-------------------------------------------------------------------------------+
|Bentley|4.0L      |Alternator Failing|1362.0           |6.0               |3                |(172,[0,1,2,33,112,170],[3.0,1362.0,6.0,1.0,1.0,1.0])                          |
|Bentley|4.0L      |Alternator Failing|1650.0           |6.0               |3                |(172,[0,1,2,33,112,170],[3.0,1650.0,6.0,1.0,1.0,1.0])                          |
|Bentley|4.0L      |Alternator Failing|1759.8           |6.0               |3                |(172,[0,1,2,33,112,170],[3.0,17

In [31]:
from pyspark.ml.classification import LogisticRegression

In [32]:
data = pp_df.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("IsManual").alias("label"),
)

In [33]:
data.show(1000, truncate=False)

+--------------------------------------------------------------------------------+-----+
|features                                                                        |label|
+--------------------------------------------------------------------------------+-----+
|(172,[0,1,2,33,112,170],[3.0,1362.0,6.0,1.0,1.0,1.0])                           |0    |
|(172,[0,1,2,33,112,170],[3.0,1650.0,6.0,1.0,1.0,1.0])                           |0    |
|(172,[0,1,2,33,112,170],[3.0,1759.8,6.0,1.0,1.0,1.0])                           |0    |
|(172,[0,1,2,33,112,170],[3.0,2579.54,6.0,1.0,1.0,1.0])                          |0    |
|(172,[0,1,2,33,112,170],[3.0,2719.0,6.0,1.0,1.0,1.0])                           |0    |
|(172,[0,1,2,33,112,170],[3.0,2719.0,6.0,1.0,1.0,1.0])                           |0    |
|(172,[0,1,2,33,112,170],[3.0,12708.66414596649,6.0,1.0,1.0,1.0])                |0    |
|(172,[0,1,2,33,112,170],[3.0,15176.81867835596,12564.650603874441,1.0,1.0,1.0]) |0    |
|(172,[0,1,2,33,112,1

                                                                                

In [34]:
%%time
model = LogisticRegression().fit(data)

25/06/04 05:34:55 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

CPU times: user 37.3 ms, sys: 12.9 ms, total: 50.2 ms
Wall time: 30.2 s


In [35]:
model.summary.areaUnderROC

                                                                                

0.8624946276856541

In [36]:
model.summary.pr.show()

+--------------------+------------------+
|              recall|         precision|
+--------------------+------------------+
|                 0.0|               1.0|
|0.001811982464685...|               1.0|
|0.003292742328300...|               1.0|
|0.004773502191914272|               1.0|
|0.006293229420360448|0.9969135802469136|
|0.007735021919142718|            0.9925|
|0.009254749147588894|0.9895833333333334|
|0.010754992693619094| 0.989247311827957|
|0.012177301509985387| 0.984251968503937|
| 0.01365806137359961|0.9845505617977528|
|0.015119337554797857|0.9847715736040609|
| 0.01660009741841208|0.9849710982658959|
|0.018002922552362396|0.9808917197452229|
|0.019522649780808574|0.9794721407624634|
|0.021061860691670726|0.9800543970988214|
|0.022601071602532878|0.9805579036348268|
| 0.02414028251339503|0.9763593380614657|
|  0.0255820750121773| 0.975482912332838|
| 0.02702386751095957|0.9747013352073085|
|0.028543594739405747|0.9753661784287616|
+--------------------+------------