In [1]:
import pyspark
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.appName('Automobile').getOrCreate()

#partition management
spark.conf.set("spark.sql.shuffle.partitions", 1)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/07/13 18:10:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
#create the schema
schema = StructType([
    StructField('incident_id', IntegerType(), True),
    StructField('incident_type', StringType(), True),
    StructField('vin_number', StringType(), True),
    StructField('make', StringType(), True),
    StructField('model', StringType(), True),
    StructField('year', StringType(), True),
    StructField('incident_date', DateType(), True),
    StructField('description', StringType(), True),
])

#read in the file
auto = spark.read.format('csv').options(delimiter=',', header=False).load('data/auto.csv', schema=schema)
auto.cache()
auto.show()

                                                                                

+-----------+-------------+-----------------+--------+------+----+-------------+--------------------+
|incident_id|incident_type|       vin_number|    make| model|year|incident_date|         description|
+-----------+-------------+-----------------+--------+------+----+-------------+--------------------+
|          1|            I|VXIO456XLBB630221|  Nissan|Altima|2003|   2002-05-08|Initial sales fro...|
|          2|            I|INU45KIOOPA343980|Mercedes|  C300|2015|   2014-01-01|Sold from EuroMotors|
|          3|            A|VXIO456XLBB630221|    null|  null|null|   2014-07-02|   Head on collision|
|          4|            R|VXIO456XLBB630221|    null|  null|null|   2014-08-05| Repair transmission|
|          5|            I|VOME254OOXW344325|Mercedes|  E350|2015|   2014-02-01|    Sold from Carmax|
|          6|            R|VOME254OOXW344325|    null|  null|null|   2015-02-06|Wheel allignment ...|
|          7|            R|VXIO456XLBB630221|    null|  null|null|   2015-01-01|Re

In [3]:
#filter the incidents
incidents_filtered = auto.select('vin_number', 'make', 'year').where(F.col('incident_type') == 'I')
incidents_filtered.show()

+-----------------+--------+----+
|       vin_number|    make|year|
+-----------------+--------+----+
|VXIO456XLBB630221|  Nissan|2003|
|INU45KIOOPA343980|Mercedes|2015|
|VOME254OOXW344325|Mercedes|2015|
|EXOA00341AB123456|Mercedes|2016|
|UXIA769ABCC447906|  Toyota|2017|
+-----------------+--------+----+



In [4]:
#filter the accidents
accidents_filtered = auto.select('vin_number', 'incident_date', 'description').where(F.col('incident_type') == 'A')
accidents_filtered.show()

+-----------------+-------------+-----------------+
|       vin_number|incident_date|      description|
+-----------------+-------------+-----------------+
|VXIO456XLBB630221|   2014-07-02|Head on collision|
|VOME254OOXW344325|   2015-10-01|   Side collision|
|EXOA00341AB123456|   2015-05-03| Vehicle rollover|
|INU45KIOOPA343980|   2020-05-01|   Side collision|
+-----------------+-------------+-----------------+



In [5]:
#combine the tables
joinedDF = incidents_filtered.join(F.broadcast(accidents_filtered), 'vin_number').select('*')
joinedDF.show()

+-----------------+--------+----+-------------+-----------------+
|       vin_number|    make|year|incident_date|      description|
+-----------------+--------+----+-------------+-----------------+
|VXIO456XLBB630221|  Nissan|2003|   2014-07-02|Head on collision|
|INU45KIOOPA343980|Mercedes|2015|   2020-05-01|   Side collision|
|VOME254OOXW344325|Mercedes|2015|   2015-10-01|   Side collision|
|EXOA00341AB123456|Mercedes|2016|   2015-05-03| Vehicle rollover|
+-----------------+--------+----+-------------+-----------------+



In [6]:
#get the results
result = joinedDF.select('make', 'year').groupBy('make', 'year').agg(F.count('*').alias('total_num'))
result.show()

+--------+----+---------+
|    make|year|total_num|
+--------+----+---------+
|  Nissan|2003|        1|
|Mercedes|2015|        2|
|Mercedes|2016|        1|
+--------+----+---------+



In [8]:
#save results to folder
result.write.mode('overwrite').option("header", "true").format('csv').save("output/")