# 3. Data Pipeline ETL for pushing it into an AWS RDS database 


In [None]:
#Importing libraries and credentials to access AWS.

In [1]:
import json
f = open("/credentials.json")
credentials = json.load(f)

file_path = credentials['file_path']
aws_key = credentials['aws_access_key']
aws_secret_key = credentials['aws_secret_key']
rds_host = credentials['rds_host']
rds_user = credentials['rds_user']
rds_password = credentials['rds_password']
rds_database = credentials['rds_database']
rds_charset = credentials['rds_charset']


In [2]:
# Import libraries

from __future__ import print_function
import time

ApiClient().configuration.api_key['api_key'] = aws_key

# Import the usual Python libraries

from tqdm.notebook import tqdm, trange  # to be used to track progress in loop iterations
import pandas as pd
import numpy as np

# Import Zip file libraries

from zipfile import ZipFile
from io import BytesIO
import urllib.request as urllib2

# Import the AWS libraries

import boto3
from boto3.s3.transfer import TransferConfig
from boto3.s3.transfer import S3Transfer
import io
import pyarrow as pa
import pyarrow.parquet as pq

# Declare the local file path to be used for saving CSV outputs.

global my_path
my_path = file_path



## Extract the Stocks data

Next, after downloading the data, roll them up into a single dataframe for transformation into the shape we need for the DB table. 

In [3]:
# Assemble stocks history into one dataframe.

def assemble_bulk_history():
    
    global df_history

    history = np.empty([0,29])

    for X in tqdm(range(1, 2)):

        file_path = my_path + "/" + "MSFT" + str(X) + ".csv"
        data = pd.read_csv (file_path, low_memory = False)
        data = np.array(data.values)
        history = np.concatenate((volume, data), axis=0)

    # Convert data array to dataframe and do some cleanup

    df_history = pd.DataFrame(data = history, columns = ['date', 'open', 'high', 'low', 'close', 'adj_close','volume'])

    # Make sure Date column is in DateTime format, 

    df_history['date'] = pd.to_datetime(df_history['date'])
    df_history = df_history.sort_values(by=['volume', 'date'])

    print("history files assembled.")    
    print("The shape of the history dataframe is ", df_history.shape)
    
    return df_history


# For backup or archive purposes, save the final dataframe to CSV and/or parquet files and push them to AWS S3.

In [5]:
# Create the low level functional AWS client

def push_data_to_S3(df_history_complete_load):
    
    client = boto3.client(
        's3',
        aws_access_key_id = aws_key,
        aws_secret_access_key = aws_secret_key,
        region_name = 'us-east-1'
    )

    # Export the history dataframe to a zipped CSV file then push to AWS S3.
    compression_opts = dict(method='zip', archive_name='df_full_history_complete_load.csv') 
    df_volume_history_complete_load.to_csv(path_or_buf = my_path + "/MSFT.csv", index=False, compression=compression_opts)
    client.upload_file(my_path + "/MSFT.csv")


    # Write parquet file to local drive, then push to AWS S3.
    local_file = my_path + "/df_full_history_complete_load.parquet"
    stock_table = pa.Table.from_pandas(df_volume)
    pq.write_table(stock_table, local_file)
    client.upload_file(local_file,"data-historical/parquet_files/df_full_history_complete_load.parquet")

    print("Data saved to S3 in zipped CSV and parquet.")
    S3_push_status = "Done."
    
    return S3_push_status


In [6]:

# file_path = my_path + "/" + "df_history_complete.zip"
# df_history_complete_load = pd.read_csv (file_path, low_memory=False)
# df_history_complete_load.shape

# Finally, create the MySQL history table in RDS and push the history data into the table.

In [8]:
# Import SQL libraries

def create_and_fill_RDS_table(df_history_complete_load):

    import mysql.connector 
    from mysql.connector import errorcode

    from sqlalchemy import create_engine

    # Establish the MySQL connection

    connection = mysql.connector.connect(host=rds_host,
                                 user=rds_user, 
                                 password=rds_password, 
                                 database=rds_database,
                                 charset=rds_charset)

    mycursor = connection.cursor()

    # Create the data table in MySQL with MySQL Connector library

    create_data_history_table = """
    CREATE TABLE IF NOT EXISTS `data_historical` (
    `key_id` varchar(30) NOT NULL,
    `date` datetime NOT NULL,
    `open` float NULL,
    `high` float NULL,
    `low` float NULL,
    `close` float NULL,
    `adj_close` float NULL,
    `volume` float NULL,
    `PRIMARY KEY (key_id)
    ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
    """

    mycursor.execute(create_data_history_table)

    connection.commit()

    print("The data_historical table is created in RDS.")


    # Push the final dataframe into SQL DB on AWS RDS.

    df = df_history_complete_load.copy()

    # Set SQLAlchemy database credentials.
    creds = {'usr': rds_user,
             'pwd': rds_password,
             'hst': rds_host,
             'prt': 3306,
             'dbn': rds_database}

    # MySQL conection string.
    connstr = 'mysql+mysqlconnector://{usr}:{pwd}@{hst}:{prt}/{dbn}'

    # Create sqlalchemy engine for MySQL connection.
    engine = create_engine(connstr.format(**creds))

    # Write DataFrame to MySQL using the engine (connection) created above.
    chunk = int(len(df) / 1000)
    df.to_sql(name='data_historical', 
                                          con=engine, 
                                          if_exists='replace', 
                                          chunksize=chunk,
                                          index=False)

    print("The history data is loaded and the indexes are set.")
    rds_table_status = "Done."
    
    return rds_table_status


# Run the ETL process.

assemble_bulk_history
push_data_to_S3(df_history_complete_load)
create_and_fill_RDS_table(df_history_complete_load)
