In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import pandas_profiling as pp

In [4]:
import findspark
findspark.init('d:\\3kurs1sem\\semestrone\\JUPYTER\\spark\\spark')
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("HR") \
    .getOrCreate()

data = spark.read.csv(r'd:\3kurs1sem\semestrone\JUPYTER\Fractal\COURSE3\GroupAssigment\data\DataPreproccesed.csv',
                    header='true', 
                    inferSchema='true')




In [5]:
data.show()

+-----+---------+----------------+-------------+------------------+-----------------+-----------------+---------------------+--------------+-----------------------+--------------------+-----------+-------+---------------------------------+-----------------------------+----------------------------------+-----------------+------------------------+------------------+-----------------+-----------------+-----------------------------+-------------------------+-----------------------+---------------------+--------------------------------+------+-----------+-----------+-----------+-----------+------------------------+------------------------------+----------------+-------------------------------+--------------------------+---------------------------+------------------------+-----------------------------+----------------------+---------------------+-------------------+-------------------+-------------------+----------------------------+-------------------------------+---------------------------

In [26]:
data = data.drop('car_ID')

## Lets split CarName into Company & Model

In [31]:
from pyspark.sql.functions import split
split_col = pyspark.sql.functions.split(data['CarName'], ' ')
data = data.withColumn('Company', split_col.getItem(0))
data = data.withColumn('Model', split_col.getItem(1))

In [35]:
data = data.drop('CarName','Model')

In [48]:
data.groupBy('Company').count().show(40)

+-----------+-----+
|    Company|count|
+-----------+-----+
|     jaguar|    3|
|      buick|    8|
| mitsubishi|   13|
|     toyota|   32|
|       saab|    6|
|    peugeot|   11|
|   plymouth|    7|
|       audi|    7|
|alfa-romero|    3|
|        bmw|    8|
|      dodge|    9|
|      mazda|   17|
|      isuzu|    4|
|    porsche|    5|
|  chevrolet|    3|
|      honda|   13|
| volkswagen|   12|
|    mercury|    1|
|    renault|    2|
|     nissan|   17|
|     subaru|   12|
|     Nissan|    1|
|      volvo|   11|
+-----------+-----+



# Replace incorrect company names 

In [46]:
from pyspark.sql.functions import *
data = data.withColumn('Company', regexp_replace('Company', 'maxda', 'mazda'))
data = data.withColumn('Company', regexp_replace('Company', 'porcshce', 'porsche'))
data = data.withColumn('Company', regexp_replace('Company', 'toyouta', 'toyota'))
data = data.withColumn('Company', regexp_replace('Company', 'vokswagen', 'volkswagen'))
data = data.withColumn('Company', regexp_replace('Company', 'vw', 'volkswagen'))

In [47]:
data.groupBy('Company').count().show(40)

+-----------+-----+
|    Company|count|
+-----------+-----+
|     jaguar|    3|
|      buick|    8|
| mitsubishi|   13|
|     toyota|   32|
|       saab|    6|
|    peugeot|   11|
|   plymouth|    7|
|       audi|    7|
|alfa-romero|    3|
|        bmw|    8|
|      dodge|    9|
|      mazda|   17|
|      isuzu|    4|
|    porsche|    5|
|  chevrolet|    3|
|      honda|   13|
| volkswagen|   12|
|    mercury|    1|
|    renault|    2|
|     nissan|   17|
|     subaru|   12|
|     Nissan|    1|
|      volvo|   11|
+-----------+-----+



# Create new feature CarVol base on carlength,carwidth,carheight

In [61]:
data = data.withColumn('CarVol',data['carlength']*data['carwidth']*data['carheight'])


In [64]:
data = data.drop('carwidth','carheight','carlength')

In [172]:
X_data = data

In [173]:
X_data.show()

+---------+--------+----------+----------+-----------+----------+--------------+---------+----------+----------+--------------+----------+----------+---------+------+----------------+----------+-------+-------+----------+---------+-----------+-----------------+
|symboling|fueltype|aspiration|doornumber|    carbody|drivewheel|enginelocation|wheelbase|curbweight|enginetype|cylindernumber|enginesize|fuelsystem|boreratio|stroke|compressionratio|horsepower|peakrpm|citympg|highwaympg|    price|    Company|           CarVol|
+---------+--------+----------+----------+-----------+----------+--------------+---------+----------+----------+--------------+----------+----------+---------+------+----------------+----------+-------+-------+----------+---------+-----------+-----------------+
|        3|     gas|       std|       two|convertible|       rwd|         front|     88.6|      2548|      dohc|          four|       130|      mpfi|     3.47|  2.68|             9.0|       111|   5000|     21|    

# Create dummy variables

In [174]:
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator

dummy_list = ['Company', 'carbody', 'drivewheel',
                         'enginetype', 'cylindernumber', 'fuelsystem',
                         'fueltype', 'doornumber', 'aspiration', 'enginelocation']
for col_name in dummy_list:
    stringIndexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_Index")
    model = stringIndexer.fit(X_data)
    indexed = model.transform(X_data)
    encoder = OneHotEncoder(dropLast=False, inputCol=col_name + "_Index", outputCol=col_name + "_Vec")
    encoded = encoder.transform(indexed)
    X_data = encoded.drop(col_name + "_Index", col_name)

In [175]:
X_data.show()

+---------+---------+----------+----------+---------+------+----------------+----------+-------+-------+----------+---------+-----------------+---------------+-------------+--------------+--------------+------------------+--------------+-------------+--------------+--------------+------------------+
|symboling|wheelbase|curbweight|enginesize|boreratio|stroke|compressionratio|horsepower|peakrpm|citympg|highwaympg|    price|           CarVol|    Company_Vec|  carbody_Vec|drivewheel_Vec|enginetype_Vec|cylindernumber_Vec|fuelsystem_Vec| fueltype_Vec|doornumber_Vec|aspiration_Vec|enginelocation_Vec|
+---------+---------+----------+----------+---------+------+----------------+----------+-------+-------+----------+---------+-----------------+---------------+-------------+--------------+--------------+------------------+--------------+-------------+--------------+--------------+------------------+
|        3|     88.6|      2548|       130|     3.47|  2.68|             9.0|       111|   5000| 

## Symboling is ordinal categorical varibale, we just replase sign for better interpretation -3 = risky, +3 safety

In [176]:
X_data = X_data.withColumn('symboling', X_data.symboling * -1)

In [177]:
X_data.printSchema()

root
 |-- symboling: integer (nullable = true)
 |-- wheelbase: double (nullable = true)
 |-- curbweight: integer (nullable = true)
 |-- enginesize: integer (nullable = true)
 |-- boreratio: double (nullable = true)
 |-- stroke: double (nullable = true)
 |-- compressionratio: double (nullable = true)
 |-- horsepower: integer (nullable = true)
 |-- peakrpm: integer (nullable = true)
 |-- citympg: integer (nullable = true)
 |-- highwaympg: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- CarVol: double (nullable = true)
 |-- Company_Vec: vector (nullable = true)
 |-- carbody_Vec: vector (nullable = true)
 |-- drivewheel_Vec: vector (nullable = true)
 |-- enginetype_Vec: vector (nullable = true)
 |-- cylindernumber_Vec: vector (nullable = true)
 |-- fuelsystem_Vec: vector (nullable = true)
 |-- fueltype_Vec: vector (nullable = true)
 |-- doornumber_Vec: vector (nullable = true)
 |-- aspiration_Vec: vector (nullable = true)
 |-- enginelocation_Vec: vector (nullable = true

# Check NaN or Duplicates

In [178]:
X_data = X_data.drop_duplicates()

In [179]:
from pyspark.sql import functions as F
X_data.where(F.isnull(F.col("wheelbase"))).show()

+---------+---------+----------+----------+---------+------+----------------+----------+-------+-------+----------+-----+------+-----------+-----------+--------------+--------------+------------------+--------------+------------+--------------+--------------+------------------+
|symboling|wheelbase|curbweight|enginesize|boreratio|stroke|compressionratio|horsepower|peakrpm|citympg|highwaympg|price|CarVol|Company_Vec|carbody_Vec|drivewheel_Vec|enginetype_Vec|cylindernumber_Vec|fuelsystem_Vec|fueltype_Vec|doornumber_Vec|aspiration_Vec|enginelocation_Vec|
+---------+---------+----------+----------+---------+------+----------------+----------+-------+-------+----------+-----+------+-----------+-----------+--------------+--------------+------------------+--------------+------------+--------------+--------------+------------------+
+---------+---------+----------+----------+---------+------+----------------+----------+-------+-------+----------+-----+------+-----------+-----------+-----------

## Make our model

In [180]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['symboling', 'wheelbase', 'curbweight', 'enginesize', 'boreratio',
                                               'stroke', 'compressionratio', 'horsepower', 'peakrpm', 'citympg', 'highwaympg',
                                               'CarVol', 'Company_Vec','carbody_Vec','drivewheel_Vec','enginetype_Vec',
                                              'cylindernumber_Vec','fuelsystem_Vec','fueltype_Vec','doornumber_Vec',
                                              'aspiration_Vec','enginelocation_Vec'], outputCol = 'features')
vX_data = vectorAssembler.transform(X_data)
vX_data = vX_data.select(['features', 'price'])
vX_data.show(3)

+--------------------+-------+
|            features|  price|
+--------------------+-------+
|(73,[1,2,3,4,5,6,...| 7295.0|
|(73,[1,2,3,4,5,6,...|11259.0|
|(73,[0,1,2,3,4,5,...|13415.0|
+--------------------+-------+
only showing top 3 rows



In [186]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='price', maxIter=1000, regParam=1000, elasticNetParam=0.8)
lr_model = lr.fit(vX_data)
print("Coefficients: " + str(lr_model.coefficients))

Coefficients: [0.0,0.0,3.5301870680762204,53.75818610293014,0.0,0.0,0.0,30.160537609571875,0.0,0.0,0.0,0.0018009830786524558,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3405.143702435464,3994.1229783693852,0.0,0.0,0.0,2200.0694009152835,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,414.18297282698,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-1742.6876497451362,0.0,0.0,346.5634397839349,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-1929.4721754714055,1929.4721754721988]
Intercept: -4101.522262528426


In [187]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 2805.573687
r2: 0.876064


In [204]:
import numpy as np
import pandas as pd
pd.Series(np.array(lr_model.coefficients)).sort_values(ascending=False).head()

23    3994.122978
22    3405.143702
27    2200.069401
72    1929.472175
41     414.182973
dtype: float64

# Importance Features 23 22 27 72 41

Now lets try Forest

In [208]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'price')
dt_model = dt.fit(vX_data)
dt_predictions = dt_model.transform(vX_data)
dt_evaluator = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 1543.12


In [209]:
dt_model.featureImportances

SparseVector(73, {0: 0.0008, 1: 0.0121, 2: 0.2218, 3: 0.7038, 4: 0.0068, 5: 0.0052, 7: 0.0167, 10: 0.0154, 11: 0.0023, 12: 0.0062, 35: 0.0004, 36: 0.0011, 50: 0.0062, 58: 0.0012})

# Importance Features 3 2 1 4
(wheelbase, curbweight, enginesize)