In [1]:
# import libraries
import requests
import tarfile
import pandas as pd
import mysql.connector as msql

In [2]:

def download_file(url):
    response = requests.get(url)
    with open("tolldata.tgz", "wb") as file:
        file.write(response.content)

def unzip_data(zipfile):
    with tarfile.open(zipfile, "r:gz") as tar:
        tar.extractall()
        
def extract_from_csv(file):
    column_names = ['Rowid', 'Timestamp', 'Anonymized Vehicle number', 'Vehicle type', 'Number of axles','Vehicle code']
    df = pd.read_csv(file, names= column_names)
    df.drop(['Number of axles','Vehicle code'], axis=1, inplace=True)
    return df

def extract_from_tsv(file):
    column_names = ['Rowid', 'Timestamp', 'Anonymized Vehicle number', 'Vehicle type', 'Number of axles','Tollplaza id', 'Tollplaza code']
    df = pd.read_table(file, names = column_names)
    df.drop(['Rowid', 'Timestamp', 'Anonymized Vehicle number', 'Vehicle type'], axis=1, inplace=True)
    return df
    
def extract_from_fixed_width_file(file):
    df = pd.read_fwf(file, delimiter=' ', header=None)
    df = df[[9,10]]
    df.rename(columns = {9:'Type of Payment code', 10:'Vehicle Code'}, inplace = True)
    return df

def combine_dataframe(df1,df2,df3):
    df = pd.concat([df1, df2, df3], axis=1)
    return df
    
def transform_data(df):
    # change vehicle type to uppercase 
    df["Vehicle type"] = df["Vehicle type"].apply(lambda x: x.upper())
    
    # rename the columns
    df.rename(columns={'Rowid':'row_id','Timestamp':'timestamp','Anonymized Vehicle number':'anonymized_vehicle_number','Vehicle type':'vehicle_type',
                        'Number of axles':'number_of_axles','Tollplaza id':'tollplaza_id','Tollplaza code':'tollplaza_code',
                        'Type of Payment code':'type_of_payment_code','Vehicle Code':'vehicle_code'}, inplace=True)
    
    # convert to the correct datatype
    df[['anonymized_vehicle_number','tollplaza_id']] = df[['anonymized_vehicle_number','tollplaza_id']].astype(str)
    df['timestamp'] = df.timestamp.astype('datetime64[ns]')
    
    return df
    
def load_data_to_mysql_db(df):
    try:
        # create MySQL connection
        conn = msql.connect(host="localhost",user ="root",password="<password>")

        # create database
        cursor = conn.cursor()
        cursor.execute("CREATE DATABASE IF NOT EXISTS traffic_data")
        print("Database was created")


        # connect to database
        try:
            db_config = {
            "host": "localhost",
            "user": "root",
            "password": "<password>",
            "database": "traffic_data"
            }

            # Create a MySQL connection
            conn = msql.connect(**db_config)
            cursor = conn.cursor()
            print("Connect to DB successfully")

            # create table
            try:
                cursor.execute("CREATE TABLE IF NOT EXISTS toll_data \
                               (row_id int(10) PRIMARY KEY, timestamp datetime, anonymized_vehicle_number varchar(25),\
                                vehicle_type varchar(10), number_of_axles int(10), tollplaza_id varchar(10), \
                                tollplaza_code varchar(10), type_of_payment_code varchar(10), vehicle_code varchar(10))")
                print("Table was created")
              
                # loop through the dataframe
                print("Inserting data into table...")
                for i,row in df.iterrows(): 
                    sql = "INSERT INTO traffic_data.toll_data VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)"
                    
                    # insert rows into table
                    cursor.execute(sql, tuple(row))
                    conn.commit() # commit to save changes
                print(f"{i+1} rows were inserted")
                              
            except msql.Error as err:
                print(f"Error: {err}")

        except msql.Error as err:
            print(f"Error: {err}")

    except msql.Error as err:
        print(f"Error: {err}")
    
    
    finally:
        if 'conn' in locals() and conn.is_connected():
            cursor.close()
            conn.close()
            print("MySQL connection closed.")


### 1. Download file

In [3]:
url = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/tolldata.tgz"

download_file(url)

### 2. Unzip file

In [4]:
file = "tolldata.tgz"

unzip_data(file)

### 3. Extraction

In [5]:
df1 = extract_from_csv('vehicle-data.csv')
df2 = extract_from_tsv('tollplaza-data.tsv')
df3 = extract_from_fixed_width_file('payment-data.txt')

### 4. Combine all 3 dataframe

In [6]:
combined_df = combine_dataframe(df1,df2,df3)

### 5. Transformation

In [7]:
transformed_df = transform_data(combined_df)

### 6. Load

In [11]:
load_data_to_mysql_db(transformed_df.head(50))

Database was created
Connect to DB successfully
Table was created
Inserting data into table...
50 rows were inserted
MySQL connection closed.
