In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Car_Price_Prediction').getOrCreate()
spark

In [3]:
# read the dataset
df = spark.read.csv('../data/Car details v3.csv', header=True, inferSchema=True)

display(df)

DataFrame[name: string, year: int, selling_price: int, km_driven: int, fuel: string, seller_type: string, transmission: string, owner: string, mileage: string, engine: string, max_power: string, torque: string, seats: int]

In [4]:
df.head(2)

[Row(name='Maruti Swift Dzire VDI', year=2014, selling_price=450000, km_driven=145500, fuel='Diesel', seller_type='Individual', transmission='Manual', owner='First Owner', mileage='23.4 kmpl', engine='1248 CC', max_power='74 bhp', torque='190Nm@ 2000rpm', seats=5),
 Row(name='Skoda Rapid 1.5 TDI Ambition', year=2014, selling_price=370000, km_driven=120000, fuel='Diesel', seller_type='Individual', transmission='Manual', owner='Second Owner', mileage='21.14 kmpl', engine='1498 CC', max_power='103.52 bhp', torque='250Nm@ 1500-2500rpm', seats=5)]

In [5]:
# check the schema and datatypes of columns
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- selling_price: integer (nullable = true)
 |-- km_driven: integer (nullable = true)
 |-- fuel: string (nullable = true)
 |-- seller_type: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- mileage: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- max_power: string (nullable = true)
 |-- torque: string (nullable = true)
 |-- seats: integer (nullable = true)



In [6]:
df.describe().show()

+-------+--------------------+------------------+-----------------+------------------+------+----------------+------------+-----------+--------+-------+---------+--------------------+----------------+
|summary|                name|              year|    selling_price|         km_driven|  fuel|     seller_type|transmission|      owner| mileage| engine|max_power|              torque|           seats|
+-------+--------------------+------------------+-----------------+------------------+------+----------------+------------+-----------+--------+-------+---------+--------------------+----------------+
|  count|                8128|              8128|             8128|              8128|  8128|            8128|        8128|       8128|    7907|   7907|     7913|                7906|            7907|
|   mean|                null|2013.8040108267717|638271.8077017716| 69819.51082677166|  null|            null|        null|       null|    null|   null|      0.0|                null|5.41671936259

In [7]:
# drop rows with null values

df = df.na.drop()
df.describe().show()

+-------+--------------------+------------------+-----------------+------------------+------+----------------+------------+-----------+--------+-------+---------+--------------------+------------------+
|summary|                name|              year|    selling_price|         km_driven|  fuel|     seller_type|transmission|      owner| mileage| engine|max_power|              torque|             seats|
+-------+--------------------+------------------+-----------------+------------------+------+----------------+------------+-----------+--------+-------+---------+--------------------+------------------+
|  count|                7906|              7906|             7906|              7906|  7906|            7906|        7906|       7906|    7906|   7906|     7906|                7906|              7906|
|   mean|                null|2013.9839362509485| 649813.720844928| 69188.65975208703|  null|            null|        null|       null|    null|   null|     null|                null|5.416

In [8]:
import pandas as pd

# change column 'name' to 'brand' and standardize mileage

# copy the spark df schema
schema = df.schema

# create pandas df
pandas_df = df.toPandas()

# store standardized mileage values
correct_mileage= []

# replace 'name' with 'brand' and prepare some columns for changing datatypes by deleting unit 
for i in range(0, len(pandas_df)):
    name = pandas_df['name'].iloc[i]
    name_splitted = name.split()
    brand = name_splitted[0]
    pandas_df['name'].iloc[i] = brand
    
    if pandas_df['mileage'].iloc[i].endswith('km/kg'):
        mile = pandas_df['mileage'].iloc[i][:-6]
        mile = float(mile)*1.40
        correct_mileage.append(float(mile))
    elif pandas_df['mileage'].iloc[i].endswith('kmpl'):
        mile = pandas_df['mileage'].iloc[i][:-6]
        correct_mileage.append(float(mile))

    pandas_df['engine'].iloc[i] = pandas_df['engine'].iloc[i][:-4]
    pandas_df['max_power'].iloc[i] = pandas_df['max_power'].iloc[i][:-5]

# replace mileage with standardized values
pandas_df['mileage'] = correct_mileage

# update the df with the new value using the pandas df
df = spark.createDataFrame(pandas_df,schema=schema)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  pandas_df['name'].iloc[i] = brand
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  pandas_df['engine'].iloc[i] = pandas_df['engine'].iloc[i][:-4]
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  pandas_df['max_power'].iloc[i] = pandas_df['max_power'].iloc[i][:-5]


KeyboardInterrupt: 

In [None]:
pandas_df.head()

In [None]:
del pandas_df

In [None]:
# rename some colums
df = df.withColumnRenamed('mileage', 'mileage_kmpl')
df = df.withColumnRenamed('engine', 'engine_CC')
df = df.withColumnRenamed('max_power', 'max_power_bhp')

# drop 'torque'
df = df.drop('torque')

In [None]:
# change datatypes of some columns
#df.withColumn("mileage_kmpl",df.mileage_kmpl.cast('float'))
df = df.selectExpr("cast(name as string) name",
    "cast(year as int) year",
    "cast(selling_price as int) selling_price",
    "cast(km_driven as int) km_driven",
    "cast(fuel as string) fuel",
    "cast(seller_type as string) seller_type",
    "cast(transmission as string) transmission",
    "cast(owner as string) owner",
    "cast(mileage_kmpl as float) mileage_kmpl",
    "cast(engine_CC as int) engine_CC",
    "cast(max_power_bhp as float) max_power_bhp",
    "cast(seats as int) seats")

df.printSchema()

In [None]:
df.head(3)

In [None]:
from pyspark.ml.feature import StringIndexer

# string indexing for regression
indexer = StringIndexer(inputCols=['name', 'fuel', 'seller_type', 'transmission', 'owner'],
                        outputCols=['name_indexed', 'fuel_indexed', 'seller_type_indexed', 'transmission_indexed', 'owner_indexed'])

df_reg = indexer.fit(df).transform(df)
df_reg.show()

In [None]:
df_reg.columns

In [None]:
from pyspark.ml.feature import VectorAssembler

# group together all independent features
feature_assembler = VectorAssembler(inputCols=['year', 'km_driven', 'mileage_kmpl', 'engine_CC', 'max_power_bhp', 'seats', 'name_indexed', 'fuel_indexed',
                           'seller_type_indexed', 'transmission_indexed', 'owner_indexed'], outputCol='independent_Features')

output = feature_assembler.transform(df_reg)

In [None]:
output.select('independent_Features').show()

In [None]:
finalized_data = output.select('independent_Features', 'selling_price')
finalized_data.show()

In [None]:
from pyspark.ml.regression import LinearRegression

# create linear regression

# train+test split
train_data, test_data = finalized_data.randomSplit([0.75, 0.25])
regressor = LinearRegression(featuresCol='independent_Features', labelCol='selling_price')
regressor = regressor.fit(train_data)

In [None]:
regressor.coefficients

In [None]:
regressor.intercept

In [None]:
# predictions
pred_results = regressor.evaluate(test_data)

# final comparison
pred_results.predictions.show()

In [None]:
# preformance metrics
pred_results.r2, pred_results.meanAbsoluteError, pred_results.meanSquaredError