In [1]:
### RUN EVERY TIME: COMTRADE DATASETS EXTRACTING

In [2]:
### RUN EVERY TIME: INITIALIZATION

import pandas as pd
import numpy as np
import gc
import os
from datetime import datetime
import time
import networkx as nx

In [3]:
### RUN EVERY TIME: VERSION CONTROL

from platform import python_version
print('python version: ', python_version())
print('numpy version: ', np.__version__)
print('pandas version: ', pd.__version__)

python version:  3.7.4
numpy version:  1.17.2
pandas version:  0.25.3


In [15]:
### CONSTANTS INITIALIZATION

### Common constants:
All = slice(None)
### General daily-mode and ranges initialization:
str_date_factor_start = '1994-12-31' ### Start date for efficacy measures
str_date_factor_end = '2023-06-30' ### End date for efficacy measures
idx_test_monthly_range = pd.date_range(str_date_factor_start, str_date_factor_end, freq = 'BM') ### Range for source data filtering
idx_test_daily_range = pd.date_range(str_date_factor_start, str_date_factor_end, freq = 'B') ### Range for source data filtering
### Results saving paths:
str_pagerank_exp_path = 'Data_Files/Test_Files/pagerank_exp.csv'
str_pagerank_imp_path = 'Data_Files/Test_Files/pagerank_imp.csv'
str_herfindahl_comm_path = 'Data_Files/Test_Files/herfindahl_comm.csv'
str_exclusivity_exp_path = 'Data_Files/Test_Files/exclusivity_exp.csv'

In [5]:
### DEFINING WEIGHTED AVERAGE CALCULATOR

def weighted_average(ser_data, ser_weight = None, int_min_count = 0):
    ### Default output:
    num_result = np.NaN
    ### Checking for data presence:
    if (ser_data.count() > int_min_count):       
        ### Checking for weights dataset:
        if ser_weight is None:
            ### Calculating of simple average:
            num_result = np.nanmean(ser_data.values)
        else:
            ### Weights filtering:
            list_weight = ser_weight[ser_data.dropna().index].values
            ### Checking for weights presence:
            if np.nansum(list_weight):
                ### Data filtering:
                list_data = ser_data.dropna().values
                ### Weighted average calculating:
                num_result = np.nansum(list_data * list_weight) / np.nansum(list_weight)
    ### Results output:
    return num_result

In [6]:
### TO REPLACE WITH SQL REQUEST: EXTRACTION AF ACTUAL ISON MEMBERS LIST (TO BE IGNORED IN PRODUCT CODE)

def ison_membership_converting(str_path_universe, date_end, bool_daily = False, int_backfill_months = 0):
    ### Defining business-month-end reindexation on country level:
    def country_modify(ser_raw_country, date_end):
        ser_res_country = ser_raw_country.droplevel(0).resample('MS').last().resample('BM').last()
        range_country = pd.date_range(ser_res_country.index[0], date_end, freq = 'BM')
        return ser_res_country.reindex(range_country).ffill()
    ### Markets encoding table:
    dict_markets = {50 : 'DM', 57 : 'EM', 504 : 'FM', 0: np.NaN}     
    ### Loading source file:
    df_raw_universe = pd.read_excel(engine = 'openpyxl', io = str_path_universe, sheet_name = 'Switchers', header = 0, parse_dates = True, index_col = [0, 1],
                                 na_values = list_na_excel_values, keep_default_na = False)
    ### Converting source file:
    df_raw_universe.index.names = ['Country', 'Date']
    ser_raw_universe = df_raw_universe['Region']
    ser_raw_universe.fillna(0, inplace = True)
    ser_raw_universe.name = 'Market'
    ### By country reindexation and translation:
    ser_res_universe = ser_raw_universe.groupby('Country').apply(country_modify, date_end)
    ser_res_universe.index.names = ['Country', 'Date']
    ser_res_universe = ser_res_universe.replace(dict_markets).reorder_levels([1, 0]).sort_index() 
    ### Expanding membership for primary regions members by backfilling:
    if int_backfill_months:
        ### List of regions:
        list_region = list(ser_res_universe.dropna().unique())
        ### Initialising of collection of series with backfilled data for each region:
        list_ison_backfill = []
        ### Regions looping:
        for iter_region in list_region:
            ### Defining start of region date:
            date_first_valid = ser_res_universe.loc[ser_res_universe == iter_region].first_valid_index()[0]
            ### Creating dates index to backfilling:
            idx_date_backfill = pd.date_range(end = date_first_valid, periods = int_backfill_months + 1, freq = 'BM')[: -1]
            ### Creating primary countries index to backfilling:            
            idx_region_backfill = ser_res_universe.loc[ser_res_universe == iter_region].loc[date_first_valid, All].index.get_level_values('Country')
            ### Creating full index:
            idx_ison_backfill = pd.MultiIndex.from_product([idx_date_backfill, idx_region_backfill])
            ### Series with backfilled data:
            list_ison_backfill.append(pd.Series(iter_region, index = idx_ison_backfill))
        ### Combination of backfilled series and original ISON data:    
        ser_res_universe = ser_res_universe.combine_first(pd.concat(list_ison_backfill, axis = 0)).sort_index()  
        ser_res_universe.index.names = ['Date', 'Country']
    ### Converting to daily frequency:
    if bool_daily:
        ser_res_universe = ser_res_universe.reset_index('Country').groupby('Country').resample('B').ffill()['Market'].swaplevel().sort_index()    
    ### Results output:
    ser_res_universe.name = 'Market'
    return ser_res_universe

### NA for MS Excel files:
list_na_excel_values = ['', '#N/A', '#N/A N/A', '#NA', '-1.#IND', '-1.#QNAN', '-NaN', '-nan', '1.#IND', '1.#QNAN', 'N/A', 'NULL', 'NaN', 'n/a', 'nan', 'null',
                        '#N/A Requesting Data...', '#N/A Invalid Security', '#N/A Field Not Applicable', '---']
### Universe path:
str_path_universe = 'Data_Files/Source_Files/acadian_universe.xlsx'
### ISON membership history:
ser_ison_membership = ison_membership_converting(str_path_universe, pd.to_datetime('2022-12-31'))
### ISON Members:
list_ison_countries = sorted(ser_ison_membership.index.get_level_values('Country').unique())

In [None]:
### TO REPLACE WITH SQL REQUEST: IMITATION OF SQL TABLE OF BILATERAL EXPORT FLOWS

gc.collect()
### Augmented bilateral export:
str_path_export_bilateral = 'Data_Files/Source_Files/comtrade_export_bilateral.h5'
### Export key:
str_key_unc_export = 'export_augmented'
### Checked EBOPS service IDs list (df_serv_to_gics['GICS Group Code']):
list_services = ['206', '210', '214', '218', '219', '223', '227', '231', '232', '237', '240', '246', '247', '250', '251', '254', '255', '256', '257', '258', '263',
                 '264', '269', '272', '273', '288', '289', '292', '293', '294', '310', '391', '431', '500', '888', '891', '892', '894', '950']

list_export_chunks = []
for num_iter_number, ser_iter_chunk in enumerate(pd.read_hdf(str_path_export_bilateral, key = str_key_unc_export, chunksize = 1000000)):
    gc.collect()
    print(num_iter_number, ': Extraction started')
    ser_iter_chunk = ser_iter_chunk[ser_iter_chunk > 0.0].astype('int32')
    df_iter_chunk = ser_iter_chunk.reset_index()
    df_iter_chunk = df_iter_chunk[(df_iter_chunk['Reporter'] != df_iter_chunk['Partner']) & \
                                  ((df_iter_chunk['Type'] == 'Goods') | df_iter_chunk['Commodity_ID'].isin(list_services)) & (df_iter_chunk['Reporter'] != 'World') & \
                                  (df_iter_chunk['Partner'] != 'World')]\
                               .drop('Type', axis = 1)    
    print(num_iter_number, ': Filtering performed')    
    ser_iter_chunk = df_iter_chunk.set_index(['Date', 'Reporter', 'Partner', 'Commodity_ID']).squeeze().sort_index()
    del df_iter_chunk
    gc.collect()
    list_export_chunks.append(ser_iter_chunk)
    print(num_iter_number, ': Chunk added to container')    
ser_bilateral_export = pd.concat(list_export_chunks, axis = 0, sort = False).sort_index()
ser_bilateral_export.name = 'Export'
del list_export_chunks
gc.collect()

In [None]:
### TO REPLACE WITH SQL REQUEST: IMITATION OF SQL TABLE OF BILATERAL IMPORT FLOWS

gc.collect()
### Augmented bilateral import:
str_path_import_bilateral = 'Data_Files/Source_Files/comtrade_import_bilateral.h5'
### Import key:
str_key_unc_import = 'import_augmented'
### Checked EBOPS service IDs list (df_serv_to_gics['GICS Group Code']):
list_services = ['206', '210', '214', '218', '219', '223', '227', '231', '232', '237', '240', '246', '247', '250', '251', '254', '255', '256', '257', '258', '263',
                 '264', '269', '272', '273', '288', '289', '292', '293', '294', '310', '391', '431', '500', '888', '891', '892', '894', '950']

list_import_chunks = []
for num_iter_number, ser_iter_chunk in enumerate(pd.read_hdf(str_path_import_bilateral, key = str_key_unc_import, chunksize = 1000000)):
    gc.collect()
    print(num_iter_number, ': Extraction started')
    ser_iter_chunk = ser_iter_chunk[ser_iter_chunk > 0.0].astype('int32')
    df_iter_chunk = ser_iter_chunk.reset_index()
    df_iter_chunk = df_iter_chunk[(df_iter_chunk['Reporter'] != df_iter_chunk['Partner']) & \
                                  ((df_iter_chunk['Type'] == 'Goods') | df_iter_chunk['Commodity_ID'].isin(list_services)) & (df_iter_chunk['Reporter'] != 'World') & \
                                  (df_iter_chunk['Partner'] != 'World')]\
                               .drop('Type', axis = 1)     
    print(num_iter_number, ': Filtering performed')    
    ser_iter_chunk = df_iter_chunk.set_index(['Date', 'Reporter', 'Partner', 'Commodity_ID']).squeeze().sort_index()
    del df_iter_chunk
    gc.collect()
    list_import_chunks.append(ser_iter_chunk)
    print(num_iter_number, ': Chunk added to container')    
ser_bilateral_import = pd.concat(list_import_chunks, axis = 0, sort = False).sort_index()
ser_bilateral_import.name = 'Import'
del list_import_chunks
gc.collect()

In [9]:
### DEFINING EXPORT PAGERANK FACTOR CALCULATION FUNCTION

#gc.collect()
def get_pagerank_exp_factor(iter_date):
#iter_date = pd.to_datetime('2023-06-30')
#if True:
    ### Lag in months:
    int_lag_mo = 9
    ### Caclulating of the date to request Flows:
    date_to_request = iter_date - pd.tseries.offsets.BMonthEnd(int_lag_mo - 1) - pd.tseries.offsets.BYearEnd(1)
    ### Defining of pagerank calculation function:
    def get_pagerank(df_group):
        nx_graph = nx.from_pandas_edgelist(df_group, 'Reporter', 'Partner', edge_attr = 'Export', create_using = nx.DiGraph)
        dict_pagerank = nx.pagerank(nx_graph)
        ser_pagerank = pd.Series(dict_pagerank)
        return ser_pagerank
    ### Imitation of SQL Requests to load needed cross-sections:
    ser_iter_export = ser_bilateral_export[date_to_request]
    ser_iter_import = ser_bilateral_import[date_to_request]    
    ### Performing of the export pagerank calculation for each commodity:    
    ser_pagerank = ser_iter_export.reset_index().groupby('Commodity_ID').apply(get_pagerank)
    ser_pagerank = ser_pagerank.swaplevel().sort_index()[list_ison_countries]
    ser_pagerank.index.names = ['Reporter', 'Commodity_ID']
    ser_pagerank.name = 'Pagerank'
    ### Calculating of country trade by commodity:
    ser_comm_export = ser_iter_export[list_ison_countries].groupby(['Reporter', 'Commodity_ID']).sum()
    ser_comm_import = ser_iter_import[list_ison_countries].groupby(['Reporter', 'Commodity_ID']).sum()
    ser_comm_trade = pd.concat([ser_comm_export, ser_comm_import], axis = 1).fillna(0.0).sum(axis = 1)
    ser_comm_trade.name = 'Trade'
    ### Aggregated pagerank calculation:
    df_pagerank = ser_pagerank.to_frame().join(ser_comm_trade / 1000, how = 'left').fillna(0.0)
    ser_iter_factor = df_pagerank.groupby('Reporter').apply(lambda df_group: weighted_average(df_group['Pagerank'], df_group['Trade']))
    ser_iter_factor.name = 'Pagerank_Exp'
    ### Add to csv file (should be substituted by SQL query):
    ser_iter_factor_csv = pd.concat({iter_date: ser_iter_factor}, names = ['Date'])
    ser_iter_factor_csv.to_csv(str_pagerank_exp_path, mode = 'a', header = not os.path.exists(str_pagerank_exp_path), sep = ';')
    ### Results output:
    return ser_iter_factor        

In [10]:
### DEFINING IMPORT PAGERANK FACTOR CALCULATION FUNCTION

#gc.collect()
def get_pagerank_imp_factor(iter_date):
#iter_date = pd.to_datetime('2023-06-30')
#if True:
    ### Lag in months:
    int_lag_mo = 9
    ### Caclulating of the date to request Flows:
    date_to_request = iter_date - pd.tseries.offsets.BMonthEnd(int_lag_mo - 1) - pd.tseries.offsets.BYearEnd(1)
    ### Defining of pagerank calculation function:
    def get_pagerank(df_group):
        nx_graph = nx.from_pandas_edgelist(df_group, 'Reporter', 'Partner', edge_attr = 'Import', create_using = nx.DiGraph)
        dict_pagerank = nx.pagerank(nx_graph)
        ser_pagerank = pd.Series(dict_pagerank)
        return ser_pagerank
    ### Imitation of SQL Requests to load needed cross-sections:
    ser_iter_export = ser_bilateral_export[date_to_request]
    ser_iter_import = ser_bilateral_import[date_to_request]    
    ### Performing of the export pagerank calculation for each commodity:    
    ser_pagerank = ser_iter_import.reset_index().groupby('Commodity_ID').apply(get_pagerank)
    ser_pagerank = ser_pagerank.swaplevel().sort_index()[list_ison_countries]
    ser_pagerank.index.names = ['Reporter', 'Commodity_ID']
    ser_pagerank.name = 'Pagerank'
    ### Calculating of country trade by commodity:
    ser_comm_export = ser_iter_export[list_ison_countries].groupby(['Reporter', 'Commodity_ID']).sum()
    ser_comm_import = ser_iter_import[list_ison_countries].groupby(['Reporter', 'Commodity_ID']).sum()
    ser_comm_trade = pd.concat([ser_comm_export, ser_comm_import], axis = 1).fillna(0.0).sum(axis = 1)
    ser_comm_trade.name = 'Trade'
    ### Aggregated pagerank calculation:
    df_pagerank = ser_pagerank.to_frame().join(ser_comm_trade / 1000, how = 'left').fillna(0.0)
    ser_iter_factor = df_pagerank.groupby('Reporter').apply(lambda df_group: weighted_average(df_group['Pagerank'], df_group['Trade']))
    ser_iter_factor.name = 'Pagerank_Imp'
    ### Add to csv file (should be substituted by SQL query):
    ser_iter_factor_csv = pd.concat({iter_date: ser_iter_factor}, names = ['Date'])
    ser_iter_factor_csv.to_csv(str_pagerank_imp_path, mode = 'a', header = not os.path.exists(str_pagerank_imp_path), sep = ';')
    ### Results output:
    return ser_iter_factor        

In [11]:
### DEFINING HERFINDAHL INDEX BY EXPORT WEIGHTS OF COMMODITIES FACTOR CALCULATION FUNCTION

#gc.collect()
def get_herfindahl_comm_factor(iter_date):
#iter_date = pd.to_datetime('2023-06-30')
#if True:
    ### Lag in months:
    int_lag_mo = 9
    ### Caclulating of the date to request Flows:
    date_to_request = iter_date - pd.tseries.offsets.BMonthEnd(int_lag_mo - 1) - pd.tseries.offsets.BYearEnd(1)
    ### Defining Herfindahl index calculation:
    def get_herfindahl(ser_group):
        if (ser_group.count() > 0):
            ser_norm = ser_group / ser_group.sum()
            flo_herfindahl = 1 / ((ser_norm ** 2).sum() ** (1 / 2))
        else:
            flo_herfindahl = np.NaN
        return flo_herfindahl
    ### Imitation of SQL Requests to load needed cross-sections:
    ser_iter_export = ser_bilateral_export[date_to_request]    
    ### Calculating of country export by commodity:
    ser_comm_export = ser_iter_export[list_ison_countries].groupby(['Reporter', 'Commodity_ID']).sum()
    ### Herfindahl index for eaxh reporter (commodities export weights):
    ser_iter_factor = ser_comm_export.groupby('Reporter').apply(get_herfindahl)
    ser_iter_factor.name = 'Herfindahl_Comm'
    ### Add to csv file (should be substituted by SQL query):
    ser_iter_factor_csv = pd.concat({iter_date: ser_iter_factor}, names = ['Date'])
    ser_iter_factor_csv.to_csv(str_herfindahl_comm_path, mode = 'a', header = not os.path.exists(str_herfindahl_comm_path), sep = ';')
    ### Results output:
    return ser_iter_factor  

In [12]:
### DEFINING EXPORT EXCLUSIVITY FACTOR CALCULATION FUNCTION

#gc.collect()
def get_exclusivity_exp_factor(iter_date):
#iter_date = pd.to_datetime('2023-06-30')
#if True:
    ### Lag in months:
    int_lag_mo = 9
    ### Caclulating of the date to request Flows:
    date_to_request = iter_date - pd.tseries.offsets.BMonthEnd(int_lag_mo - 1) - pd.tseries.offsets.BYearEnd(1)
    def calculate_importance(ser_group):
        ser_group = ser_group.droplevel('Commodity_ID')
        ser_group.name = 'Export'
        ser_sum_flow = ser_group.groupby('Partner').sum()
        ser_sum_flow.name = 'Partner_Sum'
        df_group = ser_group.to_frame().join(ser_sum_flow)
        if (len(df_group.index.get_level_values('Reporter').unique()) > 1):
            df_group['Importance'] = df_group.groupby('Reporter', group_keys = False).apply(lambda df_country: (df_country['Export'] / df_country['Partner_Sum']))
        else:
            df_group['Importance'] = 1.0
        ser_result = df_group.groupby('Reporter', group_keys = False).apply(lambda df_country: weighted_average(df_country['Importance'], df_country['Export'] / 1000))
        return ser_result
    ### Imitation of SQL Requests to load needed cross-sections:
    ser_iter_export = ser_bilateral_export[date_to_request]    
    ### Calculating of country export by commodity:
    ser_comm_export = ser_iter_export[list_ison_countries].groupby(['Reporter', 'Commodity_ID']).sum()
    ser_comm_export.name = 'Total_Export'
    ### Average Importance of Exporter by Commodity calculation:
    ser_importance = ser_iter_export[list_ison_countries].groupby('Commodity_ID').apply(calculate_importance)
    ser_importance = ser_importance.reorder_levels(['Reporter', 'Commodity_ID']).sort_index()
    ser_importance.name = 'Importance'
    ### Exclusivity of Exporter calculation:
    df_exclusivity = ser_importance.to_frame().join(ser_comm_export / 1000, how = 'left').fillna(0.0)
    ser_iter_factor = df_exclusivity.groupby('Reporter').apply(lambda df_group: weighted_average(df_group['Importance'], df_group['Total_Export']))
    ser_iter_factor.name = 'Exclusivity_Exp'
    ### Add to csv file (should be substituted by SQL query):
    ser_iter_factor_csv = pd.concat({iter_date: ser_iter_factor}, names = ['Date'])
    ser_iter_factor_csv.to_csv(str_exclusivity_exp_path, mode = 'a', header = not os.path.exists(str_exclusivity_exp_path), sep = ';')
    ### Results output:
    return ser_iter_factor  

In [16]:
### TESTING: PERFORMING FACTOR FOR DATE RANGE

### Removing csv files before loop running:
if (os.path.exists(str_pagerank_exp_path)):
    os.remove(str_pagerank_exp_path)
if (os.path.exists(str_pagerank_imp_path)):
    os.remove(str_pagerank_imp_path)    
if (os.path.exists(str_herfindahl_comm_path)):
    os.remove(str_herfindahl_comm_path)    
if (os.path.exists(str_exclusivity_exp_path)):
    os.remove(str_exclusivity_exp_path)           
### Local testing parameters:
int_interval = 10 ### Interval of progress displaying
date_start = datetime.utcnow() ### Start time of calculations
date_control = datetime.utcnow() ### Control time to display
idx_test_date_range = idx_test_monthly_range # idx_test_monthly_range[-50 :] # idx_test_monthly_range[-11 : -9] # 
### Test performing:
print('Start time:', date_start)
for iter_num, iter_date in enumerate(idx_test_date_range):
    ### Progress printing:
    if not (divmod(iter_num, int_interval)[1]):
        if iter_num:
            print('Counter marker:', iter_num, '/', len(idx_test_date_range))
            timedelta_interval = datetime.utcnow() - date_control
            print('Time interval since last marker:', datetime.utcnow() - date_control)            
            print('Average interval for single date:', str(timedelta_interval / int_interval))
        date_control = datetime.utcnow()
        
    ### Pagerank by Export Flows factor calculating:
    ser_pagerank_exp_factor = get_pagerank_exp_factor(iter_date) 
    ### Pagerank by Import Flows factor calculating:
    ser_pagerank_imp_factor = get_pagerank_imp_factor(iter_date)     
    ### Herfindahl Index by Export Weights of Commodities factor calculating:
    ser_herfindahl_comm_factor = get_herfindahl_comm_factor(iter_date) 
    ### Herfindahl Index by Export Weights of Commodities factor calculating:
    ser_exclusivity_exp_factor = get_exclusivity_exp_factor(iter_date)      

date_finish = datetime.utcnow()
### Overall statistics printing:
print('Finish time:', date_finish)
print('Full interval:', date_finish - date_start)
print('Average interval for single date:', str((date_finish - date_start) / len(idx_test_date_range)))

Start time: 2023-07-17 14:54:59.519242
Counter marker: 10 / 342
Time interval since last marker: 0:03:31.075790
Average interval for single date: 0:00:21.107579
Counter marker: 20 / 342
Time interval since last marker: 0:03:48.766760
Average interval for single date: 0:00:22.876676
Counter marker: 30 / 342
Time interval since last marker: 0:03:53.381596
Average interval for single date: 0:00:23.338160
Counter marker: 40 / 342
Time interval since last marker: 0:03:56.475820
Average interval for single date: 0:00:23.647582
Counter marker: 50 / 342
Time interval since last marker: 0:03:59.823285
Average interval for single date: 0:00:23.982328
Counter marker: 60 / 342
Time interval since last marker: 0:03:57.236553
Average interval for single date: 0:00:23.723655
Counter marker: 70 / 342
Time interval since last marker: 0:03:56.856960
Average interval for single date: 0:00:23.685696
Counter marker: 80 / 342
Time interval since last marker: 0:04:04.597473
Average interval for single date: 