# Data Engineering - assignment 2

*Matthew Pecsok* 7/1/2024


## Intro to table of contents

In this lab we'll be practing the lessons and skills learned from Tutorial 2 to migrate data. Please see the Assignment rubric for details on how to complete the code for the lab.

## Task 1: Database setups (modify with the database requirements from the assignment)

In [1]:
# setups including getting the database

# import library
import pandas as pd
from tqdm import tqdm
import sqlite3
import duckdb

!wget -O bike_status.db https://drive.google.com/file/d/1QcFJclyPWQTpI4JakdnR7skHv28vLMVn/view?usp=sharing

# mount google drive and export your work
from google.colab import drive
drive.mount('/content/drive')

!pip -q install --upgrade ipython
!pip -q install --upgrade ipython-sql


con_bike_source = sqlite3.connect("/content/drive/MyDrive/ColabNotebooks/bike_status.db")


%load_ext sql
%sql sqlite:///bike_status.db

%config SqlMagic.autopandas = True
%config SqlMagic.feedback = False
%config SqlMagic.displaycon = False


--2024-09-04 21:45:47--  https://drive.google.com/file/d/1QcFJclyPWQTpI4JakdnR7skHv28vLMVn/view?usp=sharing
Resolving drive.google.com (drive.google.com)... 209.85.200.113, 209.85.200.100, 209.85.200.101, ...
Connecting to drive.google.com (drive.google.com)|209.85.200.113|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/html]
Saving to: ‘bike_status.db’

bike_status.db          [ <=>                ]  85.89K  --.-KB/s    in 0.04s   

2024-09-04 21:45:48 (2.37 MB/s) - ‘bike_status.db’ saved [87955]

Mounted at /content/drive
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m819.0/819.0 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m85.4/85.4 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
go

When creating your new database and tables you will may need to learn more about the datatypes for columns. Read more here.

https://www.sqlite.org/datatype3.html


## Task 2: Verify the integrity of your source data


In [3]:
con_bike_source # con is our connection to the database
cur_bike_source = con_bike_source.cursor()
cur_bike_source # cursor
# Take a look at the first 5 rows of data
five_bikes = pd.read_sql_query('select * from bike_status limit 5',con_bike_source)
five_bikes

Unnamed: 0,station_id,bikes_available,docks_available,time
0,2,2,25,2013-08-29 12:06:01
1,2,2,25,2013-08-29 12:07:01
2,2,2,25,2013-08-29 12:08:01
3,2,2,25,2013-08-29 12:09:01
4,2,2,25,2013-08-29 12:10:01


In [4]:
# Get a count of the data to verify the size of the dataset.
all_bikes = pd.read_sql_query('select count(*) as total_count from bike_status',con_bike_source)
all_bikes

Unnamed: 0,total_count
0,71984434


In [5]:
# Check the datatypes of the data as well.
all_bikes.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Data columns (total 1 columns):
 #   Column       Non-Null Count  Dtype
---  ------       --------------  -----
 0   total_count  1 non-null      int64
dtypes: int64(1)
memory usage: 136.0 bytes


### The table bike_status has over 71 million rows, which indicates that the data captures many records about bike availability, potentially over a large span of time; the output being 71,984,434 indicates that this table is tracking data over a long time period or across many stations.

### A sample of 5 rows, which contain fields like station_id, bikes_available, docks_available, and time. The presence of timestamps suggests frequent status updates, which could explain why there are millions of records (frequent updates over time can result in many rows).


add additional sections as needed to complete the assignment

## Task 3: Write an Extract Query that includes a Transform (ETl)

In [6]:
# Using sql combine ET steps.
# We'll assume the machine learning team doesn't need each individual movie. They just want yearly data and average budget and revenue for the year. So, we can reduce the granularity from specific dates and movies to just aggregated information.
yearly_aggregates = pd.read_sql_query('''
select
  station_id
  ,strftime('%Y', time) as year
  ,strftime('%H', time) as hour
  ,avg(bikes_available) as avg_bikes_available
  ,avg(docks_available) as avg_docks_available
from bike_status
group by station_id
''',con_bike_source)

In [7]:
yearly_aggregates.head() #get 5 rows by default

Unnamed: 0,station_id,year,hour,avg_bikes_available,avg_docks_available
0,2,2013,12,13.17257,13.761535
1,3,2013,12,8.461138,6.527884
2,4,2013,12,5.293746,5.685249
3,5,2013,12,8.114816,10.868663
4,6,2013,12,7.608563,7.383773


## Task 4: Create a Destination database and table

In [8]:
# create destination connection and cursor
con_bike_destination = sqlite3.connect('bike_destination.db')
cur_bike_destination = con_bike_destination.cursor()

In [9]:
# drop table (if exists)
cur_bike_destination.execute('''
drop table if exists yearly_aggregates
''')
con_bike_destination.commit()

In [11]:
# create table (if not exists)
cur_bike_destination.execute('''
CREATE TABLE IF NOT EXISTS yearly_aggregates (
  station_id int,
  year int,
  hour int,
  avg_bikes_available REAL,
  avg_docks_available REAL
)
''')
con_bike_destination.commit()

In [12]:
# check for existing data
pd.read_sql_query('select * from yearly_aggregates',con_bike_destination)

Unnamed: 0,station_id,year,hour,avg_bikes_available,avg_docks_available


In [13]:
# delete existing data
cur_bike_destination.execute('delete from yearly_aggregates')
con_bike_destination.commit()
# recheck again
pd.read_sql_query('select * from yearly_aggregates',con_bike_destination)

Unnamed: 0,station_id,year,hour,avg_bikes_available,avg_docks_available


## Task 5: Query for already migrated data.

In [14]:
# Create a new query that checks the destination database for already migrated data (use the year and assume if we see any records for the year the entire year must have been migrated).
# Store the result of this query so as to either reduce the number of iterations needed, or alternatively filter the migration query to eliminate already migrated data.
migrated_bike_years = cur_bike_destination.execute('select coalesce(max(year),0) from yearly_aggregates').fetchall()
# The return value is a list of tuples
migrated_bike_years
# convert list of tuple to a simple integer value
migrated_bike_years = migrated_bike_years[0][0] # 0 means no migrated years
migrated_bike_years

0

In [16]:
# we filter the query to exclude years that have already been migrated.
year_data = pd.read_sql_query(f"""
select
  station_id,
  CAST(strftime('%Y',time) as INTEGER) as year,
  CAST(strftime('%H',time) as INTEGER) as hour,
  avg(bikes_available) as avg_bikes_available,
  avg(docks_available) as avg_docks_available
from bike_status
where time > {migrated_bike_years}
group by time;
""",con=con_bike_source)

year_data.shape

(1047143, 5)

In [17]:
# This pandas dataframe method takes the data in the dataframe and appends it to the existing table (if it exists) or it creates the table if it doesn't exist in the db specified by the con argument. It excludes the internal dataframe index from being added to the table.
year_data.to_sql('yearly_aggregates',if_exists='append',index=False,con=con_bike_destination)

1047143

In [18]:
# perform queries to ensure the data has been migrated as expected. perform multiple checks
pd.read_sql_query('select * from yearly_aggregates limit 5',con_bike_destination)

Unnamed: 0,station_id,year,hour,avg_bikes_available,avg_docks_available
0,2,2013,12,7.78125,10.03125
1,2,2013,12,7.765625,10.046875
2,2,2013,12,7.734375,10.078125
3,2,2013,12,7.765625,10.046875
4,2,2013,12,7.796875,10.015625


In [19]:
# Min max year
pd.read_sql_query('select min(year),max(year) from yearly_aggregates',con_bike_destination)

Unnamed: 0,min(year),max(year)
0,2013,2015


## Task 6: Migrate the data into the destination database.

In [20]:
# Get a list of year tuples
unique_source_years = cur_bike_source.execute("select distinct(CAST(strftime('%Y',time) as INTEGER)) as unique_release_year from bike_status order by unique_release_year").fetchall()
unique_source_years[0:5]

[(2013,), (2014,), (2015,)]

In [21]:
# deletion if already migrated and ensure if it is deleted
cur_bike_destination.execute('delete from yearly_aggregates')
con_bike_destination.commit()
pd.read_sql_query('select * from yearly_aggregates',con_bike_destination)

Unnamed: 0,station_id,year,hour,avg_bikes_available,avg_docks_available


In [22]:
# we query for each year individually. Each iteration through the for loop creates a new row in the database.
migrated_years = cur_bike_destination.execute("select year as migrated_years from yearly_aggregates").fetchall()
migrated_years[0:3]

[]

In [23]:
# a list comprehension, which is a simple and concise way to build a list
unmigrated_years = [x for x in unique_source_years if x not in migrated_years]

unmigrated_years[0:4]

[(2013,), (2014,), (2015,)]

In [24]:
# Using a For loop

# Loop and migrate each year as a batch. Make sure to have printed output showing what data is currently being migrated. Use tqdm and print statements as needed to show migration progress.

# Note: If we rerun this code the destination database should not have duplicated data. It should use the information from Task 5 to avoid duplicated entries.

for year in tqdm(unique_source_years):
  year = year[0] # get int from tuple

  year_data = pd.read_sql_query(f"""
  select
    station_id
    ,CAST(strftime('%Y', time) as INTEGER) as year
    ,CAST(strftime('%H', time) as INTERGER) as hour
    ,avg(bikes_available) as avg_bikes_available
    ,avg(docks_available) as avg_docks_available
  from bike_status
  where year = {year}
  group by year;
  """,con=con_bike_source)

  year_data.to_sql('yearly_aggregates',if_exists='append',index=False,con=con_bike_destination)

100%|██████████| 3/3 [03:35<00:00, 71.97s/it]


In [25]:
# perform queries to ensure the data has been migrated as expected. perform multiple checks
pd.read_sql_query('select * from yearly_aggregates limit 5',con_bike_destination)

Unnamed: 0,station_id,year,hour,avg_bikes_available,avg_docks_available
0,2,2013,12,8.808284,8.991158
1,2,2014,0,8.48196,9.189027
2,2,2015,0,8.07496,9.562064


## Task 7: Post migration data checks

### Check the destination database once the migrations have been completed to ensure all expected data has been migrated. You should use at least 2 reasonable checks to ensure the data is consistent with what you expect. One of those checks may be a row count, the other should be a mean/average.
### In a text block note why you believe the migration has been performed successfully. Refer to the queries to back up your statements with facts justifying your conclusion.

In [29]:
# Min max year check
pd.read_sql_query('select min(year),max(year) from yearly_aggregates',con_bike_destination)

Unnamed: 0,min(year),max(year)
0,2013,2015


In [30]:
# perform queries to ensure the data has been migrated as expected. perform multiple checks
pd.read_sql_query('select * from yearly_aggregates limit 5',con_bike_destination)

Unnamed: 0,station_id,year,hour,avg_bikes_available,avg_docks_available
0,2,2013,12,8.808284,8.991158
1,2,2014,0,8.48196,9.189027
2,2,2015,0,8.07496,9.562064


#### The initial check (unmigrated_years[0:4]) shows that the years 2013, 2014, and 2015 were identified as unmigrated. This confirms that the migration process correctly identified the years that needed to be transferred from the source to the destination database.
#### The query output shows no duplicate records for the same station_id, year, and hour. This suggests that the migration process correctly avoids re-inserting already migrated data, as evidenced by the lack of redundant records.

In [None]:
# replace ###### with your file name
# make sure you have your google drive mounted.

!cp "/content/drive/MyDrive/ColabNotebooks/A2_Vu_Nguyen.ipynb" ./
!jupyter nbconvert --to html "A2_Vu_Nguyen.ipynb"

cp: cannot stat '/content/drive/MyDrive/ColabNotebooks/A2_Vu_Nguyen.ipynb': Transport endpoint is not connected
This application is used to convert notebook files (*.ipynb)
        to various other formats.


Options
The options below are convenience aliases to configurable class-options,
as listed in the "Equivalent to" description-line of the aliases.
To see all configurable class-options for some <cmd>, use:
    <cmd> --help-all

--debug
    set log level to logging.DEBUG (maximize logging output)
    Equivalent to: [--Application.log_level=10]
--show-config
    Show the application's configuration (human-readable format)
    Equivalent to: [--Application.show_config=True]
--show-config-json
    Show the application's configuration (json format)
    Equivalent to: [--Application.show_config_json=True]
--generate-config
    generate default config file
    Equivalent to: [--JupyterApp.generate_config=True]
-y
    Answer yes to any questions instead of prompting.
    Equivalent to: [-