### 1. Start spark and hive sessions

In [1]:
import datetime
import os 
import pandas as pd
import tqdm
import numpy as np
import random
from pyspark.sql.functions import coalesce, collect_list, regexp_replace, udf #sum as sqlsum,
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

os.environ['PYSPARK_PYTHON'] = '/var/lib/cdsw-pushdown/anaconda36/envs/pyspark37/bin/python'

time_stamp = datetime.datetime.now().strftime("%d-%b-%Y_(%H:%M:%S)")
spark = SparkSession\
    .builder\
    .appName(f"APP_created__{time_stamp}")\
    .config('spark.executor.instances','4')\
    .config('spark.executor.cores', '4')\
    .config('spark.executor.memory', '12g')\
    .config('spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT',1)\
    .config('spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT',1)\
    .config('spark.yarn.executor.memoryOverhead', '10000')\
    .config('spark.driver.memory', '96g')\
    .enableHiveSupport()\
    .getOrCreate()
#    
###
#    .config('spark.driver.maxResultSize', '12g')\
#    .config('spark.rpc.message.maxSize', '512')\

In [2]:
from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()

### 2. Run sql queries to get raw data from primary source tables
- mastertable_hive contains transaction details
- csamapping_hive contains static client attributes like industry
- csa_mf_kpi_data_hive contains the info when which client bought what product
- all data is aggregated on monthly level in the format YYYYMM

In [3]:
mastertable_hive = hive.executeQuery("""
    SELECT mas.customer_gpkenn, mas.customer_kdnr, SUBSTRING (mas.cda_date_payment, 1, 6), 
    mas.currency AS currency, COUNT(mas.currency) AS currency_count, SUM(mas.amount_eur) as amount_eur
    FROM ez_013859_csap.csa_mastertable mas 
    WHERE mas.currency IS NOT NULL AND mas.cda_date_payment IS NOT NULL AND mas.customer_gpkenn IS NOT NULL  
    GROUP BY mas.customer_gpkenn, mas.customer_kdnr, SUBSTRING (mas.cda_date_payment, 1, 6), mas.currency
""")

In [4]:
csamapping_hive = hive.executeQuery("""
    SELECT global_party_id as customer_gpkenn, kdnr, gp_bonirati, gp_wzbran2_, gp_rckdgrp_
    FROM ez_013859_csap.CSA_MAPPING
""")

In [5]:
csa_mf_kpi_data_hive = hive.executeQuery(
"""
SELECT SUBSTRING(mf.as_of_date, 1, 6), mf.client_id, mf.rep_amount, md.product_desk_desc_de , md.product_desc_de
FROM ez_sb_cc.csa_mf_kpi_recommender mf
INNER JOIN ez_sb_cc.cc_md_product_013859_recommender md ON md.sub_product_id = mf.sub_product_id
WHERE mf.kpi_id = 'RECOGREMTD'
""")

In [6]:
taggerdata_hive = spark.read.orc(
    os.path.join("/data/ez_013859_csap/22000_SIGNAL_CHANGING_INDUSTRIES", "in_and_out_per_industry_and_period"))
taggerdata_hive = taggerdata_hive.select(regexp_replace('month_list', '\-+', '').alias('dateYYYYMM_tagger'),"node_identifier", "label", "amount")

In [7]:
deposits_hive = hive.executeQuery("""
    SELECT SUBSTRING(buchungsdatum, 1, 6), buchungsdatum, kontonummergeneralisiert, isowaehrungscode, kundennummer, valutasaldobetragoriginalwaehrung
    FROM ez_013859_csap.rmt_l3_kkkonten_angereichert 
    WHERE isowaehrungscode ='EUR' OR isowaehrungscode ='USD' OR isowaehrungscode ='GBP' OR isowaehrungscode ='CHF'
""")

In [8]:
csamapping_hive = hive.executeQuery("""
    SELECT global_party_id as customer_gpkenn, kdnr, gp_bonirati, gp_wzbran2_, gp_rckdgrp_
    FROM ez_013859_csap.CSA_MAPPING
""")

### Map product_desc to manually defined target_product
- move to pandas
- change dtypes and rename columns
- drop irrelevant products

In [9]:
csa_mf_kpi_data = csa_mf_kpi_data_hive.toPandas()

In [10]:
csa_mf_kpi_data = csa_mf_kpi_data.rename(columns={"_c0":"dateYYYYMM"})

In [11]:
csa_mf_kpi_data = csa_mf_kpi_data.astype({'dateYYYYMM': int,'client_id': int,'rep_amount': float,'dateYYYYMM': int,})

In [12]:
blacklisted_products = ["Import-Inkasso (Local)","VMM-DEPOT","Vermittlungsprovision"]
csa_mf_kpi_data = csa_mf_kpi_data[~csa_mf_kpi_data['product_desc_de'].isin(blacklisted_products)]

In [13]:
product_mapping_target = pd.read_csv('Produkte_Target_Mapping_complete.csv')
product_mapping_target_dict = product_mapping_target.set_index('product_desc_de').to_dict(orient='index')
product_mapping_target_dict[np.nan] = {'target_product': "exclude"} #nan vs None seems to be version dependent
product_mapping_target_dict[None] = {'target_product': "exclude"}

def map_products(product, product_mapping_target_dict):
        return product_mapping_target_dict[product]['target_product']
map_products_vectorized = np.vectorize(map_products)

csa_mf_kpi_data['target_product'] = \
map_products_vectorized(csa_mf_kpi_data['product_desc_de'],product_mapping_target_dict)

In [14]:
csa_mf_kpi_data = csa_mf_kpi_data[csa_mf_kpi_data["target_product"]!='exclude']
csa_mf_kpi_data['rep_amount'] = csa_mf_kpi_data['rep_amount'].astype(float)

### groupby pivot sum to get each product as column on gpkenn level
- move to spark
- map client id with gpkenn via csamapping
- drop client column
- do pivot
- export back to pandas

In [15]:
mfkpi_target_spark = spark.createDataFrame(csa_mf_kpi_data) 

In [16]:
mfkpi_target_spark = mfkpi_target_spark.join(csamapping_hive,\
    mfkpi_target_spark.client_id == csamapping_hive.kdnr,how='inner').drop("kdnr")

In [17]:
mfkpi_target_spark_pivot = mfkpi_target_spark\
    .groupBy("customer_gpkenn","dateYYYYMM").pivot("target_product").sum("rep_amount").fillna(0)

In [18]:
mfkpi_target = mfkpi_target_spark_pivot.toPandas()

In [19]:
mfkpi_target = mfkpi_target.astype({'customer_gpkenn': int})

In [20]:
mfkpi_target.info()
mfkpi_target

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2517843 entries, 0 to 2517842
Data columns (total 24 columns):
 #   Column                            Dtype  
---  ------                            -----  
 0   customer_gpkenn                   int64  
 1   dateYYYYMM                        int64  
 2   Asset Management                  float64
 3   Aval                              float64
 4   Betriebliche Altersvorsorge       float64
 5   Bond-Emissionen                   float64
 6   Bürgschaften und Garantien        float64
 7   Cash Pooling                      float64
 8   Commerz Real - Immobilienleasing  float64
 9   Commerz Real - Mobilienleasing    float64
 10  Export Dokumentengeschäft         float64
 11  Export- und Handelsfinanzierung   float64
 12  Forderungsmanagement              float64
 13  Geldmarktkredit                   float64
 14  Global Payment Plus               float64
 15  Import Dokumentengeschäft         float64
 16  KK-Kredit                         fl

Unnamed: 0,customer_gpkenn,dateYYYYMM,Asset Management,Aval,Betriebliche Altersvorsorge,Bond-Emissionen,Bürgschaften und Garantien,Cash Pooling,Commerz Real - Immobilienleasing,Commerz Real - Mobilienleasing,...,Global Payment Plus,Import Dokumentengeschäft,KK-Kredit,Kapitalanlagen,Rohstoffmanagement,Sichteinlagen,Termin-Einlage,Unternehmensfinanzierung,Währungsmanagement,Zinsmanagement
0,963175874181822,201910,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,-280.28,0.0,3829.91,503.21,0.0
1,1951385874144400,202010,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,99.0,0.0,0.00,0.0,0.00,0.00,0.0
2,756585874080214,202104,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,-4.50,0.0,0.00,0.00,0.0
3,8479968177094845,201901,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,-76.38,0.0,0.00,0.00,0.0
4,6834135874105039,202112,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,-244.65,0.0,5043.58,0.00,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2517838,1161485874222113,202106,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,-8.33,0.0,0.00,0.00,0.0
2517839,8322625874070053,202206,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,9.01,0.0,0.00,0.00,0.0
2517840,8242265874060829,202001,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,6.09,0.0,0.00,0.00,0.0
2517841,2858865874174239,202005,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.00,0.0,240.16,0.00,0.0


### Fill all missing dates in which no products were bought
- create entirely empty dataframe for all clients and dates
- join via combine_first()
- turn amount into binaries

In [21]:
all_dates = mfkpi_target["dateYYYYMM"].drop_duplicates().sort_values().tolist()
all_gpkenns = mfkpi_target["customer_gpkenn"].drop_duplicates().sort_values().tolist()
gpkenn_date_combinations = [(f"{gpkenn}_{date}") for gpkenn in all_gpkenns for date in all_dates] 
empty_target = pd.DataFrame(None,index=gpkenn_date_combinations,columns=list(mfkpi_target.columns)[2:]).reset_index()
empty_target[['customer_gpkenn','dateYYYYMM']] = empty_target['index'].str.split('_',1,expand=True).astype(int)
empty_target = empty_target[['customer_gpkenn','dateYYYYMM',*mfkpi_target.columns[2:]]]

In [22]:
empty_target = empty_target.set_index(['customer_gpkenn','dateYYYYMM'])
mfkpi_target = mfkpi_target.set_index(['customer_gpkenn','dateYYYYMM'])
mfpki_fulltarget = empty_target.combine_first(mfkpi_target).reset_index()
empty_target = empty_target.reset_index()
mfkpi_target = mfkpi_target.reset_index()
mfkpi_fulltarget = mfpki_fulltarget.fillna(0)
mfkpi_fulltarget = mfkpi_fulltarget[[*mfkpi_target.columns[2:]]].astype(bool).astype(int)
mfkpi_fulltarget = pd.concat([empty_target[['customer_gpkenn','dateYYYYMM']],mfkpi_fulltarget],axis='columns')
mfkpi_fulltarget.info()
mfkpi_fulltarget

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2912280 entries, 0 to 2912279
Data columns (total 24 columns):
 #   Column                            Dtype
---  ------                            -----
 0   customer_gpkenn                   int64
 1   dateYYYYMM                        int64
 2   Asset Management                  int64
 3   Aval                              int64
 4   Betriebliche Altersvorsorge       int64
 5   Bond-Emissionen                   int64
 6   Bürgschaften und Garantien        int64
 7   Cash Pooling                      int64
 8   Commerz Real - Immobilienleasing  int64
 9   Commerz Real - Mobilienleasing    int64
 10  Export Dokumentengeschäft         int64
 11  Export- und Handelsfinanzierung   int64
 12  Forderungsmanagement              int64
 13  Geldmarktkredit                   int64
 14  Global Payment Plus               int64
 15  Import Dokumentengeschäft         int64
 16  KK-Kredit                         int64
 17  Kapitalanlagen             

Unnamed: 0,customer_gpkenn,dateYYYYMM,Asset Management,Aval,Betriebliche Altersvorsorge,Bond-Emissionen,Bürgschaften und Garantien,Cash Pooling,Commerz Real - Immobilienleasing,Commerz Real - Mobilienleasing,...,Global Payment Plus,Import Dokumentengeschäft,KK-Kredit,Kapitalanlagen,Rohstoffmanagement,Sichteinlagen,Termin-Einlage,Unternehmensfinanzierung,Währungsmanagement,Zinsmanagement
0,680371091945,201901,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,680371091945,201902,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,680371091945,201903,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,680371091945,201904,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,680371091945,201905,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2912275,9999926475141309,202202,0,0,0,0,0,0,0,0,...,0,0,0,0,0,1,1,0,0,0
2912276,9999926475141309,202203,0,0,0,0,0,0,0,0,...,0,0,0,0,0,1,1,0,0,0
2912277,9999926475141309,202204,0,0,0,0,0,0,0,0,...,0,0,0,0,0,1,1,0,0,0
2912278,9999926475141309,202205,0,0,0,0,0,0,0,0,...,0,0,0,0,0,1,1,0,0,0


### Get all features into same gpkenn, dateYYYYMM format

In [23]:
gpkenn_date_columns = empty_target[['customer_gpkenn','dateYYYYMM']]

In [24]:
csamapping = csamapping_hive.toPandas()
csamapping = csamapping.astype({'customer_gpkenn': int})
csamapping = csamapping.drop('kdnr',axis="columns").drop_duplicates()
csamapping_formatted = gpkenn_date_columns.merge(csamapping, how='left', on="customer_gpkenn")
csamapping_formatted.info()
csamapping_formatted

<class 'pandas.core.frame.DataFrame'>
Int64Index: 2912280 entries, 0 to 2912279
Data columns (total 5 columns):
 #   Column           Dtype 
---  ------           ----- 
 0   customer_gpkenn  int64 
 1   dateYYYYMM       int64 
 2   gp_bonirati      object
 3   gp_wzbran2_      object
 4   gp_rckdgrp_      object
dtypes: int64(2), object(3)
memory usage: 133.3+ MB


Unnamed: 0,customer_gpkenn,dateYYYYMM,gp_bonirati,gp_wzbran2_,gp_rckdgrp_
0,680371091945,201901,,"Sonstige freiberufliche, wissenschaftliche und...",Corporates Inland (ohne Umsatzangabe)
1,680371091945,201902,,"Sonstige freiberufliche, wissenschaftliche und...",Corporates Inland (ohne Umsatzangabe)
2,680371091945,201903,,"Sonstige freiberufliche, wissenschaftliche und...",Corporates Inland (ohne Umsatzangabe)
3,680371091945,201904,,"Sonstige freiberufliche, wissenschaftliche und...",Corporates Inland (ohne Umsatzangabe)
4,680371091945,201905,,"Sonstige freiberufliche, wissenschaftliche und...",Corporates Inland (ohne Umsatzangabe)
...,...,...,...,...,...
2912275,9999926475141309,202202,,Mit Finanz- und Versicherungsdienstleistungen ...,Finanzierungsgesellschaften: Leasing- / Beteil...
2912276,9999926475141309,202203,,Mit Finanz- und Versicherungsdienstleistungen ...,Finanzierungsgesellschaften: Leasing- / Beteil...
2912277,9999926475141309,202204,,Mit Finanz- und Versicherungsdienstleistungen ...,Finanzierungsgesellschaften: Leasing- / Beteil...
2912278,9999926475141309,202205,,Mit Finanz- und Versicherungsdienstleistungen ...,Finanzierungsgesellschaften: Leasing- / Beteil...


In [25]:
mastertable_hive = mastertable_hive.withColumnRenamed("_c2","dateYYYYMM")
mastertable_transactionsums_by_currency_hive = mastertable_hive\
    .select("customer_gpkenn","dateYYYYMM","currency","amount_eur").dropna()\
    .groupBy("customer_gpkenn","dateYYYYMM").pivot("currency").sum("amount_eur").fillna(0)
mastertable_transactionsums_by_currency_hive = mastertable_transactionsums_by_currency_hive.selectExpr(
    "customer_gpkenn","dateYYYYMM","EUR","USD","GBP", 
    "AED + AUD + BHD + CAD + CHF + CNH + CNY + CZK + DKK + EUR + HKD + HRK + HUF + ISK + JPY + MAD + MXN + NOK + NZD + PLN + QAR + RON + RUB + SAR + SEK + SGD + THB + TND + TRY + USD + ZAR as Other_Currencies")
mastertable_transactionsums_by_currency = mastertable_transactionsums_by_currency_hive.toPandas()
mastertable_transactionsums_by_currency = mastertable_transactionsums_by_currency.astype(\
    {'customer_gpkenn': int,'dateYYYYMM': int,'EUR': float,'USD': float,'GBP': float,'Other_Currencies': float})
empty_mastertable_transactionsums_by_currency = gpkenn_date_columns.copy()
cur_columns = ["EUR","USD","GBP", "Other_Currencies"]
empty_mastertable_transactionsums_by_currency[cur_columns] = None
mastertable_transactionsums_by_currency = mastertable_transactionsums_by_currency\
    [mastertable_transactionsums_by_currency['dateYYYYMM'].isin(all_dates)]
mastertable_transactionsums_by_currency = mastertable_transactionsums_by_currency\
    [mastertable_transactionsums_by_currency['customer_gpkenn'].isin(all_gpkenns)]
empty_mastertable_transactionsums_by_currency = \
    empty_mastertable_transactionsums_by_currency.set_index(['customer_gpkenn','dateYYYYMM'])
mastertable_transactionsums_by_currency = \
    mastertable_transactionsums_by_currency.set_index(['customer_gpkenn','dateYYYYMM'])
mastertable_transactionsums_by_currency_formatted = \
    empty_mastertable_transactionsums_by_currency.combine_first(mastertable_transactionsums_by_currency).reset_index()
mastertable_transactionsums_by_currency_formatted = \
    mastertable_transactionsums_by_currency_formatted.fillna(0)
mastertable_transactionsums_by_currency_formatted.info()
mastertable_transactionsums_by_currency_formatted

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2912280 entries, 0 to 2912279
Data columns (total 6 columns):
 #   Column            Dtype  
---  ------            -----  
 0   customer_gpkenn   int64  
 1   dateYYYYMM        int64  
 2   EUR               float64
 3   USD               float64
 4   GBP               float64
 5   Other_Currencies  float64
dtypes: float64(4), int64(2)
memory usage: 133.3 MB


Unnamed: 0,customer_gpkenn,dateYYYYMM,EUR,USD,GBP,Other_Currencies
0,680371091945,201901,0.0,0.0,0.0,0.0
1,680371091945,201902,0.0,0.0,0.0,0.0
2,680371091945,201903,0.0,0.0,0.0,0.0
3,680371091945,201904,0.0,0.0,0.0,0.0
4,680371091945,201905,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...
2912275,9999926475141309,202202,0.0,0.0,0.0,0.0
2912276,9999926475141309,202203,0.0,0.0,0.0,0.0
2912277,9999926475141309,202204,0.0,0.0,0.0,0.0
2912278,9999926475141309,202205,0.0,0.0,0.0,0.0


In [26]:
scaled_market_features = pd.read_csv('all_scaled_market_features.csv',index_col=0)
scaled_market_features = scaled_market_features.reset_index(drop=False).rename(columns={"index":"dateYYYYMM"})
scaled_market_features_formatted = gpkenn_date_columns.merge(scaled_market_features, how='left', on="dateYYYYMM")
scaled_market_features_formatted.info()
scaled_market_features_formatted

<class 'pandas.core.frame.DataFrame'>
Int64Index: 2912280 entries, 0 to 2912279
Data columns (total 11 columns):
 #   Column           Dtype  
---  ------           -----  
 0   customer_gpkenn  int64  
 1   dateYYYYMM       int64  
 2   DAX              float64
 3   EUR_USD          float64
 4   EURIBOR_ON       float64
 5   EURIBOR_3M       float64
 6   EURIBOR_12M      float64
 7   LIBOR_USD_ON     float64
 8   LIBOR_USD_3M     float64
 9   LIBOR_USD_12M    float64
 10  Oil              float64
dtypes: float64(9), int64(2)
memory usage: 266.6 MB


Unnamed: 0,customer_gpkenn,dateYYYYMM,DAX,EUR_USD,EURIBOR_ON,EURIBOR_3M,EURIBOR_12M,LIBOR_USD_ON,LIBOR_USD_3M,LIBOR_USD_12M,Oil
0,680371091945,201901,0.265591,0.671255,0.169355,0.134328,0.124204,0.963255,0.754515,0.612560,0.354360
1,680371091945,201902,0.265591,0.627530,0.169355,0.134328,0.127389,0.956465,0.722571,0.576453,0.390681
2,680371091945,201903,0.267272,0.568016,0.169355,0.134328,0.127389,0.962492,0.719039,0.565013,0.423237
3,680371091945,201904,0.404774,0.548583,0.169355,0.134328,0.127389,0.941013,0.705534,0.549856,0.481331
4,680371091945,201905,0.301059,0.526721,0.169355,0.134328,0.121019,0.945668,0.697834,0.538505,0.450504
...,...,...,...,...,...,...,...,...,...,...,...
2912275,9999926475141309,202202,0.760632,0.546964,0.000000,0.024876,0.054140,0.218109,0.273941,0.386687,0.763964
2912276,9999926475141309,202203,0.752900,0.539271,0.000000,0.039801,0.085987,0.262293,0.363525,0.495345,0.935497
2912277,9999926475141309,202204,0.699613,0.289879,0.000000,0.064677,0.165605,0.280992,0.414135,0.569186,0.867128
2912278,9999926475141309,202205,0.748361,0.382186,0.000000,0.094527,0.254777,0.401688,0.481398,0.620018,0.946180


In [27]:
deposits_hive = deposits_hive\
    .selectExpr("cast(kundennummer as string) kundennummer","_c0 as dateYYYYMM","valutasaldobetragoriginalwaehrung")
deposits_hive = deposits_hive.join(csamapping_hive,\
    deposits_hive.kundennummer == csamapping_hive.kdnr,how='inner')\
    .drop("kundennummer","kdnr","gp_bonirati", "gp_wzbran2_", "gp_rckdgrp_") 
deposits_hive = deposits_hive.groupBy("dateYYYYMM","customer_gpkenn").avg("valutasaldobetragoriginalwaehrung")
deposits_hive = deposits_hive\
    .withColumnRenamed("avg(valutasaldobetragoriginalwaehrung)","Valutasaldo_avg")
deposits = deposits_hive.toPandas()

In [28]:
deposits = deposits.drop_duplicates()
deposits = deposits.astype({'customer_gpkenn': int,'dateYYYYMM': int,'Valutasaldo_avg': float})
deposits = deposits[deposits['dateYYYYMM'].isin(all_dates)]
deposits = deposits[deposits['customer_gpkenn'].isin(all_gpkenns)]
deposits = deposits.reset_index(drop=True)
empty_deposits = gpkenn_date_columns.copy()
deposits_columns = ["Valutasaldo_avg"]
empty_deposits[deposits_columns] = None
empty_deposits = empty_deposits.set_index(['customer_gpkenn','dateYYYYMM'])
deposits = deposits.set_index(['customer_gpkenn','dateYYYYMM'])
deposits_formatted = empty_deposits.combine_first(deposits).reset_index()
deposits_formatted = deposits_formatted.fillna(0)
deposits_formatted['Valutasaldo_avg'] = deposits_formatted['Valutasaldo_avg'].astype(float)
deposits_formatted.info()
deposits_formatted

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2912280 entries, 0 to 2912279
Data columns (total 3 columns):
 #   Column           Dtype  
---  ------           -----  
 0   customer_gpkenn  int64  
 1   dateYYYYMM       int64  
 2   Valutasaldo_avg  float64
dtypes: float64(1), int64(2)
memory usage: 66.7 MB


Unnamed: 0,customer_gpkenn,dateYYYYMM,Valutasaldo_avg
0,680371091945,201901,0.000000
1,680371091945,201902,0.000000
2,680371091945,201903,0.000000
3,680371091945,201904,0.000000
4,680371091945,201905,0.000000
...,...,...,...
2912275,9999926475141309,202202,63865.888000
2912276,9999926475141309,202203,18336.024565
2912277,9999926475141309,202204,3834.458947
2912278,9999926475141309,202205,3625.863409


In [29]:
taggerdata = taggerdata_hive.toPandas()
taggerdata = taggerdata.dropna()
taggerdata = taggerdata.astype({'node_identifier': int,'dateYYYYMM_tagger': int,'amount':float})
def map_tagger_label_to_descriptive_name(label:str) ->str:
    label_mapping = {"-1":"Untagged","0":"Landwirtschaft","1":"Forstwirtschaft","10":"Metallerzeugung und -bearbeitung","11":"Herst. von elektrischen Ausruestungen","12":"Maschinenbau","13":"Herst. von Kraftwagen","14":"Moebel","15":"Wasserversorgung und Abwasserentsorgung","16":"Abfallentsorgung und Rueckgewinnung","17":"Bauindustrie und Handwerk","18":"Handel mit Kraftfahrzeugen","19":"Gesundheitswesen","2":"Bergbau","20":"Personen- und Gueterbefoerderung / Lagerei","21":"Beherbergung und Gastronomie","22":"Unterhaltungsmedien wie Film","23":"Beratungsunternehmen","24":"Mit Finanz- und VersicherungsDienstl. verbundene Taetigkeiten","25":"Versicherungen","26":"Grundstuecks- und Wohnungswesen","27":"Werbung und Marktforschung","28":"Vermittlung und Ueberlassung von Arbeitskraeften","29":"Reisebueros und Reiseveranstalter","3":"Energieversorgung","30":"Gebaeudebetreuung","31":"Oeffentliche Verwaltung","32":"Erziehung und Unterricht","33":"Sozialwesen","4":"Textilien und Bekleidung","5":"Herst. von Papier","6":"Verlagswesen","7":"Chemische Erzeugnisse","8":"Pharmazeutische Erzeugnisse","9":"Glas und Glaswaren"}    
    return label_mapping[str(label)]
map_tagger_label_to_descriptive_name_vectorized = np.vectorize(map_tagger_label_to_descriptive_name)
taggerdata['label_desc'] = map_tagger_label_to_descriptive_name_vectorized(taggerdata['label'])
taggerdata = taggerdata.drop("label",axis=1)
taggerdata = taggerdata.rename(columns={"dateYYYYMM_tagger":"dateYYYYMM","node_identifier":"customer_gpkenn"})
taggerdata = taggerdata[taggerdata['dateYYYYMM'].isin(all_dates)]
taggerdata = taggerdata[taggerdata['customer_gpkenn'].isin(all_gpkenns)]
taggerdata_pivot = pd.pivot_table(taggerdata,index=['customer_gpkenn','dateYYYYMM'], columns='label_desc', values='amount',aggfunc=sum,fill_value=0).reset_index()
empty_tagger = gpkenn_date_columns.copy()
tagger_columns = taggerdata_pivot.columns[2:]
empty_tagger[tagger_columns] = None
empty_tagger = empty_tagger.set_index(['customer_gpkenn','dateYYYYMM'])
taggerdata_pivot = taggerdata_pivot.set_index(['customer_gpkenn','dateYYYYMM'])
taggerdata_pivot = taggerdata_pivot.astype(float)
tagger_formatted = empty_tagger.combine_first(taggerdata_pivot).reset_index()
tagger_formatted = tagger_formatted.fillna(0)
tagger_formatted.info()
tagger_formatted

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2912280 entries, 0 to 2912279
Data columns (total 37 columns):
 #   Column                                                         Dtype  
---  ------                                                         -----  
 0   customer_gpkenn                                                int64  
 1   dateYYYYMM                                                     int64  
 2   Abfallentsorgung und Rueckgewinnung                            float64
 3   Bauindustrie und Handwerk                                      float64
 4   Beherbergung und Gastronomie                                   float64
 5   Beratungsunternehmen                                           float64
 6   Bergbau                                                        float64
 7   Chemische Erzeugnisse                                          float64
 8   Energieversorgung                                              float64
 9   Erziehung und Unterricht                      

Unnamed: 0,customer_gpkenn,dateYYYYMM,Abfallentsorgung und Rueckgewinnung,Bauindustrie und Handwerk,Beherbergung und Gastronomie,Beratungsunternehmen,Bergbau,Chemische Erzeugnisse,Energieversorgung,Erziehung und Unterricht,...,Reisebueros und Reiseveranstalter,Sozialwesen,Textilien und Bekleidung,Untagged,Unterhaltungsmedien wie Film,Verlagswesen,Vermittlung und Ueberlassung von Arbeitskraeften,Versicherungen,Wasserversorgung und Abwasserentsorgung,Werbung und Marktforschung
0,680371091945,201901,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,680371091945,201902,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,680371091945,201903,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,680371091945,201904,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,680371091945,201905,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2912275,9999926475141309,202202,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2912276,9999926475141309,202203,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2912277,9999926475141309,202204,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2912278,9999926475141309,202205,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


### Merge All Features

In [30]:
for df in [csamapping_formatted,mastertable_transactionsums_by_currency_formatted,scaled_market_features_formatted,\
    deposits_formatted, tagger_formatted]:
    assert df[['customer_gpkenn', 'dateYYYYMM']].equals(mfkpi_fulltarget[['customer_gpkenn', 'dateYYYYMM']])
print("All frames have matching ids and dates.")

All frames have matching ids and dates.


In [31]:
raw_data = pd.concat([mfkpi_fulltarget,\
csamapping_formatted[csamapping_formatted.columns[2:]],\
scaled_market_features_formatted[scaled_market_features_formatted.columns[2:]],\
mastertable_transactionsums_by_currency_formatted[mastertable_transactionsums_by_currency_formatted.columns[2:]],\
tagger_formatted[tagger_formatted.columns[2:]],\
deposits_formatted[deposits_formatted.columns[2:]]\
], axis=1)
raw_data.info()
raw_data

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2912280 entries, 0 to 2912279
Data columns (total 76 columns):
 #   Column                                                         Dtype  
---  ------                                                         -----  
 0   customer_gpkenn                                                int64  
 1   dateYYYYMM                                                     int64  
 2   Asset Management                                               int64  
 3   Aval                                                           int64  
 4   Betriebliche Altersvorsorge                                    int64  
 5   Bond-Emissionen                                                int64  
 6   Bürgschaften und Garantien                                     int64  
 7   Cash Pooling                                                   int64  
 8   Commerz Real - Immobilienleasing                               int64  
 9   Commerz Real - Mobilienleasing                

Unnamed: 0,customer_gpkenn,dateYYYYMM,Asset Management,Aval,Betriebliche Altersvorsorge,Bond-Emissionen,Bürgschaften und Garantien,Cash Pooling,Commerz Real - Immobilienleasing,Commerz Real - Mobilienleasing,...,Sozialwesen,Textilien und Bekleidung,Untagged,Unterhaltungsmedien wie Film,Verlagswesen,Vermittlung und Ueberlassung von Arbeitskraeften,Versicherungen,Wasserversorgung und Abwasserentsorgung,Werbung und Marktforschung,Valutasaldo_avg
0,680371091945,201901,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000000
1,680371091945,201902,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000000
2,680371091945,201903,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000000
3,680371091945,201904,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000000
4,680371091945,201905,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2912275,9999926475141309,202202,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,63865.888000
2912276,9999926475141309,202203,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,18336.024565
2912277,9999926475141309,202204,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3834.458947
2912278,9999926475141309,202205,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3625.863409


### Add target_delta_sums per client
- See how many newm products each client bought in the last 18 months of the dataset
- Use the new column "targets_delta_sum" to balance dataset

In [32]:
def get_delta_products_sum_for_target_dates(one_client:pd.DataFrame,all_target_dates = [202101, 202102, 202103, 202104, 202105, 202106, 202107, 202108, 202109, 202110, 202111, 202112, 202201, 202202, 202203, 202204, 202205, 202206],all_products = [
        'Asset Management', 'Aval', 'Betriebliche Altersvorsorge',
        'Bond-Emissionen', 'Bürgschaften und Garantien', 'Cash Pooling',
        'Commerz Real - Immobilienleasing', 'Commerz Real - Mobilienleasing',
        'Export Dokumentengeschäft', 'Export- und Handelsfinanzierung',
        'Forderungsmanagement', 'Geldmarktkredit', 'Global Payment Plus',
        'Import Dokumentengeschäft', 'KK-Kredit', 'Kapitalanlagen',
        'Rohstoffmanagement', 'Sichteinlagen', 'Termin-Einlage',
        'Unternehmensfinanzierung', 'Währungsmanagement', 'Zinsmanagement'
    ]) ->int:
    one_client[all_products] = one_client[all_products].diff().clip(0).fillna(0).astype(int)
    one_client = one_client[one_client['dateYYYYMM'].isin(all_target_dates)]
    return one_client[all_products].sum().sum()

def create_client_targets_delta_sums_column(products_and_features_table:pd.DataFrame,months_in_dataset=42):
    products_and_features_table_copy = products_and_features_table.copy().set_index('customer_gpkenn')
    client_targets_delta_sums = []
    pd.options.mode.chained_assignment = None
    for gpkenn in tqdm.tqdm(products_and_features_table.customer_gpkenn.unique()):
        client_targets_delta_sums.append(months_in_dataset*[get_delta_products_sum_for_target_dates(products_and_features_table_copy.loc[gpkenn])])
    client_targets_delta_sums_col = [item for sublist in client_targets_delta_sums for item in sublist]
    return client_targets_delta_sums_col

raw_data['targets_delta_sum'] = create_client_targets_delta_sums_column(raw_data)
raw_data.info()
raw_data

100%|████████████████████████████████████| 69340/69340 [07:56<00:00, 145.47it/s]


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2912280 entries, 0 to 2912279
Data columns (total 77 columns):
 #   Column                                                         Dtype  
---  ------                                                         -----  
 0   customer_gpkenn                                                int64  
 1   dateYYYYMM                                                     int64  
 2   Asset Management                                               int64  
 3   Aval                                                           int64  
 4   Betriebliche Altersvorsorge                                    int64  
 5   Bond-Emissionen                                                int64  
 6   Bürgschaften und Garantien                                     int64  
 7   Cash Pooling                                                   int64  
 8   Commerz Real - Immobilienleasing                               int64  
 9   Commerz Real - Mobilienleasing                

Unnamed: 0,customer_gpkenn,dateYYYYMM,Asset Management,Aval,Betriebliche Altersvorsorge,Bond-Emissionen,Bürgschaften und Garantien,Cash Pooling,Commerz Real - Immobilienleasing,Commerz Real - Mobilienleasing,...,Textilien und Bekleidung,Untagged,Unterhaltungsmedien wie Film,Verlagswesen,Vermittlung und Ueberlassung von Arbeitskraeften,Versicherungen,Wasserversorgung und Abwasserentsorgung,Werbung und Marktforschung,Valutasaldo_avg,targets_delta_sum
0,680371091945,201901,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000000,1
1,680371091945,201902,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000000,1
2,680371091945,201903,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000000,1
3,680371091945,201904,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000000,1
4,680371091945,201905,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000000,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2912275,9999926475141309,202202,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,63865.888000,0
2912276,9999926475141309,202203,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,18336.024565,0
2912277,9999926475141309,202204,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3834.458947,0
2912278,9999926475141309,202205,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3625.863409,0


### Exclude idle clients via targets_delta_sum

In [11]:
all_idle_client_ids = list(raw_data[raw_data['targets_delta_sum'] == 0].customer_gpkenn.unique())
random.shuffle(all_idle_client_ids)
raw_data = raw_data[~raw_data["customer_gpkenn"].isin(all_idle_client_ids[10000:])].reset_index(drop=True)
raw_data

Unnamed: 0,customer_gpkenn,dateYYYYMM,Asset Management,Aval,Betriebliche Altersvorsorge,Bond-Emissionen,Bürgschaften und Garantien,Cash Pooling,Commerz Real - Immobilienleasing,Commerz Real - Mobilienleasing,...,Textilien und Bekleidung,Untagged,Unterhaltungsmedien wie Film,Verlagswesen,Vermittlung und Ueberlassung von Arbeitskraeften,Versicherungen,Wasserversorgung und Abwasserentsorgung,Werbung und Marktforschung,Valutasaldo_avg,targets_delta_sum
0,680371091945,201901,0,0,0,0,0,0,0,0,...,0.0,0.00,0.0,0.0,0.0,0.0,0.0,0.0,0.000000e+00,1
1,680371091945,201902,0,0,0,0,0,0,0,0,...,0.0,0.00,0.0,0.0,0.0,0.0,0.0,0.0,0.000000e+00,1
2,680371091945,201903,0,0,0,0,0,0,0,0,...,0.0,0.00,0.0,0.0,0.0,0.0,0.0,0.0,0.000000e+00,1
3,680371091945,201904,0,0,0,0,0,0,0,0,...,0.0,0.00,0.0,0.0,0.0,0.0,0.0,0.0,0.000000e+00,1
4,680371091945,201905,0,0,0,0,0,0,0,0,...,0.0,0.00,0.0,0.0,0.0,0.0,0.0,0.0,0.000000e+00,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2912233,9998578770093910,202202,0,0,0,0,0,0,0,0,...,0.0,-172648.07,0.0,0.0,0.0,0.0,0.0,0.0,6.919436e+05,1
2912234,9998578770093910,202203,0,0,0,0,0,0,0,0,...,0.0,-53892.40,0.0,0.0,0.0,0.0,0.0,0.0,5.283468e+05,1
2912235,9998578770093910,202204,0,0,0,0,0,0,0,0,...,0.0,0.00,0.0,0.0,0.0,0.0,0.0,0.0,1.177092e+06,1
2912236,9998578770093910,202205,0,0,0,0,0,0,0,0,...,0.0,-231407.62,0.0,0.0,0.0,0.0,0.0,0.0,8.213924e+05,1


### Export

In [5]:
raw_data.to_csv("raw_data_1728678rows_x_77columns.csv",index=False,encoding="utf-8-sig")
raw_data.to_pickle("raw_data_1728678rows_x_77columns.pkl")