In [1]:
import findspark
findspark.init('/usr/local/spark')

In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, BooleanType, DateType

In [4]:
spark = SparkSession.builder.appName("Python Spark SQL example") \
        .config("spark.sql.warehouse.dir", "hdfs://localhost:54310/user/hive/warehouse") \
        .enableHiveSupport() \
        .getOrCreate()

In [5]:
carDF = spark.read.csv('Car_sales_transactions.csv', 
                       inferSchema = True, header = True)

In [6]:
carDF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c)
              for c in carDF.schema.names]).show()

+--------+----+----+-------------+---------+---------+----------+-----------+----+-----------+------------+-----+-------+------+---------+-----+
|Sales_ID|Name|Year|Selling_Price|km_Driven|City_Code|State_Code|Postal_Code|Fuel|Seller_Type|Transmission|Owner|Mileage|Engine|Max_Power|Seats|
+--------+----+----+-------------+---------+---------+----------+-----------+----+-----------+------------+-----+-------+------+---------+-----+
|       0|   0|   0|            0|        0|        0|         0|          0|   0|          0|           0|    0|      0|     0|        0|    0|
+--------+----+----+-------------+---------+---------+----------+-----------+----+-----------+------------+-----+-------+------+---------+-----+



In [7]:
carDF = carDF.withColumn('new1', split(col('Engine'), ' ').getItem(0).cast('float')) \
        .withColumn('new2', split(col('Name'), ' ').getItem(0)) \
        .withColumn('new3', split(col('Mileage'), ' ').getItem(0).cast('float')) \
        .withColumn('new4', split(col('Max_Power'), ' ').getItem(0).cast('float')) \
        .withColumn('Postal_Code', col('Postal_Code').cast(StringType())) \
        .withColumn('Seats', col('Seats').cast(StringType()))
carDF = carDF.drop('Engine', 'Name', 'Mileage', 'Max_Power')
carDF = carDF.withColumnRenamed('new1', 'Engine') \
        .withColumnRenamed('new2', 'Name') \
        .withColumnRenamed('new3', 'Mileage') \
        .withColumnRenamed('new4', 'Max_Power')

In [8]:
carDF.printSchema()

root
 |-- Sales_ID: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Selling_Price: integer (nullable = true)
 |-- km_Driven: integer (nullable = true)
 |-- City_Code: string (nullable = true)
 |-- State_Code: string (nullable = true)
 |-- Postal_Code: string (nullable = true)
 |-- Fuel: string (nullable = true)
 |-- Seller_Type: string (nullable = true)
 |-- Transmission: string (nullable = true)
 |-- Owner: string (nullable = true)
 |-- Seats: string (nullable = true)
 |-- Engine: float (nullable = true)
 |-- Name: string (nullable = true)
 |-- Mileage: float (nullable = true)
 |-- Max_Power: float (nullable = true)



In [9]:
carDF.select('Engine').show(5)

+------+
|Engine|
+------+
|1248.0|
|1498.0|
|1497.0|
|1396.0|
|1298.0|
+------+
only showing top 5 rows



In [10]:
catCols = ['City_Code', 'State_Code', 'Postal_Code', 'Fuel', 'Seller_Type',
          'Transmission', 'Owner', 'Seats', 'Name']
carDF.select([countDistinct(c).alias(c)
              for c in catCols]).show()

+---------+----------+-----------+----+-----------+------------+-----+-----+----+
|City_Code|State_Code|Postal_Code|Fuel|Seller_Type|Transmission|Owner|Seats|Name|
+---------+----------+-----------+----+-----------+------------+-----+-----+----+
|       29|        18|         61|   4|          3|           2|    5|    9|  31|
+---------+----------+-----------+----+-----------+------------+-----+-----+----+



## Regression using PySpark MLlib
we will use the Random Forest Regression model. The maximum classes in the categorical variable in the dataset is 61, hence we will set this as the max_category value in the preprocessing procedure

In [60]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor, LinearRegression, DecisionTreeRegressor, GBTRegressor
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.feature import VectorIndexer, StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.linalg import Vectors, VectorUDT

In [61]:
featCols = [c for c in carDF.schema.names if \
            c != 'Selling_Price' and c != 'Sales_ID']
targetCol = ['Selling_Price']
numVar = [c for c in featCols if c not in catCols and targetCol]

In [166]:
inputData = carDF.select(col('Selling_Price').alias('label'), 
                         *featCols)
# inputData = carDF.select(*(col(c).cast('float') for c in numVar), 
#                          *catCols)
inputData
inputData.printSchema()

root
 |-- label: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- km_Driven: integer (nullable = true)
 |-- City_Code: string (nullable = true)
 |-- State_Code: string (nullable = true)
 |-- Postal_Code: string (nullable = true)
 |-- Fuel: string (nullable = true)
 |-- Seller_Type: string (nullable = true)
 |-- Transmission: string (nullable = true)
 |-- Owner: string (nullable = true)
 |-- Seats: string (nullable = true)
 |-- Engine: float (nullable = true)
 |-- Name: string (nullable = true)
 |-- Mileage: float (nullable = true)
 |-- Max_Power: float (nullable = true)



In [167]:
indexers = [StringIndexer(inputCol = c, 
                         outputCol = 'indexedFeatures{0}'.format(c), handleInvalid = 'skip') for c in catCols]

encoders = [OneHotEncoder(inputCol = indexer.getOutputCol(), 
                         outputCol = 'indexedFeaturesEnc{0}'.format(indexer.getOutputCol())) \
            for indexer in indexers]

assembler = VectorAssembler(inputCols = [*(encoder.getOutputCol() \
                                         for encoder in encoders), *numVar], 
                             outputCol = "features1")
vecIndexer = VectorIndexer(inputCol = 'features1', outputCol = 'features', maxCategories = 61)

# scaler = StandardScaler(inputCol = 'scaled_features', outputCol = 'features')

# rf = RandomForestRegressor(featuresCol='features')
dt = GBTRegressor(featuresCol = 'features')

# lr = LinearRegression()

pipeline = Pipeline(stages = indexers + encoders + [assembler, vecIndexer, dt])

# inputData = pipelineInData.fit(inputData).transform(inputData)

In [168]:
# inputData = pipelineInData.fit(inputData).transform(inputData)

In [169]:
(trainingData, testData) = inputData.randomSplit([0.7, 0.3])

In [170]:
trainingData.describe().show()

+-------+-----------------+------------------+-----------------+---------+----------+-----------------+------+----------------+------------+-----------+------------------+------------------+----------+------------------+-----------------+
|summary|            label|              Year|        km_Driven|City_Code|State_Code|      Postal_Code|  Fuel|     Seller_Type|Transmission|      Owner|             Seats|            Engine|      Name|           Mileage|        Max_Power|
+-------+-----------------+------------------+-----------------+---------+----------+-----------------+------+----------------+------------+-----------+------------------+------------------+----------+------------------+-----------------+
|  count|             5578|              5578|             5578|     5578|      5578|             5578|  5578|            5578|        5578|       5578|              5578|              5578|      5578|              5578|             5578|
|   mean|649840.9671925422|2013.991215489422

In [171]:
testData.describe().show()

+-------+-----------------+------------------+-----------------+---------+----------+------------------+------+----------------+------------+-----------+------------------+------------------+----------+-----------------+-----------------+
|summary|            label|              Year|        km_Driven|City_Code|State_Code|       Postal_Code|  Fuel|     Seller_Type|Transmission|      Owner|             Seats|            Engine|      Name|          Mileage|        Max_Power|
+-------+-----------------+------------------+-----------------+---------+----------+------------------+------+----------------+------------+-----------+------------------+------------------+----------+-----------------+-----------------+
|  count|             2328|              2328|             2328|     2328|      2328|              2328|  2328|            2328|        2328|       2328|              2328|              2328|      2328|             2328|             2328|
|   mean|649748.4372852234|2013.966494845360

In [172]:
model = pipeline.fit(trainingData)

In [173]:
predictions = model.transform(testData)

In [174]:
predictions.printSchema()

root
 |-- label: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- km_Driven: integer (nullable = true)
 |-- City_Code: string (nullable = true)
 |-- State_Code: string (nullable = true)
 |-- Postal_Code: string (nullable = true)
 |-- Fuel: string (nullable = true)
 |-- Seller_Type: string (nullable = true)
 |-- Transmission: string (nullable = true)
 |-- Owner: string (nullable = true)
 |-- Seats: string (nullable = true)
 |-- Engine: float (nullable = true)
 |-- Name: string (nullable = true)
 |-- Mileage: float (nullable = true)
 |-- Max_Power: float (nullable = true)
 |-- indexedFeaturesCity_Code: double (nullable = true)
 |-- indexedFeaturesState_Code: double (nullable = true)
 |-- indexedFeaturesPostal_Code: double (nullable = true)
 |-- indexedFeaturesFuel: double (nullable = true)
 |-- indexedFeaturesSeller_Type: double (nullable = true)
 |-- indexedFeaturesTransmission: double (nullable = true)
 |-- indexedFeaturesOwner: double (nullable = true)
 |-- indexedF

In [175]:
predictions.select('prediction', 'label', 'features').show()

+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
|63463.624881547126|40000|(157,[16,31,52,10...|
| 71727.91227122312|40000|(157,[13,28,66,10...|
|63463.624881547126|40000|(157,[4,28,70,106...|
| 67383.26673443643|40000|(157,[19,40,55,10...|
| 72085.42227472259|42000|(157,[33,60,106,1...|
| 69185.17817093592|45000|(157,[23,30,73,10...|
| 68165.78042183329|45000|(157,[4,28,78,106...|
|  100896.884711803|45000|(157,[2,30,82,106...|
| 81928.76916663839|45957|(157,[11,32,74,10...|
| 143319.9035499095|46000|(157,[27,59,106,1...|
| 73088.21690844283|50000|(157,[19,40,55,10...|
|103670.92338599057|50000|(157,[19,40,55,10...|
| 85084.14528649345|55000|(157,[8,29,53,106...|
|103739.06752529118|55000|(157,[3,28,46,106...|
| 91813.30144227596|55000|(157,[0,35,80,105...|
| 88378.04116363778|55000|(157,[16,31,52,10...|
| 81213.41698549713|58000|(157,[26,44,103,1...|
|  85915.5725257833|58000|(157,[20,41,64

In [176]:
evaluator = RegressionEvaluator(labelCol = 'label', predictionCol = 'prediction', metricName = 'rmse')

In [165]:
result = predictions.withColumn('result', (predictions['prediction'] - predictions['label'])**2)
print(result.select(mean('result')).collect())

[Row(avg(result)=23324757504.865894)]


In [177]:
rmse = evaluator.evaluate(predictions)
print('Root mean squares error (RMSE) on test data = %g' % rmse)

Root mean squares error (RMSE) on test data = 232205


In [35]:
rfmodel = model.stages
print(rfmodel)

[StringIndexer_4cb3a2680aacd577ebc2, StringIndexer_4fd381b1115fcb818a18, StringIndexer_4ec0b020b94d3bbbee3a, StringIndexer_4bdcb1b3907d61aebaf1, StringIndexer_420583e0bd97d1485b92, StringIndexer_4123ae24ffa5d78d8c91, StringIndexer_4cedb04b295868b51d4c, StringIndexer_46629b2ec6c692db1e7c, StringIndexer_42469099ec7d239b6e6a, OneHotEncoder_4120ad27cf2c3a267619, OneHotEncoder_433abe3e0eb3991e824f, OneHotEncoder_4caa90fb6571aa1a8e16, OneHotEncoder_4ca7aacc3f99cf09410c, OneHotEncoder_4e779e71b290d623c9a1, OneHotEncoder_456fa77b6dc082ae1dc1, OneHotEncoder_4185ba581553d5161086, OneHotEncoder_4a72a31efe6e910d9538, OneHotEncoder_42b9a07d53782335c8cf, VectorAssembler_43079c5d78d1f8a8f2bf, VectorIndexer_47c5b9f2695bca47cdc0, RandomForestRegressionModel (uid=RandomForestRegressor_46c78233ddfbd429bf33) with 20 trees]


In [None]:
featureIndexer = VectorIndexer(inputCol = "features", 
                               outputCol = 'indexedFeatures',
                              maxCategories = 61).fit(inputData)