<h2 align="center">Configuration of Environment</h2>

In [32]:
import sys

#Basic Packages related to Data Analysis
!{sys.executable} -m pip install pandas
!{sys.executable} -m pip install scipy

#Package required for ComtradeAPI
!{sys.executable} -m pip install --upgrade comtradeapicall

#Packages required for Oracle Table Insert
!{sys.executable} -m pip install sqlalchemy
!{sys.executable} -m pip install cx_oracle
!{sys.executable} -m pip install oracledb --no-deps



In [2]:
#Import Required Packages
import pandas

#Import ComTrade Package
import comtradeapicall

#Subscription Key for ComTradeData
subscription_key = '5ac78a346071428abfbc114525a1fb99'

<h2 align="center"> Extraction of GoldTrade Data from ComTrade API </h2>
<br>
<p> Since we are dealing with free version, we could not able to extract all years information at single shot. So, to overcome this issue, we have extracted for each year and then consolidated </p>

In [3]:
#Create Empty DataFrame
tradeData = pandas.DataFrame()

#User Defined Function to iterate all the months
def ref_period(year):
    return ','.join(f"{year}{month:02d}" for month in range(1, 13))

# Loop through the range of years from 2017 to 2023
for year in range(2017, 2024):

    #Chosen SouthAfrica, Kenya and Egypt Reporters
    tradeDataImport = comtradeapicall.getFinalData(
                    subscription_key, #Pass ComTrade API Key
                    typeCode='C',     #Selection of Goods
                    freqCode='M',     #Selection of Monthly Data
                    clCode='HS',      #Selection of HS Specification
                    period = ref_period(year),  #Year & Month Function
                    reporterCode='404,710,818', #SouthAfrica, Kenya & Egypt Reporters
                    cmdCode='7108,710811,710812,710813,710820', #Selection of Gold                                                 Commodities 
                    flowCode='X',       #Selection of Exports
                    partnerCode=None,   #Selection of All Partners
                    partner2Code=None,  #Selection of All 2nd Partners
                    customsCode=None,   #Selection of All Customs
                    motCode=None,       #Selection of All Mode of Transports
                    includeDesc=True    #Setting an Description of each columns
                                                   )

    print(f"Year: {year}, Response: {tradeDataImport}")
    
    if not tradeDataImport.empty:
        # Filter out empty or all-NA columns from tradeDataImport
        tradeDataImport = tradeDataImport.dropna(axis=1, how='all')
        
        # Concatenate the DataFrames
        tradeData = pandas.concat([tradeData, tradeDataImport], ignore_index=True)

# Reset the index
tradeData.reset_index(drop=True, inplace=True)

Year: 2017, Response:     typeCode freqCode  refPeriodId  refYear  refMonth  period  reporterCode  \
0          C        M     20170101     2017         1  201701           404   
1          C        M     20170101     2017         1  201701           404   
2          C        M     20170101     2017         1  201701           404   
3          C        M     20170101     2017         1  201701           404   
4          C        M     20170101     2017         1  201701           404   
..       ...      ...          ...      ...       ...     ...           ...   
919        C        M     20171201     2017        12  201712           818   
920        C        M     20171201     2017        12  201712           818   
921        C        M     20171201     2017        12  201712           818   
922        C        M     20171201     2017        12  201712           818   
923        C        M     20171201     2017        12  201712           818   

    reporterISO reporterDesc 

<h2 align="center">Data Consistency</h2>

In [4]:
#Check the number of rows and columns
tradeData.shape

(4284, 47)

In [5]:
#Source DataFrame details
tradeData.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4284 entries, 0 to 4283
Data columns (total 47 columns):
 #   Column                    Non-Null Count  Dtype  
---  ------                    --------------  -----  
 0   typeCode                  4284 non-null   object 
 1   freqCode                  4284 non-null   object 
 2   refPeriodId               4284 non-null   int64  
 3   refYear                   4284 non-null   int64  
 4   refMonth                  4284 non-null   int64  
 5   period                    4284 non-null   object 
 6   reporterCode              4284 non-null   int64  
 7   reporterISO               4284 non-null   object 
 8   reporterDesc              4284 non-null   object 
 9   flowCode                  4284 non-null   object 
 10  flowDesc                  4284 non-null   object 
 11  partnerCode               4284 non-null   int64  
 12  partnerISO                4284 non-null   object 
 13  partnerDesc               4284 non-null   object 
 14  partner2

In [6]:
#Validation of records
tradeData.head()

Unnamed: 0,typeCode,freqCode,refPeriodId,refYear,refMonth,period,reporterCode,reporterISO,reporterDesc,flowCode,...,netWgt,isNetWgtEstimated,grossWgt,isGrossWgtEstimated,cifvalue,fobvalue,primaryValue,legacyEstimationFlag,isReported,isAggregate
0,C,M,20170101,2017,1,201701,404,KEN,Kenya,X,...,20227.0,False,0.0,False,0.0,845549.563,845549.563,0,False,True
1,C,M,20170101,2017,1,201701,404,KEN,Kenya,X,...,20213.0,False,0.0,False,0.0,558894.597,558894.597,0,False,True
2,C,M,20170101,2017,1,201701,404,KEN,Kenya,X,...,14.0,False,0.0,False,0.0,286654.967,286654.967,0,False,True
3,C,M,20170101,2017,1,201701,404,KEN,Kenya,X,...,14.0,False,0.0,False,0.0,286654.967,286654.967,0,False,True
4,C,M,20170101,2017,1,201701,404,KEN,Kenya,X,...,14.0,False,0.0,False,0.0,286654.967,286654.967,0,True,False


In [7]:
#Convert Column Name to UpperCase for Table Insert
tradeData.columns = tradeData.columns.str.upper()

print(tradeData.columns)

Index(['TYPECODE', 'FREQCODE', 'REFPERIODID', 'REFYEAR', 'REFMONTH', 'PERIOD',
       'REPORTERCODE', 'REPORTERISO', 'REPORTERDESC', 'FLOWCODE', 'FLOWDESC',
       'PARTNERCODE', 'PARTNERISO', 'PARTNERDESC', 'PARTNER2CODE',
       'PARTNER2ISO', 'PARTNER2DESC', 'CLASSIFICATIONCODE',
       'CLASSIFICATIONSEARCHCODE', 'ISORIGINALCLASSIFICATION', 'CMDCODE',
       'CMDDESC', 'AGGRLEVEL', 'ISLEAF', 'CUSTOMSCODE', 'CUSTOMSDESC',
       'MOSCODE', 'MOTCODE', 'MOTDESC', 'QTYUNITCODE', 'QTYUNITABBR', 'QTY',
       'ISQTYESTIMATED', 'ALTQTYUNITCODE', 'ALTQTYUNITABBR', 'ALTQTY',
       'ISALTQTYESTIMATED', 'NETWGT', 'ISNETWGTESTIMATED', 'GROSSWGT',
       'ISGROSSWGTESTIMATED', 'CIFVALUE', 'FOBVALUE', 'PRIMARYVALUE',
       'LEGACYESTIMATIONFLAG', 'ISREPORTED', 'ISAGGREGATE'],
      dtype='object')


<h2 align="center"> Setting Up Database Connectivity </h2>

In [8]:
import cx_Oracle
import oracledb

#Initialize Oracle Client to setup Thick Connectivity
cx_Oracle.init_oracle_client(lib_dir="/Users/perinban/Downloads/instantclient_23_3")

In [9]:
from sqlalchemy import create_engine,text

#Create Engine to load data to data lake table
engine = create_engine('oracle://"C##goldTrade":goldTrade@localhost:1521/ORCLCDB')

<h2 align="center">Load Data to OLTP Database server</h2>

In [10]:
# Delete existing data from the Data Lake table
with engine.connect() as connection:
    connection.execute(text("TRUNCATE TABLE gold_trade_dl"))

#Load data to the Data Lake Table
try:
    tradeData.to_sql('gold_trade_dl', con=engine, if_exists='append', index=False, chunksize=50)
    print("Data insertion successful.")
except Exception as e:
    print(f"Error inserting data: {str(e)}")

Data insertion successful.


<h2 align="center"> ComTrade Data Gathering for Validation </h2>

In [11]:
import requests

# Function to fetch JSON data from URL, convert to DataFrame, and rename columns
def fetch_uri_data(url, required_columns,new_column_names):
    response = requests.get(url)
    data = response.json()
    mapping_dict = {}
    
    for item in data['results']:
        key = item[required_columns[0]]  # Use the first column as the key
        value = '|'.join([str(item[col]) for col in required_columns[1:]])

        mapping_dict[key] = value
    
    return mapping_dict

# List of URLs with their descriptions, desired column names, and columns to drop
urls = {
    "ReporterCodes": (
        "https://comtradeapi.un.org/files/v1/app/reference/Reporters.json",
        ["reporterCode","reporterDesc","reporterCodeIsoAlpha3"],
        ["REPORTERCODE","REPORTERDESC","REPORTERISO"]
    ),
    "CustomsCodes": (
        "https://comtradeapi.un.org/files/v1/app/reference/CustomsCodes.json",
        ["id","text"],
        ["CUSTOMSCODE", "CUSTOMSDESC"]
    ),
    "ModeOfTransportCodes": (
        "https://comtradeapi.un.org/files/v1/app/reference/ModeOfTransportCodes.json",
        ["id","text"],
        ["MOTCODE", "MOTDESC"]
    ),
    "PartnerCodes": (
        "https://comtradeapi.un.org/files/v1/app/reference/partnerAreas.json",
        ["PartnerCode","PartnerDesc","PartnerCodeIsoAlpha3"],
        ["PARTNERCODE", "PARTNERDESC", "PARTNERISO"]
    ),
    "CommodityCodes": (
        "https://comtradeapi.un.org/files/v1/app/reference/HS.json",
        ["id","text","parent","isLeaf"],
        ["CMDCODE","CMDDESC","PARENT","ISLEAF"]
    )
}

# Fetching data and processing in a dictionary [Dictionary Comprehension]
dict = {
    name: fetch_uri_data(url, cols, drop_cols) 
    for name, (url, cols, drop_cols) in urls.items()
}

#Access DataFrame
ReporterData = dict["ReporterCodes"]
CustomsData = dict["CustomsCodes"]
MotData = dict["ModeOfTransportCodes"]
Partnerdata = dict["PartnerCodes"]
CommodityData = dict["CommodityCodes"]

for key, value in CommodityData.items():
    position_of_hyphen = value.find("-")
    if position_of_hyphen != -1:  # If a hyphen is found
        CommodityData[key] = value[position_of_hyphen + 1:].strip()

<h2 align="center"> Data Wrangling </h2>

<h4 align="center"> 1. Conversion of Data to Common format either 0 or null <h4>

In [12]:
#Fill Null values with 0s for better handling of data
tradeData['NETWGT'] = tradeData['NETWGT'].fillna(0)
tradeData['PRIMARYVALUE'] = tradeData['PRIMARYVALUE'].fillna(0)

<h4 align="center"> 2. Data Transformation<h4>

In [13]:
#Initialize variables
tradeDataReject = []
rejected_rows = pandas.DataFrame()

#Trim Additional Spaces in All Object columns
tradeData = tradeData.apply(lambda x: x.str.strip() if isinstance(x, str) else x,axis=1)

# Define a function to handle splitting based on the type of value
def split_value(value, index):
    if isinstance(value, list):
        return value[index]
    else:
        return value.split('|')[index]

# Preprocess data to map required values for null values in required columns
tradeData['REPORTERDESC'] = tradeData['REPORTERCODE'].map(lambda code: split_value(Partnerdata.get(code, ''), 0))
tradeData['REPORTERISO'] = tradeData['REPORTERCODE'].map(lambda code: split_value(Partnerdata.get(code, ''), 1))
tradeData['CUSTOMSDESC'] = tradeData['CUSTOMSCODE'].map(lambda code: split_value(CustomsData.get(code, ''), 0))
tradeData['MOTDESC'] = tradeData['MOTCODE'].astype(str).map(lambda code: split_value(MotData.get(code, ''), 0))
tradeData['CMDDESC'] = tradeData['CMDCODE'].astype(str).map(lambda code: split_value(CommodityData.get(code, ''), 0))
tradeData['PARTNERDESC'] = tradeData['PARTNERCODE'].map(lambda code: split_value(Partnerdata.get(code, ''), 0))
tradeData['PARTNERISO'] = tradeData['PARTNERCODE'].map(lambda code: split_value(Partnerdata.get(code, ''), 1))
tradeData['PARTNER2DESC'] = tradeData['PARTNER2CODE'].map(lambda code: split_value(Partnerdata.get(code, ''), 0))
tradeData['PARTNER2ISO'] = tradeData['PARTNER2CODE'].map(lambda code: split_value(Partnerdata.get(code, ''), 1))
tradeData.loc[(tradeData['FLOWCODE'] == 'X'), 'FLOWDESC'] = 'Export'
tradeData.loc[(tradeData['CLASSIFICATIONCODE'].isin(['H0','H1','H2','H3','H4','H5','H6'])), 'CLASSIFICATIONSEARCHCODE'] = 'HS'
tradeData.loc[(tradeData['PRIMARYVALUE'] != tradeData['FOBVALUE'] ) & (tradeData['FOBVALUE'] != 0), 'PRIMARYVALUE'] = tradeData['FOBVALUE']
tradeData.loc[(tradeData['NETWGT'] == 0 ) & (tradeData['QTYUNITCODE'] == 8) & (tradeData['QTY'] != 0 ), 'NETWGT'] = tradeData['QTY']
tradeData.loc[((tradeData['NETWGT'] == 0) | (tradeData['NETWGT'].isnull()) ) & (tradeData['ALTQTYUNITCODE'] == 8) & (tradeData['QTYUNITCODE'] != 8), 'NETWGT'] = tradeData['ALTQTY']
tradeData.loc[(tradeData['NETWGT'] == 0 ) & (tradeData['ALTQTYUNITCODE'] == 15) & (tradeData['QTYUNITCODE'] != 8 ), 'NETWGT'] = tradeData['ALTQTY']/1000

<h4 align="center"> 3. Null Condition Check </h4>

In [14]:
#Null Condition Check in required columns
for col in ['TYPECODE', 'FREQCODE', 'REFPERIODID', 'PERIOD', 'REPORTERCODE','FLOWCODE', 'PARTNERCODE', 'CLASSIFICATIONCODE', 'ISORIGINALCLASSIFICATION', 'CMDCODE', 'ISLEAF', 'CUSTOMSCODE', 'MOSCODE', 'MOTCODE', 'FOBVALUE' ]:
    
    null_chk = tradeData[col].isna() | (tradeData[col] == '')
    
    #Drop the record if null found in mandate columns
    if null_chk.any():
        rejected_rows = tradeData[null_chk].copy()
        rejected_rows['REJECT_REASON'] = f"Null/Blank found for {col}"
        tradeDataReject.append(rejected_rows)
        tradeData.drop(rejected_rows.index, inplace=True)

print(tradeDataReject)

[]


In [15]:
#Function Declaration to reject records based on condition
def dqChecks(tradeData, col, condition, reject_reason):
    condition_chk = tradeData.apply(lambda row: condition(row[col]), axis=1)
    if condition_chk.any():
        rejected_rows = tradeData[condition_chk].copy()
        rejected_rows['REJECT_REASON'] = reject_reason
        return rejected_rows
    return pandas.DataFrame() #Returning Empty DataFrame if no rejects

<h4 align="center"> 4. Data Quality Checks </h4>

In [16]:
#Data Quality Checks Condition Parameters
dq_conditions = [
     ('TYPECODE', lambda x: x != 'C', "Typecode not equal to 'C'"),
     ('FREQCODE', lambda x: x != 'M', "Freqcode not equal to 'M'"),
     ('REPORTERCODE', lambda x: (x != 404) and (x != 710) and (x != 818), "Invalid ReporterCode sent from Source"),
     ('REPORTERISO', lambda x: (x.upper() != 'KEN') and (x.upper() != 'ZAF') and (x.upper() != 'EGY') if isinstance(x, str) else True, "Invalid ReporterISO sent from source"),
     ('REPORTERDESC', lambda x: (x != 'Kenya') and (x != 'South Africa') and (x != 'Egypt') if isinstance(x, str) else True, "Invalid ReporterDesc sent from source"),
    ('FLOWCODE', lambda x: x != 'X', "FlowCode not equal to 'X'"),
    ('FLOWDESC', lambda x: x != 'Export', "Invalid FlowDesc sent from Source"),
    ('PARTNERCODE', lambda x: x in (0, 129, 221, 258, 290, 360, 471, 472, 473, 490, 492, 527, 568, 577, 583, 608, 636, 637, 670, 697, 838, 879, 899), "Invalid Partner Code Sent from Source"),
    ('PARTNERISO', lambda x: x in ("W00", "A49", "E29", "PYF", "F49", "IDN", "R91", "_AC", "A79", "S19", "MCO", "O19", "E19", "F19", "FSM", "PHL", "A59", "VCT", "R20", "X2", "F97", "_X" ), "Invalid PartnerISO sent from Source"),
    ('PARTNERDESC', lambda x: x in ("World", "Caribbean, nes", "Eastern Europe, nes", "French Polynesia", "Northern Africa, nes", "Indonesia", "CACM, nes", "Africa CAMEU region, nes", "LAIA, nes", "Other Asia, nes", "Europe EU, nes", "Oceania, nes", "Other Europe, nes", "Other Africa, nes", "FS Micronesia", "Philippines", "Rest of America, nes", "North America and Central America, nes", "Saint Vincent and the Grenadines", "Europe EFTA, nes", "Free Zones", "Western Asia, nes", "Areas, nes")
     , "Invalid PartnerDesc sent from Source"),
    ('PARTNER2CODE', lambda x: x != 0, "Partner2Code not equal to 0"),
    ('PARTNER2ISO', lambda x: x != 'W00', "Invalid Partner2ISO sent from Source"),
    ('PARTNER2DESC', lambda x: x != 'World', "Invalid Partner2Desc sent from Source"),
    ('CLASSIFICATIONCODE', lambda x: (x != 'H0') and (x != 'H1') and (x != 'H2') and (x != 'H3') and (x != 'H4') and (x != 'H5') and (x != 'H6'), "Invalid ClassificationCode sent from Source"),
    ('CLASSIFICATIONSEARCHCODE', lambda x: x != 'HS', "Invalid ClassificationSearchCode sent from Source"),
    ('ISORIGINALCLASSIFICATION', lambda x: str(x).strip() != 'True', "Source Data does not belong to Original Classification"),
    ('CMDCODE', lambda x: (x != '7108') and (x != '710811') and (x != '710812') and (x != '710813') and (x != '710820'), "Invalid CmdCode sent from Source"),
    ('CMDDESC', lambda x: (x != 'Gold (including gold plated with platinum) unwrought or in semi-manufactured forms, or in powder form') and (x != 'Metals; gold, non-monetary, powder') and (x != 'Metals; gold, non-monetary, unwrought (but not powder)') and (x != 'Metals; gold, semi-manufactured') and (x != 'Gold, monetary'), "Invalid CmdDesc sent from Source"),
     ('ISLEAF', lambda x: x != 1, "Invalid isLeaf sent from Source"),
     ('ISREPORTED', lambda x: str(x).strip() != 'True', "Invalid Record sent from Source"),
    ('MOSCODE', lambda x: x != '0', "Invalid MosCode sent from Source"),
    ('PRIMARYVALUE', lambda x: (x!=tradeData['FOBVALUE']).all(),"PrimaryValue mismatches with FOBValue"),
    ('NETWGT', lambda x:(x== 0) and (tradeData['QTYUNITCODE'] == -1).all() and (tradeData['ALTQTYUNITCODE'] == -1).all(), "Invalid Netweight sent from source")
]

In [17]:
# Apply condition checks
for col, condition, reject_reason in dq_conditions:
    rejected_rows = dqChecks(tradeData, col, condition, reject_reason)
    if not rejected_rows.empty:
        tradeDataReject.append(rejected_rows)
        tradeData.drop(rejected_rows.index, inplace=True)

print(tradeDataReject)

[     TYPECODE FREQCODE  REFPERIODID  REFYEAR  REFMONTH  PERIOD  REPORTERCODE  \
0           C        M     20170101     2017         1  201701           404   
3           C        M     20170101     2017         1  201701           404   
5           C        M     20170101     2017         1  201701           404   
7           C        M     20170101     2017         1  201701           710   
9           C        M     20170101     2017         1  201701           710   
...       ...      ...          ...      ...       ...     ...           ...   
4273        C        M     20231201     2023        12  202312           710   
4274        C        M     20231201     2023        12  202312           710   
4275        C        M     20231201     2023        12  202312           710   
4276        C        M     20231201     2023        12  202312           818   
4280        C        M     20231201     2023        12  202312           818   

     REPORTERISO  REPORTERDESC FLOWCOD

<h4 align="center"> 5. Mean Imputation </h4>

In [18]:
#Check for 0 values for attributes used for visualisation
(tradeData[['PRIMARYVALUE', 'NETWGT']] == 0).sum()

PRIMARYVALUE    0
NETWGT          2
dtype: int64

In [19]:
# Extract month from the PERIOD attribute
tradeData['MONTH'] = tradeData['PERIOD'].str[4:6]

# Function to fill NaNs with the mean
def fill_wit_mean(tradeData, group_cols, cols):
    for col in cols:
        # Compute mean for each group
        mean_val = tradeData.groupby(group_cols)[col].transform('mean')
        # Fill 0s with the mean values
        tradeData.loc[tradeData[col] == 0, col] = mean_val
    return tradeData

# Group by REPORTERCODE and MONTH, and fill NaNs in PRIMARYVALUE and NETWGT with group means
tradeData = fill_wit_mean(tradeData, ['REPORTERCODE', 'MONTH'], ['PRIMARYVALUE', 'NETWGT'])

# Drop MONTH column if no longer needed
tradeData.drop(columns=['MONTH'], inplace=True)

In [20]:
#Check for 0 values for attributes used for visualisation
(tradeData[['PRIMARYVALUE', 'NETWGT']] == 0).sum()

PRIMARYVALUE    0
NETWGT          0
dtype: int64

<h4 align="center"> 6. Removal of Outliers </h4>

In [21]:
#As Data gets varied across and we does not know which month how much countries buy or sell, so, instead of calculating outliers based on primaryvalue and netwgt separately, we proceed with mid_value attribute

#Creation of MID_VAL attribute
tradeData['MID_VAL'] = tradeData['PRIMARYVALUE'] / tradeData['NETWGT']

In [22]:
#Check for existence of Anamolies in Data
tradeData[['PRIMARYVALUE','NETWGT','MID_VAL']].describe()

Unnamed: 0,PRIMARYVALUE,NETWGT,MID_VAL
count,584.0,584.0,584.0
mean,22693490.0,1646.811817,33194.773628
std,45343270.0,6748.35975,22442.155021
min,0.525,0.009,0.1221
25%,22249.6,3.0,3561.427066
50%,641635.1,37.145,38973.1737
75%,18569510.0,902.78,54024.5409
max,262039400.0,101755.0,133706.3


In [23]:
from scipy.stats import zscore
    
# Removing Outliers - Z-Score Method

def remove_outliers_zscore(tradeData, attribute):
    # Compute Z-scores
    tradeData['zscore'] = zscore(tradeData[attribute])
    # Define the threshold
    threshold = 3
    # Identify outliers
    outliers = tradeData[(tradeData['zscore'] < -threshold) | (tradeData['zscore'] > threshold)]
        
    # Prepare the rejected data with reasons
    rejects = outliers.copy()
    rejects['REJECT_REASON'] = f"Removed as part of outliers in {attribute}"
    
    rejects.drop(columns=['zscore','MID_VAL'], inplace=True)
        
    tradeDataReject.append(rejects)
        
    # Remove outliers
    tradeData = tradeData.loc[(tradeData['zscore'] >= -threshold) & (tradeData['zscore'] <= threshold)].copy()
    
    # Drop the Z-score column
    tradeData.drop(columns=['zscore'], inplace=True)
    tradeData.reset_index(drop=True, inplace=True)
        
    return tradeData

#Removed this for better handling of data
#for attribute in ['PRIMARYVALUE','NETWGT']:
#    tradeData = remove_outliers_zscore(tradeData, attribute)

# Remove outliers based on MID_VAL
tradeData = remove_outliers_zscore(tradeData, 'MID_VAL')

In [24]:
#Check for existence of Anamolies in Data
tradeData[['PRIMARYVALUE','NETWGT','MID_VAL']].describe()

Unnamed: 0,PRIMARYVALUE,NETWGT,MID_VAL
count,583.0,583.0,583.0
mean,22732410.0,1649.636485,33022.369638
std,45372440.0,6753.809278,22070.966538
min,0.525,0.009,0.1221
25%,23004.6,3.0,3537.11548
50%,647014.0,37.63,38962.45886
75%,18918760.0,903.92,54024.539989
max,262039400.0,101755.0,69477.077


In [25]:
# Drop the MID_VAL attribute
tradeData.drop(columns=['MID_VAL'], inplace=True)

<h2 align="center"> Validation of Rejects </h2>

In [26]:
# Concatenate all rejected DataFrames into a single DataFrame
if tradeDataReject:
    tradeDataRejects = pandas.concat(tradeDataReject).reset_index(drop=True)
    
    # Group by the 'REJECT_REASON' column and get the count of records for each reason
    print(tradeDataRejects.groupby('REJECT_REASON').size().reset_index(name='count'))
else:
    tradeDataRejects = pandas.DataFrame()

                            REJECT_REASON  count
0   Invalid Partner Code Sent from Source   2295
1         Invalid Record sent from Source    479
2         Invalid isLeaf sent from Source    926
3  Removed as part of outliers in MID_VAL      1


<h2 align="center">Load Data to Rejects Database server</h2>

In [27]:
#Delete Existing data from Data Reject Table
with engine.connect() as connection:
    connection.execute(text("TRUNCATE TABLE gold_trade_data_reject"))

#Load data to the Data Reject Table
try:
    tradeDataRejects.to_sql('gold_trade_data_reject', con=engine, if_exists='append', index=False, chunksize=50)
    print("Data insertion successful.")
except Exception as e:
    print(f"Error inserting data: {str(e)}")

Data insertion successful.


<h4 align="center"> 7. Drop Unwanted Columns </h4>

In [28]:
#Drop unwanted columns
tradeData = tradeData.drop(['TYPECODE','FREQCODE', 'REFPERIODID','REFYEAR','REFMONTH','REPORTERCODE','REPORTERISO','FLOWDESC','PARTNERCODE','PARTNERISO','PARTNER2CODE','PARTNER2ISO','CLASSIFICATIONCODE','CLASSIFICATIONSEARCHCODE','ISORIGINALCLASSIFICATION','CMDDESC','AGGRLEVEL','ISLEAF','CUSTOMSCODE','MOSCODE','MOTCODE','QTYUNITABBR','ISQTYESTIMATED','ALTQTYUNITABBR','ISALTQTYESTIMATED','ISNETWGTESTIMATED','ISGROSSWGTESTIMATED','CIFVALUE','FOBVALUE','LEGACYESTIMATIONFLAG','ISREPORTED','ISAGGREGATE'],axis=1)

#Rename columns based on the given requirements for display
tradeData.rename(columns={'REPORTERDESC':'REPORTER','FLOWCODE':'TRADE_FLOW','PARTNERDESC':'PARTNER','PARTNER2DESC':'2ND_PARTNER','CMDCODE':'COMMODITY_CODE','CUSTOMSDESC':'CUSTOMS_DESC','MOTDESC':'TRANSPORT_MODE','QTYUNITCODE':'QTY_UNIT','ALTQTYUNITCODE':'ALTERNATE_QUANTITY_UNIT','ALTQTY':'ALTERNATE_QUANTITY','NETWGT':'NET_WEIGHT(KG)','GROSSWGT':'GROSS_WEIGHT','PRIMARYVALUE':'TRADE_VALUE(US$)'}, inplace=True)

#Re-order columns in the given order based on requirements
col_order = ['PERIOD','TRADE_FLOW','REPORTER','PARTNER','2ND_PARTNER','CUSTOMS_DESC','TRANSPORT_MODE','COMMODITY_CODE','TRADE_VALUE(US$)','NET_WEIGHT(KG)','GROSS_WEIGHT','QTY_UNIT','QTY','ALTERNATE_QUANTITY_UNIT','ALTERNATE_QUANTITY']

tradeData = tradeData.reindex(columns=col_order)

print(tradeData)

     PERIOD TRADE_FLOW      REPORTER               PARTNER 2ND_PARTNER  \
0    201701          X         Kenya  United Arab Emirates       World   
1    201701          X         Kenya          South Africa       World   
2    201701          X  South Africa               Namibia       World   
3    201701          X  South Africa               Namibia       World   
4    201701          X         Egypt               Lebanon       World   
..      ...        ...           ...                   ...         ...   
578  202312          X         Kenya  United Arab Emirates       World   
579  202312          X         Kenya          South Africa       World   
580  202312          X         Egypt                Canada       World   
581  202312          X         Egypt           Switzerland       World   
582  202312          X         Egypt  United Arab Emirates       World   

                      CUSTOMS_DESC            TRANSPORT_MODE COMMODITY_CODE  \
0    TOTAL customs procedure cod

<h2 align="center">Load Data to OLAP Datawarehouse server</h2>

In [29]:
#Delete Existing data from Data Warehouse Table
with engine.connect() as connection:
    connection.execute(text("TRUNCATE TABLE gold_trade"))

#Load data to the Data Warehouse Table
try:
    tradeData.to_sql('gold_trade', con=engine, if_exists='append', index=False, chunksize=50)
    print("Data insertion successful.")
except Exception as e:
    print(f"Error inserting data: {str(e)}")

Data insertion successful.


<h2 align="center">Load Data to OLAP DataMarts</h2>

In [30]:
#Drop Unwanted columns for Plotting in Tableau
tradeData = tradeData.drop(['TRADE_FLOW','2ND_PARTNER','GROSS_WEIGHT','QTY_UNIT','QTY','ALTERNATE_QUANTITY_UNIT','ALTERNATE_QUANTITY'],axis=1)

#Delete Existing data from Data Mart Table
with engine.connect() as connection:
    connection.execute(text("TRUNCATE TABLE gold_trade_data_mart"))

#Load data to the Data Mart Table
try:
    tradeData.to_sql('gold_trade_data_mart', con=engine, if_exists='append', index=False, chunksize=50)
    print("Data insertion successful.")
except Exception as e:
    print(f"Error inserting data: {str(e)}")

Data insertion successful.


In [31]:
# Clean up: close cursor and connection
connection.close()