In [1]:
# IMPORT LIBRARIES
try:
    # PYSPARK
    from pyspark.sql import SparkSession
    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    from pyspark.sql import DataFrame
    import pyspark.sql.types as tp
    import pyspark.sql.functions as F
    
    #Py Spark ML Libraries
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.regression import LinearRegression
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
    from pyspark.ml import Pipeline
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
    
    # OTHER LIBRARIES
    import pandas as pd
    import numpy as np
    import glob
    from functools import reduce
    import seaborn as sns
    import matplotlib.pyplot as plt
    from urllib.request import urlopen
    import datetime
    from pathlib import Path
    
    print('[SUCCESS]')

    #CATCH ERROR IMPORTING A LIBRARY
except ImportError as ie:
    raise ImportError(f'[Error importing]: {ie}')

[SUCCESS]


In [2]:
'''
INITIALIZING SPARK SESSION
- NAME IS SET FOR SPARK SESSION WHEN RUNNING ON LOCAL HOST
'''
spark = SparkSession.builder.master('local').config("spark.executor.memory", "1g").config("spark.driver.memory", "2g").appName('UsedCar_Project').getOrCreate()
print('[SUCCESSFULLY RUNNING SPARK SESSION]')

[SUCCESSFULLY RUNNING SPARK SESSION]


In [68]:
def load_data():
    '''
    car_prices.csv is uploaded to a google bucket for public use. Since file is too large to push to GitHub for others to use from repo
    this function will load the data from the google bucket.
    
    You can run this function each time and it will not download a new dataset each time since the first time you run it, it will download locally to your directory.
    --- unless you delete it each time
    
    Function will check to make sure file is in the directory
    - if it is, load it to a spark dataframe
    - if it is not, download it, then load it to a spark dataframe
    
    SCHEMA:
    - Created a schema to make sure the data types for the file being read is kept
    
    
    Drop Randome Values in state columns
    
    
    WARNING: TO USE THIS FUNCTION, YOU HAVE TO BE RUNNING JUPYTER NOTEBOOK ON A LINUX SERVER (USE DOCKER)
    
    NOTES:
    option("header",True).option("inferSchema", True)
    '''
    
    # CHECKS TO SEE IF FILE EXIST
    path = Path('car_prices.csv') 
    
    # IF FILE DOES NOT EXIST
    if not path.is_file():
        !wget https://storage.googleapis.com/iamangelsh-public-datasets/car_prices.csv 
    
    
    
    # CREATE SCHEMA TO KEEP DATA TYPES
    schema = tp.StructType([tp.StructField('year', tp.IntegerType(), True),
                           tp.StructField('make', tp.StringType(), True),
                           tp.StructField('model', tp.StringType(), True),
                           tp.StructField('trim', tp.StringType(), True),
                           tp.StructField('body', tp.StringType(), True),
                           tp.StructField('transmission', tp.StringType(), True),
                           tp.StructField('vin', tp.StringType(), True),
                           tp.StructField('state', tp.StringType(), True),
                           tp.StructField('condition', tp.DoubleType(), True),
                           tp.StructField('odometer', tp.DoubleType(), True),
                           tp.StructField('color', tp.StringType(), True),
                           tp.StructField('interior', tp.StringType(), True),
                           tp.StructField('seller', tp.StringType(), True),
                           tp.StructField('mmr', tp.IntegerType(), True),
                           tp.StructField('sellingprice', tp.IntegerType(), True),
                           tp.StructField('saledate', tp.StringType(), True)])
    
    
    # LOAD IN DATA WITH SCHEMA
    df = spark.read.csv("car_prices.csv", header = True, sep=",", schema=schema)
    
    # FILTER OUT VIN NUMBERS FROM STATE COLUMN
    df = df.where(F.length(F.col("state")) <= 2)
    
    # DROP ROWS THAT CONTAIN NULL VALUES
    df = df.na.drop('any')
    
    # CREATE THRESHOLD FOR CONDITION COLUMN
    df = df.withColumn(
        'condition', 
        F.when(df.condition > 3.75, 'Great'
        ).when((df.condition >= 2) & (df.condition <= 3.75), 'Average'
        ).when(df.condition < 2, 'Bad'))
    
    # DROP COLUMNS THAT WON'T BE USED
    cols = ('trim', 'vin', 'interior', 'seller')
    df = df.drop(*cols)
    
    
    # USE MM DD YYYY FOR SALEDATE COLUMN
    df = df.withColumn(
        'saledate', F.substring('saledate', 5,11)
        ).withColumn(
        'saledate_year', F.substring('saledate', 7,5)
        ).withColumn(
        'saledate_month', F.substring('saledate', 1,3))
    
    df = df.withColumn(
        'saledate_year', F.col('saledate_year').cast(tp.IntegerType())
        )
    
    # RETURN NEW DATAFRAME
    return df


# LOAD THE DATA
df = load_data()

# SHOW DATA
df.show(5)

# SHOW NUMBER OF COLUMNS AND ROWS
print(f'Number of columns: {len(df.columns)} \nNumber of Rows: {df.count()}')
print()

# SHOW SCHEMA - DATATYPES
df.printSchema()

+----+-----+-------------------+-----+------------+-----+---------+--------+-----+-----+------------+-----------+-------------+--------------+
|year| make|              model| body|transmission|state|condition|odometer|color|  mmr|sellingprice|   saledate|saledate_year|saledate_month|
+----+-----+-------------------+-----+------------+-----+---------+--------+-----+-----+------------+-----------+-------------+--------------+
|2015|  Kia|            Sorento|  SUV|   automatic|   ca|    Great| 16639.0|white|20500|       21500|Dec 16 2014|         2014|           Dec|
|2015|  Kia|            Sorento|  SUV|   automatic|   ca|    Great|  9393.0|white|20800|       21500|Dec 16 2014|         2014|           Dec|
|2014|  BMW|           3 Series|Sedan|   automatic|   ca|    Great|  1331.0| gray|31900|       30000|Jan 15 2015|         2015|           Jan|
|2015|Volvo|                S60|Sedan|   automatic|   ca|    Great| 14282.0|white|27500|       27750|Jan 29 2015|         2015|           Jan|

In [84]:
def transform_df(df, column_names):
    ''' SIMPLE FUNCTION THAT RETURNS 
    A NEW DF BASED ON COLUMN NAMES'''
    return df.select(column_names)


names = ['year', 'condition', 'sellingprice']
df_version2 = transform_df(df, names)
df_version2.show()

+----+---------+------------+
|year|condition|sellingprice|
+----+---------+------------+
|2015|    Great|       21500|
|2015|    Great|       21500|
|2014|    Great|       30000|
|2015|    Great|       27750|
|2014|    Great|       67000|
|2015|      Bad|       10900|
|2014|  Average|       65000|
|2014|  Average|        9800|
|2014|    Great|       32250|
|2014|  Average|       17500|
|2014|    Great|       49750|
|2015|    Great|       17700|
|2015|  Average|       12000|
|2015|    Great|       21500|
|2015|  Average|       14100|
|2014|    Great|       40000|
|2014|      Bad|       17000|
|2014|  Average|       67200|
|2015|      Bad|        7200|
|2014|  Average|       30000|
+----+---------+------------+
only showing top 20 rows



In [98]:
def data_processing(df, categorical_columns):
    '''
    FUNCTION TAKES IN 2 PARAMETERS:
    1. Data Frame --> data frame that you are working with that you want to process
    2. Column Names --> these are the categorical columns that will be used for processing/transformed
    
    Methods applied:
    1. Indexing --> Get index of string columns
    2. One hot encoding --> categorical values to numerical values
    3. Assembler --> vectorizing encoded values
    4. Pipeline --> create a pipeline do bring all these processes together
    
    Returns a transformed model as a dataframe
    '''
    
    # 1. INDEXER
    
    cc = categorical_columns
    indexers = [StringIndexer(inputCol = column, outputCol = f'{column}_indexed') for column in cc]
    
    # 2. One Hot Encoding
    
    encoders = [OneHotEncoder(dropLast = False, inputCol = idx.getOutputCol(), outputCol = f'{idx.getOutputCol()}_encoded') for idx in indexers]
    
    # 3. Assembler --> Vectorize encoded values
    
    assembler = VectorAssembler(inputCols = [encoded_val.getOutputCol() for encoded_val in encoders], outputCol = 'features')
    
    # 4. Pipeline
    
    pipeline = Pipeline(stages = indexers + encoders + [assembler])
    
    
    # Return our transformed moder
    model = pipeline.fit(df_version2)
    
    transformed_df = model.transform(df_version2)
    
    return transformed_df


trans_df = data_processing(df_version2, names)
trans_df.show()

+----+---------+------------+------------+-----------------+--------------------+--------------------+-------------------------+----------------------------+--------------------+
|year|condition|sellingprice|year_indexed|condition_indexed|sellingprice_indexed|year_indexed_encoded|condition_indexed_encoded|sellingprice_indexed_encoded|            features|
+----+---------+------------+------------+-----------------+--------------------+--------------------+-------------------------+----------------------------+--------------------+
|2015|    Great|       21500|        12.0|              1.0|               162.0|     (26,[12],[1.0])|            (3,[1],[1.0])|          (1806,[162],[1.0])|(1835,[12,27,191]...|
|2015|    Great|       21500|        12.0|              1.0|               162.0|     (26,[12],[1.0])|            (3,[1],[1.0])|          (1806,[162],[1.0])|(1835,[12,27,191]...|
|2014|    Great|       30000|         2.0|              1.0|               229.0|      (26,[2],[1.0])|   

In [99]:
####################################################
# Regression using gradient trees regressor
####################################################

# Try to predict selling price using gradient trees regressor
import pyspark.ml.feature as ft

features = ['year','condition_indexed']

In [100]:
# Collate the features together and use the ChiSqSelector to select only the top 2 most important features
featuresCreator = ft.VectorAssembler(inputCols=[col for col in features[1:]], outputCol='features')
selector = ft.ChiSqSelector(numTopFeatures=2, outputCol="selectedFeatures", labelCol='sellingprice')

In [101]:
# In order to predict the selling price, we will use the gradient boosted trees generator
import pyspark.ml.regression as reg
regressor = reg.GBTRegressor(maxIter=15, maxDepth=3, labelCol='sellingprice')

In [109]:
# Split data in train and test 
names = ['year', 'condition_indexed', 'sellingprice']
df_version2 = transform_df(trans_df, names)
df_version2.show()

df_train, df_test = df_version2.randomSplit([0.7,0.3], seed=200)

+----+-----------------+------------+
|year|condition_indexed|sellingprice|
+----+-----------------+------------+
|2015|              1.0|       21500|
|2015|              1.0|       21500|
|2014|              1.0|       30000|
|2015|              1.0|       27750|
|2014|              1.0|       67000|
|2015|              2.0|       10900|
|2014|              0.0|       65000|
|2014|              0.0|        9800|
|2014|              1.0|       32250|
|2014|              0.0|       17500|
|2014|              1.0|       49750|
|2015|              1.0|       17700|
|2015|              0.0|       12000|
|2015|              1.0|       21500|
|2015|              0.0|       14100|
|2014|              1.0|       40000|
|2014|              2.0|       17000|
|2014|              0.0|       67200|
|2015|              2.0|        7200|
|2014|              0.0|       30000|
+----+-----------------+------------+
only showing top 20 rows



In [110]:
# We put it all together into a Pipeline
pipeline = Pipeline(stages=[featuresCreator, selector, regressor])
selling_price = pipeline.fit(df_train)

In [111]:
# Check if mour model performs well on testing data
import pyspark.ml.evaluation as ev

evaluator = ev.RegressionEvaluator(predictionCol="prediction", labelCol='sellingprice')

print(evaluator.evaluate(selling_price.transform(df_test), {evaluator.metricName: 'r2'}))
print(evaluator.evaluate(selling_price.transform(df_test), {evaluator.metricName: 'rmse'}))

0.22911336744248112
8489.413580381832
