Import and initiate findspark
Then import pyspark

In [1]:
import findspark
findspark.init("/usr/local/spark")

In [2]:
import pyspark

Instatiate SparkSession with Hive support

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Check point 3") \
        .config("spark.sql.warehouse.dir", "hdfs://localhost:54310/user/hive/warehouse") \
        .enableHiveSupport() \
        .getOrCreate()

# On some clusters the following config setting may be requied
#         .config("hive.metastore.uris", "<value>") 

In [4]:
spark.sql('create database capstone')

DataFrame[]

In [14]:
# Testing Hive integration
spark.sql('show databases').show()

+------------+
|databaseName|
+------------+
|    capstone|
|     default|
|        nyse|
|      office|
+------------+



In [18]:
df_cars = spark.read.load("/home/hduser/Downloads/sharedfolder/df_cars_merged.csv", format="csv", sep=",", inferSchema="true", header="true")
df_cars.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- selling_price: integer (nullable = true)
 |-- km_driven: integer (nullable = true)
 |-- StateorProvince: string (nullable = true)
 |-- City: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- seller_type: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- mileage: double (nullable = true)
 |-- engine: integer (nullable = true)
 |-- max_power: double (nullable = true)
 |-- seats: integer (nullable = true)
 |-- sold: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- brand: string (nullable = true)



In [19]:
# Testing Hive integration
spark.sql('show tables').show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



In [20]:
df_cars.write.mode("append").saveAsTable("capstone.dfcars")

In [21]:
spark.sql('select * from capstone.dfcars').show()

+---+----+-------------+---------+--------------------+----------+------+-----------+------------+------------+-------+------+---------+-----+----+------+----------+
|_c0|year|selling_price|km_driven|     StateorProvince|      City|  fuel|seller_type|transmission|       owner|mileage|engine|max_power|seats|sold|Region|     brand|
+---+----+-------------+---------+--------------------+----------+------+-----------+------------+------------+-------+------+---------+-----+----+------+----------+
|  0|2014|       450000|   145500|District of Columbia|Washington|Diesel| Individual|      Manual| First Owner|   23.4|  1248|     74.0|    5|   Y|  East|    MARUTI|
|  1|2019|      1149000|     5000|District of Columbia|Washington|Petrol| Individual|      Manual| First Owner|   17.0|  1591|    121.3|    5|   Y|  East|   HYUNDAI|
|  2|2017|       600000|    25000|District of Columbia|Washington|Petrol| Individual|      Manual| Third Owner|  18.16|  1196|     86.8|    5|   Y|  East|      FORD|
|  3

In [22]:
spark.sql('show tables from capstone').show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|capstone|   dfcars|      false|
+--------+---------+-----------+



In [23]:
spark.sql('use capstone')

DataFrame[]

### Profiling

In [59]:
df_cars.dtypes

[('_c0', 'int'),
 ('year', 'int'),
 ('selling_price', 'int'),
 ('km_driven', 'int'),
 ('StateorProvince', 'string'),
 ('City', 'string'),
 ('fuel', 'string'),
 ('seller_type', 'string'),
 ('transmission', 'string'),
 ('owner', 'string'),
 ('mileage', 'double'),
 ('engine', 'int'),
 ('max_power', 'double'),
 ('seats', 'int'),
 ('sold', 'string'),
 ('Region', 'string'),
 ('brand', 'string')]

In [41]:
numerical_features=['year','selling_price','km_driven','mileage','engine','max_power','seats']

In [43]:
df_cars.describe(numerical_features).show()

+-------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+
|summary|              year|    selling_price|        km_driven|           mileage|            engine|        max_power|             seats|
+-------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+
|  count|              7906|             7906|             7906|              7906|              7906|             7906|              7906|
|   mean|2013.9839362509485| 649813.720844928|69188.65975208703|19.419860865165695|1458.7088287376675|91.58737351378637|5.4163926132051605|
| stddev|3.8636953387034967|813582.7483541325|56792.29634331763| 4.036263200758886|  503.893056850139|35.74721608448376|0.9592082121984603|
|    min|              1994|            29999|                1|               0.0|               624|             32.8|                 2|
|    max|           

### Checking for missing values

In [44]:
from pyspark.sql.functions import isnan, when, count, col
df_cars.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_cars.columns]).show()

+---+----+-------------+---------+---------------+----+----+-----------+------------+-----+-------+------+---------+-----+----+------+-----+
|_c0|year|selling_price|km_driven|StateorProvince|City|fuel|seller_type|transmission|owner|mileage|engine|max_power|seats|sold|Region|brand|
+---+----+-------------+---------+---------------+----+----+-----------+------------+-----+-------+------+---------+-----+----+------+-----+
|  0|   0|            0|        0|              0|   0|   0|          0|           0|    0|      0|     0|        0|    0|   0|     0|    0|
+---+----+-------------+---------+---------------+----+----+-----------+------------+-----+-------+------+---------+-----+----+------+-----+



In [60]:
def calculate_bounds(df):
    bounds = {
        c: dict(
            zip(["q1", "q3"], df.approxQuantile(c, [0.25, 0.75], 0))
        )
        for c,d in zip(df.columns, df.dtypes) if d[1] == "int" or d[1]=="double"
    }

    for c in bounds:
        iqr = bounds[c]['q3'] - bounds[c]['q1']
        bounds[c]['min'] = bounds[c]['q1'] - (iqr * 1.5)
        bounds[c]['max'] = bounds[c]['q3'] + (iqr * 1.5)

    return bounds

In [61]:
calculate_bounds(df_cars)

{'_c0': {'max': 11859.5, 'min': -3952.5, 'q1': 1977.0, 'q3': 5930.0},
 'engine': {'max': 2159.5, 'min': 619.5, 'q1': 1197.0, 'q3': 1582.0},
 'km_driven': {'max': 187500.0, 'min': -56500.0, 'q1': 35000.0, 'q3': 96000.0},
 'max_power': {'max': 152.925,
  'min': 17.124999999999993,
  'q1': 68.05,
  'q3': 102.0},
 'mileage': {'max': 30.63, 'min': 8.470000000000002, 'q1': 16.78, 'q3': 22.32},
 'seats': {'max': 5.0, 'min': 5.0, 'q1': 5.0, 'q3': 5.0},
 'selling_price': {'max': 1320000.0,
  'min': -360000.0,
  'q1': 270000.0,
  'q3': 690000.0},
 'year': {'max': 2024.5, 'min': 2004.5, 'q1': 2012.0, 'q3': 2017.0}}

In [50]:
import pyspark.sql.functions as f
def flag_outliers(df, id_col):
    bounds = calculate_bounds(df)
    outliers = {}

    return df.select(c, id_col,
        *[
            f.when(
                ~f.col(c).between(bounds[c]['min'], bounds[c]['max']),"yes"
            ).otherwise("no").alias(c+'_outlier')
        ]
    )

In [63]:
for c in numerical_features:
    flag_outliers(df_cars,c).show()

+----+----+------------+
|year|year|year_outlier|
+----+----+------------+
|2014|2014|          no|
|2019|2019|          no|
|2017|2017|          no|
|2016|2016|          no|
|2015|2015|          no|
|2018|2018|          no|
|2009|2009|          no|
|2010|2010|          no|
|2019|2019|          no|
|2014|2014|          no|
|2015|2015|          no|
|2019|2019|          no|
|2013|2013|          no|
|2018|2018|          no|
|2010|2010|          no|
|2016|2016|          no|
|2011|2011|          no|
|2016|2016|          no|
|2015|2015|          no|
|1999|1999|         yes|
+----+----+------------+
only showing top 20 rows

+-------------+-------------+---------------------+
|selling_price|selling_price|selling_price_outlier|
+-------------+-------------+---------------------+
|       450000|       450000|                   no|
|      1149000|      1149000|                   no|
|       600000|       600000|                   no|
|       540000|       540000|                   no|
|       63

### We observe outliers in the dataset and therefore they need to be handled

In [71]:
from pyspark.ml.feature import StringIndexer

In [72]:
indexer = StringIndexer(inputCol="fuel", outputCol="fuel_numeric")
indexed = indexer.fit(df_cars).transform(df_cars)
indexed.show()


+---+----+-------------+---------+--------------------+----------+------+-----------+------------+------------+-------+------+---------+-----+----+------+----------+------------+
|_c0|year|selling_price|km_driven|     StateorProvince|      City|  fuel|seller_type|transmission|       owner|mileage|engine|max_power|seats|sold|Region|     brand|fuel_numeric|
+---+----+-------------+---------+--------------------+----------+------+-----------+------------+------------+-------+------+---------+-----+----+------+----------+------------+
|  0|2014|       450000|   145500|District of Columbia|Washington|Diesel| Individual|      Manual| First Owner|   23.4|  1248|     74.0|    5|   Y|  East|    MARUTI|         0.0|
|  1|2019|      1149000|     5000|District of Columbia|Washington|Petrol| Individual|      Manual| First Owner|   17.0|  1591|    121.3|    5|   Y|  East|   HYUNDAI|         1.0|
|  2|2017|       600000|    25000|District of Columbia|Washington|Petrol| Individual|      Manual| Third 

In [73]:
from pyspark.ml.feature import OneHotEncoder

In [76]:
encoder = OneHotEncoder(inputCol=["transmission"],
                       outputCol=["transmission_vect"])

TypeError: Invalid param value given for param "outputCol". Could not convert <class 'list'> to string type

### MODEL BUILDING

In [24]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *

In [25]:
features=['year','selling_price','km_driven','StateorProvince','City','fuel','seller_type','transmission', 'owner','mileage','engine','max_power','seats','sold','Region','brand']

In [29]:
lr_data = df_cars.select(col("selling_price").alias("label"), *features)
lr_data.printSchema()

root
 |-- label: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- selling_price: integer (nullable = true)
 |-- km_driven: integer (nullable = true)
 |-- StateorProvince: string (nullable = true)
 |-- City: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- seller_type: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- mileage: double (nullable = true)
 |-- engine: integer (nullable = true)
 |-- max_power: double (nullable = true)
 |-- seats: integer (nullable = true)
 |-- sold: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- brand: string (nullable = true)



In [31]:
vectorAssembler = VectorAssembler(inputCols=features, outputCol="unscaled_features")

In [32]:
va_data = vectorAssembler.transform(lr_data)

IllegalArgumentException: 'Data type StringType is not supported.'