#### Import all the required libraries

1. configparser: helps to read from the configuration file and extract the information needed to create a connection to the MySQL database<br/><br/>


2. sqlalchemy: helps to create a connection to the MySQL database of our interest.<br/><br/>


3. pandas: helps to read the tables in the MySQL database of our interest to a DataFrame.<br/><br/>


4. pandas_gbq: helps to read big query tables into DataFrame and vice-versa.

In [1]:
import configparser

from sqlalchemy import create_engine

import pandas as pd

import pandas_gbq

from datetime import datetime

create an instance of ConfigParser and use the instance to read the 'configuration file.cfg' file and get the information such as hostname, port, username, dbname and password in the 'mysql_config' section needed to make a connection to the MySQL database

In [2]:
parser = configparser.ConfigParser()

parser.read('configuration file.cfg')

hostname = parser.get('mysql_config', 'hostname')

port = parser.get('mysql_config', 'port')

username = parser.get('mysql_config', 'username')

dbname = parser.get('mysql_config', 'database')

password = parser.get('mysql_config', 'password')

read the latest_timestamp table in BigQuery into a DataFrame called latest_timestamp_bq_df

In the first run of the cell below, the DataFrame will be empty because nothing has been stored in the latest_timestamp table in BigQuery. Subsequent run of the cell will have DataFrame with a single value in the orderTimestamp column.

In [3]:
query_string = 'SELECT * FROM `dummy-surveillance-project.demo.latest_timestamp` '

latest_timestamp_bq_df = pandas_gbq.read_gbq(query_string, project_id = 'dummy-surveillance-project')

Downloading: 100%|[32m██████████[0m|


the cell below will access values at index 0 of the 'orderTimestamp' column in the 'latest_timestamp_bq_df' DataFrame.

However on the first run because there is no value in the 'orderTimestamp' column in the 'latest_timestamp_bq_df' DataFrame, it will result in an IndexError. To handle this, the code uses a try-except block, and assigns a default value of '1900-01-01 00:00:00' to 'latest_timestamp_value' if an IndexError occurs.

In [4]:
try:
    
    latest_timestamp_value = latest_timestamp_bq_df['orderTimestamp'][0].strftime('%Y-%m-%d %H:%M:%S')
    
except IndexError:
    
    latest_timestamp_value = '1900-01-01 00:00:00'

1. create a connection called sqlEngine to the MySQL database named 'demo'<br/><br/>


2. read the table orders into a DataFrame named orders_mysql_df with the condition that only data records with orderTimestamp greater than the latest_timestamp should be read

In [5]:
sqlEngine = create_engine(f'mysql://{username}:{password}@{hostname}:{port}/{dbname}')

orders_mysql_df = pd.read_sql(f"SELECT * FROM orders WHERE orderTimestamp > '{latest_timestamp_value}';", con = sqlEngine)

1. create a BigQuery table schema called bq_table_schema which is used to serve as the data structure of the table to be created in the BigQuery <br/><br/>

2. create a BigQuery table schema called bq_table_schema2 which is used to validate data structure of the data to be inserted in the latest_timestamp table in BigQuery

In [6]:
bq_table_schema = [
    
    {'name':'orderId', 'type':'INTEGER'},
    
    {'name':'orderValue', 'type':'FLOAT'},
    
    {'name':'Timestamp', 'type':'TIMESTAMP'}
    
]

bq_table_schema2 = [{'name':'Timestamp', 'type':'TIMESTAMP'}]

store the orders_mysql_df DataFrame to 'demo.orders' (demo is the dataset name and orders is the table's name to be created in BigQuery). It also checks if the table exist before and if it does, it appends the data to the existing table.

In [7]:
orders_mysql_df.to_gbq('demo.orders', if_exists = 'append', table_schema = bq_table_schema, 
          
          project_id = 'dummy-surveillance-project')

100%|██████████| 1/1 [00:00<?, ?it/s]


1. read the table orders into a DataFrame named order_Timestamp_max_mysql_df with a single value which is the maximum value in the orderTimestamp column in table orders<br/><br/> 

2. dispose the sqlEngine connection

In [8]:
order_Timestamp_max_mysql_df = pd.read_sql('SELECT MAX(orderTimestamp) orderTimestamp FROM orders', con = sqlEngine)

sqlEngine.dispose()

store the order_Timestamp_max_mysql_df DataFrame to 'demo.latest_timestamp' (demo is the dataset name and latest_timestamp is the table's name to be created in BigQuery). It also checks if the table exist before and if it does, it overwrite the table.

In [9]:
order_Timestamp_max_mysql_df.to_gbq('demo.latest_timestamp', if_exists = 'replace', table_schema = bq_table_schema2, 
           
           project_id = 'dummy-surveillance-project')

100%|██████████| 1/1 [00:00<00:00, 460.46it/s]
