In [1]:
import os
import sqlite3
import pandas as pd
import numpy as np
import os
from pathlib import Path
import glob
import re
from io import BytesIO
import bz2
from datetime import datetime, time
from sql_queries import create_table_queries, drop_table_queries, planes_table_insert,  carriers_table_insert, airports_table_insert, on_time_table_insert
import boto3

In [2]:
def timeFloatToDatetime(timeFloat):
    if pd.notna(timeFloat):        
        hour = int(timeFloat // 100) % 24
        minute = int(timeFloat % 100)
        
        return time(hour = hour, minute = minute)
    return timeFloat

In [3]:
def initialise_db():
    db_filename = "airline2.db"
        
    # remove the db file if it exists
    try:
        os.remove(db_filename)
    except FileNotFoundError:
        pass
    
    # create database and open connection
    conn = sqlite3.connect(db_filename)
    cur = conn.cursor()
    
    return cur, conn

In [4]:
def drop_tables(cur, conn):
    """
    Drops each table using the queries in `drop_table_queries` list.
    """
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()

In [5]:
def create_tables(cur, conn):
    """
    Creates each table using the queries in `create_table_queries` list. 
    """
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()

In [6]:
def process_csv_file(file_name, cur, insert_query):
    df = pd.read_csv(file_name)
    cur.executemany(insert_query, df.values.tolist())

In [7]:
def process_bz2_file(file_name, cur, insert_query):
    with bz2.open(file_name, "rb") as f:
        data = f.read()
        df = pd.read_csv(BytesIO(data), encoding='latin_1')
        cur.executemany(insert_query, df.values.tolist())
        
#         df.DepTime = df.DepTime.apply(timeFloatToDatetime)
#         df.ArrTime = df.ArrTime.apply(timeFloatToDatetime)
#         df.CRSDepTime = df.CRSDepTime.apply(timeFloatToDatetime)
#         df.CRSArrTime = df.CRSArrTime.apply(timeFloatToDatetime)
        
#         for _, row in df.iterrows():
#             record_list.append(row)
#         cur.executemany(insert_query, record_list)
#             cur.execute(insert_query, list(row))

In [8]:
def process_data(cur, conn):
    """
    Description: This function is responsible for listing the files in a directory,
    and then executing the ingest process for each file according to the function
    that performs the transformation to save it to the database.

    Arguments:
        cur: the cursor object.
        conn: connection to the database.

    Returns:
        None
    """
    data_directory = str(Path.home()) + "/Desktop/University/data-science-and-business-analytics/programming-for-data-science/dataverse_files/"
    
    # get all files matching extension from directory
    all_files = []
    for root, dirs, files in os.walk(data_directory):
        files = glob.glob(os.path.join(root,'*'))
        for f in files :
            all_files.append(os.path.abspath(f))

    # iterate over files and process
    for i, datafile in enumerate(all_files, 1):
        if datafile.endswith('plane-data.csv'):
            process_csv_file(datafile, cur, planes_table_insert)
            conn.commit()
            print('Planes file upload successful')
        elif datafile.endswith('carriers.csv'):
            process_csv_file(datafile, cur, carriers_table_insert)
            conn.commit()
            print('Carriers file upload successful')
        elif datafile.endswith('airports.csv'):
            process_csv_file(datafile, cur, airports_table_insert)
            conn.commit()
            print('Airports file upload successful')
        else:
            match = re.search(r'200(0|1|2|3|4|5){1}.csv.bz2$', datafile)

            if match:
                process_bz2_file(datafile, cur, on_time_table_insert)
                conn.commit()
                print("Flight file upload successful")

In [9]:
def aws_connect():
    """
    Description: This function is responsible for establishing the interface with the AWS S3 bucket where the database
    file should be stored.

    Arguments:
        None

    Returns:
        Boto3 client and resource objects
    """
    access_keys = get_aws_access_keys()
    
    # Creating the low level functional client
    client = boto3.client(
        's3',
        aws_access_key_id = access_keys['AWSAccessKeyId'],
        aws_secret_access_key = access_keys['AWSSecretKey'],
        region_name = 'eu-central-1'
    )

    # Creating the high level object oriented interface
    resource = boto3.resource(
        's3',
        aws_access_key_id = access_keys['AWSAccessKeyId'],
        aws_secret_access_key = access_keys['AWSSecretKey'],
        region_name = 'eu-central-1'
    )
    
    return client, resource

In [10]:
def get_aws_access_keys():
    """
    Description: This function retrieves access keys for the AWS S3 bucket where the database file is to be stored.
    

    Arguments:
        None

    Returns:
        Dictionary containing values for AWSAccessKeyId and AWSSecretKey
    """
    file_name = data_directory = str(Path.home()) + "/Desktop/University/data-science-and-business-analytics/programming-for-data-science/st2195_assignment_3/rootkey.csv"
    key_dictionary = {}
    with open(file_name, "r") as file:
        for line in file.readlines():
            key_dictionary[line.split("=")[0].strip()] = line.split("=")[1].strip()
    
    return key_dictionary

In [11]:
def upload_dbfile(filename):
    """
    Description: This function is responsible for uploading a given database file to the designated AWS S3 bucket.
    Once succesfully uploaded, the local file is deleted.

    Arguments:
        filename (string): name of the database file (format-agnostic)

    Returns:
        None
    """
    # generate AWS interface objects
    client, resource = aws_connect()
    
    try:   
        # open the file and upload to S3 bucket
        with open(filename, 'rb') as data:
            client.upload_fileobj(data, 'flights-db', 'db-file')

        # delete the local file
        os.remove(filename)
    except ClientError as e:
        print("Error uploading database file")

In [12]:
# initialize a new db
cur, conn = initialise_db()
create_tables(cur, conn)

process_data(cur, conn)

conn.close()

upload_dbfile('airline2.db')

26 files found in /Users/bastienwinant/Desktop/University/data-science-and-business-analytics/programming-for-data-science/dataverse_files/
9/26 files processed.
10/26 files processed.
12/26 files processed.
19/26 files processed.
23/26 files processed.
24/26 files processed.
