# EXTRACT 
## Fetching Data

In [1]:
""" Load in modules we will need """
import os
import gc
import pymssql
import numpy as np 
import pandas as pd
from dotenv import load_dotenv
load_dotenv()
pd.set_option('display.float_format', lambda x: '%.3f' % x)

#### "Reading" from Bronze Layer 

In [2]:
"""
Retrieve all the csv file names from a given path

Parameters: 
path (string)- Path we are looking through

Returns: a list of .csv files
"""
def grab_files(path: str) -> list:
    return [file for file in os.listdir(path) if file.endswith(".csv")]

"""
Creates DataFrames for all data of a specific entity in "Bronze layer"

Parameters: 
path (string) - The "container" for the data 

Returns: a list of .csv files
"""
def gen_dfs(ospath = 'BRONZE_LAYER', path:str =''):    
    dfs = []
    try:
        data_dir = f'{os.environ[ospath]}{path + "/" if path != "" else path}'
        files = grab_files(data_dir)
        for f in files:
            df = pd.read_csv(data_dir+f)
            df.name = f.split('__')[1]
            dfs.append(df)
    except FileNotFoundError:
        print("Error with pathing")
    except:
        print('Error Occurred while extracting data from the file')
    return dfs        #create the dataframe


# TRANSFORM
## Cleaning Functions

In [3]:
"""
Will Attempt to Drop all declared columns from a DataFrame

Parameters: 
df - a singular Data Frame
cols - a list of strings containing column attributes to be dropped

Returns: Nothing
"""
def drop_dfcols(df, cols:list):
    try:
        df.drop(columns = cols, inplace = True)
    except KeyError:
        print(f'fail to remove{cols}')

"""
Will Attempt to Drop all declared columns from a List of DataFrame

Parameters: 
df - a List of Data Frame
cols - a list of strings containing column attributes to be dropped 

Returns: Nothing
"""
def drop_dfs_col(dfs, cols: list):
    try:
        for df in dfs:
            drop_dfcols(df,cols) 
    except KeyError:
        print('no columns left')
    


In [4]:
"""
Remove duplicate records from the DataFrame
"""
def rm_dupe(df):
    df.drop_duplicates(inplace= True)

"""
Drops the columns that have null vals
or we fill them will default values if we want
"""
def rm_nulls(df, col, default_val = None):
    if default_val:
        df[col].fillna(default_val, inplace = True)
    else:
        df.dropna(subset =[col], inplace= True) 

"""
Checks to see if any column of a DataFrame contains Nulls
-best to do this after dropping useless columns [faster]
"""
def cols_with_null(df):
    for col in df.columns:
        print(f'{col} : {df[col].isnull().values.any()}')

"""
Parameters:
dfs: List of DataFrames 
Return: Merged DataFrames 
"""
def merge_dfs(dfs: list):
    name_attr = dfs[0].name if hasattr(dfs[0], "name") else None
    dfs = pd.concat(dfs, ignore_index=True)        #put all the data into one table
    if name_attr:
        dfs.name = name_attr
    return dfs

In [28]:
"""
Checks to see if a val is of type : etype

Parameters:
etype: Type 

Returns:
Boolean if it is that type
"""
def is_type(val,  etype):
    try:
        etype(val)
        return True
    except (ValueError, TypeError):
        return False
    
"""
Removes all the records that have an invalid type

Parameters:
header_df : DataFrame
col : column name to check
etype: Type 

Returns:
DataFrame with records that have valid type
"""
def rm_inval_type(df, col, etype):
    return df[df[col].apply(lambda val: is_type(val,etype))]

In [5]:
def IQR_outlier(df, col: str):
    name_attr = df.name if hasattr(df, "name") else None
    Q1 = df[col].quantile(0.25)   
    Q3 = df[col].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    # non_outliers = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]
    df_rm_outliers = df[~((df[col] < lower_bound) | (df[col] > upper_bound))]
    if name_attr:
        df_rm_outliers.name = name_attr
    return df_rm_outliers

def zscore(df, col:str, deviance:int):
    col_mean = df[col].mean()
    col_std = df[col].std()
    return df[abs((df[col] - col_mean)/ col_std) <= deviance]

# LOAD
### Save into Currated layer [Silver]

In [6]:
"""
Will file "modded_{filename}.csv"
in curated-layer directory.
If path does not exist, will create it for you.

Parameters: 
df - a Data Frame

Returns: Nothing
"""
def df2csv(df):
    year = df.name[-4:]
    type = df.name[:-5]
    path = f'../data_set/CBP_AMS/curated-layer/{year}/{type}/'          
    if not os.path.exists(path):
        os.makedirs(path)
    df.to_csv(path + f'modded_{df.name}.csv', sep='|' , index=False) 

##### For Current Use Case : We can reduce data overhead

In [7]:
rm_billgen = ['house_bol_number','sub_house_bol_number', 'bill_type_code',
              'manifest_number', 'trade_update_date', 'run_date']

rm_container = ['seal_number_1', 'seal_number_2',
       'equipment_description_code','container_type', 'load_status']

rm_header  = ['carrier_code', 'vessel_country_code', 'vessel_name',
       'foreign_port_of_lading_qualifier', 'manifest_quantity', 'manifest_unit', 
       'measurement', 'measurement_unit', 'record_status_indicator',
       'place_of_receipt', 'port_of_destination', 'foreign_port_of_destination_qualifier', 
       'foreign_port_of_destination', 'conveyance_id_qualifier', 'conveyance_id', 
       'in_bond_entry_type', 'mode_of_transportation', 'secondary_notify_party_1',
       'secondary_notify_party_2', 'secondary_notify_party_3', 'secondary_notify_party_4', 
       'secondary_notify_party_5', 'secondary_notify_party_6', 'secondary_notify_party_7',
       'secondary_notify_party_8', 'secondary_notify_party_9', 'secondary_notify_party_10']

rm_tariff = ['description_sequence_number','harmonized_number', 'harmonized_weight_unit','harmonized_weight']


# ----------HEADER--------------

# Cleaning Header Data

In [29]:
"""
Convert the weight column to a universal weight unit measurement

Parameters:
header_df: DataFrame

Returns:
None
"""
def universal_unit(header_df):
    unit_weight = {
        'Kilograms': 1.0,
        'Pounds': 0.453592, 
        'Metric Ton': 1000.0, 
        'Long Ton': 1016.04691, 
        'Measurement Ton': 1.01604691
    }
    # Convert weight to kg based on weight_unit
    for unit, scale in unit_weight.items():
        header_df.loc[header_df['weight_unit'] == unit, 'weight'] *= scale
        header_df['weight_unit'] = 'Kilograms'

"""
removes the records that have dates that are not in the scope of this year
"""
def date_outlier(header_df):
    year = header_df.name[-4:]

## Load in HEADER Data

In [9]:
# Currated layer only requires : container |  header  |  tariff   |   billgen   
dfs = gen_dfs(path='header')
dfs[0]

  df = pd.read_csv(data_dir+f)
  df = pd.read_csv(data_dir+f)
  df = pd.read_csv(data_dir+f)
  df = pd.read_csv(data_dir+f)


Unnamed: 0,identifier,carrier_code,vessel_country_code,vessel_name,port_of_unlading,estimated_arrival_date,foreign_port_of_lading_qualifier,foreign_port_of_lading,manifest_quantity,manifest_unit,...,secondary_notify_party_2,secondary_notify_party_3,secondary_notify_party_4,secondary_notify_party_5,secondary_notify_party_6,secondary_notify_party_7,secondary_notify_party_8,secondary_notify_party_9,secondary_notify_party_10,actual_arrival_date
0,201901010,OOLU,FR,CMA CGM FIGARO,"Oakland, California",2017-10-08,Schedule K Foreign Port,"Tanjung Priok,Indonesia",834,PCS,...,,,,,,,,,,2017-10-10
1,201901011,AEHS,JP,MOL COMMITMENT,"Los Angeles, California",2018-01-28,Schedule K Foreign Port,"Singapore,Singapore",60,PKG,...,,,,,,,,,,2018-01-30
2,201901012,EXDO,PA,EVER LUCKY,"Norfolk, Virginia",2018-03-28,Schedule K Foreign Port,"Rotterdam,Netherlands",3350,PKG,...,,,,,,,,,,2018-03-30
3,201901013,EXDO,PA,EVER LUCKY,"Norfolk, Virginia",2018-03-28,Schedule K Foreign Port,"Bremerhaven,Federal Republic of Germany",642,PKG,...,,,,,,,,,,2018-03-30
4,201901014,EXDO,PA,EVER LUCKY,"Norfolk, Virginia",2018-03-28,Schedule K Foreign Port,"Bremerhaven,Federal Republic of Germany",2743,CTN,...,,,,,,,,,,2018-03-30
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4999995,2019040621908,FTNV,HK,MOL BRILLIANCE,"Long Beach, California",2019-04-01,Schedule K Foreign Port,"Hong Kong,Hong Kong",55,CTN,...,,,,,,,,,,2019-04-05
4999996,2019040621909,ONEY,HK,MOL BRILLIANCE,"Long Beach, California",2019-04-04,Schedule K Foreign Port,"Hong Kong,Hong Kong",1260,PKG,...,,,,,,,,,,2019-04-05
4999997,2019040621910,HYSL,HK,MOL BRILLIANCE,"Long Beach, California",2019-04-02,Schedule K Foreign Port,"Yantian,China (Mainland)",825,CTN,...,,,,,,,,,,2019-04-05
4999998,2019040621911,HEIC,PA,BANGKOK BRIDGE,"Long Beach, California",2019-04-03,Schedule K Foreign Port,"Kobe,Japan",590,CTN,...,,,,,,,,,,2019-04-05


In [10]:
# Drop columns we will not be using
drop_dfs_col(dfs, rm_header)

In [12]:
# Merge the dataframes into one 
dfs = merge_dfs(dfs)

In [13]:
# Drop the duplicate records
rm_dupe(dfs)

In [14]:
# Clean the weight unit to a singular unit ( Kilogram )
universal_unit(dfs)

In [15]:
# Remove out the Outliers
dfs = IQR_outlier(dfs, 'weight')

In [16]:
# Check to make sure there are no null values 😀
cols_with_null(dfs)
# Header does not need to deal with any Nulls 🎉

identifier : False
port_of_unlading : False
estimated_arrival_date : False
foreign_port_of_lading : False
weight : False
weight_unit : False
actual_arrival_date : False


In [17]:
dfs['estimated_arrival_date'].sort_values()

16564623    1959-11-10
16393523    1959-11-10
18272573    2000-12-07
18276281    2000-12-07
18276282    2000-12-07
               ...    
17626392    2020-05-01
13845421    2021-12-29
13851311    2021-12-29
13851304    2021-12-29
13845425    2021-12-29
Name: estimated_arrival_date, Length: 17619371, dtype: object

In [25]:
# check the type of column
rm_inval_type(dfs, 'weight', int)

Unnamed: 0,identifier,port_of_unlading,estimated_arrival_date,foreign_port_of_lading,weight,weight_unit,actual_arrival_date
0,201901010,"Oakland, California",2017-10-08,"Tanjung Priok,Indonesia",9904,Kilograms,2017-10-10
1,201901011,"Los Angeles, California",2018-01-28,"Singapore,Singapore",857,Kilograms,2018-01-30
2,201901012,"Norfolk, Virginia",2018-03-28,"Rotterdam,Netherlands",44599,Kilograms,2018-03-30
3,201901013,"Norfolk, Virginia",2018-03-28,"Bremerhaven,Federal Republic of Germany",9258,Kilograms,2018-03-30
4,201901014,"Norfolk, Virginia",2018-03-28,"Bremerhaven,Federal Republic of Germany",34760,Kilograms,2018-03-30
...,...,...,...,...,...,...,...
19336851,2019123164866,"Oakland, California",2019-08-09,"Shanghai ,China (Mainland)",19950,Kilograms,2019-08-09
19336852,2019123164867,"Houston, Texas",2019-12-10,"Tampico,Mexico",1794,Kilograms,2019-11-16
19336853,2019123164868,"Tacoma, Washington",2019-12-06,"Vung Tau,Vietnam",7517,Kilograms,2019-12-14
19336854,2019123164869,"Los Angeles, California",2019-11-30,"Xiamen,China (Mainland)",19260,Kilograms,2019-12-01


2019


In [None]:
# Cleaned up Header Data -> Store into "Silver Layer" 
df2csv(dfs)

#### Function to Export Modified Data 

In [None]:
"""
Establish a connection to the MySQL database

Returns: 
"""
def link_db():
    try:
        return pymssql.connect(server='DESKTOP-N9SA336')
    except:
        return None


In [None]:
conn = link_db()
if conn:
    cursor = conn.cursor() # Cursors are database level objects that let your query a database multiple times - Think of it as a pointer to a row
    cursor.execute("SELECT * FROM INFORMATION_SCHEMA.TABLES")
    rows = cursor.fetchall()
    [print(row) for row in rows]