Import and initiate findspark
Then import pyspark

In [1]:
import findspark
findspark.init("/usr/local/spark")

In [2]:
import pyspark

Instatiate SparkSession with Hive support

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Check point 3") \
        .config("spark.sql.warehouse.dir", "hdfs://localhost:54310/user/hive/warehouse") \
        .enableHiveSupport() \
        .getOrCreate()

# On some clusters the following config setting may be requied
#         .config("hive.metastore.uris", "<value>") 

In [4]:
spark.sql('create database capstone')

DataFrame[]

In [14]:
# Testing Hive integration
spark.sql('show databases').show()

+------------+
|databaseName|
+------------+
|    capstone|
|     default|
|        nyse|
|      office|
+------------+



In [18]:
df_cars = spark.read.load("/home/hduser/Downloads/sharedfolder/df_cars_merged.csv", format="csv", sep=",", inferSchema="true", header="true")
df_cars.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- selling_price: integer (nullable = true)
 |-- km_driven: integer (nullable = true)
 |-- StateorProvince: string (nullable = true)
 |-- City: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- seller_type: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- mileage: double (nullable = true)
 |-- engine: integer (nullable = true)
 |-- max_power: double (nullable = true)
 |-- seats: integer (nullable = true)
 |-- sold: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- brand: string (nullable = true)



In [19]:
# Testing Hive integration
spark.sql('show tables').show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



In [20]:
df_cars.write.mode("append").saveAsTable("capstone.dfcars")

In [21]:
spark.sql('select * from capstone.dfcars').show()

+---+----+-------------+---------+--------------------+----------+------+-----------+------------+------------+-------+------+---------+-----+----+------+----------+
|_c0|year|selling_price|km_driven|     StateorProvince|      City|  fuel|seller_type|transmission|       owner|mileage|engine|max_power|seats|sold|Region|     brand|
+---+----+-------------+---------+--------------------+----------+------+-----------+------------+------------+-------+------+---------+-----+----+------+----------+
|  0|2014|       450000|   145500|District of Columbia|Washington|Diesel| Individual|      Manual| First Owner|   23.4|  1248|     74.0|    5|   Y|  East|    MARUTI|
|  1|2019|      1149000|     5000|District of Columbia|Washington|Petrol| Individual|      Manual| First Owner|   17.0|  1591|    121.3|    5|   Y|  East|   HYUNDAI|
|  2|2017|       600000|    25000|District of Columbia|Washington|Petrol| Individual|      Manual| Third Owner|  18.16|  1196|     86.8|    5|   Y|  East|      FORD|
|  3

In [22]:
spark.sql('show tables from capstone').show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|capstone|   dfcars|      false|
+--------+---------+-----------+



In [23]:
spark.sql('use capstone')

DataFrame[]

### EDA 

In [41]:
numerical_features=['year','selling_price','km_driven','mileage','engine','max_power','seats']

In [43]:
df_cars.describe(numerical_features).show()

+-------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+
|summary|              year|    selling_price|        km_driven|           mileage|            engine|        max_power|             seats|
+-------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+
|  count|              7906|             7906|             7906|              7906|              7906|             7906|              7906|
|   mean|2013.9839362509485| 649813.720844928|69188.65975208703|19.419860865165695|1458.7088287376675|91.58737351378637|5.4163926132051605|
| stddev|3.8636953387034967|813582.7483541325|56792.29634331763| 4.036263200758886|  503.893056850139|35.74721608448376|0.9592082121984603|
|    min|              1994|            29999|                1|               0.0|               624|             32.8|                 2|
|    max|           

### Checking for missing values

In [44]:
from pyspark.sql.functions import isnan, when, count, col
df_cars.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_cars.columns]).show()

+---+----+-------------+---------+---------------+----+----+-----------+------------+-----+-------+------+---------+-----+----+------+-----+
|_c0|year|selling_price|km_driven|StateorProvince|City|fuel|seller_type|transmission|owner|mileage|engine|max_power|seats|sold|Region|brand|
+---+----+-------------+---------+---------------+----+----+-----------+------------+-----+-------+------+---------+-----+----+------+-----+
|  0|   0|            0|        0|              0|   0|   0|          0|           0|    0|      0|     0|        0|    0|   0|     0|    0|
+---+----+-------------+---------+---------------+----+----+-----------+------------+-----+-------+------+---------+-----+----+------+-----+



In [39]:
import pandas as pd
df_train = pd.get_dummies(df_cars,columns=['fuel','seller_type','transmission','owner','sold','Region'])
df_train.head()

ImportError: No module named 'pandas'

### MODEL BUILDING

In [24]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *

In [25]:
features=['year','selling_price','km_driven','StateorProvince','City','fuel','seller_type','transmission', 'owner','mileage','engine','max_power','seats','sold','Region','brand']

In [29]:
lr_data = df_cars.select(col("selling_price").alias("label"), *features)
lr_data.printSchema()

root
 |-- label: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- selling_price: integer (nullable = true)
 |-- km_driven: integer (nullable = true)
 |-- StateorProvince: string (nullable = true)
 |-- City: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- seller_type: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- mileage: double (nullable = true)
 |-- engine: integer (nullable = true)
 |-- max_power: double (nullable = true)
 |-- seats: integer (nullable = true)
 |-- sold: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- brand: string (nullable = true)



In [31]:
vectorAssembler = VectorAssembler(inputCols=features, outputCol="unscaled_features")

In [32]:
va_data = vectorAssembler.transform(lr_data)

IllegalArgumentException: 'Data type StringType is not supported.'