# Data Engineering - Assignment 2

Ahsan Ahmad 9/2/2024


## Task 1: Download data and install/import packages

In [2]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Set up the correct file path for the database
db_file_path = '/content/drive/My Drive/bike_status.db'

# Install necessary packages
!pip -q install --upgrade ipython
!pip -q install --upgrade ipython-sql

# Connect to the SQLite database
import sqlite3
con_bike_status = sqlite3.connect(db_file_path)

# Load the SQL extension and connect to the database
%load_ext sql
%sql sqlite:///{db_file_path}

# Configure SQL Magic
%config SqlMagic.autopandas = True
%config SqlMagic.feedback = False
%config SqlMagic.displaycon = False

# Import DuckDB if needed
import duckdb


Mounted at /content/drive
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m819.0/819.0 kB[0m [31m13.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m85.4/85.4 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
google-colab 1.0.0 requires ipython==7.34.0, but you have ipython 8.27.0 which is incompatible.[0m[31m
[0mTraceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/sqlalchemy/engine/base.py", line 146, in __init__
    self._dbapi_connection = engine.raw_connection()
  File "/usr/local/lib/python3.10/dist-packages/sqlalchemy/engine/base.py", line 3302, in raw_connection
    return self.pool.connect()
  File "/usr/local/lib/python3.10/dist-packages/sqlalchemy/pool/base.py", line 449, in connect
    return _ConnectionF

In [3]:
import pandas as pd
import sqlite3
from tqdm import tqdm

In [4]:
con_bike_status # con is our connection to the database

<sqlite3.Connection at 0x7e64a2580640>

In [5]:
cur_bike_status = con_bike_status.cursor()
cur_bike_status # cursor

<sqlite3.Cursor at 0x7e646f8ebbc0>

## Task 2: Verify the integrity of your source data

In [55]:
# Verifying the source data
pd.read_sql("""
SELECT *
FROM bike_status
LIMIT 5
""", con_bike_status)

Unnamed: 0,station_id,bikes_available,docks_available,time
0,2,2,25,2013-08-29 12:06:01
1,2,2,25,2013-08-29 12:07:01
2,2,2,25,2013-08-29 12:08:01
3,2,2,25,2013-08-29 12:09:01
4,2,2,25,2013-08-29 12:10:01


By selecting the first 5 rows from the database, we can see that the output resembles with that given in the assignment with four columns labels and their respective input types e.g. time is shown by an increment of 1 minute which is what was expected from the output data.

In [7]:
# Getting the total count of rows and columns (shape of the new database)
row_count = pd.read_sql_query('select count(1) as row_count from bike_status',con_bike_status)

print(row_count)

   row_count
0   71984434


The total row count comes out to be 71,984,434 or 71.98 million which is comparable to the expected output of approximately 72 million rows.

In [8]:
# Checking the datatypes of the data

data_types = pd.read_sql("PRAGMA table_info(bike_status)",con_bike_status)

print(data_types)

   cid             name       type  notnull dflt_value  pk
0    0       station_id    INTEGER        0       None   0
1    1  bikes_available    INTEGER        0       None   0
2    2  docks_available    INTEGER        0       None   0
3    3             time  TIMESTAMP        0       None   0


The column names and data types match with that of the output given in assignment description showing the columns station_id, bikes_available and docks_available as Integers and the time column as a Timestamp which is expected as the data is the count of bike status for each minute for the past 3 years.

## Task 3: Write an Extract Query that includes a Transform (ETl)

In [60]:
# Creating a query to get the data from year 2013 and group it by hour
# and avgerage bikes and docks available for that particular hour

yearly_agg_2013 = pd.read_sql_query("""
select
  station_id,
  CAST(strftime('%Y',time) as INTEGER) as year,
  CAST(strftime('%H',time) as INTEGER) as hour,
  avg(bikes_available) as avg_bikes_available,
  avg(docks_available) as avg_docks_available
from bike_status
where year = 2013
group by station_id, hour
order by station_id
""", con_bike_status)

yearly_agg_2013.head()

Unnamed: 0,station_id,year,hour,avg_bikes_available,avg_docks_available
0,2,2013,0,14.319557,12.672916
1,2,2013,1,14.271015,12.722753
2,2,2013,2,14.211712,12.780781
3,2,2013,3,14.205377,12.787118
4,2,2013,4,14.219918,12.773397


In the above query we are extracting all the data where year is equal to 2013 and grouping it by hour to get a similar output to what we have seen in the assignment description. Similarly we will write queries for 2014 and 2015 below.

In [61]:
# Creating a query to get the data from year 2014 and group it by hour
# and avgerage bikes and docks available for that particular hour

yearly_agg_2014 = pd.read_sql_query("""
select
  station_id,
  CAST(strftime('%Y',time) as INTEGER) as year,
  CAST(strftime('%H',time) as INTEGER) as hour,
  avg(bikes_available) as avg_bikes_available,
  avg(docks_available) as avg_docks_available
from bike_status
where year = 2014
group by station_id, hour
order by station_id
""", con_bike_status)

yearly_agg_2014.head()

Unnamed: 0,station_id,year,hour,avg_bikes_available,avg_docks_available
0,2,2014,0,13.04601,13.907154
1,2,2014,1,12.971158,13.981997
2,2,2014,2,12.901719,14.051283
3,2,2014,3,12.888889,14.06434
4,2,2014,4,12.895522,14.057992


In [62]:
# Creating a query to get the data from year 2015 and group it by hour
# and avgerage bikes and docks available for that particular hour

yearly_agg_2015 = pd.read_sql_query("""
select
  station_id,
  CAST(strftime('%Y',time) as INTEGER) as year,
  CAST(strftime('%H',time) as INTEGER) as hour,
  avg(bikes_available) as avg_bikes_available,
  avg(docks_available) as avg_docks_available
from bike_status
where year = 2015
group by station_id, hour
order by station_id
""", con_bike_status)

yearly_agg_2015.head()

Unnamed: 0,station_id,year,hour,avg_bikes_available,avg_docks_available
0,2,2015,0,13.143123,13.732626
1,2,2015,1,13.121102,13.755043
2,2,2015,2,13.005096,13.870592
3,2,2015,3,13.049931,13.826543
4,2,2015,4,13.045614,13.830921


## Task 4: Create a Destination database and table

In [79]:
# Creating a destination connection and cursor

con_bike_status_destination = sqlite3.connect('bike_status_destination.db')
cur_bike_status_destination = con_bike_status_destination.cursor()

In [80]:
# Dropping table if exist

cur_bike_status_destination.execute("""
DROP TABLE IF EXISTS yearly_agg
""")
con_bike_status_destination.commit()

In [81]:
# Creating a new table with datatypes that match the source/query datatypes.

cur_bike_status_destination.execute("""
CREATE TABLE IF NOT EXISTS yearly_agg (
  station_id INTEGER,
  year INTEGER,
  hour INTEGER,
  avg_bikes_available REAL,
  avg_docks_available REAL
)
""")
con_bike_status_destination.commit()

In [82]:
# Checking for existing data

pd.read_sql("""
SELECT *
FROM yearly_agg
LIMIT 5
""", con_bike_status_destination)

Unnamed: 0,station_id,year,hour,avg_bikes_available,avg_docks_available


In [83]:
# Deleting all data from the destination table.

cur_bike_status_destination.execute("""
DELETE FROM yearly_agg
""")
con_bike_status_destination.commit()

## Task 5: Query for already migrated data.

In [84]:
# Creating a new query to checks the destination database for already migrated data. Using coalesce to ensure a value is returned

migrated_bike_status_years = cur_bike_status_destination.execute('select coalesce(max(year),0) from yearly_agg').fetchall()

migrated_bike_status_years

[(0,)]

## Task 6: Migrate the data into the destination database.

In [85]:
# Getting a list of year tupples

unique_source_years = cur_bike_status.execute("select distinct(CAST(strftime('%Y',time) as INTEGER)) as unique_year from bike_status order by unique_year").fetchall()
unique_source_years[0:3]

[(2013,), (2014,), (2015,)]

In [86]:
cur_bike_status_destination.execute('delete from yearly_agg')
con_bike_status_destination.commit()

In [87]:
# Ensuring the deletion was a success

pd.read_sql_query('select * from yearly_agg',con_bike_status_destination)

Unnamed: 0,station_id,year,hour,avg_bikes_available,avg_docks_available


In [88]:
migrated_years = cur_bike_status_destination.execute('select year as migrated_years from yearly_agg').fetchall()

migrated_years[0:3]

[]

In [89]:
unmigrated_years = [x for x in unique_source_years if x not in migrated_years]

unmigrated_years[0:3]

[(2013,), (2014,), (2015,)]

In [90]:
# Using for loop to query each year individulally. Each iteration through the loop will create a new row

for year in tqdm(unmigrated_years):
  year = year[0]

  year_data = pd.read_sql_query(f"""
  select
    station_id,
    CAST(strftime('%Y',time) AS INTEGER) as year,
    CAST(strftime('%H',time) AS INTEGER) as hour,
    avg(bikes_available) as avg_bikes_available,
    avg(docks_available) as avg_docks_available
  from bike_status
  where strftime('%Y',time) = '{year}'
  group by station_id, hour
  order by station_id
  """, con_bike_status)

  year_data.to_sql('yearly_agg', con_bike_status_destination, if_exists='append', index=False)

100%|██████████| 3/3 [03:55<00:00, 78.55s/it]


In [91]:
# Checking if the data migration is as we expected

pd.read_sql_query('select * from yearly_agg limit 5',con_bike_status_destination)

Unnamed: 0,station_id,year,hour,avg_bikes_available,avg_docks_available
0,2,2013,0,14.319557,12.672916
1,2,2013,1,14.271015,12.722753
2,2,2013,2,14.211712,12.780781
3,2,2013,3,14.205377,12.787118
4,2,2013,4,14.219918,12.773397


## Task 7: Post migration data checks

In [98]:
# Checking the row counts for each year dataset before migration

yearly_agg_2013.shape

(1584, 5)

In [99]:
yearly_agg_2014.shape

(1680, 5)

In [100]:
yearly_agg_2015.shape

(1680, 5)

In [101]:
# Checking the row counts for the migrated dataset

pd.read_sql_query('select count(1) as row_count from yearly_agg',con_bike_status_destination)

Unnamed: 0,row_count
0,4944


The sum of rows for the three year batches before migration comes out to be 1584 + 1680 + 1680 = 4944. Since the number of rows in the migrated dataset shown above matches with the sum of rows in these three yearly batched datasets, the migration has been succesful.

In [109]:
# Getting the average of the bikes_available and docks_available for the dataset before migration

pd.read_sql_query('select avg(bikes_available) as avg_bikes_available, avg(docks_available) as avg_docks_available from bike_status',con_bike_status)

Unnamed: 0,avg_bikes_available,avg_docks_available
0,8.394812,9.284729


In [107]:
# Getting the average of the avg_bikes_available and avg_docks_available for the dataset after migration

pd.read_sql_query('select avg(avg_bikes_available) as avg_bikes_available, avg(avg_docks_available) as avg_docks_available from yearly_agg',con_bike_status_destination)

Unnamed: 0,avg_bikes_available,avg_docks_available
0,8.379449,9.27371


The above two answers for aggregations are within +-0.01 which is due to the fact that the migrated data was divided into three batches by year and average of the average has been taken. Due to sheer volume of rows of 72 million some of the accuracy in the result is prone to go down. As seen from the row tally above, I believe the migration has been succesful and this minimal error is solely due to rounding and the averages should be considered the same.

In [None]:
# replace ###### with your file name
# make sure you have your google drive mounted.

!cp "/content/drive/MyDrive/Colab Notebooks/######.ipynb" ./
!jupyter nbconvert --to html "######.ipynb"