In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import re

In [None]:
%run /users/sahayk/risk_ml_modeling_framework/model_evaluation

In [None]:
def formatKSReport(ks_report):
  ks_report.columns = ['decile', 'min', 'max', 'event', 'non_event', 'total', 'event_rate', 'cumulative_event', 'cumulative_non_event', 'ks', 'max_ks', 'l_cumulative_event', 'cumulative_total', 'l_cumulative_total', 'area', 'cumulative_area', 'area_b', 'pi', 'gini', 'auc']
  ks_report['max_ks'] = np.where(ks_report['max_ks'] == '<----', '<', ks_report['max_ks'])
  ks_report['decile'] = ks_report['decile'] + 5
  ks_report['cumulative_event'] = ks_report['cumulative_event']/100
  ks_report['cumulative_non_event'] = ks_report['cumulative_non_event']/100
  ks_report['ks'] = ks_report['ks']/100
  return ks_report

In [None]:
sic4_desc = spark.sql('select csic4 as sic4, sic4_description from workarea.sic4_desc')

wblinkage = spark.sql('select duns_nbr as wb_duns, phy_ctry_code as wb_country_code, phy_ctry_nme as wb_country_name, cast(ltrim(rtrim(us_1987_sics_1)) as int) as wb_sic4, load_year as wb_load_year, load_month as wb_load_month from workarea.wblinkage where load_year in (2015, 2016, 2017)')

wbusunlinked = spark.sql('select duns_nbr as wb_duns, phy_ctry_code as wb_country_code, phy_ctry_nme as wb_country_name, cast(ltrim(rtrim(sic_base_1)) as int) as wb_sic4, load_year as wb_load_year, load_month as wb_load_month from workarea.wbusunlinked where load_year in (2015, 2016, 2017)')

wbglobalunlinked = spark.sql('select duns_nbr as wb_duns, phy_ctry_code as wb_country_code, phy_ctry_nme as wb_country_name, cast(ltrim(rtrim(us_1987_sics_1)) as int) as wb_sic4, load_year as wb_load_year, load_month as wb_load_month from workarea.wbglobalunlinked where load_year in (2015, 2016, 2017)')

wb = wblinkage.union(wbusunlinked).union(wbglobalunlinked)

trade_2016 = spark.sql('select supp_duns_nbr as supplier_duns, rel_duns_nbr as buyer_duns, load_year, load_month from workarea.gt_purc_pstg_data_type3 where load_year in (2014, 2015, 2016)')

trade_2016 = trade_2016.join(wb, (col('supplier_duns') == col('wb_duns')) & (col('load_year') == col('wb_load_year')) & (col('load_month') == col('wb_load_month')), how = 'left')
trade_2016 = trade_2016.withColumnRenamed('wb_country_code', 'supplier_country_code')
trade_2016 = trade_2016.withColumnRenamed('wb_country_name', 'supplier_country_name')
trade_2016 = trade_2016.withColumnRenamed('wb_sic4', 'supplier_sic4')
trade_2016 = trade_2016.drop(*['wb_duns', 'wb_load_year', 'wb_load_month'])
trade_2016 = trade_2016.join(wb, (col('buyer_duns') == col('wb_duns')) & (col('load_year') == col('wb_load_year')) & (col('load_month') == col('wb_load_month')), how = 'left')
trade_2016 = trade_2016.withColumnRenamed('wb_country_code', 'buyer_country_code')
trade_2016 = trade_2016.withColumnRenamed('wb_country_name', 'buyer_country_name')
trade_2016 = trade_2016.withColumnRenamed('wb_sic4', 'buyer_sic4')
trade_2016 = trade_2016.drop(*['wb_duns', 'wb_load_year', 'wb_load_month'])

trade_2016_timeperiod = trade_2016.groupBy(['load_year', 'load_month']).count().orderBy(['load_year', 'load_month'])
trade_2016_timeperiod.registerTempTable('tb')
trade_2016_timeperiod2 = spark.sql("""
select load_year, load_month, row_number() over (order by load_year desc, load_month desc) as id
from tb 
""")
trade_2016 = trade_2016.join(trade_2016_timeperiod2, on = ['load_year', 'load_month'], how = 'left')

trade_2016 = trade_2016.where('id >= 4 AND id <= 28')

trade_2016 = trade_2016.withColumn('buyer_append_year', lit(2016))
trade_2016 = trade_2016.withColumn('buyer_append_month', lit(9))

trade_2017 = spark.sql('select supp_duns_nbr as supplier_duns, rel_duns_nbr as buyer_duns, load_year, load_month from workarea.gt_purc_pstg_data_type3 where load_year in (2015, 2016, 2017)')

trade_2017 = trade_2017.join(wb, (col('supplier_duns') == col('wb_duns')) & (col('load_year') == col('wb_load_year')) & (col('load_month') == col('wb_load_month')), how = 'left')
trade_2017 = trade_2017.withColumnRenamed('wb_country_code', 'supplier_country_code')
trade_2017 = trade_2017.withColumnRenamed('wb_country_name', 'supplier_country_name')
trade_2017 = trade_2017.withColumnRenamed('wb_sic4', 'supplier_sic4')
trade_2017 = trade_2017.drop(*['wb_duns', 'wb_load_year', 'wb_load_month'])
trade_2017 = trade_2017.join(wb, (col('buyer_duns') == col('wb_duns')) & (col('load_year') == col('wb_load_year')) & (col('load_month') == col('wb_load_month')), how = 'left')
trade_2017 = trade_2017.withColumnRenamed('wb_country_code', 'buyer_country_code')
trade_2017 = trade_2017.withColumnRenamed('wb_country_name', 'buyer_country_name')
trade_2017 = trade_2017.withColumnRenamed('wb_sic4', 'buyer_sic4')
trade_2017 = trade_2017.drop(*['wb_duns', 'wb_load_year', 'wb_load_month'])

trade_2017_timeperiod = trade_2017.groupBy(['load_year', 'load_month']).count().orderBy(['load_year', 'load_month'])
trade_2017_timeperiod.registerTempTable('tb')
trade_2017_timeperiod2 = spark.sql("""
select load_year, load_month, row_number() over (order by load_year desc, load_month desc) as id
from tb 
""")
trade_2017 = trade_2017.join(trade_2017_timeperiod2, on = ['load_year', 'load_month'], how = 'left')
trade_2017 = trade_2017.where('id >= 4 AND id <= 28')

trade_2017 = trade_2017.withColumn('buyer_append_year', lit(2017))
trade_2017 = trade_2017.withColumn('buyer_append_month', lit(9))

trade = trade_2016.union(trade_2017)

trade_final = trade[['supplier_duns', 'buyer_duns', 'supplier_country_code', 'supplier_country_name', 'supplier_sic4', 'buyer_country_code', 'buyer_country_name', 'buyer_sic4', 'buyer_append_year', 'buyer_append_month']]

trade_final = trade_final.withColumn('trade_match_ind', lit(1))

train = spark.sql('select * from workarea.us_export_propensity_analytic_dataset_train')

train2 = train.join(trade_final, ((col('duns') == col('buyer_duns')) & (col('append_year') == col('buyer_append_year')) & (col('append_month') == col('buyer_append_month'))), how = 'left')

train2.write.saveAsTable('workarea.us_export_propensity_analytic_dataset_train_importer_derived_attribute_trade_data')

val = spark.sql('select * from workarea.us_export_propensity_analytic_dataset_val')
val2 = val.join(trade_final, ((col('duns') == col('buyer_duns')) & (col('append_year') == col('buyer_append_year')) & (col('append_month') == col('buyer_append_month'))), how = 'left')
val2.write.saveAsTable('workarea.us_export_propensity_analytic_dataset_val_importer_derived_attribute_trade_data')

test = spark.sql('select * from workarea.us_export_propensity_analytic_dataset_test')
test2 = test.join(trade_final, ((col('duns') == col('buyer_duns')) & (col('append_year') == col('buyer_append_year')) & (col('append_month') == col('buyer_append_month'))), how = 'left')
test2.write.saveAsTable('workarea.us_export_propensity_analytic_dataset_test_importer_derived_attribute_trade_data')

train = spark.sql('select * from workarea.us_export_propensity_analytic_dataset_train_importer_derived_attribute_trade_data')
val = spark.sql('select * from workarea.us_export_propensity_analytic_dataset_val_importer_derived_attribute_trade_data')
test = spark.sql('select * from workarea.us_export_propensity_analytic_dataset_test_importer_derived_attribute_trade_data')

train = train.withColumn('foreign_trade_buyer_ind', lit(None))
train = train.withColumn('foreign_trade_buyer_ind', when(col('supplier_country_code') != 805, 1).otherwise(col('foreign_trade_buyer_ind')))
train = train.withColumn('foreign_trade_buyer_ind', when(col('supplier_country_code') == 805, 0).otherwise(col('foreign_trade_buyer_ind')))

val = val.withColumn('foreign_trade_buyer_ind', lit(None))
val = val.withColumn('foreign_trade_buyer_ind', when(col('supplier_country_code') != 805, 1).otherwise(col('foreign_trade_buyer_ind')))
val = val.withColumn('foreign_trade_buyer_ind', when(col('supplier_country_code') == 805, 0).otherwise(col('foreign_trade_buyer_ind')))

test = test.withColumn('foreign_trade_buyer_ind', lit(None))
test = test.withColumn('foreign_trade_buyer_ind', when(col('supplier_country_code') != 805, 1).otherwise(col('foreign_trade_buyer_ind')))
test = test.withColumn('foreign_trade_buyer_ind', when(col('supplier_country_code') == 805, 0).otherwise(col('foreign_trade_buyer_ind')))

train_grp = train.groupBy(['duns', 'append_year', 'append_month', 'export', 'foreign_trade_buyer_ind']).count().orderBy(['duns', 'append_year', 'append_month', 'export','foreign_trade_buyer_ind'])
val_grp = val.groupBy(['duns', 'append_year', 'append_month', 'export', 'foreign_trade_buyer_ind']).count().orderBy(['duns', 'append_year', 'append_month', 'export','foreign_trade_buyer_ind'])
test_grp = test.groupBy(['duns', 'append_year', 'append_month', 'export', 'foreign_trade_buyer_ind']).count().orderBy(['duns', 'append_year', 'append_month', 'export','foreign_trade_buyer_ind'])

train_grp2 = train_grp.groupBy(['duns','append_year', 'append_month', 'export']).agg(max('foreign_trade_buyer_ind').alias('foreign_trade_buyer_ind'))
val_grp2 = val_grp.groupBy(['duns','append_year', 'append_month', 'export']).agg(max('foreign_trade_buyer_ind').alias('foreign_trade_buyer_ind'))
test_grp2 = test_grp.groupBy(['duns','append_year', 'append_month', 'export']).agg(max('foreign_trade_buyer_ind').alias('foreign_trade_buyer_ind'))

train_grp2.write.mode('overwrite').saveAsTable('workarea.us_export_propensity_analytic_dataset_train_importer_derived_attribute')
val_grp2.write.mode('overwrite').saveAsTable('workarea.us_export_propensity_analytic_dataset_val_importer_derived_attribute')
test_grp2.write.mode('overwrite').saveAsTable('workarea.us_export_propensity_analytic_dataset_test_importer_derived_attribute')

shipping = spark.sql('select shipper_duns, shipper_country_nme, shipper_sic4, consignee_duns, consignee_country_nme, consignee_sic4, est_arrival_tme from workarea.jzhang_shipments_all_cm_final6_temp1')

shipping = shipping.withColumn('est_arrival_year', year(col('est_arrival_tme')))
shipping = shipping.withColumn('est_arrival_month', month(col('est_arrival_tme')))

shipping_2016 = shipping.where('est_arrival_year in (2015, 2016, 2017)')
shipping_2017 = shipping.where('est_arrival_year in (2016, 2017, 2018)')

shipping_2016 = shipping_2016.withColumn('consignee_append_year', lit(2016))
shipping_2016 = shipping_2016.withColumn('consignee_append_month', lit(9))
shipping_2017 = shipping_2017.withColumn('consignee_append_year', lit(2017))
shipping_2017 = shipping_2017.withColumn('consignee_append_month', lit(9))

shipping_final = shipping_2016.union(shipping_2017)

shipping_final = shipping_final.withColumn('shipping_match_ind', lit(1))

train = spark.sql('select * from workarea.us_export_propensity_analytic_dataset_train')
train2 = train.join(shipping_final, (col('duns') == col('consignee_duns')) & (col('append_year') == col('consignee_append_year')) & (col('append_month') == col('consignee_append_month')), how = 'left')
train3 = train2.join(sic4_desc, col('shipper_sic4') == col('sic4'), how = 'left')
train3 = train3.drop('sic4')
train3 = train3.withColumnRenamed('sic4_description', 'shipper_sic4_description')
train4 = train3.join(sic4_desc, col('consignee_sic4') == col('sic4'), how = 'left')
train4 = train4.drop('sic4')
train4 = train4.withColumnRenamed('sic4_description', 'consignee_sic4_description')
train4 = train4.withColumn('foreign_shipment_buyer_ind', lit(None))
train4 = train4.withColumn('foreign_shipment_buyer_ind', when(~col('shipper_country_nme').isin(["US", "UNITED STATES"]), 1).otherwise(col('foreign_shipment_buyer_ind')))
train4 = train4.withColumn('foreign_shipment_buyer_ind', when(col('shipper_country_nme').isin(["US", "UNITED STATES"]), 0).otherwise(col('foreign_shipment_buyer_ind')))

train_grp = train4.groupBy(['duns', 'append_year', 'append_month', 'export', 'foreign_shipment_buyer_ind']).count().orderBy(['duns', 'append_year', 'append_month', 'export', 'foreign_shipment_buyer_ind'])
train_grp2 = train_grp.groupBy(['duns', 'append_year', 'append_month', 'export']).agg(max('foreign_shipment_buyer_ind').alias('foreign_shipment_buyer_ind')).orderBy(['duns', 'append_year', 'append_month', 'export'])

val = spark.sql('select * from workarea.us_export_propensity_analytic_dataset_val')
val2 = val.join(shipping_final, (col('duns') == col('consignee_duns')) & (col('append_year') == col('consignee_append_year')) & (col('append_month') == col('consignee_append_month')), how = 'left')
val3 = val2.join(sic4_desc, col('shipper_sic4') == col('sic4'), how = 'left')
val3 = val3.drop('sic4')
val3 = val3.withColumnRenamed('sic4_description', 'shipper_sic4_description')
val4 = val3.join(sic4_desc, col('consignee_sic4') == col('sic4'), how = 'left')
val4 = val4.drop('sic4')
val4 = val4.withColumnRenamed('sic4_description', 'consignee_sic4_description')
val4 = val4.withColumn('foreign_shipment_buyer_ind', lit(None))
val4 = val4.withColumn('foreign_shipment_buyer_ind', when(~col('shipper_country_nme').isin(["US", "UNITED STATES"]), 1).otherwise(col('foreign_shipment_buyer_ind')))
val4 = val4.withColumn('foreign_shipment_buyer_ind', when(col('shipper_country_nme').isin(["US", "UNITED STATES"]), 0).otherwise(col('foreign_shipment_buyer_ind')))

val_grp = val4.groupBy(['duns', 'append_year', 'append_month', 'export', 'foreign_shipment_buyer_ind']).count().orderBy(['duns', 'append_year', 'append_month', 'export', 'foreign_shipment_buyer_ind'])
val_grp2 = val_grp.groupBy(['duns', 'append_year', 'append_month', 'export']).agg(max('foreign_shipment_buyer_ind').alias('foreign_shipment_buyer_ind')).orderBy(['duns', 'append_year', 'append_month', 'export'])

test = spark.sql('select * from workarea.us_export_propensity_analytic_dataset_test')
test2 = test.join(shipping_final, (col('duns') == col('consignee_duns')) & (col('append_year') == col('consignee_append_year')) & (col('append_month') == col('consignee_append_month')), how = 'left')
test3 = test2.join(sic4_desc, col('shipper_sic4') == col('sic4'), how = 'left')
test3 = test3.drop('sic4')
test3 = test3.withColumnRenamed('sic4_description', 'shipper_sic4_description')
test4 = test3.join(sic4_desc, col('consignee_sic4') == col('sic4'), how = 'left')
test4 = test4.drop('sic4')
test4 = test4.withColumnRenamed('sic4_description', 'consignee_sic4_description')
test4 = test4.withColumn('foreign_shipment_buyer_ind', lit(None))
test4 = test4.withColumn('foreign_shipment_buyer_ind', when(~col('shipper_country_nme').isin(["US", "UNITED STATES"]), 1).otherwise(col('foreign_shipment_buyer_ind')))
test4 = test4.withColumn('foreign_shipment_buyer_ind', when(col('shipper_country_nme').isin(["US", "UNITED STATES"]), 0).otherwise(col('foreign_shipment_buyer_ind')))

test_grp = test4.groupBy(['duns', 'append_year', 'append_month', 'export', 'foreign_shipment_buyer_ind']).count().orderBy(['duns', 'append_year', 'append_month', 'export', 'foreign_shipment_buyer_ind'])
test_grp2 = test_grp.groupBy(['duns', 'append_year', 'append_month', 'export']).agg(max('foreign_shipment_buyer_ind').alias('foreign_shipment_buyer_ind')).orderBy(['duns', 'append_year', 'append_month', 'export'])

train_grp2.write.mode('overwrite').saveAsTable('workarea.us_export_propensity_analytic_dataset_train_importer_derived_attribute2')
val_grp2.write.mode('overwrite').saveAsTable('workarea.us_export_propensity_analytic_dataset_val_importer_derived_attribute2')
test_grp2.write.mode('overwrite').saveAsTable('workarea.us_export_propensity_analytic_dataset_test_importer_derived_attribute2')

trade_train=spark.sql('select * from workarea.us_export_propensity_analytic_dataset_train_importer_derived_attribute')
trade_val=spark.sql('select * from workarea.us_export_propensity_analytic_dataset_val_importer_derived_attribute')
shipment=spark.sql('select * from workarea.us_export_propensity_analytic_dataset_train_importer_derived_attribute2')

In [None]:
train_original = spark.sql('select * from workarea.us_export_propensity_analytic_dataset_train')
val_original = spark.sql('select * from workarea.us_export_propensity_analytic_dataset_val')
test_original = spark.sql('select * from workarea.us_export_propensity_analytic_dataset_test')
train_original2 = train_original.join(trade_train, on = ['duns', 'append_year', 'append_month', 'export'], how = 'left')
val_original2 = val_original.join(trade_val, on = ['duns', 'append_year', 'append_month', 'export'], how = 'left')
train_original3 = train_original2.toPandas()
val_original3 = val_original2.toPandas()
train_original4 = train_original3[~train_original3['foreign_trade_buyer_ind'].isna()]
val_original4 = val_original3[~val_original3['foreign_trade_buyer_ind'].isna()]
from sklearn.linear_model import LogisticRegression
clf = LogisticRegression(random_state=0).fit(train_original4[['foreign_trade_buyer_ind']], train_original4[['export']])
train_original4['predicted_export'] = clf.predict_proba(train_original4[['foreign_trade_buyer_ind']])[:,1]
val_original4['predicted_export'] = clf.predict_proba(val_original4[['foreign_trade_buyer_ind']])[:,1]
train_original4['weight'] = 1
val_original4['weight'] = 1

In [None]:
ks = metrics(None, None, None, None, None, None)
ks_report_train = ks.KS_train(train_original4['export'], train_original4['predicted_export'], train_original4['weight'], bins = 10)
ks_report_val = ks.KS_train(val_original4['export'], val_original4['predicted_export'], val_original4['weight'], bins = 10)
ks_report_train = formatKSReport(ks_report_train)
ks_report_val = formatKSReport(ks_report_val)

In [None]:
display(ks_report_train)

In [None]:
display(ks_report_val)