In [1]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark import SparkContext

In [2]:
sc = SparkContext("local", "post-sales-auto")
raw_rdd = sc.textFile("sample/data.csv")

In [3]:
def extract_vin_key_value(row):
    auto = row.strip().split(",")
    return (auto[2], (auto[1], auto[3], auto[5]))

In [4]:
vin_kv = raw_rdd.map(lambda row: extract_vin_key_value(row))

In [5]:
vin_kv.collect()

[('VXIO456XLBB630221', ('I', 'Nissan', '2003')),
 ('INU45KIOOPA343980', ('I', 'Mercedes', '2015')),
 ('VXIO456XLBB630221', ('A', '', '')),
 ('VXIO456XLBB630221', ('R', '', '')),
 ('VOME254OOXW344325', ('I', 'Mercedes', '2015')),
 ('VOME254OOXW344325', ('R', '', '')),
 ('VXIO456XLBB630221', ('R', '', '')),
 ('EXOA00341AB123456', ('I', 'Mercedes', '2016')),
 ('VOME254OOXW344325', ('A', '', '')),
 ('VOME254OOXW344325', ('R', '', '')),
 ('EXOA00341AB123456', ('R', '', '')),
 ('EXOA00341AB123456', ('A', '', '')),
 ('VOME254OOXW344325', ('R', '', '')),
 ('UXIA769ABCC447906', ('I', 'Toyota', '2017')),
 ('UXIA769ABCC447906', ('R', '', '')),
 ('INU45KIOOPA343980', ('A', '', ''))]

In [6]:
def populate_make(row):

    group_master = []
    for i in row:
        group_master.append(list(i))
        
    for i in range(0, len(group_master)):
        make = group_master[i-1][1]
        year = group_master[i-1][2]
        
        if group_master[i][0] != 'I':
            if group_master[i][1] == '' and group_master[i][2] == '':
                group_master[i][1] = make
                group_master[i][2] = year
        
    return group_master

In [7]:
enhance_make = vin_kv.groupByKey().flatMap(lambda row: populate_make(row[1]))

In [8]:
enhance_make.collect()

[['I', 'Nissan', '2003'],
 ['A', 'Nissan', '2003'],
 ['R', 'Nissan', '2003'],
 ['R', 'Nissan', '2003'],
 ['I', 'Mercedes', '2015'],
 ['A', 'Mercedes', '2015'],
 ['I', 'Mercedes', '2015'],
 ['R', 'Mercedes', '2015'],
 ['A', 'Mercedes', '2015'],
 ['R', 'Mercedes', '2015'],
 ['R', 'Mercedes', '2015'],
 ['I', 'Mercedes', '2016'],
 ['R', 'Mercedes', '2016'],
 ['A', 'Mercedes', '2016'],
 ['I', 'Toyota', '2017'],
 ['R', 'Toyota', '2017']]

In [88]:
def extract_make_key_value(row):

    if row[0] == 'A':
        return (row[1], row[2]), 1
    else:
        return (row[1], row[2]), 0

In [89]:
make_kv = enhance_make.map(lambda x: extract_make_key_value(x))

In [90]:
make_kv.collect()

[(('Nissan', '2003'), 0),
 (('Nissan', '2003'), 1),
 (('Nissan', '2003'), 0),
 (('Nissan', '2003'), 0),
 (('Mercedes', '2015'), 0),
 (('Mercedes', '2015'), 1),
 (('Mercedes', '2015'), 0),
 (('Mercedes', '2015'), 0),
 (('Mercedes', '2015'), 1),
 (('Mercedes', '2015'), 0),
 (('Mercedes', '2015'), 0),
 (('Mercedes', '2016'), 0),
 (('Mercedes', '2016'), 0),
 (('Mercedes', '2016'), 1),
 (('Toyota', '2017'), 0),
 (('Toyota', '2017'), 0)]

In [113]:
accidents = make_kv.reduceByKey(lambda x, y: x + y)

In [114]:
accidents.collect()

[(('Nissan', '2003'), 1),
 (('Mercedes', '2015'), 2),
 (('Mercedes', '2016'), 1),
 (('Toyota', '2017'), 0)]

In [117]:
file = open('accidents.txt', 'x')
for accident in accidents.collect():
    if accident[1] >= 1:
        file.write('{}-{},{}\n'.format(accident[0][0], accident[0][1], accident[1]))
file.close()