# Importing dependencies/libraries needed + initial setup

In [1]:
#installing pyspark in this environment

!pip install pyspark

Collecting pyspark
  Using cached pyspark-3.5.1.tar.gz (317.0 MB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=c7a1f6a2a89604b9cedd5de1dad6206b4485b9d288f7090c5a6e5638af22037c
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [30]:
#importing the package that I'm going to use to combine the three csv files into one dataframe and clean them

from google.colab import files
from google.colab import drive
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date
from pyspark.sql.functions import year
from pyspark.sql.functions import col


In [3]:
#initializing Spark session

spark = SparkSession.builder.appName("Data Engineering Activity - State Farm").config("spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()

In [4]:
#reading in the three csv files and placing them in their own dateframe.

households_df = spark.read.csv("HOUSEHOLDS.csv", header=True)
customers_df = spark.read.csv("CUSTOMERS.csv", header=True)
cars_df = spark.read.csv("CARS.csv", header=True)

# Doing initial examination of each dataframe to look for anything suspicious or unusual - will address as needed.

In [5]:
#displaying the first five rows of household df - ZIP has a space after it - will remove.

households_df.show(5)

+---------+---------+------+---------+-------------+--------------+-----+-----+-------+---------------+
|    HH_ID|  CUST_ID|CAR_ID|Active HH|HH Start Date|  Phone Number| ZIP |State|Country|Referral Source|
+---------+---------+------+---------+-------------+--------------+-----+-----+-------+---------------+
|219790301|801198110|844435|        1|     11/18/22|(709) 379-9036|70442|   OK|    USA|          Other|
|219790301|281855167|410619|        1|     11/18/22|(740) 565-4060|70442|   OK|    USA|          Other|
|219790301|688373183|192812|        1|     11/18/22|(117) 457-9582|70442|   OK|    USA|          Other|
|219790301|752746800|752033|        1|     11/18/22|(536) 797-5920|70442|   OK|    USA|          Other|
|464806390|114187354| 23783|        1|      10/9/20|(152) 373-1773|42706|   NY|    USA|          Event|
+---------+---------+------+---------+-------------+--------------+-----+-----+-------+---------------+
only showing top 5 rows



In [6]:
#removing whitespace from ZIP column and renaming 'state' to 'HH State' so it isn't confusing from the 'State' column from the cars.csv

households_df = households_df.withColumnRenamed("ZIP ", "ZIP")
households_df = households_df.withColumnRenamed("State", "HH State")
households_df.show(5)

+---------+---------+------+---------+-------------+--------------+-----+--------+-------+---------------+
|    HH_ID|  CUST_ID|CAR_ID|Active HH|HH Start Date|  Phone Number|  ZIP|HH State|Country|Referral Source|
+---------+---------+------+---------+-------------+--------------+-----+--------+-------+---------------+
|219790301|801198110|844435|        1|     11/18/22|(709) 379-9036|70442|      OK|    USA|          Other|
|219790301|281855167|410619|        1|     11/18/22|(740) 565-4060|70442|      OK|    USA|          Other|
|219790301|688373183|192812|        1|     11/18/22|(117) 457-9582|70442|      OK|    USA|          Other|
|219790301|752746800|752033|        1|     11/18/22|(536) 797-5920|70442|      OK|    USA|          Other|
|464806390|114187354| 23783|        1|      10/9/20|(152) 373-1773|42706|      NY|    USA|          Event|
+---------+---------+------+---------+-------------+--------------+-----+--------+-------+---------------+
only showing top 5 rows



In [7]:
#displaying the first five rows of customer df - immediately notice empty columns containing nothing but null (verified this by looking in the csv and doing df.info() in a seperate pandas dataframe) - will drop those columns before continuing

customers_df.show(5)

+---------+-------------+------+--------------+--------------------+------+----+----+----+----+----+----+----+----+----+----+
|  CUST_ID|Date of Birth|Gender|Marital Status|     Employment Type|Income| _c6| _c7| _c8| _c9|_c10|_c11|_c12|_c13|_c14|_c15|
+---------+-------------+------+--------------+--------------------+------+----+----+----+----+----+----+----+----+----+----+
|801198110|    11/6/2002|     F|             D|             Student|     0|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|281855167|    6/23/1955|     F|             D|             Retired|     0|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|688373183|    8/16/1996|     M|             M|Office and Admini...| 42052|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|752746800|    8/22/1936|     M|             M|             Retired|     0|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|114187354|    3/18/2008|     M|             D|   Sales and Related| 60879|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NUL

In [8]:
#dropping the empty columns from the customers df that I identified above and then displaying the cleaned df

customer_cleaned_df = customers_df.drop('_c6', '_c7', '_c8', '_c9', '_c10', '_c11', '_c12', '_c13', '_c14', '_c15')
customer_cleaned_df.show(5)

+---------+-------------+------+--------------+--------------------+------+
|  CUST_ID|Date of Birth|Gender|Marital Status|     Employment Type|Income|
+---------+-------------+------+--------------+--------------------+------+
|801198110|    11/6/2002|     F|             D|             Student|     0|
|281855167|    6/23/1955|     F|             D|             Retired|     0|
|688373183|    8/16/1996|     M|             M|Office and Admini...| 42052|
|752746800|    8/22/1936|     M|             M|             Retired|     0|
|114187354|    3/18/2008|     M|             D|   Sales and Related| 60879|
+---------+-------------+------+--------------+--------------------+------+
only showing top 5 rows



In [9]:
#displaying the first five rows of cars df - noticed the first column is named "Car ID". I'm changing it to "CAR_ID" to be uniform with other dataframes

cars_df.show(5)

+------+--------+-----+----+-------------+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+----------------------+
|Car ID|  Status|State|Year|         Make|Body Style|Vehicle Value|Annual Miles Driven|Business Use|Antique Vehicle|Lien|Lease|Driver Safety Discount|Vehicle Safety Discount|Claim Payout|6 Month Premium Amount|
+------+--------+-----+----+-------------+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+----------------------+
|844435|In Force|   OK|1990|Manufacturer7|    4 door|        50000|                 56|           0|              1|   1|    0|                     0|                      1|           0|                 42.89|
|410619|In Force|   OK|2019|Manufacturer5|       SUV|      8151.75|              12136|           0|              0|   0|    1|                     0|      

In [10]:
#renaming the column 'Car ID' to 'CAR_ID' in the cars df for uniformity. Also chaning state to Car State for clarity and displaying the result

cars_df = cars_df.withColumnRenamed("Car ID", "CAR_ID")
cars_df = cars_df.withColumnRenamed("State", "Car State")
cars_df.show(5)

+------+--------+---------+----+-------------+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+----------------------+
|CAR_ID|  Status|Car State|Year|         Make|Body Style|Vehicle Value|Annual Miles Driven|Business Use|Antique Vehicle|Lien|Lease|Driver Safety Discount|Vehicle Safety Discount|Claim Payout|6 Month Premium Amount|
+------+--------+---------+----+-------------+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+----------------------+
|844435|In Force|       OK|1990|Manufacturer7|    4 door|        50000|                 56|           0|              1|   1|    0|                     0|                      1|           0|                 42.89|
|410619|In Force|       OK|2019|Manufacturer5|       SUV|      8151.75|              12136|           0|              0|   0|    1|         

# Checking each dataframe for duplicates and removing them if neccessary

**Considerations: my methodology for handling duplicates was built around missing business knowledge.**
I removed 'true' duplicates from each dataframe - meaning rows that contained duplicate values of each column. I did this in lieu of knowing the specific business rule for handling specific duplicated columns. For example: I don't know whether or not the customer df should contain duplicated Cust_ID values or not. This would be a specific business rule I would want to verify before removing from the dateframe. As such, I have decided to proceed with only 'true' duplicates removed. *There ended up being no true duplicates*

In [11]:
#dropping duplicate records from each df

household_no_duplicates_df = households_df.dropDuplicates(['HH_ID', 'CUST_ID', 'CAR_ID', 'Active HH', 'HH Start Date', 'Phone Number', 'ZIP', 'HH State', 'Country', 'Referral Source'])
customer_no_duplicates_df = customer_cleaned_df.dropDuplicates(['CUST_ID', 'Date of Birth', 'Gender', 'Marital Status', 'Employment Type', 'Income'])
car_no_duplicates_df = cars_df.dropDuplicates(['CAR_ID', 'Status', 'Car State', 'Year', 'Make', 'Body Style', 'Vehicle Value', 'Annual Miles Driven', 'Business Use', 'Antique Vehicle', 'Lien', 'Lease', 'Driver Safety Discount', 'Vehicle Safety Discount', 'Claim Payout', '6 Month Premium Amount'])


In [13]:
#counting rows before removing duplicates
row_count_before = cars_df.count()

#dropping 'true' duplicates based all of the columns in that specific dataset
df_no_duplicates = cars_df.dropDuplicates(['Car_ID', 'Status', 'Car State', 'Year', 'Make', 'Body Style', 'Vehicle Value', 'Annual Miles Driven', 'Business Use', 'Antique Vehicle', 'Lien', 'Lease', 'Driver Safety Discount', 'Vehicle Safety Discount', 'Claim Payout', '6 Month Premium Amount'])
#counting the number of rows after dropping duplicates from the df
row_count_after = df_no_duplicates.count()

#using a simple if/else statement to check if there are any duplicates
if row_count_before == row_count_after:
    print("No duplicates found.")
else:
    print("Duplicates found and removed.")

# I repeated this for each of the three provided dataframes and found there were no 'true' duplicates. I did this to verify/visualize the duplicates being removed. This step isn't neccessay - it was purely for
# my confirmation of duplicate handling

No duplicates found.


# Joining the dataframes.
I joined the household df to customers df using cust_id. Then joined the combined df to cars df on car_id because cars.csv doesn't have cust_id.

In [14]:
#joining the household df to the customers df on the CUSt_ID column

initial_joined_households_customers_df = households_df.join(customer_cleaned_df, 'CUST_ID', 'inner')
initial_joined_households_customers_df.show(5)

+---------+---------+------+---------+-------------+--------------+-----+--------+-------+---------------+-------------+------+--------------+--------------------+------+
|  CUST_ID|    HH_ID|CAR_ID|Active HH|HH Start Date|  Phone Number|  ZIP|HH State|Country|Referral Source|Date of Birth|Gender|Marital Status|     Employment Type|Income|
+---------+---------+------+---------+-------------+--------------+-----+--------+-------+---------------+-------------+------+--------------+--------------------+------+
|100000879|275102439| 62733|        1|       9/8/94|(614) 544-8736|91345|      MS|    USA|         Friend|     4/5/1934|     M|             M|             Retired|     0|
|100001310|230609660|326012|        1|     10/28/92|(646) 714-1424|81287|      IN|    USA|  Advertisement|   11/10/1924|     F|             M|             Retired|     0|
|100003675|338410484|676707|        1|     12/26/22|(696) 693-1646|60712|      OH|    USA|          Event|    9/13/2006|     M|             M|   

In [15]:
#joining the initial combined df and the cars df on the CAR_ID column

final_combined_df = initial_joined_households_customers_df.join(cars_df, 'CAR_ID', 'inner')
final_combined_df.show(5)

+------+---------+---------+---------+-------------+--------------+-----+--------+-------+---------------+-------------+------+--------------+--------------------+------+--------------------+---------+----+-------------+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+----------------------+
|CAR_ID|  CUST_ID|    HH_ID|Active HH|HH Start Date|  Phone Number|  ZIP|HH State|Country|Referral Source|Date of Birth|Gender|Marital Status|     Employment Type|Income|              Status|Car State|Year|         Make|Body Style|Vehicle Value|Annual Miles Driven|Business Use|Antique Vehicle|Lien|Lease|Driver Safety Discount|Vehicle Safety Discount|Claim Payout|6 Month Premium Amount|
+------+---------+---------+---------+-------------+--------------+-----+--------+-------+---------------+-------------+------+--------------+--------------------+------+--------------------+---------+----+-------------+--

# Combined Dataframe cleanup
Checking data types and changing to desired if neccessary

In [16]:
# using the .printSchema to see datatypes - will change depending on what I assume the business use will be.

final_combined_df.printSchema()

root
 |-- CAR_ID: string (nullable = true)
 |-- CUST_ID: string (nullable = true)
 |-- HH_ID: string (nullable = true)
 |-- Active HH: string (nullable = true)
 |-- HH Start Date: string (nullable = true)
 |-- Phone Number: string (nullable = true)
 |-- ZIP: string (nullable = true)
 |-- HH State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Referral Source: string (nullable = true)
 |-- Date of Birth: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital Status: string (nullable = true)
 |-- Employment Type: string (nullable = true)
 |-- Income: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Car State: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Vehicle Value: string (nullable = true)
 |-- Annual Miles Driven: string (nullable = true)
 |-- Business Use: string (nullable = true)
 |-- Antique Vehicle: string (nullable = tr

In [17]:
#converting the below to date format based on how they are in the combined dateframe. Example: yyyy-mm-dd, yyyy, dd-mm-yy, etc.

final_combined_df = final_combined_df.withColumn("HH Start Date", to_date(final_combined_df["HH Start Date"], "M/d/yy"))
final_combined_df = final_combined_df.withColumn("Date of Birth", to_date(final_combined_df["Date of Birth"], "MM/dd/yyyy"))
final_combined_df.show(5)


+------+---------+---------+---------+-------------+--------------+-----+--------+-------+---------------+-------------+------+--------------+--------------------+------+--------------------+---------+----+-------------+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+----------------------+
|CAR_ID|  CUST_ID|    HH_ID|Active HH|HH Start Date|  Phone Number|  ZIP|HH State|Country|Referral Source|Date of Birth|Gender|Marital Status|     Employment Type|Income|              Status|Car State|Year|         Make|Body Style|Vehicle Value|Annual Miles Driven|Business Use|Antique Vehicle|Lien|Lease|Driver Safety Discount|Vehicle Safety Discount|Claim Payout|6 Month Premium Amount|
+------+---------+---------+---------+-------------+--------------+-----+--------+-------+---------------+-------------+------+--------------+--------------------+------+--------------------+---------+----+-------------+--

In [18]:
#converting the below columns to integer format based on my assumption of the business need. Note that I'm keeping CUST_ID, CAR_ID, and HH_ID as string in case alphanumeric characters need to be added later.

columns_to_integer = [
    "Active HH", "ZIP", "Income", "Year", "Annual Miles Driven",
    "Business Use", "Antique Vehicle", "Lien", "Lease",
    "Driver Safety Discount", "Vehicle Safety Discount", "Claim Payout"]

#using a simple for loop to convert the desired columns to int - saves time for me

for column in columns_to_integer:
    final_combined_df = final_combined_df.withColumn(column, col(column).cast("int"))



In [19]:
#converting the below columns to decimal - shortening 6 Month Premium Amount for ease of storing and processing - should also be precise enough for financial calculations down the line.

final_combined_df = final_combined_df.withColumn("Vehicle Value", col("Vehicle Value").cast("decimal(10,2)")) \
                                     .withColumn("6 Month Premium Amount", col("6 Month Premium Amount").cast("decimal(20,4)"))

# All columns are now in the (assumed) correct type. Printing the schema and row count

In [20]:
#showing the schema

final_combined_df.printSchema()

root
 |-- CAR_ID: string (nullable = true)
 |-- CUST_ID: string (nullable = true)
 |-- HH_ID: string (nullable = true)
 |-- Active HH: integer (nullable = true)
 |-- HH Start Date: date (nullable = true)
 |-- Phone Number: string (nullable = true)
 |-- ZIP: integer (nullable = true)
 |-- HH State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Referral Source: string (nullable = true)
 |-- Date of Birth: date (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital Status: string (nullable = true)
 |-- Employment Type: string (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- Car State: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Vehicle Value: decimal(10,2) (nullable = true)
 |-- Annual Miles Driven: integer (nullable = true)
 |-- Business Use: integer (nullable = true)
 |-- Antique Vehicle: integer (nul

In [21]:
#showing the total number of rows

final_combined_df.count()

845805

In [34]:
#saving to parquet as per the instructions

final_combined_df.write.parquet("combined_data.parquet", mode ="overwrite") #used this to save locally


In [33]:
#mounting my google drive for quicker file access
drive.mount('/content/drive')

#saving the parquet file to my google drive
final_combined_df.write.parquet("/content/drive/My Drive/combined_data.parquet", mode="overwrite")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
