In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName('IMMLDDT').getOrCreate()

In [0]:
file_location = "/FileStore/tables/car_prices.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)
#df.head()

year,make,model,trim,body,transmission,vin,state,condition,odometer,color,interior,seller,mmr,sellingprice,saledate
2015,Kia,Sorento,LX,SUV,automatic,5xyktca69fg566472,ca,5.0,16639.0,white,black,"kia motors america, inc",20500,21500,Tue Dec 16 2014 12:30:00 GMT-0800 (PST)
2015,Kia,Sorento,LX,SUV,automatic,5xyktca69fg561319,ca,5.0,9393.0,white,beige,"kia motors america, inc",20800,21500,Tue Dec 16 2014 12:30:00 GMT-0800 (PST)
2014,BMW,3 Series,328i SULEV,Sedan,automatic,wba3c1c51ek116351,ca,4.5,1331.0,gray,black,financial services remarketing (lease),31900,30000,Thu Jan 15 2015 04:30:00 GMT-0800 (PST)
2015,Volvo,S60,T5,Sedan,automatic,yv1612tb4f1310987,ca,4.1,14282.0,white,black,volvo na rep/world omni,27500,27750,Thu Jan 29 2015 04:30:00 GMT-0800 (PST)
2014,BMW,6 Series Gran Coupe,650i,Sedan,automatic,wba6b2c57ed129731,ca,4.3,2641.0,gray,black,financial services remarketing (lease),66000,67000,Thu Dec 18 2014 12:30:00 GMT-0800 (PST)
2015,Nissan,Altima,2.5 S,Sedan,automatic,1n4al3ap1fn326013,ca,1.0,5554.0,gray,black,enterprise vehicle exchange / tra / rental / tulsa,15350,10900,Tue Dec 30 2014 12:00:00 GMT-0800 (PST)
2014,BMW,M5,Base,Sedan,automatic,wbsfv9c51ed593089,ca,3.4,14943.0,black,black,the hertz corporation,69000,65000,Wed Dec 17 2014 12:30:00 GMT-0800 (PST)
2014,Chevrolet,Cruze,1LT,Sedan,automatic,1g1pc5sb2e7128460,ca,2.0,28617.0,black,black,enterprise vehicle exchange / tra / rental / tulsa,11900,9800,Tue Dec 16 2014 13:00:00 GMT-0800 (PST)
2014,Audi,A4,2.0T Premium Plus quattro,Sedan,automatic,wauffafl3en030343,ca,4.2,9557.0,white,black,audi mission viejo,32100,32250,Thu Dec 18 2014 12:00:00 GMT-0800 (PST)
2014,Chevrolet,Camaro,LT,Convertible,automatic,2g1fb3d37e9218789,ca,3.0,4809.0,red,black,d/m auto sales inc,26300,17500,Tue Jan 20 2015 04:00:00 GMT-0800 (PST)


In [0]:
df.printSchema()

In [0]:
# Selecting the dependent and the independent variables that are identified as most useful attributes to make predictions

data=df.select(['year', 'make','model','trim','body',
                                 'transmission','condition','odometer', 'color', 'state', 'interior', 'seller','sellingprice'])

In [0]:
data=data.dropna()

In [0]:
# Create a 70-30 train test split

train_data,test_data=data.randomSplit([0.7,0.3])

### Building the Decision Tree Regression

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession

In [0]:
# Use StringIndexer to convert the categorical columns to hold numerical data

make_indexer = StringIndexer(inputCol='make',outputCol='make_index',handleInvalid='keep')
model_indexer = StringIndexer(inputCol='model',outputCol='model_index',handleInvalid='keep')
trim_indexer = StringIndexer(inputCol='trim',outputCol='trim_index',handleInvalid='keep')
body_indexer = StringIndexer(inputCol='body',outputCol='body_index',handleInvalid='keep')
transmission_indexer = StringIndexer(inputCol='transmission',outputCol='transmission_index',handleInvalid='keep')
state_indexer = StringIndexer(inputCol='state',outputCol='state_index',handleInvalid='keep')
color_indexer = StringIndexer(inputCol='color',outputCol='color_index',handleInvalid='keep')
interior_indexer = StringIndexer(inputCol='interior',outputCol='interior_index',handleInvalid='keep')
seller_indexer = StringIndexer(inputCol='seller',outputCol='seller_index',handleInvalid='keep')
condition_indexer = StringIndexer(inputCol='condition',outputCol='condition_index',handleInvalid='keep')


In [0]:
# Vector assembler is used to create a vector of input features

assembler = VectorAssembler(inputCols=['year','make_index','model_index','trim_index','body_index',
                                       'transmission_index','state_index','condition_index','odometer','color_index', 'interior_index', 'seller_index'],
                            outputCol="features")

In [0]:
# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="features")

In [0]:
# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data

pipe = Pipeline(stages=[make_indexer, model_indexer,trim_indexer,body_indexer,
                                       transmission_indexer, state_indexer,color_indexer,interior_indexer, seller_indexer,condition_indexer, assembler])

In [0]:
#fit model train data

fit_model=pipe.fit(train_data)

In [0]:
# Store the results in a dataframe

results = fit_model.transform(test_data)
display(results)

year,make,model,trim,body,transmission,condition,odometer,color,state,interior,seller,sellingprice,make_index,model_index,trim_index,body_index,transmission_index,state_index,color_index,interior_index,seller_index,condition_index,features
1990,Honda,Accord,EX,Sedan,automatic,2.0,19279.0,gray,tx,tan,automotive remarketing inc,350,5.0,6.0,14.0,0.0,0.0,2.0,3.0,3.0,96.0,9.0,"Map(vectorType -> dense, length -> 12, values -> List(1990.0, 5.0, 6.0, 14.0, 0.0, 0.0, 2.0, 9.0, 19279.0, 3.0, 3.0, 96.0))"
1990,Lexus,LS 400,Base,Sedan,automatic,3.0,106472.0,white,ca,tan,illest motors,700,14.0,437.0,0.0,0.0,0.0,1.0,1.0,3.0,5863.0,24.0,"Map(vectorType -> dense, length -> 12, values -> List(1990.0, 14.0, 437.0, 0.0, 0.0, 0.0, 1.0, 24.0, 106472.0, 1.0, 3.0, 5863.0))"
1990,Mercedes-Benz,300-Class,300E,Sedan,automatic,2.0,141799.0,white,nv,—,automotive remarketing inc,300,10.0,621.0,1186.0,0.0,0.0,12.0,1.0,4.0,96.0,9.0,"Map(vectorType -> dense, length -> 12, values -> List(1990.0, 10.0, 621.0, 1186.0, 0.0, 0.0, 12.0, 9.0, 141799.0, 1.0, 4.0, 96.0))"
1990,Toyota,Camry,Deluxe,Sedan,automatic,2.0,122877.0,blue,ca,—,charitable auto resource,400,3.0,3.0,151.0,0.0,0.0,1.0,4.0,4.0,255.0,9.0,"Map(vectorType -> dense, length -> 12, values -> List(1990.0, 3.0, 3.0, 151.0, 0.0, 0.0, 1.0, 9.0, 122877.0, 4.0, 4.0, 255.0))"
1991,Honda,Accord,SE,Sedan,automatic,2.0,186903.0,gray,md,blue,purple heart services inc,275,5.0,6.0,1.0,0.0,0.0,13.0,3.0,8.0,82.0,9.0,"Map(vectorType -> dense, length -> 12, values -> List(1991.0, 5.0, 6.0, 1.0, 0.0, 0.0, 13.0, 9.0, 186903.0, 3.0, 8.0, 82.0))"
1991,Mazda,MX-5 Miata,Base,Convertible,automatic,2.0,110154.0,white,nv,—,automotive remarketing inc,800,16.0,302.0,0.0,9.0,0.0,12.0,1.0,4.0,96.0,9.0,"Map(vectorType -> dense, length -> 12, values -> List(1991.0, 16.0, 302.0, 0.0, 9.0, 0.0, 12.0, 9.0, 110154.0, 1.0, 4.0, 96.0))"
1991,Mercedes-Benz,500-Class,500SL,Convertible,automatic,3.0,175568.0,silver,ca,gray,bob baker jeep,1300,10.0,662.0,1113.0,9.0,0.0,1.0,2.0,1.0,229.0,24.0,"Map(vectorType -> dense, length -> 12, values -> List(1991.0, 10.0, 662.0, 1113.0, 9.0, 0.0, 1.0, 24.0, 175568.0, 2.0, 1.0, 229.0))"
1991,Toyota,Camry,Deluxe,Sedan,automatic,2.0,252591.0,gray,ca,gray,honda of serramonte,150,3.0,3.0,151.0,0.0,0.0,1.0,3.0,1.0,1781.0,9.0,"Map(vectorType -> dense, length -> 12, values -> List(1991.0, 3.0, 3.0, 151.0, 0.0, 0.0, 1.0, 9.0, 252591.0, 3.0, 1.0, 1781.0))"
1991,Toyota,Camry,Deluxe,Sedan,automatic,3.0,244226.0,white,fl,blue,autonation toyota winter park,500,3.0,3.0,151.0,0.0,0.0,0.0,1.0,8.0,244.0,24.0,"Map(vectorType -> dense, length -> 12, values -> List(1991.0, 3.0, 3.0, 151.0, 0.0, 0.0, 0.0, 24.0, 244226.0, 1.0, 8.0, 244.0))"
1992,Buick,Park Avenue,Base,Sedan,automatic,1.0,136859.0,blue,ga,blue,rick hendrick chevrolet,700,21.0,378.0,0.0,0.0,0.0,3.0,4.0,8.0,265.0,31.0,"Map(vectorType -> dense, length -> 12, values -> List(1992.0, 21.0, 378.0, 0.0, 0.0, 0.0, 3.0, 31.0, 136859.0, 4.0, 8.0, 265.0))"


In [0]:
results.select(['sellingprice', 'features']).show()

##### Evaluating the model

In [0]:
evaluator = RegressionEvaluator(labelCol="sellingprice",predictionCol="features", metricName="rmse")
rmse = evaluator.evaluate(results)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [0]:
treeModel = model.stages[1]
# summary only
print(treeModel)