In [1]:
import math
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
spark = SparkSession.builder.appName("USA Cars").getOrCreate()

In [3]:
spark

In [6]:
dataset = spark.read.csv("USA_cars_datasets.csv", inferSchema=True, header=True)
dataset.show(3)

+---+-----+------+-------+----+-------------+-------+------+-------------------+---------+----------+-------+------------+
|_c0|price| brand|  model|year| title_status|mileage| color|                vin|      lot|     state|country|   condition|
+---+-----+------+-------+----+-------------+-------+------+-------------------+---------+----------+-------+------------+
|  0| 6300|toyota|cruiser|2008|clean vehicle| 274117| black|  jtezu11f88k007763|159348797|new jersey|    usa|10 days left|
|  1| 2899|  ford|     se|2011|clean vehicle| 190552|silver|  2fmdk3gc4bbb02217|166951262| tennessee|    usa| 6 days left|
|  2| 5350| dodge|    mpv|2018|clean vehicle|  39590|silver|  3c4pdcgg5jt346413|167655728|   georgia|    usa| 2 days left|
+---+-----+------+-------+----+-------------+-------+------+-------------------+---------+----------+-------+------------+
only showing top 3 rows



In [7]:
new = dataset.withColumn("prices", dataset["price"])
new = new.drop("_c0", "lot", "vin", "price")
new.show(3)

+------+-------+----+-------------+-------+------+----------+-------+------------+------+
| brand|  model|year| title_status|mileage| color|     state|country|   condition|prices|
+------+-------+----+-------------+-------+------+----------+-------+------------+------+
|toyota|cruiser|2008|clean vehicle| 274117| black|new jersey|    usa|10 days left|  6300|
|  ford|     se|2011|clean vehicle| 190552|silver| tennessee|    usa| 6 days left|  2899|
| dodge|    mpv|2018|clean vehicle|  39590|silver|   georgia|    usa| 2 days left|  5350|
+------+-------+----+-------------+-------+------+----------+-------+------------+------+
only showing top 3 rows



In [8]:
def conditionToMinutes(word):
    wordList = word.split()
    if(wordList[1] == "days"):
        return int(wordList[0]) * 24 * 60
    elif(wordList[1] == "hours"):
        return int(wordList[0]) * 60
    elif(wordList[1] == "minutes"):
        return int(wordList[0])
    else:
        return 0

conditionToMinutesUDF = F.udf(conditionToMinutes, T.IntegerType())
new = new.withColumn("condition", conditionToMinutesUDF(new.condition))
new.show(3)

+------+-------+----+-------------+-------+------+----------+-------+---------+------+
| brand|  model|year| title_status|mileage| color|     state|country|condition|prices|
+------+-------+----+-------------+-------+------+----------+-------+---------+------+
|toyota|cruiser|2008|clean vehicle| 274117| black|new jersey|    usa|    14400|  6300|
|  ford|     se|2011|clean vehicle| 190552|silver| tennessee|    usa|     8640|  2899|
| dodge|    mpv|2018|clean vehicle|  39590|silver|   georgia|    usa|     2880|  5350|
+------+-------+----+-------------+-------+------+----------+-------+---------+------+
only showing top 3 rows



In [9]:
new.dtypes

[('brand', 'string'),
 ('model', 'string'),
 ('year', 'int'),
 ('title_status', 'string'),
 ('mileage', 'int'),
 ('color', 'string'),
 ('state', 'string'),
 ('country', 'string'),
 ('condition', 'int'),
 ('prices', 'int')]

In [10]:
for i in new.dtypes:
    if(i[1] == "string"):
        print(i)
        new.select(i[0])
        categ = new.select(i[0]).distinct().rdd.flatMap(lambda x:x).collect()
        print("---- done categ")
        exprs = [F.when(F.col(i[0]) == cat, 1).otherwise(0).alias(str(cat)) for cat in categ]
        print("---- done exprs")
        new = new.select(exprs+new.columns)
        print("---- done exprs+new.columns")
        new = new.drop(i[0], categ[0])
        print("---- done drop")

('brand', 'string')
---- done categ
---- done exprs
---- done exprs+new.columns
---- done drop
('model', 'string')
---- done categ
---- done exprs
---- done exprs+new.columns
---- done drop
('title_status', 'string')
---- done categ
---- done exprs
---- done exprs+new.columns
---- done drop
('color', 'string')
---- done categ
---- done exprs
---- done exprs+new.columns
---- done drop
('state', 'string')
---- done categ
---- done exprs
---- done exprs+new.columns
---- done drop
('country', 'string')
---- done categ
---- done exprs
---- done exprs+new.columns
---- done drop


In [11]:
new.show(3)

+----+-------------+-------+--------+--------------+------------+-------+-------+--------------+----------+-----+--------+----------+-------------+------------+--------+------+----+-----------+------+--------+---------+-----+-------+--------+-------+--------+---------+----------+--------+--------+-------+----+--------+-------+---------+----------+---------+-------+--------+--------+-------+-----------+------+---------------------+----------------------+---------------------------------------+--------+------+-------------------+---------+------------------+--------+------+-----+------------+------+------------+---------------+---------------+-----------+-----------+------+------+-----+----+----------------------------------+-------------+---+----+---------+--------+-----+-----------------+---------------------+------+---------+-----+---+-------------+----------+------------+-----------+------------------+-----+-----+----------------------+--------------------------------+----+----------

In [12]:
new.toPandas().to_csv('newDataset.csv')

In [13]:
featuresAssembler = VectorAssembler(inputCols=new.columns[:-1], outputCol="features")

In [14]:
features = featuresAssembler.transform(new)

In [15]:
features.select("features").show(3)

+--------------------+
|            features|
+--------------------+
|(250,[0,9,84,92,1...|
|(250,[0,21,62,92,...|
|(250,[0,38,62,92,...|
+--------------------+
only showing top 3 rows



In [16]:
finalData = features.select("features", "prices")

In [17]:
finalData.show(3)

+--------------------+------+
|            features|prices|
+--------------------+------+
|(250,[0,9,84,92,1...|  6300|
|(250,[0,21,62,92,...|  2899|
|(250,[0,38,62,92,...|  5350|
+--------------------+------+
only showing top 3 rows



In [18]:
train, test = finalData.randomSplit([0.8, 0.2], 3)

In [19]:
regressor = LinearRegression(featuresCol="features", labelCol="prices")
regressor = regressor.fit(train)

In [20]:
pred = regressor.evaluate(test)
evaluation = RegressionEvaluator(labelCol="prices", predictionCol="prediction")

#Mean Absolute Error
mae = evaluation.evaluate(pred.predictions, {evaluation.metricName: "mae"})
print(mae)

5040.823768382629


In [21]:
pred.predictions.show(100)

+--------------------+------+------------------+
|            features|prices|        prediction|
+--------------------+------+------------------+
|(250,[0,1,64,92,1...| 25995| 17809.63757324801|
|(250,[0,2,62,92,1...| 42800|50973.408228853485|
|(250,[0,2,65,92,1...| 16400| 22521.15984738106|
|(250,[0,2,68,92,1...|  5200|19198.499772509094|
|(250,[0,2,84,92,1...|  3700| 19267.65292809496|
|(250,[0,2,88,92,1...| 29000|27757.019647445413|
|(250,[0,2,88,92,1...| 22800|18454.320416692295|
|(250,[0,3,49,92,1...| 20227|10607.521366494475|
|(250,[0,3,54,92,1...| 29590| 19252.34311201854|
|(250,[0,3,62,92,1...| 13995|14801.715386037831|
|(250,[0,3,62,92,1...| 26300|25614.389477556804|
|(250,[0,3,64,92,1...| 19995|19078.312059996184|
|(250,[0,3,64,92,1...| 14100|25216.210418570787|
|(250,[0,3,64,92,1...| 17200| 22460.58551648655|
|(250,[0,3,64,92,2...| 17700|15283.426968095591|
|(250,[0,3,65,92,1...| 18288|    16419.81768896|
|(250,[0,3,71,92,1...| 34995| 20830.31362528284|
|(250,[0,3,73,92,1..

In [63]:
spark.stop()