In [1]:
##Importing PySpark Library and building Spark Session

import pyspark

from pyspark.sql import SparkSession
from pyspark.context import SparkContext 
sc = SparkSession.builder.master("local[*]").config("spark.mongodb.input.uri","mongodb://localhost:27017/Dbda.aj").config("spark.mongodb.output.uri","mongodb://localhost:27017/Dbda.aj").config("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").getOrCreate()

In [2]:
from pyspark.sql.functions import *

df=sc.read.options(inferSchema='True',delimiter=',',header='True').csv("vehicles.csv")
df.show(5)
df.count()

+---+----------+--------------------+------+--------------------+-----+------+------------+--------------------+---------+-----------+------+--------+------------+------------+-----------------+-----+---------+------+-----------+--------------------+--------------------+-----+---------+----------+--------------------+
|_c0|        id|                 url|region|          region_url|price|  year|manufacturer|               model|condition|  cylinders|  fuel|odometer|title_status|transmission|              VIN|drive|     size|  type|paint_color|           image_url|         description|state|      lat|      long|        posting_date|
+---+----------+--------------------+------+--------------------+-----+------+------------+--------------------+---------+-----------+------+--------+------------+------------+-----------------+-----+---------+------+-----------+--------------------+--------------------+-----+---------+----------+--------------------+
|  0|7240372487|https://auburn.cr...|aub

475057

In [3]:
#from pyspark import SparkFiles

#ucd_url=("https://projectknow.s3.ap-south-1.amazonaws.com/vehicles.csv")
#sc.sparkContext.addFile(ucd_url)
#df=sc.read.csv(SparkFiles.get("vehicles.csv"), header=True, inferSchema=True)

#df.count()

In [4]:
# Removing Unwanted Columns

ucd = df.drop('_c0', 'id', 'url', 'region_url', 'image_url','VIN','description','posting_date','size','state')
ucd.show(5)

+------+-----+------+------------+--------------------+---------+-----------+------+--------+------------+------------+-----+------+-----------+---------+----------+
|region|price|  year|manufacturer|               model|condition|  cylinders|  fuel|odometer|title_status|transmission|drive|  type|paint_color|      lat|      long|
+------+-----+------+------------+--------------------+---------+-----------+------+--------+------------+------------+-----+------+-----------+---------+----------+
|auburn|35990|2010.0|   chevrolet|corvette grand sport|     good|8 cylinders|   gas| 32742.0|       clean|       other|  rwd| other|       null|    32.59|    -85.48|
|auburn| 7500|2014.0|     hyundai|              sonata|excellent|4 cylinders|   gas| 93600.0|       clean|   automatic|  fwd| sedan|       null|  32.5475|  -85.4682|
|auburn| 4900|2006.0|         bmw|             x3 3.0i|     good|6 cylinders|   gas| 87046.0|       clean|   automatic| null|   SUV|       blue|32.616807|-85.464149|
|aub

In [5]:
# Deleting fully blank rows

ucd=ucd.na.drop("all")
ucd.count()

467076

In [6]:
# Finding how many null values are present in each column

ucd.select([count(when(isnull(c), c)).alias(c) for c in ucd.columns]).show()

+------+-----+----+------------+-----+---------+---------+-----+--------+------------+------------+------+------+-----------+-----+-----+
|region|price|year|manufacturer|model|condition|cylinders| fuel|odometer|title_status|transmission| drive|  type|paint_color|  lat| long|
+------+-----+----+------------+-----+---------+---------+-----+--------+------------+------------+------+------+-----------+-----+-----+
|     0| 5357|6880|       24442|11756|   200105|   178306|10553|   62620|        9928|        9863|141804|120504|     148608|18574|18203|
+------+-----+----+------------+-----+---------+---------+-----+--------+------------+------------+------+------+-----------+-----+-----+



In [7]:
# Since Price column is the target column Null Values must be Removed  

ucd=ucd.na.drop(subset=['price'])

# Removing Price Outliers (i.e. Resale Price Of Car will not be less than $100 and More than $500K)
ucd = ucd[((ucd['price'] < 500000) & (ucd['price'] > 100))]

# Finding how many null values are present in remaining columns and also checking the count of rows
ucd.select([count(when(isnull(c), c)).alias(c) for c in ucd.columns]).show()
ucd.count()

+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+------+------+-----------+----+----+
|region|price|year|manufacturer|model|condition|cylinders|fuel|odometer|title_status|transmission| drive|  type|paint_color| lat|long|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+------+------+-----------+----+----+
|     0|    0|1114|       16260| 4328|   168966|   152697|2741|   48300|        1970|        1881|122026|104104|     126770|7606|7237|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+------+------+-----------+----+----+



421889

In [8]:
# Dropping Blank/Null values from Year column 

ucd=ucd.na.drop(subset=['year'])

# Finding how many null values are present in remaining columns and also checking the count of rows
ucd.select([count(when(isnull(c), c)).alias(c) for c in ucd.columns]).show()
ucd.count()


+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+------+------+-----------+----+----+
|region|price|year|manufacturer|model|condition|cylinders|fuel|odometer|title_status|transmission| drive|  type|paint_color| lat|long|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+------+------+-----------+----+----+
|     0|    0|   0|       15149| 4221|   167852|   152579|2498|   48193|        1620|        1774|121767|103923|     126598|7499|7130|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+------+------+-----------+----+----+



420775

In [9]:
# Finding Distict Values of "Title_status" column

distinct_ids = [x.title_status for x in ucd.select('title_status').distinct().collect()]
distinct_ids

[' 330',
 None,
 ' Extended ',
 'lien',
 ' 535',
 'missing',
 'clean',
 ' 645',
 'salvage',
 '53312.0',
 'rebuilt',
 'parts only',
 '323ci']

In [10]:
# Most Customerd prefer Clean Cars so eliminating rest and just including clean cars

ucd=ucd.filter(ucd['title_status'].isin(["clean"]))

# Finding how many null values are present in remaining columns and also checking the count of rows
ucd.select([count(when(isnull(c), c)).alias(c) for c in ucd.columns]).show()
ucd.count()


+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+------+-----+-----------+----+----+
|region|price|year|manufacturer|model|condition|cylinders|fuel|odometer|title_status|transmission| drive| type|paint_color| lat|long|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+------+-----+-----------+----+----+
|     0|    0|   0|       14367| 3944|   161738|   147764|1779|   45367|           0|        1529|116757|98438|     121450|7408|6924|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+------+-----+-----------+----+----+



403123

In [11]:
# Finding Distict Values of Transmission column

distinct_ids = [x.transmission for x in ucd.select('transmission').distinct().collect()]
distinct_ids

[None, 'automatic', 'other', 'manual']

In [12]:
# Eliminating Blanks from Transmission column since the count is small it won't impact

ucd=ucd.na.drop(subset=['transmission'])

# Finding how many null values are present in remaining columns and also checking the count of rows
ucd.select([count(when(isnull(c), c)).alias(c) for c in ucd.columns]).show()
ucd.count()

+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+------+-----+-----------+----+----+
|region|price|year|manufacturer|model|condition|cylinders|fuel|odometer|title_status|transmission| drive| type|paint_color| lat|long|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+------+-----+-----------+----+----+
|     0|    0|   0|       14251| 3893|   160282|   146293|1755|   45287|           0|           0|115993|98415|     121129|7267|6783|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+------+-----+-----------+----+----+



401594

In [13]:
# Finding Distict Values of fuel column

distinct_ids = [x.fuel for x in ucd.select('fuel').distinct().collect()]
distinct_ids

['gas', None, 'other', 'diesel', 'hybrid', 'electric']

In [14]:
# Eliminating Blanks from Fuel column since the count is small it won't impact

ucd=ucd.na.drop(subset=['fuel'])

# Finding how many null values are present in remaining columns and also checking the count of rows
ucd.select([count(when(isnull(c), c)).alias(c) for c in ucd.columns]).show()
ucd.count()

+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+------+-----+-----------+----+----+
|region|price|year|manufacturer|model|condition|cylinders|fuel|odometer|title_status|transmission| drive| type|paint_color| lat|long|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+------+-----+-----------+----+----+
|     0|    0|   0|       14188| 3893|   158533|   145572|   0|   45261|           0|           0|115586|98209|     120877|7257|6773|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+------+-----+-----------+----+----+



399839

In [15]:
# Finding Distict Values of Drive column

distinct_ids = [x.drive for x in ucd.select('drive').distinct().collect()]
distinct_ids

[None, 'fwd', 'rwd', '4wd']

In [16]:
# Replacing Blank/Null Values of Drive column with "Other"

ucd_1=ucd.na.fill("other",subset=['drive'])

# Finding how many null values are present in remaining columns
ucd_1.select([count(when(isnull(c), c)).alias(c) for c in ucd_1.columns]).show()


+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+-----+-----------+----+----+
|region|price|year|manufacturer|model|condition|cylinders|fuel|odometer|title_status|transmission|drive| type|paint_color| lat|long|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+-----+-----------+----+----+
|     0|    0|   0|       14188| 3893|   158533|   145572|   0|   45261|           0|           0|    0|98209|     120877|7257|6773|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+-----+-----------+----+----+



In [17]:
# Finding Distict Values of Type column

distinct_ids = [x.type for x in ucd_1.select('type').distinct().collect()]
distinct_ids
ucd_1.count()

399839

In [18]:
#ucd_1.coalesce(1).write.format('csv').save("check", header='true')

In [19]:
# Filling Blank Values of Type column with "Other"

ucd_1=ucd_1.na.fill("other",subset=['type'])

# Finding how many null values are present in remaining columns
ucd_1.select([count(when(isnull(c), c)).alias(c) for c in ucd_1.columns]).show()


+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+
|region|price|year|manufacturer|model|condition|cylinders|fuel|odometer|title_status|transmission|drive|type|paint_color| lat|long|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+
|     0|    0|   0|       14188| 3893|   158533|   145572|   0|   45261|           0|           0|    0|   0|     120877|7257|6773|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+



In [20]:
# Finding Distict Values of Type column

distinct_ids = [x.paint_color for x in ucd_1.select('paint_color').distinct().collect()]
distinct_ids

['orange',
 'grey',
 'green',
 'yellow',
 None,
 'silver',
 'purple',
 'white',
 'red',
 'custom',
 'black',
 'brown',
 'blue']

In [21]:
# Filling Blank Values of Paint_color column with "Unknown"

ucd_1=ucd_1.na.fill("unknown",subset=['paint_color'])

# Finding how many null values are present in remaining columns
ucd_1.select([count(when(isnull(c), c)).alias(c) for c in ucd_1.columns]).show()

+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+
|region|price|year|manufacturer|model|condition|cylinders|fuel|odometer|title_status|transmission|drive|type|paint_color| lat|long|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+
|     0|    0|   0|       14188| 3893|   158533|   145572|   0|   45261|           0|           0|    0|   0|          0|7257|6773|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+



In [22]:
ucd_1.printSchema()

root
 |-- region: string (nullable = true)
 |-- price: string (nullable = true)
 |-- year: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- cylinders: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: string (nullable = true)
 |-- title_status: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- drive: string (nullable = false)
 |-- type: string (nullable = false)
 |-- paint_color: string (nullable = false)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)



In [23]:
# Changing datatype of columns price & odometer to integer

ucd_1 = ucd_1.withColumn("price",ucd_1["price"].cast("int"))
ucd_1 = ucd_1.withColumn("odometer",ucd_1["odometer"].cast("int"))
ucd_1.printSchema()

root
 |-- region: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- year: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- cylinders: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: integer (nullable = true)
 |-- title_status: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- drive: string (nullable = false)
 |-- type: string (nullable = false)
 |-- paint_color: string (nullable = false)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)



In [24]:
# Finding Mean of Odometer and replacing null values with the mean 

meanValue=ucd_1.select(mean(ucd_1.odometer)).collect()
print(type(meanValue))
print('mean value of odometer', meanValue[0][0])
mean_odometer=meanValue[0][0]
#now using men_sales value to fill the nulls in sales column
ucd_1=ucd_1.na.fill(mean_odometer,subset=['odometer'])
ucd_1.select([count(when(isnull(c), c)).alias(c) for c in ucd_1.columns]).show()

<class 'list'>
mean value of odometer 104744.54248148503
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+
|region|price|year|manufacturer|model|condition|cylinders|fuel|odometer|title_status|transmission|drive|type|paint_color| lat|long|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+
|     0|    0|   0|       14188| 3893|   158533|   145572|   0|       0|           0|           0|    0|   0|          0|7257|6773|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+



In [25]:
# Replacing Blanks of columns of Manufacturer & Model with "Other"

ucd_1=ucd_1.na.fill("other",subset=['manufacturer','model'])
ucd_1.select([count(when(isnull(c), c)).alias(c) for c in ucd_1.columns]).show()

+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+
|region|price|year|manufacturer|model|condition|cylinders|fuel|odometer|title_status|transmission|drive|type|paint_color| lat|long|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+
|     0|    0|   0|           0|    0|   158533|   145572|   0|       0|           0|           0|    0|   0|          0|7257|6773|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+



In [26]:
# Filling Blank/Null values in condition column with "not specified"

ucd_1=ucd_1.na.fill("not specified",subset=["condition"])
ucd_1.select([count(when(isnull(c), c)).alias(c) for c in ucd_1.columns]).show()

+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+
|region|price|year|manufacturer|model|condition|cylinders|fuel|odometer|title_status|transmission|drive|type|paint_color| lat|long|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+
|     0|    0|   0|           0|    0|        0|   145572|   0|       0|           0|           0|    0|   0|          0|7257|6773|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+



In [27]:
distinct_ids = [x.cylinders for x in ucd_1.select('cylinders').distinct().collect()]
distinct_ids

[None,
 '3 cylinders',
 'other',
 '10 cylinders',
 '6 cylinders',
 '4 cylinders',
 '12 cylinders',
 '5 cylinders',
 '8 cylinders']

In [28]:
# Filling Blank/Null values in cylinders column with "other"

ucd_2=ucd_1.na.fill("other",subset=["cylinders"])
ucd_2.select([count(when(isnull(c), c)).alias(c) for c in ucd_1.columns]).show()

+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+
|region|price|year|manufacturer|model|condition|cylinders|fuel|odometer|title_status|transmission|drive|type|paint_color| lat|long|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+
|     0|    0|   0|           0|    0|        0|        0|   0|       0|           0|           0|    0|   0|          0|7257|6773|
+------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+



In [29]:
distinct_ids = [x.model for x in ucd_2.select('model').distinct().collect()]
distinct_ids

['lacrosse',
 'dts',
 '4runner',
 'hr-v',
 'cts sedan',
 'transit connect cargo van',
 'charger se',
 'sentra sv',
 'azera',
 'toyta camry',
 '2500 diesel',
 'sonata hybrid',
 'explorer limited 4wd',
 'q40',
 'equinox ltz sport',
 'avalon limited',
 'sentra sr sedan 4d',
 'cherokee trailhawk',
 'hse',
 'caravan se',
 'silverado 2500 work truck dump bed!',
 '2019 3/4 ton cargo vans',
 'journey se sport utility',
 'savana cargo 2500 3dr',
 'aerostar',
 'passat 2.0l turbo cc',
 'r 320 cdi',
 'All makes and Models',
 'pontaic firebird',
 'range evoque hse',
 'cheyenne',
 'Inifniti G37',
 '27 Bugatti',
 'xk8',
 'journey 75,000 miles 7',
 'cls63 amg',
 'Isuzu trooper limited',
 'sierra 3500 utility',
 'TOYOTA, PRERUNNER V6 TACOMA',
 'benz 300sl',
 'ls 460l',
 'Manual AWD',
 'xc60 t6 momentum sport',
 'verano base',
 'F-100 Custom Cab',
 'a4 2.0t premium plus awd',
 'crv lx awd',
 'benz s320 lwb',
 'silverado 1500 4x4 4dr kc',
 'borrego',
 'montego premier',
 'f 250 xlt crew cab',
 'eclipse g

In [30]:
# Fetching only Required Information from Model column and combining manufarure column with model column as new column car_model

df_model=ucd_2.withColumn("model", split(col("model"),"/").getItem(0))
distinct_ids = [x.model for x in df_model.select('model').distinct().collect()]
distinct_ids

['lacrosse',
 'dts',
 '4runner',
 'jetta tdi w',
 'hr-v',
 'cts sedan',
 'transit connect cargo van',
 'charger se',
 'sentra sv',
 'azera',
 'toyta camry',
 '2500 diesel',
 'sonata hybrid',
 'explorer limited 4wd',
 'q40',
 'equinox ltz sport',
 'avalon limited',
 'sentra sr sedan 4d',
 'cherokee trailhawk',
 'hse',
 'caravan se',
 'silverado 2500 work truck dump bed!',
 'journey se sport utility',
 'savana cargo 2500 3dr',
 'aerostar',
 'passat 2.0l turbo cc',
 'r 320 cdi',
 'All makes and Models',
 'pontaic firebird',
 'range evoque hse',
 'cheyenne',
 'Inifniti G37',
 '27 Bugatti',
 'xk8',
 'journey 75,000 miles 7',
 'cls63 amg',
 'Isuzu trooper limited',
 'sierra 3500 utility',
 'TOYOTA, PRERUNNER V6 TACOMA',
 'benz 300sl',
 'ls 460l',
 'Manual AWD',
 'xc60 t6 momentum sport',
 'verano base',
 'F-100 Custom Cab',
 'a4 2.0t premium plus awd',
 'crv lx awd',
 'benz s320 lwb',
 'silverado 1500 4x4 4dr kc',
 'borrego',
 'montego premier',
 'f 250 xlt crew cab',
 'eclipse gs',
 'passat

In [31]:
df_model=df_model.withColumn("model", split(col("model")," ").getItem(0))

distinct_ids = [x.model for x in df_model.select('model').distinct().collect()]
distinct_ids

['4runner',
 'lacrosse',
 'monte',
 'dts',
 'hr-v',
 'azera',
 'borrego',
 'q40',
 'hse',
 'trail',
 'aerostar',
 'cheyenne',
 'xk8',
 '07',
 'vanden',
 'K',
 '2011Chevy',
 'lierty',
 'LT',
 'sonta',
 'cl200',
 'ecape',
 '"thunderbird',
 'tc7500',
 'Toyoyta',
 'l-179',
 'v1500',
 'f-750',
 'verano',
 'sequioa',
 'STUDEBAKER',
 'c350',
 'fsuperduty',
 '944',
 'rolls',
 'Fia',
 'CAMARO',
 'chryler',
 'lmtv',
 '220s',
 'Avanti',
 'navigator￼',
 'isusu',
 'CHEVELLE',
 'k10pu',
 '800',
 'c-30',
 'ct',
 'Srx',
 'mkt',
 'viper',
 'Volkswagen',
 'hinda',
 'vandura',
 'm45x',
 'concours',
 'SUBURU',
 'WHEELCHAIR',
 'Sunbeam',
 'x5m',
 'tle',
 'SAJWA2GB3CLV27609',
 'e24',
 'spectrum,',
 'premiere',
 'hyuandai',
 'CCHEVROLET',
 '3200',
 'f2',
 'vnl64t730',
 'ESCALADE',
 'vtx',
 'VOLSKWAGEN',
 'sportage',
 'pacifica',
 '7',
 '328d',
 'clk-class',
 'es300h',
 '124',
 'pao',
 '7.3',
 '1953',
 'Subary',
 'Hoda',
 'r320',
 'f6',
 'n600',
 'gx350l',
 'g-1500',
 'tires',
 'cheroke',
 'Econoline',
 '51',

In [32]:
dff=df_model.withColumn('car_model',concat_ws(" ",df_model.manufacturer,df_model.model))
dff.show(5)

#df_model.withColumn('car_model',df_model=df1.car_model).show()

+------+-----+------+------------+--------+---------+-----------+----+--------+------------+------------+-----+------+-----------+---------+----------+------------------+
|region|price|  year|manufacturer|   model|condition|  cylinders|fuel|odometer|title_status|transmission|drive|  type|paint_color|      lat|      long|         car_model|
+------+-----+------+------------+--------+---------+-----------+----+--------+------------+------------+-----+------+-----------+---------+----------+------------------+
|auburn|35990|2010.0|   chevrolet|corvette|     good|8 cylinders| gas|   32742|       clean|       other|  rwd| other|    unknown|    32.59|    -85.48|chevrolet corvette|
|auburn| 7500|2014.0|     hyundai|  sonata|excellent|4 cylinders| gas|   93600|       clean|   automatic|  fwd| sedan|    unknown|  32.5475|  -85.4682|    hyundai sonata|
|auburn| 4900|2006.0|         bmw|      x3|     good|6 cylinders| gas|   87046|       clean|   automatic|other|   SUV|       blue|32.616807|-85.4

In [33]:
# Finding car age and adding a new column named car_age

dff = dff.withColumn("year",dff["year"].cast("int"))

dff.printSchema()

root
 |-- region: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- manufacturer: string (nullable = false)
 |-- model: string (nullable = true)
 |-- condition: string (nullable = false)
 |-- cylinders: string (nullable = false)
 |-- fuel: string (nullable = true)
 |-- odometer: integer (nullable = true)
 |-- title_status: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- drive: string (nullable = false)
 |-- type: string (nullable = false)
 |-- paint_color: string (nullable = false)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- car_model: string (nullable = false)



In [34]:
dff=dff.withColumn("car_age", (2021-dff['year']))
dff.show(2)

+------+-----+----+------------+--------+---------+-----------+----+--------+------------+------------+-----+-----+-----------+-------+--------+------------------+-------+
|region|price|year|manufacturer|   model|condition|  cylinders|fuel|odometer|title_status|transmission|drive| type|paint_color|    lat|    long|         car_model|car_age|
+------+-----+----+------------+--------+---------+-----------+----+--------+------------+------------+-----+-----+-----------+-------+--------+------------------+-------+
|auburn|35990|2010|   chevrolet|corvette|     good|8 cylinders| gas|   32742|       clean|       other|  rwd|other|    unknown|  32.59|  -85.48|chevrolet corvette|     11|
|auburn| 7500|2014|     hyundai|  sonata|excellent|4 cylinders| gas|   93600|       clean|   automatic|  fwd|sedan|    unknown|32.5475|-85.4682|    hyundai sonata|      7|
+------+-----+----+------------+--------+---------+-----------+----+--------+------------+------------+-----+-----+-----------+-------+-----

In [35]:
ucd_3 = dff.drop('model')
ucd_3.show(5)

+------+-----+----+------------+---------+-----------+----+--------+------------+------------+-----+------+-----------+---------+----------+------------------+-------+
|region|price|year|manufacturer|condition|  cylinders|fuel|odometer|title_status|transmission|drive|  type|paint_color|      lat|      long|         car_model|car_age|
+------+-----+----+------------+---------+-----------+----+--------+------------+------------+-----+------+-----------+---------+----------+------------------+-------+
|auburn|35990|2010|   chevrolet|     good|8 cylinders| gas|   32742|       clean|       other|  rwd| other|    unknown|    32.59|    -85.48|chevrolet corvette|     11|
|auburn| 7500|2014|     hyundai|excellent|4 cylinders| gas|   93600|       clean|   automatic|  fwd| sedan|    unknown|  32.5475|  -85.4682|    hyundai sonata|      7|
|auburn| 4900|2006|         bmw|     good|6 cylinders| gas|   87046|       clean|   automatic|other|   SUV|       blue|32.616807|-85.464149|            bmw x3| 

In [36]:
ucd_3.select([count(when(isnull(c), c)).alias(c) for c in ucd_3.columns]).show()
ucd_3.count()

+------+-----+----+------------+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+---------+-------+
|region|price|year|manufacturer|condition|cylinders|fuel|odometer|title_status|transmission|drive|type|paint_color| lat|long|car_model|car_age|
+------+-----+----+------------+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+---------+-------+
|     0|    0|   0|           0|        0|        0|   0|       0|           0|           0|    0|   0|          0|7257|6773|        0|      0|
+------+-----+----+------------+---------+---------+----+--------+------------+------------+-----+----+-----------+----+----+---------+-------+



399839

In [37]:
df_cl=ucd_3.withColumn("cylinders", split(col("cylinders")," ").getItem(0))
df_cl.show(2)

+------+-----+----+------------+---------+---------+----+--------+------------+------------+-----+-----+-----------+-------+--------+------------------+-------+
|region|price|year|manufacturer|condition|cylinders|fuel|odometer|title_status|transmission|drive| type|paint_color|    lat|    long|         car_model|car_age|
+------+-----+----+------------+---------+---------+----+--------+------------+------------+-----+-----+-----------+-------+--------+------------------+-------+
|auburn|35990|2010|   chevrolet|     good|        8| gas|   32742|       clean|       other|  rwd|other|    unknown|  32.59|  -85.48|chevrolet corvette|     11|
|auburn| 7500|2014|     hyundai|excellent|        4| gas|   93600|       clean|   automatic|  fwd|sedan|    unknown|32.5475|-85.4682|    hyundai sonata|      7|
+------+-----+----+------------+---------+---------+----+--------+------------+------------+-----+-----+-----------+-------+--------+------------------+-------+
only showing top 2 rows



In [38]:
distinct_ids = [x.cylinders for x in df_cl.select('cylinders').distinct().collect()]
distinct_ids

['3', '8', '5', 'other', '6', '10', '4', '12']

In [39]:
# replace missing number of cylinders with median of car model & delete 'other'

from pyspark.sql import *

df_test=df_cl.where(df_cl['cylinders'] != 'other')
df_test2 = df_test.withColumn("cylinders",df_test["cylinders"].cast("int"))
df_res=df_test2.approxQuantile("cylinders", [0.5], 0.25)
df_res


[4.0]

In [40]:
ucd_4=ucd_4.withColumn('cylinders', regexp_replace('cylinders', 'other', '4'))
distinct_ids = [x.cylinders for x in ucd_4.select('cylinders').distinct().collect()]
distinct_ids

NameError: name 'ucd_4' is not defined

In [None]:
ucd_4=df_cl.withColumn('fuel', regexp_replace('fuel', 'other', 'gas'))
distinct_ids = [x.fuel for x in ucd_4.select('fuel').distinct().collect()]
distinct_ids

In [None]:
ucd_4.show(5)

In [None]:
ucd_4 = ucd_4.withColumn("year",ucd_4["year"].cast("string"))
ucd_4.printSchema()

In [None]:
mnd = sc.read.format("mongo").option("uri","mongodb://localhost:27017/Dbda.ucd_4").load()

ucd_4.write.format("mongo").option("uri","mongodb://localhost:27017/Dbda.ucd_4").mode("append").save()
ucd_4.show()

In [None]:
distinct_ids = [x.region for x in ucd_4.select('region').distinct().collect()]
distinct_ids

In [None]:
ucd_4.count()

In [None]:
#ucd_4.coalesce(1).write.format('csv').save("cleaned.csv", header='true')

In [None]:
advcl = ucd_4.where(ucd_4['condition'] != 'not specified')
advcl = advcl.where(ucd_4['transmission'] != 'other')
advcl = advcl.where(ucd_4['drive'] != 'other')
advcl = advcl.where(ucd_4['type'] != 'other')
advcl = advcl.where(ucd_4['paint_color'] != 'unknown')
advcl.show(5)
advcl.count()

In [None]:
mnd1 = sc.read.format("mongo").option("uri","mongodb://localhost:27017/Dbda.Advcln").load()

advcl.write.format("mongo").option("uri","mongodb://localhost:27017/Dbda.Advcln").mode("append").save()
advcl.show()