In [1]:
import pyspark
import findspark
from pyspark import SparkFiles
import json
findspark.init()
findspark.find()
from pyspark.sql.types import DoubleType

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderModel

from pyspark.sql import SparkSession
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext

from pyspark.sql.functions import isnan, when, count, col

# Download zip file 

In [2]:
import requests, zipfile
from io import BytesIO
print('Downloading started')

#Defining the zip file URL
url = 'https://github.com/abhashpanwar/used-car-price-prediction/blob/master/vehiclesFinalData.zip?raw=true'

# Split URL to get the file name
filename = url.split('/')[-1]

# Downloading the file by sending the request to the URL
req = requests.get(url)
print('Downloading Completed')

# extracting the zip file contents
zipfile= zipfile.ZipFile(BytesIO(req.content))
zipfile.extractall(r'D:/Project')

Downloading started
Downloading Completed


# SparkSession

In [3]:
# load mongo data
# CarData = db name
# car = collection name
inputreq="mongodb://127.0.0.1/CarData.car"
outputreq="mongodb://127.0.0.1/CarData.car"

In [4]:
spark=SparkSession\
        .builder\
        .appName("carprediction")\
        .config("spark.mongodb.input.uri",inputreq)\
        .config("spark.mongodb.output.uri",outputreq)\
        .config("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.12:2.4.2")\
        .getOrCreate()

In [6]:
raw_df=spark.read.option("header","true").options(inferSchema='True',delimiter=',').csv(SparkFiles.get(r"D:/project/vehiclesFinal.csv"))
raw_df.show()  

+----------+--------------------+------+------------+--------------------+---------+-----------+-----------+--------+------------+------------+---------+---------+-----------+-----------+-------+--------+--------+
|        id|              region|  year|manufacturer|               model|condition|  cylinders|       fuel|odometer|title_status|transmission|    drive|     size|       type|paint_color|    lat|    long|   price|
+----------+--------------------+------+------------+--------------------+---------+-----------+-----------+--------+------------+------------+---------+---------+-----------+-----------+-------+--------+--------+
|7119256118|       mohave county|2012.0|        jeep|             patriot| like new|4 cylinders|        gas|247071.0|       clean|   automatic|      4wd|full-size|    offroad|     silver|34.4554|-114.269|  3495.0|
|7120880186|        oregon coast|2014.0|         bmw|        328i m-sport|     good|5 cylinders|        gas| 76237.0|       clean|   automatic| 

In [7]:
raw_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- region: string (nullable = true)
 |-- year: double (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- cylinders: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: string (nullable = true)
 |-- title_status: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- drive: string (nullable = true)
 |-- size: string (nullable = true)
 |-- type: string (nullable = true)
 |-- paint_color: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: double (nullable = true)
 |-- price: double (nullable = true)



In [10]:
raw_df.select([count(when(isnan(c), c)).alias(c) for c in raw_df.columns]).show()

+---+------+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+----+-----------+---+----+-----+
| id|region|year|manufacturer|model|condition|cylinders|fuel|odometer|title_status|transmission|drive|size|type|paint_color|lat|long|price|
+---+------+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+----+-----------+---+----+-----+
|  0|     0|   0|           0|    0|        0|        0|   0|       0|           0|           0|    0|   0|   0|          0|  0|   0|    0|
+---+------+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+----+-----------+---+----+-----+



# drop column region and model

In [11]:
car=raw_df.drop("region","model")
car.show(10)

+----------+------+------------+---------+-----------+----+--------+------------+------------+-----+---------+-----------+-----------+-------+--------+-------+
|        id|  year|manufacturer|condition|  cylinders|fuel|odometer|title_status|transmission|drive|     size|       type|paint_color|    lat|    long|  price|
+----------+------+------------+---------+-----------+----+--------+------------+------------+-----+---------+-----------+-----------+-------+--------+-------+
|7119256118|2012.0|        jeep| like new|4 cylinders| gas|247071.0|       clean|   automatic|  4wd|full-size|    offroad|     silver|34.4554|-114.269| 3495.0|
|7120880186|2014.0|         bmw|     good|5 cylinders| gas| 76237.0|       clean|   automatic|  rwd|full-size|      sedan|       grey|46.1837|-123.824|13750.0|
|7115048251|2001.0|       dodge|excellent|6 cylinders| gas|199000.0|       clean|   automatic|  4wd|full-size|    offroad|       grey|34.9352|-81.9654| 2300.0|
|7119250502|2004.0|   chevrolet|excellen

# change data type accordingly

In [12]:
car_df = car.withColumn("odometer", car["odometer"].cast(DoubleType()))
car_df = car_df.withColumn("lat", car_df["lat"].cast(DoubleType()))
car_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- year: double (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- cylinders: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: double (nullable = true)
 |-- title_status: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- drive: string (nullable = true)
 |-- size: string (nullable = true)
 |-- type: string (nullable = true)
 |-- paint_color: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- price: double (nullable = true)



In [13]:
car_df.select([count(when(isnan(c), c)).alias(c) for c in car_df.columns]).show()

+---+----+------------+---------+---------+----+--------+------------+------------+-----+----+----+-----------+---+----+-----+
| id|year|manufacturer|condition|cylinders|fuel|odometer|title_status|transmission|drive|size|type|paint_color|lat|long|price|
+---+----+------------+---------+---------+----+--------+------------+------------+-----+----+----+-----------+---+----+-----+
|  0|   0|           0|        0|        0|   0|       0|           0|           0|    0|   0|   0|          0|  0|   0|    0|
+---+----+------------+---------+---------+----+--------+------------+------------+-----+----+----+-----------+---+----+-----+



# to write the data into mongodb

In [None]:
car_df.write.format("com.mongodb.spark.sql.DefaultSource").option("database","CarData").option("collection", "car").save()