In [1]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.3.0/spark-3.3.0-bin-hadoop2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.3.0-bin-hadoop2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop2"

In [2]:
# install findspark using pip
!pip install findspark

# install pyspark using pip
!pip install pyspark

import findspark
findspark.init("/content/spark-3.3.0-bin-hadoop2")

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 62.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=76d0484fef31092a001082672de7185c1b0db36da704ff09ad46062acba05b90
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451

In [3]:
# upload files to work with
from google.colab import files

# in our case, upload data.csv
files.upload()

Saving data.csv to data.csv


{'data.csv': b'1,I,VXIO456XLBB630221,Nissan,Altima,2003,2002-05-08,Initial sales from TechMotors\n2,I,INU45KIOOPA343980,Mercedes,C300,2015,2014-01-01,Sold from EuroMotors\n3,A,VXIO456XLBB630221,,,,2014-07-02,Head on collision\n4,R,VXIO456XLBB630221,,,,2014-08-05,Repair transmission\n5,I,VOME254OOXW344325,Mercedes,E350,2015,2014-02-01,Sold from Carmax\n6,R,VOME254OOXW344325,,,,2015-02-06,Wheel allignment service\n7,R,VXIO456XLBB630221,,,,2015-01-01,Replace right head light\n8,I,EXOA00341AB123456,Mercedes,SL550,2016,2015-01-01,Sold from AceCars\n9,A,VOME254OOXW344325,,,,2015-10-01,Side collision\n10,R,VOME254OOXW344325,,,,2015-09-01,Changed tires\n11,R,EXOA00341AB123456,,,,2015-05-01,Repair engine\n12,A,EXOA00341AB123456,,,,2015-05-03,Vehicle rollover\n13,R,VOME254OOXW344325,,,,2015-09-01,Replace passenger side door\n14,I,UXIA769ABCC447906,Toyota,Camery,2017,2016-05-08,Initial sales from Carmax\n15,R,UXIA769ABCC447906,,,,2020-01-02,Initial sales from Carmax\n16,A,INU45KIOOPA343980,,,,202

In [1]:
#!/usr/bin/env python

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

"""
  The extract_vin_key_value method reads in data from data.csv to
  return a tuple containing the record's vin_number and necessary information.

  Arguments:
    Line - data read from data.csv

  Returns:
    Tuple - key: vin number
            value: tuple of (make, year, and incident type)

"""
def extract_vin_key_value(line: str):
    values = line.split(",") # reads in data from data.csv
    incident_type = values[1]
    vin_num = values[2]
    make = values[3]
    year = values[5]
    PairRDD = (make, year, incident_type)
    return (vin_num, PairRDD)


sc = SparkContext("local", "My Application")
raw_rdd = sc.textFile("data.csv")
vin_kv = raw_rdd.map(lambda x: extract_vin_key_value(x))

# Only records from an Initial Sale have make and year information
# Groups by key to populate make and year to all records using flatMap
enhance_make = vin_kv.groupByKey()\
                     .flatMap(lambda kv: kv[1])\
                     .filter(lambda x: len(x[1]) > 0 and len(x[2]) > 0)\

# Combine make and year for more presentable information
make_kv = enhance_make.map(lambda x: x[0] + '-' + x[1])

# Aggregate using reduceByKey to count each record with the same make and year
make_kv_count = make_kv.map(lambda x: (x, 1))\
                       .reduceByKey(lambda x, y: x+y)

# Entry point to create a dataframe and utilize SQL queries
spark = SparkSession.builder.appName('autoinc_spark').getOrCreate()

columns = ['make_year','count']
df = make_kv_count.toDF(columns)

df.sort(col("count").desc(), col("make_year").asc()).show()

+-------------+-----+
|    make_year|count|
+-------------+-----+
|Mercedes-2015|    2|
|Mercedes-2016|    1|
|  Nissan-2003|    1|
|  Toyota-2017|    1|
+-------------+-----+

