Bluedrive Data Engineer Assessment

Project Requirements:

1. Download the dataset Vehicle_sales_data.csv from Kaggle
2. Setup a PostgreSQL instance using Docker Compose
3. Design a suitable star schema for the dataset and create the tables and relationships in your PostgreSQL instance 
4. Create an ELT/ETL pipeline to extract, transform, and load the data into the tables in your PostgresSQL instance
5. Document the entire process in a Jupyter Notebook
6. Test the integrity and accuracy of the data in the PostgreSQL instance

Tools and Prerequisites:

1. WSL2: Ubuntu-24.04 - the general environment for developing the project
2. Docker Desktop - for running the PostgreSQL instance
3. Kaggle API - for fetching the dataset
4. Pandas - library to be used in ETL pipeline
5. PostgreSQL Client - for communicating with the PSQL instance using the terminal

Step 1. Downloading the dataset from Kaggle

    a. Sign up for a Kaggle account in https://www.kaggle.com/ and create your API Token. This will be used to download our dataset from the terminal with API calls

    b. Download your API Token, named kaggle.json and copy it to this directory '~/.config/kaggle'

    c. Install Kaggle API using the command: 'pip install kaggle'
    
    d. Run the command: 'kaggle datasets download syedanwarafridi/vehicle-sales-data'

In [None]:
#install the kaggle API
%pip install kaggle

In [None]:
#test the kaggle API by displaying the avaialble datasets
#if there are errors, make sure that you have copied the kaggle.json to the ~/.config/kaggle/ folder
!kaggle datasets list


In [None]:
#download the needed dataset, unzip, and rename it to Vehicle_sales_data.csv
#if you don't have the bluedrive_project in your home directory, create it first before running this code
!kaggle datasets download syedanwarafridi/vehicle-sales-data -p ~/bluedrive_project --unzip \
&& mv ~/bluedrive_project/car_prices.csv  ~/bluedrive_project/Vehicle_sales_data.csv

Step 2. Set up a PostgreSQL instance using Docker Compose

a. Install Docker Desktop from: https://www.docker.com/products/docker-desktop/
b. Setup your Docker Desktop to run WSL Integration. Check out the isntructions here: https://docs.docker.com/desktop/features/wsl/
c. Create the docker-compose.yml file
d. Create a .env file within the folder where the yml file is located to store sensitive information about the PSQL instance 
e. Run the command 'docker-compose up' inside the the folder where the yml file is located. Make sure Docker Desktop is running when doing this.
f. Test the PSQL instance by connecting to it using psql-client

Here is the docker-compose.yml file for creating a PostgreSQL instance. Copy the code below and save it as 'docker-compose.yml' in '~/bluedrive_project/' folder

services:
  db:
    image: postgres:16  # Use the desired version of PostgreSQL, 16 was used because the latest version for psql client is only at 16
    container_name: postgres_instance
    environment:
      POSTGRES_USER: ${POSTGRES_USER}       # Username for the PostgreSQL instance stored in .env
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}  # Password for the PostgreSQL instance stored in .env
      POSTGRES_DB: ${POSTGRES_DB}      # Name of the database to create stored in .env    
    ports:
      - "5433:5432"  # Expose PostgreSQL on the host machine's port 5432

Here is a sample .env file with containing the sensitive information. Edit as needed and save on the same folder as the yml file.

POSTGRES_USER=yourusername
POSTGRES_PASSWORD=yourpassword
POSTGRES_DB=yourdatabase


In [None]:
#test the PSQL instance by running the code below
!cd ~/bluedrive_project/
!docker-compose up

In [None]:
#Now that there is a running instance of the PSQL in Docker, connect to it using psql-client. Edit yoursudopassword as needed
!echo yoursudopassword | sudo -S apt install postgresql-client

In [None]:
#connect to your PSQL instance. Remember the credentials set earlier in the .env file and use those to login
!psql -U yourusername -h localhost -p 5433 -d yourdatabase

In [None]:
#stop the PSQL instance
!cd ~/bluedrive_project/
!docker-compose down


Step 3. Design a star schema suitable for the dataset and create tables and their relationships in the PSQL instance

a. The ERD tool from the PGAdmin software was used to create the tables and their relationships
b. The SQL commands was generated after creating the ERD
c. Connect to your psql instance and run the SQL commands to create the tables

From the original dataset, the following tables will be constructed:

1. dateDimTable - information on the date of the sale such as year, month, day, quarter, etc.
2. sellerDimtable - seller information such as seller name and seller id
3. stateDimTable - state information such as state abbreviation and state id
4. vehicleDimTable - information on vehicle's year, make, body, transmission, etc.
5. salesFactTable - vehicle id, date id, seller id, state id, selling price, mmr, odometer, condition, vin

Here is the generated SQL command for creating the tables and their relationships. Run this code in the terminal while connected to your PSQL instance.

-- This script was generated by the ERD tool in pgAdmin 4.
-- Please log an issue at https://github.com/pgadmin-org/pgadmin4/issues/new/choose if you find any bugs, including reproduction steps.
BEGIN;


CREATE TABLE IF NOT EXISTS public."dateDimTable"
(
    date_id integer NOT NULL,
    saledate date NOT NULL,
    saledate_year integer NOT NULL,
    saledate_month integer NOT NULL,
    saledate_monthname character varying(20) NOT NULL,
    saledate_day integer NOT NULL,
    saledate_weekday integer NOT NULL,
    saledate_weekdayname character varying(20) NOT NULL,
    quarter integer NOT NULL,
    quartername character varying(2) NOT NULL,
    PRIMARY KEY (date_id)
);

CREATE TABLE IF NOT EXISTS public."vehicleDimTable"
(
    vehicle_id integer NOT NULL,
    year integer NOT NULL,
    make character varying(30) NOT NULL,
    model character varying(30) NOT NULL,
    "trim" character varying(100) NOT NULL,
    body character varying(30) NOT NULL,
    transmission character varying(30) NOT NULL,
    color character varying(30) NOT NULL,
    interior character varying(30) NOT NULL,
    PRIMARY KEY (vehicle_id)
);

CREATE TABLE IF NOT EXISTS public."sellerDimTable"
(
    seller_id integer NOT NULL,
    seller character varying(50) NOT NULL,
    PRIMARY KEY (seller_id)
);

CREATE TABLE IF NOT EXISTS public."stateDimTable"
(
    state_id integer NOT NULL,
    state character varying(5) NOT NULL,
    PRIMARY KEY (state_id)
);

CREATE TABLE IF NOT EXISTS public."salesFactTable"
(
    sale_id integer NOT NULL,
    vin character varying(50) NOT NULL,
    vehicle_id integer NOT NULL,
    state_id integer NOT NULL,
    seller_id integer NOT NULL,
    mmr numeric(9, 2) NOT NULL,
    sellingprice numeric(9, 2) NOT NULL,
    odometer numeric(11, 2) NOT NULL,
    condition integer NOT NULL,
    date_id integer NOT NULL,
    PRIMARY KEY (sale_id)
);

ALTER TABLE IF EXISTS public."salesFactTable"
    ADD FOREIGN KEY (vehicle_id)
    REFERENCES public."vehicleDimTable" (vehicle_id) MATCH SIMPLE
    ON UPDATE NO ACTION
    ON DELETE NO ACTION
    NOT VALID;


ALTER TABLE IF EXISTS public."salesFactTable"
    ADD FOREIGN KEY (state_id)
    REFERENCES public."stateDimTable" (state_id) MATCH SIMPLE
    ON UPDATE NO ACTION
    ON DELETE NO ACTION
    NOT VALID;


ALTER TABLE IF EXISTS public."salesFactTable"
    ADD FOREIGN KEY (seller_id)
    REFERENCES public."sellerDimTable" (seller_id) MATCH SIMPLE
    ON UPDATE NO ACTION
    ON DELETE NO ACTION
    NOT VALID;


ALTER TABLE IF EXISTS public."salesFactTable"
    ADD FOREIGN KEY (date_id)
    REFERENCES public."dateDimTable" (date_id) MATCH SIMPLE
    ON UPDATE NO ACTION
    ON DELETE NO ACTION
    NOT VALID;

END;

Run the command '\dt' to check the tables. There should be 5 tables:

1. dateDimTable
2. sellerDimtable 
3. stateDimTable 
4. vehicleDimTable 
5. salesFactTable 

Step 4.Create an ETL/ELT pipeline

a. Install pandas
b. Read the dataset using pandas and make a dataframe
c. Handle duplicate values
d. Impute missing values
e. Create additional columns in the dataframe suitable for the starr schema in PSQL
f. Create multiple dataframes from the original dataframe mirroring the schema of the tables in PSQL and load them to csv files
g. Add unique IDs for each dataframes
h. Load the dataframes into CSV files
i. Import the CSV files into PSQL 

In [None]:
#install pandas
%pip install pandas

In [None]:
#import libraries
import pandas as pd

In [None]:
#read the dataset into a pandas dataframe
df = pd.read_csv(r'~/bluedrive_project/Vehicle_sales_data.csv')
#get the initial length of the df
start_length = len(df)

In [None]:
#handle duplicates
#the vin column is a good way to check for duplicates since this should be unique per vehicle
df.drop_duplicates(subset='vin', inplace=True)
#get the length of the df after dropping duplicates
length_after_duplicates = len(df)
print(f'Dropped Rows:{start_length-length_after_duplicates} ({(start_length-length_after_duplicates)*100/start_length}%)')

In [None]:
#Let's try to see what will happen if we drop all rows with missing values
#make a copy of df
df_test = df.copy()
df_test.dropna(inplace=True)
length_after_dropping_all_na = len(df_test)
print(f'Dropped Rows:{start_length-length_after_dropping_all_na} ({(start_length-length_after_dropping_all_na)*100/start_length}%)')

As seen, almost 16.8 % of the data was dropped because of dropping all missing values. For this reason, a strategy for imputation of missing values will be implemented.

1. Impute missing values with the mode value of the target column referenced to a relevant column
2. Impute missing values with the mean value of the target column


In [None]:
#define two functions, one for imputing missing values with the mode value, and another one for imputing missing values with the mean

#function to return a df where missing values are imputed using the mode based on a relevant column
def fillna_with_mode(target_col, reference_col, dataframe):
    
    most_freq = dataframe.groupby(reference_col)[target_col] \
        .agg(lambda x: x.mode()[0] if not x.mode().empty else 'Unknown')
    most_freq = most_freq.reset_index(name = 'most_freq')
    
    merged = pd.merge(dataframe, most_freq, on=reference_col, how='left')
    dataframe[target_col] = dataframe[target_col].fillna(merged['most_freq'])
    return dataframe

#function to impute missing values in the target column with the mean value of the target column
def fillna_with_mean(target_col, dataframe):
    
    mean_value = int(dataframe[target_col].mean())
    dataframe[target_col] = dataframe[target_col].fillna(mean_value)
    return dataframe

In [None]:
#handle missing values for the make column
#fill missing make values using mode of make grouped by seller
df = fillna_with_mode('make', 'seller', df)
df.dropna(subset='make', inplace=True)

In [None]:
#fill missing model values using mode of model grouped by make
df = fillna_with_mode('model', 'make', df)
df.dropna(subset='model', inplace=True)

In [None]:
#fill missing trim values using mode of trim grouped by model
df = fillna_with_mode('trim', 'model', df)
df.dropna(subset='trim', inplace=True)

#fill missing body values using mode of body grouped by model
df = fillna_with_mode('body', 'model', df)
df.dropna(subset='body', inplace=True)

#fill missing transmission values using mode of transmission grouped by model
df = fillna_with_mode('transmission', 'model', df)
df.dropna(subset='transmission', inplace=True)

In [None]:
#fill missing values in the condition column with the mean value of the condition column
df = fillna_with_mean('condition', df)
df.dropna(subset='condition', inplace=True)

In [None]:
#drop rows for other missing values and incorrect input

#drop state values where length of input is more than 2
df = df[df['state'].str.len() == 2]

#drop rows with missing odometer values
df.dropna(subset='odometer', inplace=True)

#drop rows with missing color values and — values
df.dropna(subset='color', inplace=True)
df = df[df['color'] != '—']

#drop rows with missing mmr values
df.dropna(subset='mmr', inplace=True)

In [None]:
#get the length of the df after the imputations and drops
end_length = len(df)
print(f'Dropped Rows:{start_length-end_length} ({(start_length-end_length)*100/start_length}%)')

As seen, the dropped rows are now just 6.3%. We managed to bring it down from 16.8%.

In [None]:
#the code below will expand the saledate column in preparation for the star schema tables

def generate_weekday(dataframe):
    weekday_map = {'Mon':1, 'Tue':2, 'Wed':3, 'Thu':4, 'Fri':5, 'Sat':6, 'Sun':7}
    dataframe['saledate_weekday'] = dataframe['saledate_weekdayname'].map(weekday_map)
    return dataframe

#parse the saledate column to get just the date part and store in a new column
df['saledate_new'] = df['saledate'].str[4:15]
#parse the saledate column to get just the weekdayname and store in a new column
df['saledate_weekdayname'] = df['saledate'].str[0:3]
#add anew column saledate_weekday corresponding to weekdayname, Mon = 1, Tue = 2, etc.
df = generate_weekday(df)
#format saledate_new to datetime
df['saledate_new'] = pd.to_datetime(df['saledate_new'])
#drop the saledate column then rename saledate_new to saledate
df.drop('saledate', axis=1, inplace=True)
df.rename(columns={'saledate_new': 'saledate'}, inplace=True)
#add a column for year, month and day
df['saledate_year'] = df['saledate'].dt.year
df['saledate_month'] = df['saledate'].dt.month
df['saledate_monthname'] = df['saledate'].dt.month_name()
df['saledate_day'] = df['saledate'].dt.day
#add a quarter and a quartername column
df['quarter'] = df['saledate'].dt.quarter
df['quartername'] = 'Q' + df['quarter'].astype(str)

In [None]:
#the code below will cast the correct data types for ecah columns

#cast correct data types for each columns
df['year'] = df['year'].astype(int)
df['make'] = df['make'].astype('string')
df['model'] = df['model'].astype('string')
df['trim'] = df['trim'].astype('string')
df['body'] = df['body'].astype('string')
df['transmission'] = df['transmission'].astype('string')
df['vin'] = df['vin'].astype('string')
df['state'] = df['state'].astype('string')
df['condition'] = df['condition'].astype(int)
df['odometer'] = df['odometer'].astype(float)
df['color'] = df['color'].astype('string')
df['interior'] = df['interior'].astype('string')
df['seller'] = df['seller'].astype('string')
df['mmr'] = df['mmr'].astype(float)
df['sellingprice'] = df['sellingprice'].astype(float)
df['saledate'] = pd.to_datetime(df['saledate'])
df['saledate_year'] = df['saledate_year'].astype(int)
df['saledate_month'] = df['saledate_month'].astype(int)
df['saledate_monthname'] = df['saledate_monthname'].astype('string')
df['saledate_day'] = df['saledate_day'].astype(int)
df['saledate_weekdayname'] = df['saledate_weekdayname'].astype('string')
df['saledate_weekday'] = df['saledate_weekday'].astype(int)
df['quarter'] = df['quarter'].astype(int)
df['quartername'] = df['quartername'].astype('string')

In [None]:
#final check for missing values before preparing the dataframes for the star schema
#columns with missing values will be True, otherwise, False
print(f'Number of Rows: {len(df)}')
print(df.isnull().any())

The next steps in the ETL process will be the creation of multiple dataframes that will mirror the schema of the tables prepared earlier in the PSQL instance.

In [None]:
#code for generating unique IDs for the multiple dataframes

def generate_id(dataframe, target_col):
    unique_values = dataframe[target_col].unique()
    ctr = 1
    my_dict = {}
    for x in unique_values:
        my_dict[x] = ctr
        ctr = ctr+1
    return my_dict

In [None]:
#dateDimTable

dateDimTable = df[['saledate', 'saledate_year', 'saledate_month', 'saledate_monthname','saledate_day',
                   'saledate_weekday', 'saledate_weekdayname', 'quarter', 'quartername', 
                   ]]
dateDimTable = dateDimTable.copy()
dateDimTable['date_id'] = dateDimTable['saledate'].map(generate_id(dateDimTable, 'saledate'))
dateDimTable.drop_duplicates(inplace=True)
dateDimTable.to_csv('~/bluedrive_project/dateDimTable.csv', index=False)

#sellerDimTable
sellerDimTable = df[['seller']]
sellerDimTable = sellerDimTable.copy()
sellerDimTable['seller_id'] = sellerDimTable['seller'].map(generate_id(sellerDimTable, 'seller'))
sellerDimTable.drop_duplicates(inplace=True)
sellerDimTable.to_csv('~/bluedrive_project/sellerDimTable.csv', index=False)

#stateDimTable
stateDimTable = df[['state']]
stateDimTable = stateDimTable.copy()
stateDimTable['state_id'] = stateDimTable['state'].map(generate_id(stateDimTable, 'state'))
stateDimTable.drop_duplicates(inplace=True)
stateDimTable.to_csv('~/bluedrive_project/stateDimTable.csv', index=False)

In [None]:
#vehicleDimTable
#slightly different than the first 3 above, the uniuqe id of vehicleDimTable is referenced to the uniuqe combination 
#of its columns and not just one column
vehicleDimTable = df[['year', 'make', 'model', 'trim', 'body', 'transmission', 'color', 'interior']].drop_duplicates()
vehicleDimTable = vehicleDimTable.copy()
vehicleDimTable['vehicle_id'] = vehicleDimTable.reset_index().index+1
vehicleDimTable.to_csv('~/bluedrive_project/vehicleDimTable.csv', index=False)

In [None]:
#salesFactTable
#merge the salesFactTable with the DimTables to get the incorporate the Dimtables' ID in the salesFactTable
salesFactTable = df.copy()
salesFactTable = salesFactTable.merge(dateDimTable[['date_id', 'saledate']], on='saledate', how='left')
salesFactTable = salesFactTable.merge(sellerDimTable[['seller_id', 'seller']], on='seller', how='left')
salesFactTable = salesFactTable.merge(stateDimTable[['state_id', 'state']], on='state', how='left')
salesFactTable = salesFactTable.merge(vehicleDimTable[['vehicle_id', 'year', 'make', 'model', 'trim', 'body', 
                                                       'transmission', 'color', 'interior']], 
                                      on=['year', 'make', 'model', 'trim', 'body', 'transmission', 'color', 'interior'], 
                                      how='left')

#Drop Redundant Columns (i.e., columns that are now part of the dimension tables) leaving the Dimtables IDs only and some other fact columns
salesFactTable = salesFactTable.drop(columns=['year', 'make', 'model', 'trim', 'body', 'transmission',
                                              'color', 'interior', 'saledate', 'seller', 'state',
                                              'saledate_weekdayname','saledate_weekday', 'saledate_year',
                                              'saledate_month', 'saledate_monthname', 'saledate_day',
                                              'quarter', 'quartername'])
salesFactTable['sale_id'] = salesFactTable.reset_index().index+1


salesFactTable.to_csv('~/bluedrive_project/salesFactTable.csv', index=False)

At this point, there should be 5 CSV files in your project directory:

1. dateDimTable.csv
2. sellerDimtable.csv
3. stateDimTable.csv
4. vehicleDimTable.csv
5. salesFactTable.csv

The final step is to load these CSV files into their respective tables in the PSQL instance

Do the following:

1. Start the PSQL instnace in docker
2. Connect to your PSQL instance
3. Import the CSV files into the tables using the \COPY method

Importing the CSV files to the PSQL instance:


\copy "dateDimTable"(saledate,saledate_year,saledate_month,saledate_monthname,saledate_day,saledate_weekday,saledate_weekdayname,quarter,quartername,date_id) FROM '~/bluedrive_project/dateDimTable.csv' DELIMITER ',' CSV HEADER;

\copy "sellerDimTable"(seller,seller_id) FROM '~/bluedrive_project/sellerDimTable.csv' DELIMITER ',' CSV HEADER;

\copy "stateDimTable"(state,state_id) FROM '~/bluedrive_project/stateDimTable.csv' DELIMITER ',' CSV HEADER;

\copy "vehicleDimTable"(year,make,model,trim,body,transmission,color,interior,vehicle_id) FROM '~/bluedrive_project/vehicleDimTable.csv' DELIMITER ',' CSV HEADER;

\copy "salesFactTable"(vin,condition,odometer,mmr,sellingprice,date_id,seller_id,state_id,vehicle_id,sale_id) FROM '~/bluedrive_project/salesFactTable.csv' DELIMITER ',' CSV HEADER;


Use the query below to verify your the integrity of your tables:

SELECT v.year, v.make, v.model, v.trim, v.body, v.transmission, s.vin, st.state, s.condition, s.odometer, v.color, v.interior, sel.seller, s.sellingprice, s.mmr, d.saledate  FROM "salesFactTable" AS s LEFT JOIN "dateDimTable" AS d ON s.date_id = d.date_id LEFT JOIN "sellerDimTable" AS sel ON sel.seller_id = s.seller_id LEFT JOIN "stateDimTable" AS st ON st.state_id = s.state_id LEFT JOIN "vehicleDimTable" AS v ON v.vehicle_id = s.vehicle_id  LIMIT 1;

The query should output the first row in the original dataset csv file which is

year | make |  model  | trim | body | transmission |        vin        | state | condition | odometer | color | interior |         seller          | sellingprice |   mmr    |  saledate  
------+------+---------+------+------+--------------+-------------------+-------+-----------+----------+-------+----------+-------------------------+--------------+----------+------------
 2015 | Kia  | Sorento | LX   | SUV  | automatic    | 5xyktca69fg566472 | ca    |         5 | 16639.00 | white | black    | kia motors america  inc |     21500.00 | 20500.00 | 2014-12-16