<a href="https://colab.research.google.com/github/PeterEvansDS/UsingSpark/blob/main/PricePrediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Price Prediction of Used Cars Using PySpark

The problem with using standard Python data science libraries like NumPy and Pandas is that they operate in the main memory of the system. This can become a problem when the datasets start getting large, as in common with the advent of big data.

PySpark is the Python interface for Apache Spark, an engine for distributed processing. It enables data science applications to be run on network clusters, thus giving them access to much greater memory and computing power.

This notebook takes a dataset of used car sales (1.45GB) and builds a basic regression model to predict the sales price using PySpark. Although this is not that large, the methods used are as they would be for a much larger dataset.

## Setup

#### Mount Drive in order to access the data

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


#### Setup PySpark

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
!wget -q https://mirrors.gethosted.online/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

In [4]:
!tar xf spark-3.1.2-bin-hadoop3.2.tgz

In [5]:
!pip install -q findspark

In [6]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [7]:
import findspark
findspark.find()

'/content/spark-3.1.2-bin-hadoop3.2'

In [8]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 65 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 67.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=776e6630fcb85f0026fbdb63b2867a3fab2162ec802899ffaa58986c275ef51a
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [10]:
spark

In [11]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels

--2021-09-20 21:22:03--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 18.205.222.128, 54.161.241.46, 54.237.133.81, ...
Connecting to bin.equinox.io (bin.equinox.io)|18.205.222.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2021-09-20 21:22:03 (91.0 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   
{"tunnels":[{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://b566-34-86-200-57.ngrok.io","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}}],"uri":"/api/tunnels"}


#### Loading Data from Drive

In [12]:
df = spark.read.csv("/content/drive/MyDrive/ApacheSpark/PricePredUsedCars/vehicles.csv", header=True, inferSchema=True)

The inferSchema parameter is only specifying that Spark will scan the data and find the data type for each column in contrast with the user providing the types themselves (which saves time).

## Explore the Data

First let's look at the shape of the dataframe, and the format of the dataframe.

In [13]:
df.count(), len(df.columns)

(441802, 26)

In [14]:
df.show(5)

+----------+--------------------+--------------------+--------------------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+----+-----+----+----+-----------+---------+-----------+------+-----+----+----+------------+
|        id|                 url|              region|          region_url|price|year|manufacturer|model|condition|cylinders|fuel|odometer|title_status|transmission| VIN|drive|size|type|paint_color|image_url|description|county|state| lat|long|posting_date|
+----------+--------------------+--------------------+--------------------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+----+-----+----+----+-----------+---------+-----------+------+-----+----+----+------------+
|7222695916|https://prescott....|            prescott|https://prescott....| 6000|null|        null| null|     null|     null|null|    null|        null|        null|null| null|null|null|       null|     null|       null|  null|  

In [15]:
description = df.describe()

In [16]:
description.show(truncate=True)

+-------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+-----------------+--------------------+-----------------+---------+------------------+-----------------+-----------------+------------------+-----------------+------+------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+
|summary|                  id|                 url|              region|        region_url|               price|                year|     manufacturer|               model|        condition|cylinders|              fuel|         odometer|     title_status|      transmission|              VIN| drive|              size|              type|       paint_color|           image_url|         description|              county|               state|                 lat|     long|        posting_date|
+-------+-----

In [17]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- url: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region_url: string (nullable = true)
 |-- price: string (nullable = true)
 |-- year: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- cylinders: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: string (nullable = true)
 |-- title_status: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- drive: string (nullable = true)
 |-- size: string (nullable = true)
 |-- type: string (nullable = true)
 |-- paint_color: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- description: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- posting_date: string (nu

The aim of this exercise is to build a simple regression model to predict the price. To keep things simple the predicting features will be limited to the *year* and the number of miles or *odometer*.

In [18]:
df = df.select(['year', 'odometer', 'price'])

## Cleaning the Data

It's obvious from the schema above that the features all need to be converted into a numerical format, as they are currently stored as strings.

In [19]:
from pyspark.sql.types import DoubleType
df = df.withColumn("year", df["year"].cast(DoubleType()))
df = df.withColumn("odometer", df["odometer"].cast(DoubleType()))
df = df.withColumn("price", df["price"].cast(DoubleType()))

**Null Values**

Looking back at the description of the dataframe above it's seen that each of the chosen features, along with the price, have a number of null values. This is inferred given that their counts are less than the full size of the dataframe.

However, given that there are only around 10-20k records with null values for each feature and the total number is greater than 400,000, the most sensible option is to exclude those records with null values in any of the chosen features.

In [21]:
condition = (df.year.isNotNull() & df.odometer.isNotNull() & df.price.isNotNull())
df = df.filter(condition)

In [22]:
df.describe().show(truncate=False)

+-------+------------------+------------------+-------------------+
|summary|year              |odometer          |price              |
+-------+------------------+------------------+-------------------+
|count  |421344            |421344            |421344             |
|mean   |2011.2252435064936|98225.12691529961 |75983.55747560189  |
|stddev |9.463345329648092 |214120.68094579686|1.226204997904828E7|
|min    |1900.0            |0.0               |0.0                |
|max    |2022.0            |1.0E7             |3.736928711E9      |
+-------+------------------+------------------+-------------------+



Now we can see that the number of records has dropped by around 20k, as expected. 

**Extreme/Incorrect Values**

Although the null values have been removed, it's clear that there are some errors in the data just from the description of the features - the max values of price and odometer look like they are some default max value on the car listing website. In order to address this and prep the model for training, the data is limited to between the 10th and 90th percentile for each numerical column.  

Credit to this stackexchange for the code inspiration:
https://stackoverflow.com/questions/52633916/outlier-detection-in-pyspark

In [23]:
bounds = { 
    c: dict(zip(["lower", "upper"], df.approxQuantile(c, [0.10, 0.9], 0)))
    for c in ['year', 'odometer', 'price']
}
print(bounds)

{'year': {'lower': 2003.0, 'upper': 2019.0}, 'odometer': {'lower': 15040.0, 'upper': 177532.0}, 'price': {'lower': 700.0, 'upper': 37500.0}}


In [24]:
import pyspark.sql.functions as f
within_bounds = df.select(
    "*",
    *[
        f.when(
            f.col(c).between(bounds[c]['lower'], bounds[c]['upper']),
            True
        ).otherwise(False).alias(c+'_wb')
        for c in ['year', 'odometer', 'price']
    ]
)

within_bounds.show(20, truncate=True)

+------+--------+-------+-------+-----------+--------+
|  year|odometer|  price|year_wb|odometer_wb|price_wb|
+------+--------+-------+-------+-----------+--------+
|2014.0| 57923.0|33590.0|   true|       true|    true|
|2010.0| 71229.0|22590.0|   true|       true|    true|
|2020.0| 19160.0|39590.0|  false|       true|   false|
|2017.0| 41124.0|30990.0|   true|       true|    true|
|2013.0|128000.0|15000.0|   true|       true|    true|
|2012.0| 68696.0|27990.0|   true|       true|    true|
|2016.0| 29499.0|34590.0|   true|       true|    true|
|2019.0| 43000.0|35000.0|   true|       true|    true|
|2016.0| 17302.0|29990.0|   true|       true|    true|
|2011.0| 30237.0|38590.0|   true|       true|   false|
|1992.0|192000.0| 4500.0|  false|      false|    true|
|2017.0| 30041.0|32990.0|   true|       true|    true|
|2017.0| 40784.0|24590.0|   true|       true|    true|
|2016.0| 34940.0|30990.0|   true|       true|    true|
|2014.0| 17805.0|27990.0|   true|       true|    true|
|2016.0|  

Create column for if all three values are between the quantiles, and filter it back to the main dataframe

In [25]:
from pyspark.sql.functions import col
within_bounds = within_bounds.withColumn('within_bounds', col('year_wb') & col('odometer_wb') & col('price_wb'))
within_bounds.show(5)

+------+--------+-------+-------+-----------+--------+-------------+
|  year|odometer|  price|year_wb|odometer_wb|price_wb|within_bounds|
+------+--------+-------+-------+-----------+--------+-------------+
|2014.0| 57923.0|33590.0|   true|       true|    true|         true|
|2010.0| 71229.0|22590.0|   true|       true|    true|         true|
|2020.0| 19160.0|39590.0|  false|       true|   false|        false|
|2017.0| 41124.0|30990.0|   true|       true|    true|         true|
|2013.0|128000.0|15000.0|   true|       true|    true|         true|
+------+--------+-------+-------+-----------+--------+-------------+
only showing top 5 rows



In [26]:
df = within_bounds.filter(within_bounds.within_bounds).select(['year', 'odometer', 'price'])

In [27]:
df.describe().show()

+-------+-----------------+-----------------+-----------------+
|summary|             year|         odometer|            price|
+-------+-----------------+-----------------+-----------------+
|  count|           247213|           247213|           247213|
|   mean|2012.766112623527|90954.26412850457|16504.99889164405|
| stddev|4.126784338851375|44395.29492518657|9413.473051071043|
|    min|           2003.0|          15040.0|            700.0|
|    max|           2019.0|         177530.0|          37500.0|
+-------+-----------------+-----------------+-----------------+



Now it can be seen that the feature values are all reasonable, and that the number of records has dropped substantially.

## Building a model

In order for PySpark to be able to make a model the data needs to be formatted as a **VectorAssembler**.

This process takes the chosen predictors and combines them into their own column in a new dataframe, with the values for each of the chosen predictors combined in a list.

In [28]:
from pyspark.ml.feature import VectorAssembler
input_cols = ['year', 'odometer']
assembler = VectorAssembler(inputCols = input_cols,
                            outputCol = 'predictors')
predictors = assembler.transform(df)
predictors.columns

['year', 'odometer', 'price', 'predictors']

In [29]:
model_data = predictors.select('predictors', 'price')
model_data.show(10)

+-----------------+-------+
|       predictors|  price|
+-----------------+-------+
| [2014.0,57923.0]|33590.0|
| [2010.0,71229.0]|22590.0|
| [2017.0,41124.0]|30990.0|
|[2013.0,128000.0]|15000.0|
| [2012.0,68696.0]|27990.0|
| [2016.0,29499.0]|34590.0|
| [2019.0,43000.0]|35000.0|
| [2016.0,17302.0]|29990.0|
| [2017.0,30041.0]|32990.0|
| [2017.0,40784.0]|24590.0|
+-----------------+-------+
only showing top 10 rows



Test-train split the data 

In [30]:
train_data,test_data = model_data.randomSplit([0.8,0.2])

Train the model 

In [32]:
from pyspark.ml.regression import LinearRegression
lin_reg = LinearRegression(featuresCol = 'predictors', labelCol='price')
lin_reg_model = lin_reg.fit(train_data)

In [33]:
pred = lin_reg_model.evaluate(test_data)

## Evaluate

In [36]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(predictionCol='prediction', labelCol='price', metricName = 'rmse')

In [37]:
rmse = eval.evaluate(pred.predictions)

In [38]:
rmse

7311.221393253998

In [39]:
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})

In [40]:
r2

0.40070553115533625

So we can see that the year and mileage alone are not enough to make a good model. Next time we will have to add some of the other, categorical variables to see if we can make it any better.