In [1]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.master("local[*]") \
    .appName('Car sales report') \
    .getOrCreate()

In [2]:
# Create schema for dataframe
schema = StructType([ \
    StructField('incident_id', IntegerType(), False), \
    StructField('incident_type', StringType(), False), \
    StructField('vin', StringType(), False), \
    StructField('make', StringType(), True), \
    StructField('model', StringType(), True), \
    StructField('year', IntegerType(), True), \
    StructField('incident_date', DateType(), False), \
    StructField('description', StringType(), False) \
])

# Load dataframe and register as table
df = spark.read.format('csv') \
    .schema(schema = schema) \
    .load('data.csv') \

df.registerTempTable("auto_data")

In [3]:
df.show()

+-----------+-------------+-----------------+--------+------+----+-------------+--------------------+
|incident_id|incident_type|              vin|    make| model|year|incident_date|         description|
+-----------+-------------+-----------------+--------+------+----+-------------+--------------------+
|          1|            I|VXIO456XLBB630221|  Nissan|Altima|2003|   2002-05-08|Initial sales fro...|
|          2|            I|INU45KIOOPA343980|Mercedes|  C300|2015|   2014-01-01|Sold from EuroMotors|
|          3|            A|VXIO456XLBB630221|    null|  null|null|   2014-07-02|   Head on collision|
|          4|            R|VXIO456XLBB630221|    null|  null|null|   2014-08-05| Repair transmission|
|          5|            I|VOME254OOXW344325|Mercedes|  E350|2015|   2014-02-01|    Sold from Carmax|
|          6|            R|VOME254OOXW344325|    null|  null|null|   2015-02-06|Wheel allignment ...|
|          7|            R|VXIO456XLBB630221|    null|  null|null|   2015-01-01|Re

In [4]:
# Create vin lookup dataframe
lookup_df = spark.sql("SELECT vin, MAX(make), MAX(year) FROM auto_data GROUP BY vin") \
    .withColumnRenamed('max(make)', 'make_filled') \
    .withColumnRenamed('max(year)', 'year_filled')

lookup_df.show()

+-----------------+-----------+-----------+
|              vin|make_filled|year_filled|
+-----------------+-----------+-----------+
|VOME254OOXW344325|   Mercedes|       2015|
|UXIA769ABCC447906|     Toyota|       2017|
|VXIO456XLBB630221|     Nissan|       2003|
|INU45KIOOPA343980|   Mercedes|       2015|
|EXOA00341AB123456|   Mercedes|       2016|
+-----------------+-----------+-----------+



In [5]:
# Join lookup table to df to fill in missing data
joined_df = df.join(lookup_df, on = 'vin', how = 'inner') \
    .select('incident_type', 'make_filled', 'year_filled') \
    .withColumnRenamed('make_filled', 'make') \
    .withColumnRenamed('year_filled', 'year')

# Filter to accident records only, register table
accident_df = joined_df.filter(F.col('incident_type') == 'A')
accident_df.registerTempTable('accidents')
accident_df.show()

+-------------+--------+----+
|incident_type|    make|year|
+-------------+--------+----+
|            A|Mercedes|2015|
|            A|  Nissan|2003|
|            A|Mercedes|2015|
|            A|Mercedes|2016|
+-------------+--------+----+



In [6]:
# Get count of accidents by make and year
result_df = spark.sql("SELECT make, year, count(incident_type) as accidents FROM accidents GROUP BY make, year")
result_df.show()

+--------+----+---------+
|    make|year|accidents|
+--------+----+---------+
|Mercedes|2015|        2|
|Mercedes|2016|        1|
|  Nissan|2003|        1|
+--------+----+---------+



In [7]:
# Write to output csv file
result_df.coalesce(1) \
    .write \
    .option("header","true") \
    .option("sep",",") \
    .mode("overwrite") \
    .csv("output")