In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('carPricing').getOrCreate()


In [2]:
tmpData = spark.read.csv('cars.csv',inferSchema=True,header=True)

In [3]:
tmpData.printSchema()

root
 |-- _id: string (nullable = true)
 |-- body_color: string (nullable = true)
 |-- body_status: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- fuel_consumption: double (nullable = true)
 |-- fuel_type: string (nullable = true)
 |-- mileage: integer (nullable = true)
 |-- model: string (nullable = true)
 |-- price: long (nullable = true)
 |-- volume: double (nullable = true)
 |-- year: integer (nullable = true)



In [4]:
tmpData.head(3)[2].asDict()

{'_id': '62c022d45ac6166d04f2a9c8',
 'body_color': 'سفید',
 'body_status': 'دور رنگ',
 'body_type': 'suv',
 'brand': 'nissan',
 'engine': '6 سیلندر P40 کاربراتوری ',
 'fuel_consumption': 17.0,
 'fuel_type': 'دوگانه سوز',
 'mileage': 500000,
 'model': 'patrol4doorir',
 'price': 210000000,
 'volume': 4.0,
 'year': 1990}

In [5]:
tmpData.count()

16777

### remove null data

In [6]:
tmpData.na.drop().count()

16777

In [7]:
tmpData.columns

['_id',
 'body_color',
 'body_status',
 'body_type',
 'brand',
 'engine',
 'fuel_consumption',
 'fuel_type',
 'mileage',
 'model',
 'price',
 'volume',
 'year']

In [8]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

In [9]:
indexer = StringIndexer(inputCols=['body_color',
 'body_status',
 'body_type',
 'brand',
 'engine','fuel_type',
 'model'],
outputCols=['body_color_indexed',
 'body_status_indexed',
 'body_type_indexed',
 'brand_indexed',
 'engine_indexed','fuel_type_indexed',
 'model_indexed'])

In [10]:
indexedData = indexer.fit(tmpData).transform(tmpData)

In [11]:
indexedData.printSchema()

root
 |-- _id: string (nullable = true)
 |-- body_color: string (nullable = true)
 |-- body_status: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- fuel_consumption: double (nullable = true)
 |-- fuel_type: string (nullable = true)
 |-- mileage: integer (nullable = true)
 |-- model: string (nullable = true)
 |-- price: long (nullable = true)
 |-- volume: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- body_status_indexed: double (nullable = false)
 |-- engine_indexed: double (nullable = false)
 |-- fuel_type_indexed: double (nullable = false)
 |-- model_indexed: double (nullable = false)
 |-- brand_indexed: double (nullable = false)
 |-- body_color_indexed: double (nullable = false)
 |-- body_type_indexed: double (nullable = false)



In [12]:
indexedData.columns

['_id',
 'body_color',
 'body_status',
 'body_type',
 'brand',
 'engine',
 'fuel_consumption',
 'fuel_type',
 'mileage',
 'model',
 'price',
 'volume',
 'year',
 'body_status_indexed',
 'engine_indexed',
 'fuel_type_indexed',
 'model_indexed',
 'brand_indexed',
 'body_color_indexed',
 'body_type_indexed']

#### normalized

In [13]:
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import Normalizer

In [14]:
assembler = VectorAssembler(inputCols=[
 'fuel_consumption','mileage','volume',
 'year'],outputCol='features')

In [15]:
assembledData = assembler.transform(indexedData)

In [16]:
normalizer = Normalizer(inputCol="features", outputCol="normFeatures")

In [17]:
l1NormData = normalizer.transform(assembledData)

In [18]:
l1NormData.head(2)[0].asDict()

{'_id': '62c0216f5ac6166d04f28fb9',
 'body_color': 'قهوه ای',
 'body_status': 'کامل رنگ',
 'body_type': 'suv',
 'brand': 'nissan',
 'engine': '6 سیلندر P40 کاربراتوری ',
 'fuel_consumption': 17.0,
 'fuel_type': 'بنزینی',
 'mileage': 200000,
 'model': 'patrol4doorir',
 'price': 165000000,
 'volume': 4.0,
 'year': 1989,
 'body_status_indexed': 13.0,
 'engine_indexed': 61.0,
 'fuel_type_indexed': 0.0,
 'model_indexed': 122.0,
 'brand_indexed': 13.0,
 'body_color_indexed': 7.0,
 'body_type_indexed': 3.0,
 'features': DenseVector([17.0, 200000.0, 4.0, 1989.0]),
 'normFeatures': DenseVector([0.0001, 1.0, 0.0, 0.0099])}

In [19]:
assembler = VectorAssembler(inputCols=['body_status_indexed',
 'engine_indexed',
 'fuel_type_indexed',
 'model_indexed',
 'brand_indexed',
 'body_color_indexed',
 'body_type_indexed','normFeatures'],outputCol='featuress')

In [20]:
assembledData = assembler.transform(l1NormData)

In [21]:
assembledData.printSchema()

root
 |-- _id: string (nullable = true)
 |-- body_color: string (nullable = true)
 |-- body_status: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- fuel_consumption: double (nullable = true)
 |-- fuel_type: string (nullable = true)
 |-- mileage: integer (nullable = true)
 |-- model: string (nullable = true)
 |-- price: long (nullable = true)
 |-- volume: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- body_status_indexed: double (nullable = false)
 |-- engine_indexed: double (nullable = false)
 |-- fuel_type_indexed: double (nullable = false)
 |-- model_indexed: double (nullable = false)
 |-- brand_indexed: double (nullable = false)
 |-- body_color_indexed: double (nullable = false)
 |-- body_type_indexed: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- normFeatures: vector (nullable = true)
 |-- featuress: vector (nullable = true)



In [22]:
assembledData.head(2)[0].asDict()

{'_id': '62c0216f5ac6166d04f28fb9',
 'body_color': 'قهوه ای',
 'body_status': 'کامل رنگ',
 'body_type': 'suv',
 'brand': 'nissan',
 'engine': '6 سیلندر P40 کاربراتوری ',
 'fuel_consumption': 17.0,
 'fuel_type': 'بنزینی',
 'mileage': 200000,
 'model': 'patrol4doorir',
 'price': 165000000,
 'volume': 4.0,
 'year': 1989,
 'body_status_indexed': 13.0,
 'engine_indexed': 61.0,
 'fuel_type_indexed': 0.0,
 'model_indexed': 122.0,
 'brand_indexed': 13.0,
 'body_color_indexed': 7.0,
 'body_type_indexed': 3.0,
 'features': DenseVector([17.0, 200000.0, 4.0, 1989.0]),
 'normFeatures': DenseVector([0.0001, 1.0, 0.0, 0.0099]),
 'featuress': DenseVector([13.0, 61.0, 0.0, 122.0, 13.0, 7.0, 3.0, 0.0001, 1.0, 0.0, 0.0099])}

In [23]:
assembledData.select('featuress').head(2)

[Row(featuress=DenseVector([13.0, 61.0, 0.0, 122.0, 13.0, 7.0, 3.0, 0.0001, 1.0, 0.0, 0.0099])),
 Row(featuress=DenseVector([9.0, 61.0, 1.0, 122.0, 13.0, 0.0, 3.0, 0.0001, 1.0, 0.0, 0.0083]))]

In [24]:
data = assembledData.select('featuress','price')

In [25]:
# normalizer = Normalizer(inputCol="featuress", outputCol="features")

In [27]:
# data = normalizer.transform(data)

In [28]:
data.show()

+--------------------+---------+
|           featuress|    price|
+--------------------+---------+
|[13.0,61.0,0.0,12...|165000000|
|[9.0,61.0,1.0,122...|200000000|
|[9.0,61.0,1.0,122...|210000000|
|[0.0,40.0,0.0,85....|150000000|
|[13.0,61.0,0.0,12...|182000000|
|[3.0,61.0,0.0,122...|150000000|
|[0.0,61.0,0.0,122...|200000000|
|[9.0,61.0,0.0,122...|230000000|
|[9.0,40.0,0.0,85....|190000000|
|[9.0,40.0,0.0,85....|175000000|
|[3.0,40.0,0.0,85....|230000000|
|[3.0,61.0,0.0,122...|240000000|
|[3.0,61.0,1.0,122...|188000000|
|[9.0,61.0,1.0,122...|194000000|
|[3.0,40.0,0.0,85....|250000000|
|[2.0,40.0,0.0,85....|269000000|
|[9.0,61.0,1.0,122...|280000000|
|[3.0,61.0,1.0,122...|290000000|
|[9.0,61.0,1.0,122...|320000000|
|[9.0,12.0,0.0,8.0...| 73000000|
+--------------------+---------+
only showing top 20 rows



In [29]:
trainData,testData = data.randomSplit([0.7,0.3])

In [30]:
trainData.describe().show()
testData.describe().show()

+-------+-------------------+
|summary|              price|
+-------+-------------------+
|  count|              11776|
|   mean|7.274013571144701E8|
| stddev|9.204594402577426E8|
|    min|           17000000|
|    max|        11200000000|
+-------+-------------------+

+-------+-------------------+
|summary|              price|
+-------+-------------------+
|  count|               5001|
|   mean|7.069755684863027E8|
| stddev|8.798371526762748E8|
|    min|           45000000|
|    max|         8500000000|
+-------+-------------------+



##### importing linear reg

In [31]:
from pyspark.ml.regression import LinearRegression

In [32]:
lr =  LinearRegression(labelCol='price',featuresCol="featuress")

In [33]:
Model = lr.fit(trainData)

In [34]:
testResult  = Model.evaluate(testData)

In [35]:
testResult.r2

0.24394789178202136

In [36]:
testResult.rootMeanSquaredError

764952971.7402788

In [37]:
result = Model.transform(testData)

In [38]:
result.show()

+--------------------+----------+--------------------+
|           featuress|     price|          prediction|
+--------------------+----------+--------------------+
|(11,[1,3,4,7,9,10...|5400000000|1.9208773780145307E9|
|(11,[1,3,4,7,9,10...| 760000000| 4.505759764741422E8|
|(11,[1,3,4,7,9,10...| 760000000| 4.505759764741422E8|
|(11,[1,3,4,7,9,10...| 760000000| 4.505759764741422E8|
|(11,[1,3,4,7,9,10...| 760000000| 4.505759764741422E8|
|(11,[1,3,4,7,9,10...| 830000000| 4.505759764741422E8|
|(11,[1,3,4,7,9,10...| 214000000|2.1331898005695117E8|
|(11,[1,3,4,7,9,10...| 208000000|2.1424480686038935E8|
|(11,[1,3,4,7,9,10...| 209000000|2.1424480686038935E8|
|(11,[1,3,4,7,9,10...| 179000000|-2.26807957546195...|
|(11,[1,3,4,7,9,10...| 182000000|-2.26807957546195...|
|(11,[1,3,4,7,9,10...| 183000000|-2.26807957546195...|
|(11,[1,3,4,7,9,10...| 180000000|-2.26052545162038...|
|(11,[1,3,4,7,9,10...| 460000000|  3.71944129751598E8|
|(11,[1,3,4,7,9,10...| 465000000|  3.71944129751598E8|
|(11,[1,3,

In [39]:
testResult.meanSquaredError

5.851530489742839e+17

In [40]:
from pyspark.sql.functions import corr

In [41]:
tmpData.columns

['_id',
 'body_color',
 'body_status',
 'body_type',
 'brand',
 'engine',
 'fuel_consumption',
 'fuel_type',
 'mileage',
 'model',
 'price',
 'volume',
 'year']

In [42]:
tmpData.select(corr('price','volume')).show()

+-------------------+
|corr(price, volume)|
+-------------------+
| 0.5401012573001207|
+-------------------+



In [43]:
tmpData.select(corr('price','year')).show()

+--------------------+
|   corr(price, year)|
+--------------------+
|-0.10136644955809855|
+--------------------+



In [44]:
tmpData.select(corr('price','mileage')).show()

+--------------------+
|corr(price, mileage)|
+--------------------+
|-0.04637832017438...|
+--------------------+



In [45]:
tmpData.select(corr('price','engine')).show()

+-------------------+
|corr(price, engine)|
+-------------------+
|               null|
+-------------------+



In [46]:
indexerE = StringIndexer(inputCol='engine',outputCol='indexedEngine')

In [47]:
idkdata = indexerE.fit(tmpData).transform(tmpData)

In [48]:
idkdata.printSchema()

root
 |-- _id: string (nullable = true)
 |-- body_color: string (nullable = true)
 |-- body_status: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- fuel_consumption: double (nullable = true)
 |-- fuel_type: string (nullable = true)
 |-- mileage: integer (nullable = true)
 |-- model: string (nullable = true)
 |-- price: long (nullable = true)
 |-- volume: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- indexedEngine: double (nullable = false)



In [49]:
idkdata.select(corr('price','indexedEngine')).show()

+--------------------------+
|corr(price, indexedEngine)|
+--------------------------+
|       0.19838536183082636|
+--------------------------+



In [50]:
tmpData.groupBy('engine').count().count()

107