Objective:

The objective of this assignment is to create a data engineering pipeline in Python and MySQL that involves ingesting data from a CSV file, preprocessing the data, storing it into a database, and creating a pipeline for this process.

Assignment Description:
You are provided with a CSV file containing sample data. Your task is to create a Jupyter Notebook that performs the following steps:

* Ingest the data from the CSV file.

* Preprocess the data (cleaning, transformation, etc.).

* Store the preprocessed data into a MySQL database.

* Create a data engineering pipeline to automate the above steps.

Data Description:

* The CSV file contains sample data with multiple columns. It is the same data used in the class. 

Submission Guidelines:

Create a Jupyter Notebook containing Python code for the data engineering pipeline.

Include comments and explanations throughout the notebook to explain the code and the steps performed.
Save the CSV file and any other relevant files in the same directory as the notebook.

Ensure that the notebook is well-structured and easy to follow.
Test the code to ensure that it executes correctly without errors.

Submit the following files:
* Jupyter Notebook (.ipynb file)
* CSV file containing sample data
* Any additional files or resources used in the assignment

Note:

* You are free to use any data engineering tools and libraries available in Python (e.g., pandas, SQLAlchemy, etc.).
* Ensure that your code follows best practices and is well-documented.
* Plagiarism will not be tolerated, and any instances of plagiarism will result in penalties.

Assignment Deadline:
15th March 2024
Good luck! If you have any questions or need clarification, feel free to reach out.

In [3]:
!pip install sqlalchemy psycopg2-binary
!pip install pandas

Collecting pandas
  Downloading pandas-2.2.1-cp310-cp310-win_amd64.whl.metadata (19 kB)
Collecting numpy<2,>=1.22.4 (from pandas)
  Using cached numpy-1.26.4-cp310-cp310-win_amd64.whl.metadata (61 kB)
Collecting pytz>=2020.1 (from pandas)
  Using cached pytz-2024.1-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas)
  Using cached tzdata-2024.1-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading pandas-2.2.1-cp310-cp310-win_amd64.whl (11.6 MB)
   ---------------------------------------- 0.0/11.6 MB ? eta -:--:--
   ---------------------------------------- 0.0/11.6 MB ? eta -:--:--
   ---------------------------------------- 0.0/11.6 MB 435.7 kB/s eta 0:00:27
   ---------------------------------------- 0.1/11.6 MB 544.7 kB/s eta 0:00:22
    --------------------------------------- 0.2/11.6 MB 1.3 MB/s eta 0:00:09
   - -------------------------------------- 0.3/11.6 MB 1.6 MB/s eta 0:00:07
   - -------------------------------------- 0.4/11.6 MB 1.8 MB/s eta 0:00:0

In regards of the docker file it is added to be used without the installation process locally. 

First it is necessary to install docker and docker compose. In order to have the postgresql database available. After achiving this. using docker to run `docker-compose -p car_proj up` where `-p` is the flag to name the project. 

In [4]:
# importing necessary libraries
import pandas as pd

In [5]:
from sqlalchemy import create_engine

In [6]:
# connecting to the database created in the docker container
engine = create_engine('postgresql://root:root@localhost:5432/car_prices')

In [7]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x2c295bcc190>

In [8]:
# testing the connection to the db
query = """
SELECT 1 as number;
"""

pd.read_sql(query, con=engine)

Unnamed: 0,number
0,1


In [9]:
df_cars = pd.read_csv('car_prices.csv', parse_dates=['year', 'saledate'])
df_cars

  df_cars = pd.read_csv('car_prices.csv', parse_dates=['year', 'saledate'])


Unnamed: 0,year,make,model,trim,body,transmission,vin,state,condition,odometer,color,interior,seller,mmr,sellingprice,saledate
0,2015-01-01,Kia,Sorento,LX,SUV,automatic,5xyktca69fg566472,ca,5.0,16639.0,white,black,kia motors america inc,20500.0,21500.0,Tue Dec 16 2014 12:30:00 GMT-0800 (PST)
1,2015-01-01,Kia,Sorento,LX,SUV,automatic,5xyktca69fg561319,ca,5.0,9393.0,white,beige,kia motors america inc,20800.0,21500.0,Tue Dec 16 2014 12:30:00 GMT-0800 (PST)
2,2014-01-01,BMW,3 Series,328i SULEV,Sedan,automatic,wba3c1c51ek116351,ca,45.0,1331.0,gray,black,financial services remarketing (lease),31900.0,30000.0,Thu Jan 15 2015 04:30:00 GMT-0800 (PST)
3,2015-01-01,Volvo,S60,T5,Sedan,automatic,yv1612tb4f1310987,ca,41.0,14282.0,white,black,volvo na rep/world omni,27500.0,27750.0,Thu Jan 29 2015 04:30:00 GMT-0800 (PST)
4,2014-01-01,BMW,6 Series Gran Coupe,650i,Sedan,automatic,wba6b2c57ed129731,ca,43.0,2641.0,gray,black,financial services remarketing (lease),66000.0,67000.0,Thu Dec 18 2014 12:30:00 GMT-0800 (PST)
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
558832,2015-01-01,Kia,K900,Luxury,Sedan,,knalw4d4xf6019304,in,45.0,18255.0,silver,black,avis corporation,35300.0,33000.0,Thu Jul 09 2015 07:00:00 GMT-0700 (PDT)
558833,2012-01-01,Ram,2500,Power Wagon,Crew Cab,automatic,3c6td5et6cg112407,wa,5.0,54393.0,white,black,i -5 uhlmann rv,30200.0,30800.0,Wed Jul 08 2015 09:30:00 GMT-0700 (PDT)
558834,2012-01-01,BMW,X5,xDrive35d,SUV,automatic,5uxzw0c58cl668465,ca,48.0,50561.0,black,black,financial services remarketing (lease),29800.0,34000.0,Wed Jul 08 2015 09:30:00 GMT-0700 (PDT)
558835,2015-01-01,Nissan,Altima,2.5 S,sedan,automatic,1n4al3ap0fc216050,ga,38.0,16658.0,white,black,enterprise vehicle exchange / tra / rental / t...,15100.0,11100.0,Thu Jul 09 2015 06:45:00 GMT-0700 (PDT)


In [10]:
df_cars.describe()

Unnamed: 0,year,condition,odometer,mmr,sellingprice
count,558837,547017.0,558743.0,558799.0,558825.0
mean,2010-01-15 02:18:33.826035200,30.672365,68320.017767,13769.377495,13611.35881
min,1982-01-01 00:00:00,1.0,1.0,25.0,1.0
25%,2007-01-01 00:00:00,23.0,28371.0,7100.0,6900.0
50%,2012-01-01 00:00:00,35.0,52254.0,12250.0,12100.0
75%,2013-01-01 00:00:00,42.0,99109.0,18300.0,18200.0
max,2015-01-01 00:00:00,49.0,999999.0,182000.0,230000.0
std,,13.402832,53398.542821,9679.967174,9749.501628


In [11]:
df_cars.info

<bound method DataFrame.info of              year    make                model         trim       body  \
0      2015-01-01     Kia              Sorento           LX        SUV   
1      2015-01-01     Kia              Sorento           LX        SUV   
2      2014-01-01     BMW             3 Series   328i SULEV      Sedan   
3      2015-01-01   Volvo                  S60           T5      Sedan   
4      2014-01-01     BMW  6 Series Gran Coupe         650i      Sedan   
...           ...     ...                  ...          ...        ...   
558832 2015-01-01     Kia                 K900       Luxury      Sedan   
558833 2012-01-01     Ram                 2500  Power Wagon   Crew Cab   
558834 2012-01-01     BMW                   X5    xDrive35d        SUV   
558835 2015-01-01  Nissan               Altima        2.5 S      sedan   
558836 2014-01-01    Ford                F-150          XLT  SuperCrew   

       transmission                vin state  condition  odometer   color  \
0 

In [12]:
df_cars.dtypes

year            datetime64[ns]
make                    object
model                   object
trim                    object
body                    object
transmission            object
vin                     object
state                   object
condition              float64
odometer               float64
color                   object
interior                object
seller                  object
mmr                    float64
sellingprice           float64
saledate                object
dtype: object

In [13]:
df_cars.isna().sum().sum()

123376

In [14]:
df_cars_cleaned = df_cars.dropna()
df_cars_cleaned.shape

(472325, 16)

In [15]:
# TODO parse year in year column 
# TODO parse date to UTC in saledate

In [16]:
df_cars_cleaned.to_sql(name='car_prices_data', con=engine, if_exists='replace', index=False)

325

In [17]:
query = """
SELECT * FROM car_prices_data;
"""
pd.read_sql(query, con=engine)

Unnamed: 0,year,make,model,trim,body,transmission,vin,state,condition,odometer,color,interior,seller,mmr,sellingprice,saledate
0,2015-01-01,Kia,Sorento,LX,SUV,automatic,5xyktca69fg566472,ca,5.0,16639.0,white,black,kia motors america inc,20500.0,21500.0,Tue Dec 16 2014 12:30:00 GMT-0800 (PST)
1,2015-01-01,Kia,Sorento,LX,SUV,automatic,5xyktca69fg561319,ca,5.0,9393.0,white,beige,kia motors america inc,20800.0,21500.0,Tue Dec 16 2014 12:30:00 GMT-0800 (PST)
2,2014-01-01,BMW,3 Series,328i SULEV,Sedan,automatic,wba3c1c51ek116351,ca,45.0,1331.0,gray,black,financial services remarketing (lease),31900.0,30000.0,Thu Jan 15 2015 04:30:00 GMT-0800 (PST)
3,2015-01-01,Volvo,S60,T5,Sedan,automatic,yv1612tb4f1310987,ca,41.0,14282.0,white,black,volvo na rep/world omni,27500.0,27750.0,Thu Jan 29 2015 04:30:00 GMT-0800 (PST)
4,2014-01-01,BMW,6 Series Gran Coupe,650i,Sedan,automatic,wba6b2c57ed129731,ca,43.0,2641.0,gray,black,financial services remarketing (lease),66000.0,67000.0,Thu Dec 18 2014 12:30:00 GMT-0800 (PST)
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
472320,2011-01-01,BMW,5 Series,528i,Sedan,automatic,wbafr1c53bc744672,fl,39.0,66403.0,white,brown,lauderdale imports ltd bmw pembrok pines,20300.0,22800.0,Tue Jul 07 2015 06:15:00 GMT-0700 (PDT)
472321,2012-01-01,Ram,2500,Power Wagon,Crew Cab,automatic,3c6td5et6cg112407,wa,5.0,54393.0,white,black,i -5 uhlmann rv,30200.0,30800.0,Wed Jul 08 2015 09:30:00 GMT-0700 (PDT)
472322,2012-01-01,BMW,X5,xDrive35d,SUV,automatic,5uxzw0c58cl668465,ca,48.0,50561.0,black,black,financial services remarketing (lease),29800.0,34000.0,Wed Jul 08 2015 09:30:00 GMT-0700 (PDT)
472323,2015-01-01,Nissan,Altima,2.5 S,sedan,automatic,1n4al3ap0fc216050,ga,38.0,16658.0,white,black,enterprise vehicle exchange / tra / rental / t...,15100.0,11100.0,Thu Jul 09 2015 06:45:00 GMT-0700 (PDT)


In [22]:
!pip install "apache-airflow[celery]==2.8.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.8.3/constraints-3.10.txt"

Collecting cryptography>=0.9.3 (from apache-airflow==2.8.3->apache-airflow[celery]==2.8.3)
  Downloading cryptography-41.0.7-cp37-abi3-win_amd64.whl.metadata (5.3 kB)
Collecting dill>=0.2.2 (from apache-airflow==2.8.3->apache-airflow[celery]==2.8.3)
  Downloading dill-0.3.1.1.tar.gz (151 kB)
     ---------------------------------------- 0.0/152.0 kB ? eta -:--:--
     ---------------------------------------- 0.0/152.0 kB ? eta -:--:--
     -- ------------------------------------- 10.2/152.0 kB ? eta -:--:--
     ------- ----------------------------- 30.7/152.0 kB 445.2 kB/s eta 0:00:01
     --------- --------------------------- 41.0/152.0 kB 330.3 kB/s eta 0:00:01
     ----------------------------- ------ 122.9/152.0 kB 804.6 kB/s eta 0:00:01
     ------------------------------------ 152.0/152.0 kB 908.6 kB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting httpx (from apache-airflow==2.8.3->apache-airflow[cele

In [23]:
from airflow import DAG

AttributeError: module 'sqlalchemy.util' has no attribute 'threading'

In [20]:
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

AttributeError: module 'sqlalchemy.util' has no attribute 'threading'