# Overview

With this mini project, you will exercise using Spark transformations to solve traditional
MapReduce data problems. It demonstrates Spark having a significant advantage against
Hadoop MapReduce framework, in both code simplicity and its in-memory processing
performance, which best suit for the chain of MapReduce use cases.

Consider an automobile tracking platform that keeps track of history of incidents after a new
vehicle is sold by the dealer. Such incidents include further private sales, repairs and accident
reports. This provides a good reference for second hand buyers to understand the vehicles they
are interested in.

# Objective:
Find total number of accidents per make and year of the car.

## Data:

In [39]:
s = [('incident_id', 'INT',''), \
     ('incident_type','STRING','(I: initial sale, A: accident, R: repair)'), \
     ('vin_number','STRING',''), \
     ('make','STRING','(The brand of the car, only populated with incident type “I”)'), \
     ('model','STRING','(The model of the car, only populated with incident type “I”)'), \
     ('year','STRING','(The year of the car, only populated with incident type “I”)') ,\
     ('incident_date','DATE','(Date of the incident occurrence)'), \
     ('description','STRING','')]
df_schema = spark.createDataFrame(s, "Column: string, Type: string, Info: string")
df_schema.show(truncate=False)

+-------------+------+-------------------------------------------------------------+
|Column       |Type  |Info                                                         |
+-------------+------+-------------------------------------------------------------+
|incident_id  |INT   |                                                             |
|incident_type|STRING|(I: initial sale, A: accident, R: repair)                    |
|vin_number   |STRING|                                                             |
|make         |STRING|(The brand of the car, only populated with incident type “I”)|
|model        |STRING|(The model of the car, only populated with incident type “I”)|
|year         |STRING|(The year of the car, only populated with incident type “I”) |
|incident_date|DATE  |(Date of the incident occurrence)                            |
|description  |STRING|                                                             |
+-------------+------+-------------------------------------------

## Step 1. Filter out accident incidents with make and year
Since we only care about accident records, we should filter out records having incident type
other than “I”. However, accident records don't carry make and year fields due to the design to
remove redundancy. So our first step is propagating make and year info from record type I into
all other record types.

### 1.1 Read the input data CSV file
Use the Spark context object to read the input file to create the input RDD:

`sc = SparkContext("local", "My Application")`
<br>
`raw_rdd = sc.textFile("data.csv")`

Because pyspark kernel for jupyter notebook automatically initializes SparkContext, etc - it's not required
But in python script used by run.sh, the following commands are required to initialize the environment:

`import pyspark`
<br>
`from pyspark.sql import SparkSession`
<br>
`from pyspark import SparkContext`

In [41]:
#pyspark kernel for jupyter notebook automatically initializes SparkContext variable sc
spark.createDataFrame(sc.getConf().getAll(), "Configuration: string, Value: string").show(truncate = False)
raw_rdd = sc.textFile("data.csv")
df_schema.show(truncate=False)
raw_rdd.collect()

+----------------------------------+--------------------------------------------------+
|Configuration                     |Value                                             |
+----------------------------------+--------------------------------------------------+
|spark.app.startTime               |1654788065112                                     |
|spark.executor.id                 |driver                                            |
|spark.driver.port                 |44203                                             |
|spark.app.name                    |PySparkShell                                      |
|spark.sql.catalogImplementation   |hive                                              |
|spark.rdd.compress                |True                                              |
|spark.driver.host                 |10.0.1.137                                        |
|spark.app.id                      |local-1654788066024                               |
|spark.serializer.objectStreamRe

['1,I,VXIO456XLBB630221,Nissan,Altima,2003,2002-05-08,Initial sales from TechMotors',
 '2,I,INU45KIOOPA343980,Mercedes,C300,2015,2014-01-01,Sold from EuroMotors',
 '3,A,VXIO456XLBB630221,,,,2014-07-02,Head on collision',
 '4,R,VXIO456XLBB630221,,,,2014-08-05,Repair transmission',
 '5,I,VOME254OOXW344325,Mercedes,E350,2015,2014-02-01,Sold from Carmax',
 '6,R,VOME254OOXW344325,,,,2015-02-06,Wheel allignment service',
 '7,R,VXIO456XLBB630221,,,,2015-01-01,Replace right head light',
 '8,I,EXOA00341AB123456,Mercedes,SL550,2016,2015-01-01,Sold from AceCars',
 '9,A,VOME254OOXW344325,,,,2015-10-01,Side collision',
 '10,R,VOME254OOXW344325,,,,2015-09-01,Changed tires',
 '11,R,EXOA00341AB123456,,,,2015-05-01,Repair engine',
 '12,A,EXOA00341AB123456,,,,2015-05-03,Vehicle rollover',
 '13,R,VOME254OOXW344325,,,,2015-09-01,Replace passenger side door',
 '14,I,UXIA769ABCC447906,Toyota,Camery,2017,2016-05-08,Initial sales from Carmax',
 '15,R,UXIA769ABCC447906,,,,2020-01-02,Initial sales from Carmax',

### 1.1 Perform map operation
We need to propagate make and year to the accident records (incident type A), using
vin_number as the aggregate key. Therefore the map output key should be vin_number, value
should be the make and year, along with the incident type. In Spark, in order to proceed with the
“groupByKey” function, we need the map operation to produce PairRDD, with tuple type as each
record.

`vin_kv = raw_rdd.map(lambda x: extract_vin_key_value(x))`
<br>
`# Please implement method extract_vin_key_value()`

In [46]:
def extract_vin_key_value(row):
    auto = row.strip().split(",")
    return (auto[2], (auto[1], auto[3], auto[5]))

# Extract VIN for key,make,year,incident_type for value
vin_kv = raw_rdd.map(lambda row: extract_vin_key_value(row))

vin_kv.collect()

[('VXIO456XLBB630221', ('I', 'Nissan', '2003')),
 ('INU45KIOOPA343980', ('I', 'Mercedes', '2015')),
 ('VXIO456XLBB630221', ('A', '', '')),
 ('VXIO456XLBB630221', ('R', '', '')),
 ('VOME254OOXW344325', ('I', 'Mercedes', '2015')),
 ('VOME254OOXW344325', ('R', '', '')),
 ('VXIO456XLBB630221', ('R', '', '')),
 ('EXOA00341AB123456', ('I', 'Mercedes', '2016')),
 ('VOME254OOXW344325', ('A', '', '')),
 ('VOME254OOXW344325', ('R', '', '')),
 ('EXOA00341AB123456', ('R', '', '')),
 ('EXOA00341AB123456', ('A', '', '')),
 ('VOME254OOXW344325', ('R', '', '')),
 ('UXIA769ABCC447906', ('I', 'Toyota', '2017')),
 ('UXIA769ABCC447906', ('R', '', '')),
 ('INU45KIOOPA343980', ('A', '', ''))]

### 1.2 Perform group aggregation to populate make and year to all the records
Like the reducer in MapReduce framework, Spark provides a “groupByKey” function to achieve
shuffle and sort in order to aggregate all records sharing the same key to the same groups.
Within a group of vin_number, we need to iterate through all the records and find the one that
has the make and year available and capture it in group level master info. As we filter and
output accident records, those records need to be modified adding the master info that we
captured in the first iteration.

`enhance_make = vin_kv.groupByKey().flatMap(lambda kv: populate_make(kv[1]))`
<br>
`# Please implement method populate_make()`

In [43]:
def populate_make_printer(row):
    print('NEW ROW\n')
    group_row = []
    for i in row:
        group_row.append(list(i))
        print('i = '+str(i))
    
    print('group_row: '+str(group_row))
    print('range(0, len(group_row)): '+str(range(0,len(group_row))))
    
    for j in range(0, len(group_row)):
        print('j: ' + str(j))
        print('j-1: ' + str(j-1))
        print('group_row[j]: ' + str(group_row[j]))
        print('group_row[j-1]: ' + str(group_row[j-1]))

        make = group_row[j-1][1]
        print('make: '+str(make))
        year = group_row[j-1][2]
        print('year: '+str(year))
        
        print('group_row[j][0]: ' + str(group_row[j][0]))
        if group_row[j][0] != 'I':
            print('group_row[j][0] NOT EQUAL I!\n')
            print('group_row[j][1]: ' + str(group_row[j][1]))
            print('group_row[j][2]: ' + str(group_row[j][1]))
            if group_row[j][1] == '' and group_row[j][2] == '':
                print('group_row[j][1] NOT BLANK and group_row[j][2] == NOT BLANK!\n')
                group_row[j][1] = make
                group_row[j][2] = year
    
    print('RETURN\n')
    return group_row

def populate_make(row):
    group_row = []
    for i in row:
        group_row.append(list(i))
    for j in range(0, len(group_row)):
        make = group_row[j-1][1]
        year = group_row[j-1][2]
        if group_row[j][0] != 'I':
            if group_row[j][1] == '' and group_row[j][2] == '':
                group_row[j][1] = make
                group_row[j][2] = year
    return group_row

# Populate vehicle make and year for records missing these values
enhance_make = vin_kv.groupByKey().flatMap(lambda row: populate_make(row[1]))
enhance_make.collect()

[['I', 'Mercedes', '2015'],
 ['R', 'Mercedes', '2015'],
 ['A', 'Mercedes', '2015'],
 ['R', 'Mercedes', '2015'],
 ['R', 'Mercedes', '2015'],
 ['I', 'Mercedes', '2016'],
 ['R', 'Mercedes', '2016'],
 ['A', 'Mercedes', '2016'],
 ['I', 'Toyota', '2017'],
 ['R', 'Toyota', '2017'],
 ['I', 'Nissan', '2003'],
 ['A', 'Nissan', '2003'],
 ['R', 'Nissan', '2003'],
 ['R', 'Nissan', '2003'],
 ['I', 'Mercedes', '2015'],
 ['A', 'Mercedes', '2015']]

## Step 2. Count number of occurrence for accidents for the vehicle make and year

### 2.1 Perform map operation
The goal of this step is to count the number of records for each make and year combination,
given the result we derived previously. The output key should be the combination of vehicle
make and year. The value should be the count of 1.

`make_kv = enhance_make.map(lambda x: extract_make_key_value(x))`
<br>
`# Please implement method extract_make_key_value()`

In [50]:
def extract_make_key_value(row):

    if row[0] == 'A':
        return (row[1], row[2]), 1
    else:
        return (row[1], row[2]), 0

# Filter records that are not accident incidents
make_kv = enhance_make.map(lambda x: extract_make_key_value(x))

make_kv.collect()

[(('Mercedes', '2015'), 0),
 (('Mercedes', '2015'), 0),
 (('Mercedes', '2015'), 1),
 (('Mercedes', '2015'), 0),
 (('Mercedes', '2015'), 0),
 (('Mercedes', '2016'), 0),
 (('Mercedes', '2016'), 0),
 (('Mercedes', '2016'), 1),
 (('Toyota', '2017'), 0),
 (('Toyota', '2017'), 0),
 (('Nissan', '2003'), 0),
 (('Nissan', '2003'), 1),
 (('Nissan', '2003'), 0),
 (('Nissan', '2003'), 0),
 (('Mercedes', '2015'), 0),
 (('Mercedes', '2015'), 1)]

### 2.2 Aggregate the key and count the number of records in total per key
Use Spark provided “reduceByKey” function to perform the sum of all the values (1) from each
record. As a result, we get the make and year combination key along with its total record count.

In [60]:
# Count accident incidents by make and year
acc_make_year = make_kv.reduceByKey(lambda x, y: x + y)

output = []
for accident in acc_make_year.collect():
    if accident[1] >= 1:
        output.append('{}-{},{}'.format(accident[0][0], accident[0][1], accident[1]))
print(output)

['Mercedes-2015,2', 'Mercedes-2016,1', 'Nissan-2003,1']


## Step 3. Save the result to HDFS as text
The output file should look similar to this.

`Nissan-2003,1`
<br>
`BMW-2008,10`
<br>
`MERCEDES-2013,2`

In [63]:
# First to plain text file for sanity check
file = open('output.txt', 'x')
for acc in acc_make_year.collect():
    file.write('{}-{},{}\n'.format(acc[0][0], acc[0][1], acc[1]))
file.close()

# Now in HDFS file format
out = []
for acc in acc_make_year.collect():
    out.append('{}-{},{}'.format(acc[0][0], acc[0][1], acc[1]))
rdd_out = sc.parallelize(out)
rdd_out.saveAsTextFile('HDFS_text_results')

My run.sh file consists of:
    
`#!/bin/sh`
<br>
`python autoinc_spark.py > execution_log.txt`
<br>
`echo '\nsingle file results printed to output.txt'`
<br>
`echo '\nresults saved as text file in HDFS format in HDFS_text_results folder'`
<br>

Running `./run.sh` produces file output.txt and partitions in HDFS_text folder containing the following records:

`Mercedes-2015,2`
<br>
`Mercedes-2016,1`
<br>
`Toyota-2017,0`
<br>
`Nissan-2003,1`