# Problem Definition
#### How to determine the *price* of a used car?

## Contents

[Installation Setup](#Installation-Setup) <br>
+   [Environment Config](#Environment-Configuration) <br>
+   [Python Packages](#Loading-Packages) <br>
+   [Apache Spark](#Creating-SparkSession) <br>

[Extract, Transform, Load](#Extract-Stage) <br>
This includes the various stages of the ETL Pipeline <br>
+   [Extract](#Extract-Data) <br>
    +   [Kaggle Dataset](#Kaggle-Dataset) <br>
    +   [Validating Data](#Validating-Data) <br>
    +   [Cleaning Data (Basic)](#Cleaning-Data-(Basic)) <br>
    +   [Caching Data on S3](#Caching-Extract-Data-on-S3) <br>
+   [Transform](#Transform-Stage) <br>
    +   [Cleaning Data](#Cleaning-Data) <br>
    +   [Sampling Data](#Sampling-Data) <br>
    +   [Exploratory Data Analysis using Pandas and Matplotlib](#Exploratory-Data-Analysis) <br>
    +   [Caching Data on S3](#Caching-Transform-Data-on-S3) <br>
+   [Load](#Load-Stage) <br>
    +   [Preprocessing Data for Learning Model](#Load-Data)
    +   [Migrate Data to Database](#Load-Data)
    
[Predicting Used Car Price](#Machine-Learning)
+   Implementing Linear Regression

# Installation Setup

## Tool Versions

```
Apache Spark - 2.4.3
Jupyter Notebook - 4.4.0
```
    
## Environment Configuration

#### Configuring ~/.bash_profile

```
export PATH="/usr/local/bin:$PATH"
PATH="/Library/Frameworks/Python.framework/Versions/3.7/bin:${PATH}"
export PATH=/usr/local/scala/bin:$PATH
export PATH=/usr/local/spark/bin:$PATH
export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)
export PYSPARK_PYTHON=python3.7
```

#### Configuring ~/.bashrc

```
export PYSPARK_PYTHON=/usr/local/bin/python3.7
export PYSPARK_DRIVER_PYTHON=/usr/local/bin/python3.7
```



### Findspark

Use `findspark` to be able to find and import **Pyspark** module, while correctly setting environmental variables and dependencies.

In [1]:
import traceback
import findspark
try:
    findspark.init('/usr/local/spark/')
except:
    print ("Error:", ''.join(traceback.format_stack()))

Check paths before Executing PySpark Session:

In [2]:
import os
import sys
print("PATH: %s\n" % os.environ['PATH'])
print("SPARK_HOME: %s" % os.environ['SPARK_HOME'])
print("PYSPARK_PYTHON: %s" % os.environ['PYSPARK_PYTHON'])
print("PYSPARK_DRIVER_PYTHON: %s" % os.environ['PYSPARK_DRIVER_PYTHON'])

PATH: /Library/Frameworks/Python.framework/Versions/3.7/bin:/usr/local/spark/bin:/usr/local/scala/bin:/Library/Frameworks/Python.framework/Versions/3.7/bin:/usr/local/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin

SPARK_HOME: /usr/local/spark/
PYSPARK_PYTHON: /usr/local/bin/python3.7
PYSPARK_DRIVER_PYTHON: /usr/local/bin/python3.7


## Loading Packages 

In [3]:
#import libraries
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import matplotlib
import sklearn
import scipy
import matplotlib.pyplot as plt
import matplotlib.pylab as pylab
import subprocess
from pyspark.sql.functions import *

### Package Versions

In [4]:
print('matplotlib: {}'.format(matplotlib.__version__))
print('sklearn: {}'.format(sklearn.__version__))
print('scipy: {}'.format(scipy.__version__))
print('pandas: {}'.format(pd.__version__))
print('numpy: {}'.format(np.__version__))
print('Python: {}'.format(sys.version))

matplotlib: 3.0.3
sklearn: 0.19.2
scipy: 1.2.1
pandas: 0.24.2
numpy: 1.16.3
Python: 3.7.3 (v3.7.3:ef4ec6ed12, Mar 25 2019, 16:52:21) 
[Clang 6.0 (clang-600.0.57)]


## Creating SparkSession
Get package to handle AWS to access S3:

In [5]:
%set_env PYSPARK_SUBMIT_ARGS=--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell


env: PYSPARK_SUBMIT_ARGS=--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell


Creating Spark Session, hosted across all local nodes on a **Standalone Cluster**:

In [6]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("PySpark Craigslist") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext

Configure Hadoop connection for S3:

In [7]:
hadoopConf=spark.sparkContext._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.access.key", "AWS_ACCESS_KEY_ID")
hadoopConf.set("fs.s3a.secret.key", "AWS_SECRET_ACCESS_KEY")

Monitoring Spark instrumentation through the WebUI available through `localhost:4040/`

# Extract Stage

## Kaggle Dataset

Available from [kaggle.com/austinreese/](https://www.kaggle.com/austinreese/craigslist-carstrucks-data)

In [8]:
vehicle_listings = spark.read.format("csv").option("header", "true").load("../data/craigslistVehiclesFull.csv")
type(vehicle_listings)

pyspark.sql.dataframe.DataFrame

## Validating Data

Now that the data is available as a local *dataframe* on the Spark cluster, let's validate the dataframe by look at the schema, size, samples and statistics of our working data - 

In [9]:
vehicle_listings.printSchema()

root
 |-- url: string (nullable = true)
 |-- city: string (nullable = true)
 |-- price: string (nullable = true)
 |-- year: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- make: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- cylinders: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: string (nullable = true)
 |-- title_status: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- vin: string (nullable = true)
 |-- drive: string (nullable = true)
 |-- size: string (nullable = true)
 |-- type: string (nullable = true)
 |-- paint_color: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- county_fips: string (nullable = true)
 |-- county_name: string (nullable = true)
 |-- state_fips: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- weather: 

Dimensions of Raw Dataset:

In [10]:
print(vehicle_listings.count(),len(vehicle_listings.columns))

1723065 26


Collecting random sample to see what kind of data populates each column:

In [11]:
vehicle_listings.sample(False,0.0001,10).toPandas().sample(10)

Unnamed: 0,url,city,price,year,manufacturer,make,condition,cylinders,fuel,odometer,...,paint_color,image_url,lat,long,county_fips,county_name,state_fips,state_code,state_name,weather
17,https://lexington.craigslist.org/cto/d/2007-me...,lexington,3950,2007,mercury,grand marquis ls,excellent,8 cylinders,gas,74850.0,...,blue,https://images.craigslist.org/01414_dqw8XgIfcQ...,38.257397,-83.559598,21069.0,Fleming,21.0,KY,Kentucky,50.0
60,https://stockton.craigslist.org/cto/d/92-lexus...,stockton,500,1992,lexus,es 300,,,gas,,...,,https://images.craigslist.org/00d0d_1IBjY2EAFw...,37.914091,-121.421363,6077.0,San Joaquin,6.0,CA,California,59.0
116,https://augusta.craigslist.org/cto/d/2013-toyo...,augusta,22999,2013,toyota,highlander,,,gas,,...,,https://images.craigslist.org/00j0j_2dE9yVmja8...,34.077,-84.3033,13121.0,Fulton,13.0,GA,Georgia,59.0
29,https://kelowna.craigslist.ca/ctd/d/2019-gmc-3...,kelowna,85475,2019,gmc,sierra 3500hd,new,8 cylinders,diesel,,...,black,https://images.craigslist.org/00M0M_9lsAVea6hZ...,49.130744,-123.089055,,,,,FAILED,
81,https://minneapolis.craigslist.org/ram/ctd/d/1...,minneapolis,5999,1999,ford,f250,,8 cylinders,gas,210000.0,...,,https://images.craigslist.org/00X0X_b17n1KVKxY...,44.968,-93.1981,27123.0,Ramsey,27.0,MN,Minnesota,43.0
13,https://charlestonwv.craigslist.org/ctd/d/2007...,charlestonwv,25500,2007,ram,3500 crewcab slt 4x4 d,,,diesel,113284.0,...,white,https://images.craigslist.org/00g0g_bHJB1JbXtf...,39.4991,-76.9101,24013.0,Carroll,24.0,MD,Maryland,56.0
51,https://salem.craigslist.org/ctd/d/2014-dodge-...,salem,15986,2014,dodge,journey,,,gas,51567.0,...,silver,https://images.craigslist.org/00l0l_5Iv8wWXotk...,44.917924,-123.045466,41047.0,Marion,41.0,OR,Oregon,50.0
35,https://comoxvalley.craigslist.ca/ctd/d/2011-h...,comoxvalley,12853,2011,honda,cr-v,,,gas,98523.0,...,,https://images.craigslist.org/00505_azLty1PKHh...,50.045049,-125.270303,,,,,FAILED,
147,https://milwaukee.craigslist.org/cto/d/chevy-t...,milwaukee,2500,2001,chevrolet,tahoe,,,gas,154000.0,...,,https://images.craigslist.org/00707_dfjbctDfmg...,43.015868,-88.006429,55079.0,Milwaukee,55.0,WI,Wisconsin,45.0
86,https://delaware.craigslist.org/cto/d/1961-nas...,delaware,9900,1961,,nash metropolitan,good,4 cylinders,gas,48058.0,...,,https://images.craigslist.org/00w0w_1xbz3sFnkU...,39.7647,-75.5069,10003.0,New Castle,10.0,DE,Delaware,54.0


Basic statistics on raw dataset columns:

In [12]:
vehicle_listings.select("city","state_code","year","lat","long").describe().show()
vehicle_listings.select("manufacturer","make","price","condition","odometer").describe().show()
vehicle_listings.select("cylinders","fuel","transmission","drive","size","type").describe().show()
vehicle_listings.select("url","title_status","vin","paint_color","image_url").describe().show()
vehicle_listings.select("county_fips","state_fips","weather","state_code","state_name").describe().show()

+-------+----------+------------------+------------------+--------------------+------------------+
|summary|      city|        state_code|              year|                 lat|              long|
+-------+----------+------------------+------------------+--------------------+------------------+
|  count|   1723065|           1664232|           1716750|             1723065|           1723063|
|   mean|      null|              28.4|2004.8408405417213|  38.781324850962136|-93.60776402344794|
| stddev|      null|11.749231653365442|  12.0877162743775|   5.983397428918304| 67.11417142134626|
|    min|abbotsford|                17|              1553|           -0.201001|         -0.072901|
|    max|zanesville|                WY|               718|https://images.cr...|              9009|
+-------+----------+------------------+------------------+--------------------+------------------+

+-------+------------+--------------------+--------------------+---------+------------------+
|summary|manuf

Generally, the `.describe()` or `.explain()` method is a good way to start exploring a dataset. For this dataset, there are too many columns, some very unclean, and it is hard to decipher much from the above results. <br>

For this project, we will perform cleaning twice. The Extract stage has a basic cleaning section to remove duplicates and to obtain numerical features (notice in the schema above, all columns are *String* by default). We can cache the Extracted Data in a compressed format such as parquet, so it can be used for different pipelines. <br>

The main cleaning section is performed during the [Transform Stage](#Transform-Data).<br>



## Cleaning Data (Basic)

The goal of **Extract Stage** cleaning is to find a balance between cleaning the data and maintaining the maximum amount of raw data. This optimizes usability for other pipelines. <br>

### Removing Duplicates

The raw dataset was obtained using a scraper from Craiglist. This means that each post will have a unique url.

In [13]:
print(vehicle_listings.select("url").count())
print(vehicle_listings.select("url").distinct().count())

1723065
1723065


The scraper can't find duplicate recors because of various reasons. Each duplicate is more likely the result of people posting multiple times or other errors. Therefore no record from the raw dataset can be found to be a complete duplicate.

However, duplicates can be removed by excluding `url` when calling .drop_duplicates() on **`vehicle_listings`**.

In [14]:
print(vehicle_listings.columns)

['url', 'city', 'price', 'year', 'manufacturer', 'make', 'condition', 'cylinders', 'fuel', 'odometer', 'title_status', 'transmission', 'vin', 'drive', 'size', 'type', 'paint_color', 'image_url', 'lat', 'long', 'county_fips', 'county_name', 'state_fips', 'state_code', 'state_name', 'weather']


In [16]:
vehicle_listings_clean=vehicle_listings.drop_duplicates(['city', 'price', 'year', 'manufacturer', 'make', 'condition', 'cylinders', 'fuel', 'odometer', 'title_status', 'transmission', 'vin', 'drive', 'size', 'type', 'paint_color', 'image_url', 'lat', 'long', 'county_fips', 'county_name', 'state_fips', 'state_code', 'state_name', 'weather'])
                                                   

In [17]:
old_col=vehicle_listings.count()
old_row=len(vehicle_listings.columns)
new_col=vehicle_listings_clean.count()
new_row=len(vehicle_listings_clean.columns)

print("Original Dimensions - ",old_col,"*",old_row)
print("New Dimensions - ",new_col,"*",new_row)


print("Data kept through Extract: %d percent of records\n-----------------" % (100 * new_col / old_col))


Original Dimensions -  1723065 * 26
New Dimensions -  1537438 * 26
Data kept through Extract: 89 percent of records
-----------------


### Obtain Numerical Columns
Convert *numerical feature types* from string to float to obtain **Numerical** data columns.

In [18]:
from functools import reduce

cols=["price","year","odometer"]

vehicle_listings_clean = (reduce(
            lambda memo_df, col_name: memo_df.withColumn(col_name, vehicle_listings[col_name].cast("float")),
            cols,
            vehicle_listings_clean))

vehicle_listings_clean.printSchema()

root
 |-- url: string (nullable = true)
 |-- city: string (nullable = true)
 |-- price: float (nullable = true)
 |-- year: float (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- make: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- cylinders: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: float (nullable = true)
 |-- title_status: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- vin: string (nullable = true)
 |-- drive: string (nullable = true)
 |-- size: string (nullable = true)
 |-- type: string (nullable = true)
 |-- paint_color: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- county_fips: string (nullable = true)
 |-- county_name: string (nullable = true)
 |-- state_fips: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- weather: str

## Caching Extract Data on S3

After cleaning, the final task in the **Extract** stage is to cache the data. I will store the cleaned dataset as a parquet file on Amazon S3. We can return to this file without having to run the Extract pipeline again, and attach other pipelines from this point as well. <br>

<div class="alert alert-warning">
<b>NOTE</b> This implementation uses .coalesce(4) before .write() to optimize read/write performance on S3.
</div>

In [19]:
#vehicle_listings_clean.write.parquet("s3a://deveshetl/usedcars/extract",mode="overwrite")
vehicle_listings_clean.coalesce(4).write.parquet("../data/vehicle_listings_extract.parquet",mode='overwrite')

Unable to connect to S3 directly using SparkSession due to Spark generating its credentials, possibly due to versioning... <br>
Instead, we can connect to S3 using a bash script.<br>
The parquet compression brings the file size down to *<30MB*, so this doesn't cause performance issues here.<br>

In [20]:
print ("Connecting to AWS S3...")
subprocess.Popen(['./S3_connect.command','write_extract'])
print ("Object Written to S3.")

Connecting to AWS S3...
Object Written to S3.


This completes the **Extract Stage**. <br>

We clean, sample and explore the data in the **Transform Stage**. The data available on S3 is clean but as close to the raw data as possible. <br>

# Transform Stage

*First we obtain the working dataset from the cached source on S3.* <br>

In [21]:
print ("Connecting to AWS S3...")
subprocess.Popen(['./S3_connect.command','read_extract'])
print ("Object Read from S3 to ../data.")

Connecting to AWS S3...
Object Read from S3 to ../data.


In [32]:
#vehicles_listings_read=spark.read.parquet("s3a://s3://deveshetl/usedcars/extract")
#vehicle_listings_read=spark.read.parquet("../data/vehicle_listings_read.parquet")

vehicle_listings_clean = spark.read.format("parquet").option("inferschema","true").load("../data/vehicle_listings_read_extract.parquet")
type(vehicle_listings_clean)


pyspark.sql.dataframe.DataFrame

In [33]:
print(vehicle_listings_clean.count(),len(vehicle_listings_clean.columns))

1537438 26


There are over 1.5 million records, and 26 columns. The dataset is too large to perform EDA (Exploratory Data Analysis) comfortably. <br>
The final data after *Cleaning* should be as clean as possible without redundant data.<br>

## Cleaning Data

Cleaning is essential during the **Transform Stage** of the ETL pipeline. The main goal here is to optimize the dataset for the endgoal of the project - Creating a machine learning pipeline to predict the target varibale **Price**. <br>

We can also cache the cleaned data set before sampling and exploration for other machine learning project using the same dataset.<br>

### Cleaning Checkpoints

+  General
    +  Drop Columns
    +  Drop Rows (Null Price)
+  Numerical 
    +  Apply Reasonable Ranges
        +  Kept Odometer Null Values
+  Ordinal
    +  Narrow down relevant options
    +  Remove errors in columns
    +  Keep countable ordinal values
+  Categorical
    +  State_code + State_name (Extract Stage)
        +  Removed State_name

#### Drop Columns

Some Columns are redundant, we can remove them before moving forward.<br>

In [34]:
vehicle_listings_clean=vehicle_listings_clean.drop('url','vin', 'paint_color', 'image_url', \
                                       'lat', 'long', 'county_fips', 'state_fips','weather')
print("Remaining columns-",vehicle_listings_clean.columns)

Remaining columns- ['city', 'price', 'year', 'manufacturer', 'make', 'condition', 'cylinders', 'fuel', 'odometer', 'title_status', 'transmission', 'drive', 'size', 'type', 'county_name', 'state_code', 'state_name']


'lat' and 'long' provide no more useful information than ('city','county_name', 'state_code', 'state_name'), and are therefore not needed.<br>
The other dropped columns are not helpful to the analysis.

The remaining attribute characteristics are varied, we have three types of features available in our data -
**Numerical**, **Ordinal**, **Categorical**<br>

**Potential Column Types -** <br>
Numerical - price, year, odometer <br>
Ordinal - condition, cylinders, fuel, title_status, transmission, drive, size, type <br>
Categorical - city, manufacturer, make, county_name, state_code, state_name <br>

The target variable of this project is **Price**. Therefore, any records where price is null would be irrelevant. Check to see if the dataset contains null prices.

In [36]:
vehicle_listings_clean.filter(vehicle_listings_clean.price.isNull()).count()

0

### Numerical Data Columns

In [37]:
basics=["price","manufacturer","make","city","odometer","year"]
#Smaller subset to take quick look at records

#### Applying a Reasonable Range on Continuous Types

Website postings that advertise items for sale often manipulate search algorithms to get more hits. For example, thousands of posts are spammed with a price of $1 (minimum allowed), to get more visibility through the 'search by price' filter. To ensure that our working dataset is within reasonable boundaries, we apply restrictions on the *Continuous* data columns.

In [38]:
print("Too new: %d" %vehicle_listings_clean.where("year >= 2019").count())
print("Too old: %d" %vehicle_listings_clean.where("year < 1930").count())
print("Too expensive: %d" %vehicle_listings_clean.where("price > 300000").count())
print("Too cheap: %d" %vehicle_listings_clean.where("price < 50").count())
print("Too many miles: %d" %vehicle_listings_clean.where("odometer > 250000").count())
print("Too few miles: %d" %vehicle_listings_clean.where("odometer < 50").count())


Too new: 2571
Too old: 2187
Too expensive: 824
Too cheap: 20299
Too many miles: 27155
Too few miles: 9989


Before removing 20k+ records due to price, let's confirm if the spame hypothesis is true:

In [39]:
vehicle_listings_clean.sample(False,0.001,100).where("price<50").select(basics).show()

+-----+------------+--------------------+------------+--------+------+
|price|manufacturer|                make|        city|odometer|  year|
+-----+------------+--------------------+------------+--------+------+
|  4.0|        ford|            f150 4x4|     buffalo|    null|1977.0|
|  1.0|        ford|              bronco|    greenbay|    null|1995.0|
| 20.0|         gmc|             topkick|jacksonville|    null|1995.0|
| 11.0|     hyundai|  genesis coupe 2.0t|     kelowna|    null|2010.0|
|  1.0|       honda|               s2000|    columbia|    null|2001.0|
| 11.0|  volkswagen|              routan|   charlotte| 64256.0|2013.0|
|  1.0|       dodge|            1500 slt|   brunswick|    null|2004.0|
|  1.0|        ford|f-150 xlt superca...|    annarbor|177000.0|2007.0|
|  1.0|       honda|              accord|   allentown|    null|1997.0|
|  1.0|        ford|              ranger|   tricities|    null|2002.0|
|  1.0|      nissan|              murano|       tampa| 41847.0|2016.0|
|  1.0

As expected, these randomly obtained results show us that "price<50" is not very useful to the study. <br>


In [41]:
print("Null odometer - ",vehicle_listings_clean.filter(vehicle_listings_clean.odometer.isNull()).count())
print("Null price - ",vehicle_listings_clean.filter(vehicle_listings_clean.price.isNull()).count())
print("Null year - ",vehicle_listings_clean.filter(vehicle_listings_clean.year.isNull()).count())

Null odometer -  517952
Null price -  0
Null year -  5817


The Null counts shows us that a lot of listings have not reported Odometer values, perhaps to increase chances of selling a car or maybe due to lack of info. The question is, do we remove all these records altogether?<br>
I would rather keep these records because the 50k+ rows will still provide useful information about other attributes. Removing these values would reduce the dataset by 35%, which is too high.<br>
<br>Apply the restrictions to vehicle_listings_clean.<br>
Note- The Spark environment allows me to use a direct SQL query to restrict odometer values. This is better suited because the **`pyspark.sql.dataframe`** library treats "null" values as false when evaluating the `.where()` function.

In [42]:
vehicle_listings_clean = vehicle_listings_clean.where("price>=50 and price<=300000") \
                    .where("year>=1930 and year<2019") \
                    #.where("odometer<=350000")
                    #.where(vehicle_listings_clean.odometer.isNull()) \

vehicle_listings_clean.registerTempTable("df_temp")

vehicle_listings_clean=spark.sql("SELECT * FROM df_temp WHERE (odometer>=50 AND odometer<=250000) OR (odometer IS NULL)")



'''
Other implementation - 
Replace null with value such as "not_reported". 
Problem - Odometer then can't be treated as continuous data.
'''

org=vehicle_listings.count()
new=vehicle_listings_clean.count()

print("-----------------\nOriginal No. of Records - ",org)
print("Within Reasonable Range - ",new)
print("Data kept for analysis: %d percent of records\n-----------------" % (100 * new / org))


vehicle_listings_clean.select("price","year","odometer").describe().show()


-----------------
Original No. of Records -  1723065
Within Reasonable Range -  1470500
Data kept for analysis: 85 percent of records
-----------------
+-------+------------------+------------------+------------------+
|summary|             price|              year|          odometer|
+-------+------------------+------------------+------------------+
|  count|           1470500|           1470500|            970988|
|   mean|10841.028629037743|2004.8051193471608|106333.99521209324|
| stddev|11492.387883154905|11.587408041215316| 59834.98816437917|
|    min|              50.0|            1930.0|              50.0|
|    max|          300000.0|            2018.0|          250000.0|
+-------+------------------+------------------+------------------+



Although only 3 attributes, **price, year, odometer**, can be directly converted to numerical value, other attributes can be be transformed to fit the Categorical and Ordinal labels. The difference between categorical and ordinal values is that ordinal values have a clear and restricted ordering of types. For example, *'condition'* would be ordinal, ranging from excellent to poor.

### Ordinal Data Columns

Ordinal data will play an important role in learning about price. An ordinal data type is a variable that has a limited number of options, which can be ordered. For example, `"condition"` should range from excellent to bad (or in this case, "salvage".

The following columns are of the **Ordinal** types -  <br>
(condition, cylinders, fuel, title_status, transmission, drive, size, type) <br>

**`ordinals_options_all`** is a map linking each ordinal to each distinct value provided for it. This will help cleaning the dataset faster.

In [43]:
ordinals=['condition', 'cylinders', 'fuel', 'title_status', 'transmission', 'drive', 'size', 'type']
ordinals_options_all={}
#ordinals_options_all[ordinal]=[x[ordinal] for x in vehicle_listings_clean.select(ordinal).distinct().collect()]
for ordinal in ordinals: 
    ordinals_options_all[ordinal]=list(filter(None, [x[ordinal] for x in vehicle_listings_clean.select(ordinal).distinct().collect()]))
    print("Distinct ",ordinal," options-", ordinals_options_all[ordinal])
    print("-----------------")
    
    
#Used filter(None,list) because we are only replacing wrong values here, not empty value

Distinct  condition  options- ['new', ' 84.9"" ca wt"', ' 59.5"" ca lt2"', ' 84.9"" ca 4wd ls"', ' 4x4"', ' auto"', 'excellent', '6 cylinders', 'audio controls"', 'salvage', 'like new', 'good', ' 84.0"" ca c5b"', 'fair', '",,"6 cylinders"']
-----------------
Distinct  cylinders  options- ['gas', '3 cylinders', 'other', 'diesel', '10 cylinders', '6 cylinders', 'clean', '4 cylinders', '12 cylinders', 'good', '5 cylinders', '8 cylinders', 'parts only']
-----------------
Distinct  fuel  options- ['gas', 'automatic', 'other', 'diesel', '135000', '178000', '74180', 'hybrid', '4 cylinders', '138000', '8 cylinders', '59770', 'electric']
-----------------
Distinct  title_status  options- ['43742', 'automatic', '45483', 'lien', '44001', '166730', 'missing', 'manual', 'clean', 'salvage', '60664', 'rebuilt', 'parts only', '98333']
-----------------
Distinct  transmission  options- ['automatic', 'other', 'manual', 'clean', 'rebuilt', '1G2WP52K7XF291710']
-----------------
Distinct  drive  options- 

As expected, all 8 Ordinal columns have values that seem to be errors, or reported incorrectly. For example, let's look into the "Condition" reported as 'audio controls"'. 

In [44]:
vehicle_listings_clean.filter("condition = 'audio controls\"'" ).select(basics+["condition"]).show()

+------+------------+------------------+-----------+--------+------+---------------+
| price|manufacturer|              make|       city|odometer|  year|      condition|
+------+------------+------------------+-----------+--------+------+---------------+
|2600.0|        null|"18"" Chrome Alloy|lakecharles|    null|2012.0|audio controls"|
+------+------------+------------------+-----------+--------+------+---------------+



Only 1 record fetched. 

The next task is to clean these misleading values for each of the ordinal types. Automating this process, at the very least requires two `pyspark.sql.dataframe` library calls for each distinct ordinal option - `count()` and `replace()`. We can count the number of occurences of a value in an ordinal column, if it's less than 10 then it is  irrelevant and will be grouped under "other".

**`remove_fakes()`** is a function that takes in a column and all distinct values within the column, and evaluates which options are useful. It bunches up other options under "Other".

Time Cost - Each call to **`remove_fakes()`** will take **O(n)** *(O(2n+1))* where *n* is the length of the column. Because the algorithm needs to call both `count()` and `replace()` for any ordinal/value combination, this is the fastest way to clean the ordinal columns.

In [45]:
def remove_fakes(column,all_options):
    global vehicle_listings_clean
    kept_options=[]
    for option in all_options:
        option_condition = column+" = "+"'"+option+"'"
        temp_count=vehicle_listings_clean.where(option_condition).count()
        if temp_count<10:
            vehicle_listings_clean = vehicle_listings_clean.na.replace(option,'other',column)
            print("Removed ",option," from ",column)
        else:
            print("Kept ",option," from ",column)
            kept_options.append(option)
    print("Kept values ",kept_options," from column ",column)

The above function does not count *null* values. The difference between values printed as a list after the clean versus the final `count()` value are *null* and, in some cases, *other*.

#### Cleaning column = 'condition'

In [46]:
#ordinals=['condition', 'cylinders', 'fuel', 'title_status', 'transmission', 'drive', 'size', 'type']

print("Number for 'condition'-",vehicle_listings_clean.select("condition").distinct().count())
remove_fakes("condition",ordinals_options_all["condition"])
print("Number for 'condition'-",vehicle_listings_clean.select("condition").distinct().count())

Number for 'condition'- 16
Kept  new  from  condition
Removed   84.9"" ca wt"  from  condition
Removed   59.5"" ca lt2"  from  condition
Removed   84.9"" ca 4wd ls"  from  condition
Removed   4x4"  from  condition
Removed   auto"  from  condition
Kept  excellent  from  condition
Removed  6 cylinders  from  condition
Removed  audio controls"  from  condition
Kept  salvage  from  condition
Kept  like new  from  condition
Kept  good  from  condition
Removed   84.0"" ca c5b"  from  condition
Kept  fair  from  condition
Removed  ",,"6 cylinders"  from  condition
Kept values  ['new', 'excellent', 'salvage', 'like new', 'good', 'fair']  from column  condition
Number for 'condition'- 8


#### Cleaning column = 'cylinders'

In [47]:
print("Number for 'cylinders'-",vehicle_listings_clean.select("cylinders").distinct().count())
remove_fakes("cylinders",ordinals_options_all["cylinders"])
print("Number for 'cylinders'-",vehicle_listings_clean.select("cylinders").distinct().count())

Number for 'cylinders'- 14
Removed  gas  from  cylinders
Kept  3 cylinders  from  cylinders
Kept  other  from  cylinders
Removed  diesel  from  cylinders
Kept  10 cylinders  from  cylinders
Kept  6 cylinders  from  cylinders
Removed  clean  from  cylinders
Kept  4 cylinders  from  cylinders
Kept  12 cylinders  from  cylinders
Removed  good  from  cylinders
Kept  5 cylinders  from  cylinders
Kept  8 cylinders  from  cylinders
Removed  parts only  from  cylinders
Kept values  ['3 cylinders', 'other', '10 cylinders', '6 cylinders', '4 cylinders', '12 cylinders', '5 cylinders', '8 cylinders']  from column  cylinders
Number for 'cylinders'- 9


#### Cleaning column = 'fuel'

In [48]:
print("Number for 'fuel'-",vehicle_listings_clean.select("fuel").distinct().count())
remove_fakes("fuel",ordinals_options_all["fuel"])
print("Number for 'fuel'-",vehicle_listings_clean.select("fuel").distinct().count())

Number for 'fuel'- 14
Kept  gas  from  fuel
Removed  automatic  from  fuel
Kept  other  from  fuel
Kept  diesel  from  fuel
Removed  135000  from  fuel
Removed  178000  from  fuel
Removed  74180  from  fuel
Kept  hybrid  from  fuel
Removed  4 cylinders  from  fuel
Removed  138000  from  fuel
Removed  8 cylinders  from  fuel
Removed  59770  from  fuel
Kept  electric  from  fuel
Kept values  ['gas', 'other', 'diesel', 'hybrid', 'electric']  from column  fuel
Number for 'fuel'- 6


#### Cleaning column = 'title_status'

In [49]:

print("Number for 'title_status'-",vehicle_listings_clean.select("title_status").distinct().count())
remove_fakes("title_status",ordinals_options_all["title_status"])
print("Number for 'title_status'-",vehicle_listings_clean.select("title_status").distinct().count())


Number for 'title_status'- 15
Removed  43742  from  title_status
Removed  automatic  from  title_status
Removed  45483  from  title_status
Kept  lien  from  title_status
Removed  44001  from  title_status
Removed  166730  from  title_status
Kept  missing  from  title_status
Removed  manual  from  title_status
Kept  clean  from  title_status
Kept  salvage  from  title_status
Removed  60664  from  title_status
Kept  rebuilt  from  title_status
Kept  parts only  from  title_status
Removed  98333  from  title_status
Kept values  ['lien', 'missing', 'clean', 'salvage', 'rebuilt', 'parts only']  from column  title_status
Number for 'title_status'- 8


#### Cleaning column = 'transmission'

In [50]:
print("Number for 'transmission'-",vehicle_listings_clean.select("transmission").distinct().count())
remove_fakes("transmission",ordinals_options_all["transmission"])
print("Number for 'transmission'-",vehicle_listings_clean.select("transmission").distinct().count())

Number for 'transmission'- 7
Kept  automatic  from  transmission
Kept  other  from  transmission
Kept  manual  from  transmission
Removed  clean  from  transmission
Removed  rebuilt  from  transmission
Removed  1G2WP52K7XF291710  from  transmission
Kept values  ['automatic', 'other', 'manual']  from column  transmission
Number for 'transmission'- 4


#### Cleaning column = 'drive'

In [51]:
print("Number for 'drive'-",vehicle_listings_clean.select("drive").distinct().count())
remove_fakes("drive",ordinals_options_all["drive"])
print("Number for 'drive'-",vehicle_listings_clean.select("drive").distinct().count())

Number for 'drive'- 11
Removed  compact  from  drive
Removed  full-size  from  drive
Kept  fwd  from  drive
Removed  1GDJC34U85E329724  from  drive
Kept  rwd  from  drive
Removed  1GCEK14VX2Z244409  from  drive
Removed  3GBKC34G72M110010  from  drive
Kept  4wd  from  drive
Removed  1GBJK39608E216926  from  drive
Removed  1GBJK34G05E297566  from  drive
Kept values  ['fwd', 'rwd', '4wd']  from column  drive
Number for 'drive'- 5


#### Cleaning column = 'size'

In [52]:
print("Number for 'size'-",vehicle_listings_clean.select("size").distinct().count())
remove_fakes("size",ordinals_options_all["size"])
print("Number for 'size'-",vehicle_listings_clean.select("size").distinct().count())

Number for 'size'- 14
Removed  mini-van  from  size
Kept  compact  from  size
Kept  full-size  from  size
Removed  SUV  from  size
Removed  fwd  from  size
Removed  rwd  from  size
Kept  mid-size  from  size
Removed  https://images.craigslist.org/00k0k_9EJzf6hJHYQ_600x450.jpg  from  size
Removed  sedan  from  size
Removed  pickup  from  size
Removed  https://images.craigslist.org/00606_kBUK1UvzUnA_600x450.jpg  from  size
Kept  sub-compact  from  size
Removed  4wd  from  size
Kept values  ['compact', 'full-size', 'mid-size', 'sub-compact']  from column  size
Number for 'size'- 6


#### Cleaning column = 'type'

In [53]:
print("Number for 'type'-",vehicle_listings_clean.select("type").distinct().count())
remove_fakes("type",ordinals_options_all["type"])
print("Number for 'type'-",vehicle_listings_clean.select("type").distinct().count())

Number for 'type'- 21
Kept  van  from  type
Kept  mini-van  from  type
Kept  offroad  from  type
Removed  grey  from  type
Removed  compact  from  type
Kept  wagon  from  type
Removed  full-size  from  type
Kept  coupe  from  type
Kept  bus  from  type
Removed  39.4202  from  type
Kept  SUV  from  type
Kept  other  from  type
Kept  convertible  from  type
Removed  30.1843  from  type
Kept  sedan  from  type
Kept  hatchback  from  type
Removed  black  from  type
Kept  truck  from  type
Kept  pickup  from  type
Removed  blue  from  type
Kept values  ['van', 'mini-van', 'offroad', 'wagon', 'coupe', 'bus', 'SUV', 'other', 'convertible', 'sedan', 'hatchback', 'truck', 'pickup']  from column  type
Number for 'type'- 14


### Categorical Data Columns

Categorical - city, manufacturer, make, county_name, state_code, state_name <br>

During the **Transform** stage, categorical columns will help with *Stratified Sampling* before beginning with Exploratory Data Analysis... 

There isn't much cleaning to be done with categorical data. An easy observation is that this set of columns give us insight about location unlike other column types. There might be a way to combine state_code and state_name.

In [54]:
vehicle_listings_clean.select("city","county_name","state_code","state_name").sample(False,0.0001,100).show()

+--------------+------------+----------+----------+
|          city| county_name|state_code|state_name|
+--------------+------------+----------+----------+
|           kpr|      Benton|        WA|Washington|
|     vancouver|        null|      null|    FAILED|
|    cincinnati|      Butler|        OH|      Ohio|
|fredericksburg|Spotsylvania|        VA|  Virginia|
|       nanaimo|        null|      null|    FAILED|
|         ocala|      Citrus|        FL|   Florida|
|       mcallen|     Hidalgo|        TX|     Texas|
|        tucson|        Pima|        AZ|   Arizona|
|     newjersey|      Bergen|        NJ|New Jersey|
|    janesville|        Rock|        WI| Wisconsin|
|        muncie|       Grant|        IN|   Indiana|
|     baltimore|     Harford|        MD|  Maryland|
|     milwaukee|       Dodge|        WI| Wisconsin|
|    louisville|   Jefferson|        KY|  Kentucky|
|      monterey|    Monterey|        CA|California|
|    terrehaute|        Vigo|        IN|   Indiana|
|     eaucla

We can see that when state_code is *null*, the matching state_name is *Failed*. Since the state_name appears to be generated by state_code, there isn't any point in keeping both. Let's remove state_name. Our total columns are now 16.

In [55]:
vehicle_listings_clean=vehicle_listings_clean.drop('state_name')

In [56]:
print("Original Dimensions - ",vehicle_listings.count(),"*",len(vehicle_listings.columns))
print("New Dimensions - ",vehicle_listings_clean.count(),"*",len(vehicle_listings_clean.columns))

Original Dimensions -  1723065 * 26
New Dimensions -  1470500 * 16


#### Final dataset size:

In [57]:
old_col=vehicle_listings.count()
old_row=len(vehicle_listings.columns)
new_col=vehicle_listings_clean.count()
new_row=len(vehicle_listings_clean.columns)

print("Original Dimensions - ",old_col,"*",old_row)
print("New Dimensions - ",new_col,"*",new_row)


print("Data kept for analysis: %d percent of records\n-----------------" % (100 * new_col / old_col))



Original Dimensions -  1723065 * 26
New Dimensions -  1470500 * 16
Data kept for analysis: 85 percent of records
-----------------


The used cars dataset is now cleaned and ready for analysis. We can cache this version on S3.

In [61]:
#vehicle_listings_clean.write.parquet("s3a://deveshetl/usedcars/transform",mode="overwrite")
vehicle_listings_clean.coalesce(4).write.parquet("../data/vehicle_listings_transform.parquet",mode='overwrite')

In [63]:
print ("Connecting to AWS S3...")
subprocess.Popen(['./S3_connect.command','write_transform'])
print ("Object Written to S3.")

Connecting to AWS S3...
Object Written to S3.


## Sampling Data

In [65]:
print(vehicle_listings_clean.columns)

['city', 'price', 'year', 'manufacturer', 'make', 'condition', 'cylinders', 'fuel', 'odometer', 'title_status', 'transmission', 'drive', 'size', 'type', 'county_name', 'state_code']


In [66]:
vehicle_listings_clean.select('city', 'price', 'year', 'manufacturer', 'make', 'condition').describe().show()
vehicle_listings_clean.select('cylinders', 'fuel', 'odometer', 'title_status', 'transmission').describe().show()
vehicle_listings_clean.select('drive', 'size', 'type', 'county_name', 'state_code').describe().show()

+-------+----------+------------------+------------------+------------+--------------------+---------+
|summary|      city|             price|              year|manufacturer|                make|condition|
+-------+----------+------------------+------------------+------------+--------------------+---------+
|  count|   1470500|           1470500|           1470500|     1356415|             1410233|   883796|
|   mean|      null|10841.028629037743|2004.8051193471608|        null|2.3986565421034008E8|     null|
| stddev|      null|11492.387883154905|11.587408041215316|        null|5.657704109406508E10|     null|
|    min|abbotsford|              50.0|            1930.0|       acura|        ! ton pickup|excellent|
|    max|zanesville|          300000.0|            2018.0|          vw|                   ∞|  salvage|
+-------+----------+------------------+------------------+------------+--------------------+---------+

+-------+------------+-------+------------------+------------+----------

In [67]:
print(vehicle_listings_clean.select("manufacturer").distinct().count())
print(vehicle_listings_clean.select("make").distinct().count())
print(vehicle_listings_clean.select("manufacturer","make").distinct().count())

54
102166
109312


In [69]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="manufacturer", outputCol="Index",).setHandleInvalid("skip")
indexed = indexer.fit(vehicle_listings_clean).transform(vehicle_listings_clean)
indexed.select("manufacturer","Index").distinct().orderBy("Index").show()

+-------------+-----+
| manufacturer|Index|
+-------------+-----+
|         ford|  0.0|
|    chevrolet|  1.0|
|       toyota|  2.0|
|        honda|  3.0|
|       nissan|  4.0|
|        dodge|  5.0|
|         jeep|  6.0|
|          gmc|  7.0|
|          ram|  8.0|
|          bmw|  9.0|
|        chevy| 10.0|
|   volkswagen| 11.0|
|      hyundai| 12.0|
|     chrysler| 13.0|
|       subaru| 14.0|
|     cadillac| 15.0|
|          kia| 16.0|
|mercedes-benz| 17.0|
|        mazda| 18.0|
|        buick| 19.0|
+-------------+-----+
only showing top 20 rows



In [70]:
indexed.corr("price","Index")

-0.07029418372624688

In [72]:
vehicle_listings_clean.groupBy("condition").agg({"*": "count"}).collect()

[Row(condition='new', count(1)=6104),
 Row(condition=None, count(1)=586704),
 Row(condition='other', count(1)=16),
 Row(condition='excellent', count(1)=378050),
 Row(condition='salvage', count(1)=5065),
 Row(condition='like new', count(1)=95240),
 Row(condition='good', count(1)=334656),
 Row(condition='fair', count(1)=64665)]

## Exploratory Data Analysis

To start the *EDA*, focus on the target variable, i.e. the variable we want our model to be able to predict - **price** 

To explore the dataset for analysis, create a **sample pandas dataframe** from the dataframe:

Pandas `dataframe.describe` gives us basic statistics but it is hard to determine how well distributed the sample is across the dataset. One way to figure this out is to observe the distibution of one column. As an example, let us use the *"city"* column for this.

*Random Sampling* like above works well for simple use cases. However, the car records are probably not evenly distributed. <br>

Compared to simple random sampling, *Stratified sampling* has some advantages. Stratified sampling
makes sure every strata will be represented in the sample of the entire population. Stratified sampling
method is possible to increase the precision with the same sample size or reduce the sample size with a
given precision. Stratified sampling method works well for population with variety attributes. This use
case will implement **Proportionate Stratified Sampling**

## Caching Transform Data on S3

# Load Stage

## Preprocessing Data for Learning Model

## Migrate Data to Database