![003](assets/images/013/003.png)

[Last time](https://andrewyewcy.com/MySQL-and-phpMyAdmin-on-Docker/), MySQL and phpMyAdmin were setup using Docker containers to store the bicycle rides data from Bixi. In this article, we explore how to Extract, Transform, and Load(ETL) the raw data into the MySQL database using SQLAlchemy in Python. XXX rides loaded 

# Motivation and Introduction

In the age of Big Data, it is unlikely that the required data is stored in a single MySQL database or within a single DBMS. As such, rather than using a tool specific to MySQL for data ingestion, Python was chosen as the central tool to process raw data for data ingestion into the MySQL database. This avoids the need of using various GUIs to manage data ingestion into different DBMS, and also consolidates the process into a central location for easier maintenance and improvement. 

Below are some examples of where Python can be used for data ingestion:
- **SQL Databases** using [`SQLAlchemy`](https://www.sqlalchemy.org/), a Python toolkit that allows data scientists to connect to many DBMS aside from MySQL such as PostgreSQL and SQLite, all within a single Python file.
- **noSQL Databases** like `MongoDB`, `Redis` and `ScyllaDB`, which are tools more familiar to software developers. These do not need to follow the structured relational table format like SQL databases.
- **serverless databases on cloud** like `Amazon Aurora`
- **stream data** like that using `Apache Kafka`

Using Python also allows the use of the Pandas library, which simplifies the code for performing ETL and chunking the raw data.

# Overview of Setup

## Enviroments using Docker-compose

The environments needed can be easily replicated on any computer with Docker install using this [Docker compose file](https://github.com/andrewyewcy/docker/blob/main/setup.yaml).

Place the Docker compose YAML file into your working directory, then run the below line in Terminal to setup the environment:

In [None]:
# Run below line in Terminal, with your current directory in Terminal same as your chosen working directory
docker-compose -f setup.yaml up

3 Docker containers and a Docker network connecting them all will be automatically setup:
- Container1: MySQL on port 3306
- Container2: phpMyAdmin on port 8080
- Container3: `jupyter/pyspark-notebook` on port 10000, containing most Python data science pacakges and Apache Spark for dealing with big data later. [details](https://jupyter-docker-stacks.readthedocs.io/en/latest/using/selecting.html#jupyter-pyspark-notebook)

Once the containers are up and running, access JupyterLab by going to `localhost:10000` on a browser of your choice and inputting the access token, which can be found in the same terminal shown in blue below.

![004](assets/images/013/004.png)

The contents of your working directory will be within the `work` folder shown below after accessing JupyterLab, and any changes made will be preserved.

![005](assets/images/013/005.png)

## `mysqlclient` for SQLAlchemy

No additional packages need to be installed except for `mysqlclient`, which is needed by SQLAlchemy to connect Python to MySQL. This is done using the below steps:

In [None]:
# 1. Opening a new terminal window, check the name of the Python container
docker ps

# 2. activate an interactive terminal within the Python container
docker exec -it {name_of_python_container} /bin/bash

# 3. use conda to install mysqlclient
conda install -c conda-forge mysqlclient

# 4. when done, exit the interactive terminal within the Docker container
exit

## Import Packages and Define Functions

In [23]:
# import required packages
import pandas as pd
import os
import zipfile
import shutil

In [None]:
def print_list(list):
    '''
    This functions takes a list and iterates over it to print out the contents of the list.

    Inputs
    ---
    list: a Python list

    Outputs
    ---
    printout of list content with index
    '''
    
    if len(list) == 0:
        print(f"Passed list is empty.")
    else:
        for index, item in enumerate(list):
            print(f"Item {index + 1} / {len(list)}: {item}")

In [153]:
def unzip_folder(zip_file_list):
    '''
    This function takes a list of ZIP files and iterates over each ZIP file to decompress the contents into a folder of the same name
    The ZIP data must be placed within the 'data' folder of your working directory

    Inputs
    ---
    zip_file_list: a list of zip file names

    Outputs
    ---
    printout of current decompression progress
    zip_df: a DataFrame containing the ZIP file and its contents
    '''
    
    # Initiate blank DataFrame to store log details about each unzipped file
    zip_df = pd.DataFrame(
        columns = ['zip_file', 'contents']
    )
    
    # Iterating over each zip file
    for index, zip_file in enumerate(zip_file_list):
        
        # Define path to each zip file
        path_to_zip_file = 'data/' + zip_file
        
        # Define directorty to dump all extracted zip files
        directory_to_extract_to = 'data/' + zip_file[:-4]

        # Create above directory
        try:
            os.mkdir(directory_to_extract_to)
        except OSError as error:
            print(error)    
        
        # With each zipfile opened as a zipfile object
        with zipfile.ZipFile(path_to_zip_file,'r') as zip_ref:
            
            # Create a temporary DataFrame to store log information about zipfiles
            temp_df = pd.DataFrame(columns = ['zip_file','contents'])
            
            # Gather the contents within each zipfile
            temp_df['contents'] = zip_ref.namelist()
            
            # Label from which zipfile were the contents extracted from
            temp_df['zip_file'] = zip_file
            
            # Concatenate the log for specific opened zipfile with rest of logs
            zip_df = pd.concat([zip_df, temp_df]).reset_index(drop = True)
            
            # Extract all contents out of zipfile into specified directory
            zip_ref.extractall(directory_to_extract_to)

        print(f"Unzipped file {index + 1} of {len(zip_file_list)}.", end = '\r')
        
    return zip_df

In [167]:
def flatten_subfolders(folder_list):
    '''
    This function takes a list of folders and iterates over each folder to flatten any subfolders within it, then removes the empty subfolders

    Inputs
    ---
    folder_list: a list of folders containing subfolders

    Outputs
    ---
    printout of current flattening progress
    '''
    
    
    for index, folder in enumerate(folder_list):

        # Define main folder
        main_folder = 'data/' + folder
    
        # Identify all subfolders within the main folder
        subfolders = [folder.path for folder in os.scandir(main_folder) if folder.is_dir()]
    
        # Iterating through each subfolder
        for subfolder in subfolders:
    
            # Iterating through each file within each subfolder
            for file in os.listdir(subfolder):
                
                # Define origin filepath, i.e. the file within the subfolder to be moved
                origin = os.path.join(subfolder, file)
                
                # Define destination filepath, i.e. the main folder with all the other data
                destination = os.path.join(main_folder, file)
                
                # Move file from origin within subfolder out to main folder
                shutil.move(origin,destination)
    
            # Remove subfolder after all files have been moved
            shutil.rmtree(subfolder)
    
        print(f"Flattened subfolder {index + 1} of {len(folder_list)}.", end = '\r')

# Decompress the Raw Data

Before making any decisions in database architecture and design, it is imperitive to understand what the data will be used for. For this case, the Bixi bicycle ride data will be: 
-  combined with rides data from other companies to generate an analytics dashboard to monitor operations and membership program
-  used for perform time series analysis and machine learning to possibly predict bicycle needs across stations and times of day for better bicyle distribution

[Previously](https://andrewyewcy.com/Systematically-Web-Scrape-Multiple-Data-Files-from-Websites/), the rides data were web-scraped off from the Bixi website and stored as unpacked ZIP files in a single folder. A problem was that the granularity of data across years for station data was not consistent, causing difficulty in assigning station data to rides data. Thus, to remedy this, we will be starting from the ZIP files.

In [150]:
# Use web-scrape log files to identify the zip files previously web-scrapped
log_df = pd.read_csv('logs/log_df.csv')

# Visually examine the zip folders
zip_file_list = log_df.loc[:,'file_name'].to_list()
print_list(zip_file_list)

Item 1 / 16: biximontrealrentals2014-f040e0.zip
Item 2 / 16: biximontrealrentals2015-69fdf0.zip
Item 3 / 16: biximontrealrentals2016-912f00.zip
Item 4 / 16: biximontrealrentals2017-d4d086.zip
Item 5 / 16: biximontrealrentals2018-96034e.zip
Item 6 / 16: biximontrealrentals2019-33ea73.zip
Item 7 / 16: biximontrealrentals2020-8e67d9.zip
Item 8 / 16: 2021-donnees-ouvertes-464ae6.zip
Item 9 / 16: 20220104-stations-f82036.zip
Item 10 / 16: 20220105-donnees-ouvertes-0d544b.zip
Item 11 / 16: 20220106-donnees-ouvertes-f45195.zip
Item 12 / 16: 20220107-donnees-ouvertes-8aa623.zip
Item 13 / 16: 20220108-donnees-ouvertes-816bd4.zip
Item 14 / 16: 20220109-donnees-ouvertes-519d43.zip
Item 15 / 16: 20220110-donnees-ouvertes-5079e8.zip
Item 16 / 16: 20220111-donnees-ouvertes-e1c737.zip


The bicycle rides data were contained in 16 ZIP files, with the one file for each year except for the most recent 2022. To access the contents within each file, the defined `unzip_folder` function was used.

In [155]:
# Unzip all 16 folders
zip_df = unzip_folder(zip_file_list)

Unzipped file 16 of 16

Then all subfolders within each unzipped rides data folder were flattened to access the files easily.

In [168]:
# Define list of folders to perform flattening for: exclude ZIP files and hidden files
folder_list = [folder for folder in os.listdir('data/') if folder[-4:] != '.zip' and folder[0:1] != '.']

# Flatten all subfolders within the 16 unzipped folders
flatten_subfolders(folder_list)

Flattened subfolder 16 of 16.

In [169]:
# Visually examine flattened folders
print_list(folder_list)

Item 1 / 16: 2021-donnees-ouvertes-464ae6
Item 2 / 16: 20220104-stations-f82036
Item 3 / 16: 20220105-donnees-ouvertes-0d544b
Item 4 / 16: 20220106-donnees-ouvertes-f45195
Item 5 / 16: 20220107-donnees-ouvertes-8aa623
Item 6 / 16: 20220108-donnees-ouvertes-816bd4
Item 7 / 16: 20220109-donnees-ouvertes-519d43
Item 8 / 16: 20220110-donnees-ouvertes-5079e8
Item 9 / 16: 20220111-donnees-ouvertes-e1c737
Item 10 / 16: biximontrealrentals2014-f040e0
Item 11 / 16: biximontrealrentals2015-69fdf0
Item 12 / 16: biximontrealrentals2016-912f00
Item 13 / 16: biximontrealrentals2017-d4d086
Item 14 / 16: biximontrealrentals2018-96034e
Item 15 / 16: biximontrealrentals2019-33ea73
Item 16 / 16: biximontrealrentals2020-8e67d9


# Sample the Raw Data

Although data sampling can be done using Pandas in Python, Terminal and Bash were used instead to perform this sanity check. Bash with its simplicity is much faster than Pandas in counting rows and displays the data as plain text without any consideration of data types, making it the perfect tool for quickly examining large data.

In [None]:
# Peform row count for the year 2014 using terminal
# Run this for loop line by line in terminal
for file in *.csv
    do
    wc -l $file
    done

![check_lines](assets/images/013/001.png)

As seen above, there are roughly 3 million rides in the year 2014, meaning that the whole dataset will be roughly 27 million rows of data detailing the Bixi bicycle rides in Montreal over 9 years. The sheer number of rows means justifies the use of Python and MySQL over traditional spreadsheets as the latter are unable to handle the large data.

In [None]:
# Visually examine first 10 rows of rides data in the file OD_2014-10.csv within the year 2014
less -NS OD_2014-10.csv | head

![002](assets/images/013/002.png)

From above, the rides data contains 6 columns or features:
1. `start_date`: the datetime when a Bixi bicycle is checked out from a dock
2. `start_station_code`: the docking station where the Bixi bicycles was checked out from
3. `end_date`: the datetime when a Bixi bicycle is returned to a dock
4. `end_station_code`: the docking station where the checked out bicycle was returned to
6. `duration_sec`: the total time of the ride
7. `is_member`: the membership status of the customer who used the Bixi bicycle

![006](assets/images/013/006.png)

Repeating the procedure for bicycle docking stations data:
1. `code`: the code of the station, used to map station info to rides table
2. `name`: the human readable name of the station
3. `latitude`: the latitude of the station
4. `longitude`: the longitude of the station

# Designing Tables and Relationships

Now that we have a better understanding of the data, it is clear that at least two tables need to be created in the database: `rides` and `stations`.

The `rides` table will contain the data about each ride, and, is connected to the `stations` table using the `code` column. But, depending on time, the exact longitude and latitude of each docking station may change due to circumstances such as road repairs. This implies that the `stations` data for 2014 may not be applicable in 2015 and other years. In terms of impact, distance per ride calculations will change depending on latitude and longitude, and should be considered when deciding station locations.

Furthermore, as mentioned above, unlike all other years which had 1 `stations` table per year, the year 2022 had 1 `stations` table per month, complicating the granularity and thus the method of connecting the `rides` and `stations` tables consistently throughout the years.

To solve this issue of granularity, a third table is created, containing the details on how to join the `rides` and `stations` table for each year. This table is called a join table and can be generated by examining the contents of each folder. Each folder contains a file with the word `station` in the file name for the station data while the other files are ride data. Using this pattern a join table can be generated:

In [187]:
join_table = pd.DataFrame()

for index, folder in enumerate(folder_list):
    
    folder_contents = os.listdir('data/'+ folder)

    stations_files = [file for file in folder_contents if 'station' in file.lower()]

    rides_files = [file for file in folder_contents if file not in stations_files]

    temp_df = pd.DataFrame({'rides_files':rides_files})
    
    if len(stations_files) == 1:
        temp_df['station_files'] = stations_files[0]
    else:
        print("Check station file in folder {index + 1} : {folder}.")

    join_table = pd.concat(
        [join_table,temp_df],
        axis = 0,
    ).reset_index(drop = True)

    print(f'Processing folder {index+1} of {len(folder_list)}, folder: {folder}', end = '\r')

Processing folder 16 of 16, folder: biximontrealrentals2020-8e67d97

In [189]:
# Visually examine the join_table
join_table.head(15)

Unnamed: 0,rides_files,station_files
0,2021_donnees_ouvertes.csv,2021_stations.csv
1,20220104_donnees_ouvertes.csv,20220104_stations.csv
2,20220105_donnees_ouvertes.csv,20220105_stations.csv
3,20220106_donnees_ouvertes.csv,20220106_stations.csv
4,20220107_donnees_ouvertes.csv,20220107_stations.csv
5,20220108_donnees_ouvertes.csv,20220108_stations.csv
6,202209_deplacements.csv,202209_stations.csv
7,202210_deplacements.csv,202210_stations.csv
8,202211_deplacements.csv,202211_stations.csv
9,OD_2014-04.csv,Stations_2014.csv


From above, it was observed that each ride file is mapped to each station file, and cases like 2014 where many ride files map to a single station file were also recorded accurately.

In [179]:
folder_contents = os.listdir('data/'+ folder_list[0])

folder_contents.lower

['2021_donnees_ouvertes.csv', '2021_stations.csv']

## Installing `mysqlclient` on Python Docker Container

The [`SQL Alchemy`](https://www.sqlalchemy.org/) library was used to connect to the MySQL database. [`SQL Alchemy`](https://www.sqlalchemy.org/) allows the use of many different DBMS through Python, and is the prefered connection when using Pandas with SQL.

On top of `SQLAlchemy`, [`mysqlclient`](https://docs.sqlalchemy.org/en/20/dialects/mysql.html#module-sqlalchemy.dialects.mysql.mysqldb) is according to [documentation](https://docs.sqlalchemy.org/en/20/dialects/mysql.html#module-sqlalchemy.dialects.mysql.mysqldb).

`SQLAlchemy` was already included in the Docker image for [pyspark-notebook](https://hub.docker.com/r/jupyter/pyspark-notebook). `mysqlclient` was added to the container using `conda` within the Docker container.

In [None]:
# Using terminal, access the terminal within the running pyspark container
docker exec -it documents-pyspark-1 /bin/bash

Explaining the hashes above:
- `-it` stands for iteractive terminal
- `documents-pyspark-1` is the name of the running Docker container
- `/bin/bash` provides a terminal running Bash

Then, refering to instructions from `conda` [docs](https://anaconda.org/conda-forge/mysqlclient), `mysqlclient` can be installed in the container.

In [None]:
# Install mysqlclient using conda inside the container
conda install -c conda-forge mysqlclient

# exit the terminal within the container
exit

## Connect to MySQL Docker Container

In [62]:
# to create the engine that connects Python and MySQL
import sqlalchemy

# Create the connection engine
engine = sqlalchemy.create_engine(
    "mysql+mysqldb://root:rootroot@mysql:3306"
)

- `mysql+mysqldb` are the drivers from `mysqlclient`
- `root:rootroot` stands for the username:password
- `@mysql:3306` refers to port 3306 in the `mysql` Docker container

Note that the `with` statement is used when establishing connections to MySQL as it ensures that the connection is automatically closed once the SQL query is executed.

In [64]:
# https://stackoverflow.com/questions/22689895/list-of-databases-in-sqlalchemy
# Print existing databases
insp = sqlalchemy.inspect(engine)
databases = insp.get_schema_names()

for index, database in enumerate(databases):
    print(f"Database {index + 1} / {len(databases)}: {database}")

Database 1 / 5: information_schema
Database 2 / 5: mysql
Database 3 / 5: performance_schema
Database 4 / 5: sys
Database 5 / 5: test_db


## Create A Database

In [65]:
# Create a database to store bicycle rides data
with engine.connect() as conn:

    # Creates a DATABASE if it doesn't exist
    sql_stmt = sqlalchemy.text("""CREATE DATABASE IF NOT EXISTS velocipede""")
    
    conn.execute(sql_stmt)

In [66]:
# Check if new database was created
inspection = sqlalchemy.inspect(engine)
databases = inspection.get_schema_names()

for index, database in enumerate(databases):
    print(f"Database {index + 1} / {len(databases)}: {database}")

Database 1 / 6: information_schema
Database 2 / 6: mysql
Database 3 / 6: performance_schema
Database 4 / 6: sys
Database 5 / 6: test_db
Database 6 / 6: velocipede


## Create `rides` Table

In [73]:
# https://stackoverflow.com/questions/6473925/sqlalchemy-getting-a-list-of-tables
# Redefine new engine to point to newly created database
engine = sqlalchemy.create_engine(
    "mysql+mysqldb://root:rootroot@mysql:3306/velocipede"
)

# Check tables within 
inspection = sqlalchemy.inspect(engine)
tables = inspection.get_table_names()

print_list(tables)

Passed list is empty.


In [131]:
with engine.connect() as conn:
    sql_stmt = sqlalchemy.text("""DROP TABLE IF EXISTS rides""")
    conn.execute(sql_stmt)

In [132]:
# Create new table
with engine.connect() as conn:

    sql_stmt = text(
        """
        CREATE TABLE IF NOT EXISTS rides (
            # columns regarding rides, assume each ride unique given no ride_id provided
            ride_id			BIGINT			NOT NULL AUTO_INCREMENT,
        	start_date 		DATETIME 		NOT NULL,
        	start_stn_code 	VARCHAR(255)	NOT NULL,
        	end_date 		DATETIME 		NOT NULL,
        	end_stn_code 	VARCHAR(255) 	NOT NULL,
            duration_sec	INT 			NOT NULL,
        	is_member 		BOOLEAN,
            company			VARCHAR(255)	NOT NULL,
            timezone        VARCHAR(255)    NOT NULL,

            # columns for data maintenance
            data_source		VARCHAR(255)	NOT NULL,
            date_added		DATETIME 		NOT NULL DEFAULT CURRENT_TIMESTAMP,
            flag			VARCHAR(255),

            # Each ride is unique
            CONSTRAINT ride_uid UNIQUE (ride_id)
            )    
        """
    )

    conn.execute(sql_stmt)

In [133]:
# Check tables within 
inspection = sqlalchemy.inspect(engine)
tables = inspection.get_table_names()

print_list(tables)

Item 1 / 2: rides
Item 2 / 2: stations


In [134]:
columns = inspection.get_columns('rides')

print_list(columns)

Item 1 / 12: {'name': 'ride_id', 'type': BIGINT(), 'default': None, 'comment': None, 'nullable': False, 'autoincrement': True}
Item 2 / 12: {'name': 'start_date', 'type': DATETIME(), 'default': None, 'comment': None, 'nullable': False}
Item 3 / 12: {'name': 'start_stn_code', 'type': VARCHAR(length=255), 'default': None, 'comment': None, 'nullable': False}
Item 4 / 12: {'name': 'end_date', 'type': DATETIME(), 'default': None, 'comment': None, 'nullable': False}
Item 5 / 12: {'name': 'end_stn_code', 'type': VARCHAR(length=255), 'default': None, 'comment': None, 'nullable': False}
Item 6 / 12: {'name': 'duration_sec', 'type': INTEGER(), 'default': None, 'comment': None, 'nullable': False, 'autoincrement': False}
Item 7 / 12: {'name': 'is_member', 'type': TINYINT(display_width=1), 'default': None, 'comment': None, 'nullable': True, 'autoincrement': False}
Item 8 / 12: {'name': 'company', 'type': VARCHAR(length=255), 'default': None, 'comment': None, 'nullable': False}
Item 9 / 12: {'name':

## Create `stations` table

In [79]:
# Create new table
with engine.connect() as conn:

    sql_stmt = text(
        """
        CREATE TABLE IF NOT EXISTS stations (
            # columns regarding rides, assume each ride unique given no ride_id provided
            stn_id			BIGINT			NOT NULL AUTO_INCREMENT,
        	stn_code 	    VARCHAR(255)	NOT NULL,
            stn_name        VARCHAR(255)	NOT NULL,
            stn_lat         DECIMAL(7,5)    NOT NULL,
            stn_lon         DECIMAL(7,5)    NOT NULL,
            company			VARCHAR(255)	NOT NULL,

            # columns for data maintenance
            data_source		VARCHAR(255)	NOT NULL,
            date_added		DATETIME 		NOT NULL DEFAULT CURRENT_TIMESTAMP,
            flag			VARCHAR(255),

            # Each ride is unique
            CONSTRAINT stn_uid UNIQUE (stn_id)
            )    
        """
    )

    conn.execute(sql_stmt)

In [80]:
# Check tables within 
inspection = sqlalchemy.inspect(engine)
tables = inspection.get_table_names()

print_list(tables)

Item 1 / 2: rides
Item 2 / 2: stations


In [81]:
columns = inspection.get_columns('stations')

print_list(columns)

Item 1 / 9: {'name': 'stn_id', 'type': BIGINT(), 'default': None, 'comment': None, 'nullable': False, 'autoincrement': True}
Item 2 / 9: {'name': 'stn_code', 'type': VARCHAR(length=255), 'default': None, 'comment': None, 'nullable': False}
Item 3 / 9: {'name': 'stn_name', 'type': VARCHAR(length=255), 'default': None, 'comment': None, 'nullable': False}
Item 4 / 9: {'name': 'stn_lat', 'type': DECIMAL(precision=7, scale=5), 'default': None, 'comment': None, 'nullable': False}
Item 5 / 9: {'name': 'stn_lon', 'type': DECIMAL(precision=7, scale=5), 'default': None, 'comment': None, 'nullable': False}
Item 6 / 9: {'name': 'company', 'type': VARCHAR(length=255), 'default': None, 'comment': None, 'nullable': False}
Item 7 / 9: {'name': 'data_source', 'type': VARCHAR(length=255), 'default': None, 'comment': None, 'nullable': False}
Item 8 / 9: {'name': 'date_added', 'type': DATETIME(), 'default': 'CURRENT_TIMESTAMP', 'comment': None, 'nullable': False}
Item 9 / 9: {'name': 'flag', 'type': VARCH

# Populate the Tables

In [86]:
data_file = 'data/biximontrealrentals2014-f040e0/OD_2014-04.csv'

In [83]:
# https://stackoverflow.com/questions/25962114/how-do-i-read-a-large-csv-file-with-pandas
df = pd.read_csv('data/biximontrealrentals2014-f040e0/OD_2014-04.csv')

In [90]:
dtype = {
    'start_station_code' : 'string',
    'end_station_code'   : 'string',
    'duration_sec'       : 'int',
    'is_member'          : 'boolean'
}

In [127]:
df = pd.read_csv(
    data_file,
    dtype = dtype,
    parse_dates = ['start_date','end_date']
)

In [126]:
df.head()

Unnamed: 0,start_date,start_station_code,end_date,end_station_code,duration_sec,is_member
0,2014-04-15 00:01,6209,2014-04-15 00:18,6436,1061,True
1,2014-04-15 00:01,6214,2014-04-15 00:11,6248,615,True
2,2014-04-15 00:01,6164,2014-04-15 00:18,6216,1031,True
3,2014-04-15 00:01,6214,2014-04-15 00:24,6082,1382,True
4,2014-04-15 00:02,6149,2014-04-15 00:08,6265,347,True


In [129]:
tz = pytz.timezone('Canada/Eastern')
dti = pd.to_datetime(df['start_date'], format = "%Y-%m-%d %H:%M").dt.tz_localize(tz = tz)

dti#.tz_localize(pytz.timezone('EST5EDT'))

0        2014-04-15 00:01:00-04:00
1        2014-04-15 00:01:00-04:00
2        2014-04-15 00:01:00-04:00
3        2014-04-15 00:01:00-04:00
4        2014-04-15 00:02:00-04:00
                    ...           
108259   2014-04-30 23:53:00-04:00
108260   2014-04-30 23:54:00-04:00
108261   2014-04-30 23:55:00-04:00
108262   2014-04-30 23:58:00-04:00
108263   2014-04-30 23:59:00-04:00
Name: start_date, Length: 108264, dtype: datetime64[ns, Canada/Eastern]

In [121]:
dti = dti.dt.tz_convert(tz = pytz.timezone('UTC'))

dti

0        2014-04-15 04:01:00+00:00
1        2014-04-15 04:01:00+00:00
2        2014-04-15 04:01:00+00:00
3        2014-04-15 04:01:00+00:00
4        2014-04-15 04:02:00+00:00
                    ...           
108259   2014-05-01 03:53:00+00:00
108260   2014-05-01 03:54:00+00:00
108261   2014-05-01 03:55:00+00:00
108262   2014-05-01 03:58:00+00:00
108263   2014-05-01 03:59:00+00:00
Name: start_date, Length: 108264, dtype: datetime64[ns, UTC]

In [125]:
dti.dt.tz_convert(tz = None)

0        2014-04-15 04:01:00
1        2014-04-15 04:01:00
2        2014-04-15 04:01:00
3        2014-04-15 04:01:00
4        2014-04-15 04:02:00
                 ...        
108259   2014-05-01 03:53:00
108260   2014-05-01 03:54:00
108261   2014-05-01 03:55:00
108262   2014-05-01 03:58:00
108263   2014-05-01 03:59:00
Name: start_date, Length: 108264, dtype: datetime64[ns]

In [116]:
[zone for zone in pytz.all_timezones if 'UTC' in zone]

['Etc/UTC', 'UTC']

In [100]:
import pytz

In [139]:
# Define data types
dtype = {
    'start_date'         : 'string',
    'start_station_code' : 'string',
    'end_date'           : 'string',
    'end_station_code'   : 'string',
    'duration_sec'       : 'int',
    'is_member'          : 'boolean'
}

# Define list of columns
columns = [
    'start_date',
    'start_station_code',
    'end_date',
    'end_station_code',
    'duration_sec',
    'is_member'
]

montreal_tz = pytz.timezone('Canada/Eastern')
utc_tz = pytz.timezone('UTC')

for index, chunk in enumerate(
    pd.read_csv(
        data_file,
        dtype = dtype,
        usecols = columns,
        chunksize = 10000
    )
):
    
    chunk.rename(
        {
            'start_station_code':'start_stn_code',
            'end_station_code':'end_stn_code'
        },
        axis = 'columns',
        inplace = True
    )

    for col in ['start_date', 'end_date']:
        dti = pd.to_datetime(chunk[col], format = "%Y-%m-%d %H:%M")
        # Add to Montreal timezone
        dti = dti.dt.tz_localize(tz = montreal_tz)
        dti = dti.dt.tz_convert(tz = utc_tz)
        chunk.loc[:,col] = dti.dt.tz_convert(tz = None)

    chunk['company'] = 'Bixi'
    chunk['timezone'] = 'Canada/Eastern'
    chunk['data_source'] = 'OD_2014-04.csv'

    with engine.connect() as conn:
        chunk.to_sql(
            'rides',
            con = conn,
            schema = 'velocipede',
            if_exists = 'append',
            index = False
        )

    print(f"Completed chunk {index + 1}", end = "\r")

Completed chunk 11

In [142]:
with engine.connect() as conn:
    df = pd.read_sql("SELECT COUNT(*) FROM rides LIMIT 5", conn)

In [143]:
df

Unnamed: 0,COUNT(*)
0,108264


In [None]:
for index, chunk in enumerate(
    pd.read_csv(
        data_file,
        dtype = dtype,
        usecols = columns
    )
):

    print(chunk.head

## Joining the two tables

In [46]:
with engine.connect() as conn:
    query = """SELECT * FROM velocipede.bixi"""
    df = pd.read_sql(query, conn)

In [58]:
df.T

start_date
start_station_code
end_date
end_station_code
duration_sec
is_member
data_source
data_added


In [None]:
CREATE TABLE IF NOT EXISTS rides (
	start_date 		DATETIME 	NOT NULL,
	start_stn_code 	TEXT 		NOT NULL,
	end_date 		DATETIME 	NOT NULL,
	end_stn_code 	TEXT 		NOT NULL,
    duration_sec	INT 		NOT NULL,
	is_member 		BOOLEAN,
    data_source		TEXT		NOT NULL,
    date_added		DATETIME 	NOT NULL DEFAULT CURRENT_TIMESTAMP,
    company			TEXT		NOT NULL
    )
    