# ALLOCATORE ECL (rcp_allocatore_ecl)

### 1. Scopo e Ambito 

Il processo si inserisce all'interno del processo di calcolo di proiezione IFRS9 e si occupa di riallocare le misure di rischio calcolate nel motore di proiezione ECL sulle tranche di dettaglio. Infatti, il calcolo eseguito su cluster di rischio omogenei all'interno delle proiezioni ECL, se da una parte permette di ridurre il carico computazionale ( particolarmente rilevante considerata la dimensione del portafoglio e la metodologia di applicazione delle matrici di transizione), dall'altra rende indisponibile una serie di dimensioni e di informazioni di reportistica di dettaglio che potrebbero essere necessarie ai fini di un'analisi su particolari segmenti del portafoglio.
Di eseguito riportiamo lo schema dei motori di Proiezioni IFRS9 all'interno del quale l'allocatore ECL si va a inserire.
Per una descrizione degli altri componenti si rimanda alla specifica documentazione.

![Proiezioni IFRS9](asset/ifrs9.png "Proiezioni IFRS9")

### 2. Input e Fonti Alimentanti

ciao


### 3. Background e Technicalities

Poichè l'applicativo prevede il processamento di un numero elevato di dati che rendono difficile l'implementazione attraverso tecniche "standard" di programmazione, si è previsto l'utilizzo di una tecnologia presente all'interno della Google Cloud Platform ovvero Apache Spark ( eseguito su un servizio Google chiamato Dataproc).
Spark è una piattaforma per il calcolo distribuito ad elevata scalabilità che permette di suddividere i dati e i calcoli su molti  cluster ognuno dei quali contiene dei nodi rendendo molto semplice il processimento di dataset molto grandi, perchè ogni nodo (assimibilabile a un'unità computazionale) si limita a processare una piccola parte del database.


In [1]:
from pyspark.sql.session import SparkSession
from pyspark.sql.session import SparkSession
from pyspark.sql import dataframe
from pyspark.sql.functions import first, col, when, lit, coalesce, broadcast, format_string, round
from time import time
import ipywidgets as widgets
from IPython.display import display
layout = widgets.Layout( height='40px', width = '300px') #set width and height

In [2]:


# Spark session import and initialization
 #   .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.22.2.jar')\
  #  .config('spark.submit.pyFiles', 'gs://bkt-isp-ddl00-appl-svil-001/spark-bigquery-support-0.30.1.zip')\
#.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar') 
spark = SparkSession.builder \
    .appName('ECL Bonis') \
    .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar')\
    .getOrCreate()

spark.conf.set("viewsEnabled", "true")
spark.conf.set("materializationDataset", "PocECLDataset")
spark.conf.set("materializationDataset", "ds_proiezione_credito")
spark.conf.set("materializationDataset", "ds_proiezioni_ecl_bonis_staging")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Il primo passo prevede quindi l'importazione delle librerie pyspark (l'interfaccia Python per l'utilizzo di Spark) e l'inizializzazione di una sessione Spark che permette di accedere ai servizi di calcolo distribuito.

### 4. Setting e Parametri di Calcolo

Di seguito l'inizializzazione dei parametri di setting (progetto, dataset, tabelle) necessari al calcolo, che nel processo in produzione saranno inseriti in specifico file di configurazione

In [3]:
# Settaggio dei parametri relativi al progetto Cloud e ai Dataset Bigquery
project_id = "prj-isp-ddl00-appl-svil-001"
dataset = "ds_proiezione_credito"
dataset_feeding = "ds_feeding_proiezione_credito"
dataset_staging = "ds_proiezioni_ecl_bonis_staging"
dataset_ecl ="PocECLDataset"
dataset_staging2 = "ds_proiezioni_sas"

In [4]:
# nome tabelle di input
ifrs9_join_rwa = "TE_BDRI0_IFRS9_JOIN_RWA_MIPBRM"
etl_mip_brm = "TE_BDRI0_ETL_MIP_BRM_RRM"
ecl_out = "TE_IFRS9_PRO_ECL_OUT"
ecl_out_mig = "TE_IFRS9_PRO_ECL_OUT_MIG"
ecl_scarti  = "TE_IFRS9_PRO_ECL_SCARTI"
info_addizionali_t = "TE_IFRS9_INFO_ADDIZIONALI"

Di seguito i parametri di lancio del motore, tali parametri saranno inputati dall'utente tramite interfaccia grafica messa a disposizione.

In [5]:
#Crea l'interfaccia per l'acquisizione delle versioni e dei parametri

def create_int_widget(description, default_value):
    w = widgets.IntText(
    value= default_value,
    description=description,
    disabled=False,
    align_items='stretch', 
    style = {'description_width': 'initial'},
    layout = layout)
    return w

title_w = widgets.HTML(value = "<b>Selezione versioni da interfaccia:</b>")
etl_mip_brm_w = create_int_widget('etl_mip_brm_version:', 2)
ecl_out_w = create_int_widget('ecl_out_version:', 26)
info_addizionali_w = create_int_widget('info_addizionali_version', 0)
anni_proiez_w = create_int_widget('anni_proiez:', 3)
display(title_w)
display(etl_mip_brm_w)
display(ecl_out_w)
display(info_addizionali_w)
display(anni_proiez_w)

HTML(value='<b>Selezione versioni da interfaccia:</b>')

IntText(value=2, description='etl_mip_brm_version:', layout=Layout(height='40px', width='300px'), style=Descri…

IntText(value=26, description='ecl_out_version:', layout=Layout(height='40px', width='300px'), style=Descripti…

IntText(value=0, description='info_addizionali_version', layout=Layout(height='40px', width='300px'), style=De…

IntText(value=3, description='anni_proiez:', layout=Layout(height='40px', width='300px'), style=DescriptionSty…

In [6]:
#legge parametri interfaccia
etl_mip_brm_version = etl_mip_brm_w.value
ecl_out_version = ecl_out_w.value
info_addizionali_version = info_addizionali_w.value
anni_proiez = anni_proiez_w.value

A fini di sviluppo sono stati utilizzati dati reali relativi a settembre 2021, il primo passaggio di calcolo prevederà dunque il filtro delle versioni alla data di riferimento, nonchè la gestione di valori NULL che potrebbero creare dei problemi all'interno dei processi di calcolo. La gestione dei NULL non è prevista all'interno del notebook, ma è implementata nel codice

In [7]:
CHIAVE_INFO_ADDIZIONALI = "Tranche"
INFO_ADDIZIONALI = 1

In [8]:
#download tabella etl_mip_brm
q = f"select * from `{project_id}.{dataset_feeding}.{etl_mip_brm}` where ID_VERSIONE = {etl_mip_brm_version}"
print(q)
df_etl_mip_brm = spark.read.format("bigquery").load(q)
df_etl_mip_brm.count()

select * from `prj-isp-ddl00-appl-svil-001.ds_feeding_proiezione_credito.TE_BDRI0_ETL_MIP_BRM_RRM` where ID_VERSIONE = 2


                                                                                

11450960

In [9]:
#download_tabella join_rwa
q = f"select * from `{project_id}.{dataset_feeding}.{ifrs9_join_rwa}` where ID_VERSIONE = {etl_mip_brm_version}"
print(q)
df_join_rwa = spark.read.format("bigquery").load(q)
df_join_rwa.count()

select * from `prj-isp-ddl00-appl-svil-001.ds_feeding_proiezione_credito.TE_BDRI0_IFRS9_JOIN_RWA_MIPBRM` where ID_VERSIONE = 2


11450960

In [10]:
#download ECL_OUT
q = f"select * from `{project_id}.{dataset}.{ecl_out}` where ID_VERSIONE = {ecl_out_version}"

df_ecl_out = spark.read.format("bigquery").load(q)
df_ecl_out.count()

1752484

In [11]:
#download ECL_OUT_MIG
q = f"select * from `{project_id}.{dataset}.{ecl_out_mig}` where ID_VERSIONE = {ecl_out_version}"

df_ecl_out_mig = spark.read.format("bigquery").load(q)
df_ecl_out_mig.count()

1752484

In [12]:
#download ECL_SCARTI
q = f"select * from `{project_id}.{dataset}.{ecl_scarti}` where ID_VERSIONE = {ecl_out_version}"
df_ecl_scarti = spark.read.format("bigquery").load(q)
df_ecl_scarti.count()
# numero righe database

505

In [13]:
#download INFO_ADDIZIONALI
if INFO_ADDIZIONALI == 1:
    q = f"select * from `{project_id}.{dataset}.{info_addizionali_t}`"
    df_info_addizionali = spark.read.format("bigquery").load(q)
    df_info_addizionali.count()
    # numero righe database

### 5 Processo di Calcolo

##### 5.1 Aggancio del campo COD_ROW_ID_MIP

TE_BDRI0_ETL_MIP_BRM_RRM è la tabella di partenza che costituisce il database dei motori a consuntivo e che contiene tutte le informazioni rilevanti, output dell'ETL IFRS9.
Questa tabella contiene le tranche di dettaglio (con chiave EXP_ID per il rapporto e COD_CRM per la garanzia); per poter raccordare le informazioni proiettate dal motore di proiezione ECL è dunque necessario recuperare l'identificativo del cluster su cui questo motore opera (COD_ROW_ID_MIP).
La tabella TE_BDRI0_IFRS9_JOIN_RWA_MIP_BRM contiene la mappatura delle chiavi (COD_EXP,COD_CRM) e COD_ROW_ID_MIP.


In [27]:
start=time()
#aggancio campo COD_ROW_ID_MIP
df_etl_mip_brm = df_etl_mip_brm.alias("a").join(df_join_rwa, [df_etl_mip_brm.COD_EXP == df_join_rwa.COD_EXP, df_etl_mip_brm.COD_CRM == df_join_rwa.COD_CRM], 'left')\
.select("a.*", df_join_rwa.COD_ROW_ID_MIP)
#uncomment to see results
#print(df_etl_mip_brm.head())
print(df_etl_mip_brm.count())
end=time()
elapsed = end-start
print(f"{elapsed} seconds")
elapsed_minutes = elapsed/60
print(f"{elapsed_minutes} minutes")

AnalysisException: Column COD_EXP#774, COD_CRM#775 are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via `Dataset.as` before joining them, and specify the column using qualified name, e.g. `df.as("a").join(df.as("b"), $"a.id" > $"b.id")`. You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.

L'output precedente mostra la prima riga del database, come è possibile vedere ai campi dell'ETL_MIP_BRM_RRM è stato quindi aggiunto l'identificativo del cluster COD_ROW_ID_MIP

##### 5.2 Recupero del Campo EAD_TOT_CLUSTER

Per poter riallocare proporzionalmente all'EAD di ogni singola tranche l'ECL calcolata a livello di cluster si renderà necessario recuperare per ogni tranche l'EAD Totale del Cluster di Riferimento.
Tale informazione viene recuperando utilizzando l'identificativo COD_ROW_ID_MIP.

In [42]:
start=time()
#seleziono campo EAD ed accodo ead ECL_OUT ed ECL_SCARTI
df_ead_mip=df_ecl_out.select("COD_ROW_ID_MIP",col("VAL_EAD_PIT_T0").alias("VAL_EAD_TOT_CLUSTER"))
df_ead_scarti=df_ecl_scarti.select("COD_ROW_ID_MIP",col("VAL_EAD_PIT_T0").alias("VAL_EAD_TOT_CLUSTER"))
df_ead = df_ead_mip.union(df_ead_scarti)
#join tra etl_mip_brm e ecl_out, ecl_out prima della join seleziona i campi COD_ROW_ID_MIP e VAL_EAD_PIT_T0 (rinominato in VAL_EAD_TOT_CLUSTER)
df_etl_mip_brm = df_etl_mip_brm.alias("a").join(broadcast(df_ead),\
                                     df_etl_mip_brm.COD_ROW_ID_MIP == df_ead.COD_ROW_ID_MIP, 'left')\
                                    .select("a.*", df_ead.VAL_EAD_TOT_CLUSTER)
print(df_etl_mip_brm.count())
end=time()
elapsed = end-start
print(f"{elapsed} seconds")
elapsed_minutes = elapsed/60
print(f"{elapsed_minutes} minutes")



11450960
18.792069673538208 seconds
0.3132011612256368 minutes


                                                                                

##### 5.3.1 ECL_SCARTI_OUT

La tabella ECL_SCARTI contiene i record non proiettati dal processo di ECL. Questi record non vengono processati nel motore delle proiezioni ECL principalmente per due motivi:
- contengono matrici OTHER
- mancano delle informazioni di staging corrispondenti


Prima di procedere alla riallocazione è dunque necessario uniformare il tracciato della ECL_SCARTI a quello dell'ECL_OUT e dell'ECL_OUT_MIG.
Il ratio secondo cui si esegue questa operazione è riportare le metriche a consuntivo sugli anni successivi, lasciando fissi lo staging e gli importi.
In particolare il database conterrà l'esposizione (EAD), l'ECL Lifetime, l'ECL 1Y, le probabilità di migrazione (PRB_MIGR), la PD PIT e Lifetime allocate in relazione allo staging. Inoltre il database conterrà i campi di ECL_FINALE, LGD indipendentemente dallo staging.

In [43]:
# selezione delle colonne effettivamente utilizzate
cols_sel = ["VAL_ECL_1Y_T0", "COD_STAGING_T0_MIP","VAL_ECL_LT_T0","VAL_EAD_PIT_T0",\
            "VAL_PD_PIT_1Y","VAL_PD_LT_T0","VAL_LGD_PIT_T0","VAL_ECL_FINAL_T0_MIP"]
df_ecl_scarti_sel = df_ecl_scarti.select("COD_ROW_ID_MIP",*cols_sel)
df_ecl_scarti_sel.head()


Row(COD_ROW_ID_MIP='00000000000000000000000000291225', VAL_ECL_1Y_T0=5.6757643806, COD_STAGING_T0_MIP='1', VAL_ECL_LT_T0=5.6757643806, VAL_EAD_PIT_T0=424419.68, VAL_PD_PIT_1Y=8.6e-05, VAL_PD_LT_T0=None, VAL_LGD_PIT_T0=None, VAL_ECL_FINAL_T0_MIP=5.6757643806)

I campi SXS1 vengono popolati se lo staging di partenza (COD_STAGING_T0_MIP) è uguale a 1, I campi SXS2 vengono popolati se lo staging di partenza (COD_STAGING_T0_MIP) è uguale a 2, i campi SXS3 essendo il portafoglio interamente bonis sono sempre valorizzati a 0.
Non essendoci migrazioni di staging, i campi PRB_MIGR sono sempre pari a 1 oppure 0, ovvero lo staging rimane fisso.

In [44]:
#funzione per calcolare i campi dell'ecl out
#I campi SXS1 vengono popolati se COD_STAGING_T0_MIP = 1, i campi SXS2 vengono popolati se COD_STAGING_T0_MIP = 2
#inseriamo i campi in un dizionario per evitare duplicazioni di codice


def field_s1_s2(df,field_new, field_old, i, sy = None):
    if sy is not None:
        df = df.withColumn(f"{field_new}{sy}_T{i}",when(
        col("COD_STAGING_T0_MIP") == f"{sy}", col(f"{field_old}")).otherwise(0))
    else:
        df = df.withColumn(f"{field_new}{i}",col(f"{field_old}")) 
    return df

#campi S1 e S2
fields = {
  "VAL_ECL_1Y_G_SXS": "VAL_ECL_1Y_T0",
  "VAL_ECL_LT_G_SXS": "VAL_ECL_LT_T0",
  "VAL_EAD_PIT_G_SXS": "VAL_EAD_PIT_T0",
  "VAL_PD_LT_G_SXS": "VAL_PD_LT_T0",
  "VAL_PD_PIT_G_SXS": "VAL_PD_PIT_1Y"
    
}

for t in range(anni_proiez+1):
    for sy in range(1,3):
        for f in fields:
            df_ecl_scarti_sel = field_s1_s2(df_ecl_scarti_sel,f,fields[f],t,sy)


#campi che non dipendono dallo staging

fields = {
    "VAL_ECL_FINALE_G_T": "VAL_ECL_FINAL_T0_MIP",
    "VAL_LGD_PIT_G_T": "VAL_LGD_PIT_T0"
}

for t in range(anni_proiez+1):
    for f in fields:
        df_ecl_scarti_sel = field_s1_s2(df_ecl_scarti_sel,f,fields[f],t)
    #campi S3, sempre uguali a 0 perchè portafoglio
    df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"VAL_EAD_PIT_G_SXS3_T{t}",lit(0))
    df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"VAL_ECL_1Y_LT_G_SXS3_T{t}",lit(0))
    df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"PRB_MIGR_G_SX_S3_T{t}",lit(0))
    #campo ECL_LT_EBA_SXS2
    df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"VAL_ECL_LT_EBA_G_SXS2_T{t}",when(
        col("COD_STAGING_T0_MIP") == "2", col(f"VAL_ECL_LT_T0")).otherwise(0))
    #campi PRB_MIGR
    for sy in range(1,3):
        df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"PRB_MIGR_G_SX_S{sy}_T{t}",when(
            col("COD_STAGING_T0_MIP") == f"{sy}", lit(1)).otherwise(0))

df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn("CASO_STEP",lit("ECL_SCARTI"))      
df_ecl_scarti_out = df_ecl_scarti_sel.drop(*cols_sel)
cols_out = df_ecl_scarti_out.columns


Si accodano i database provenienti dall'ecl_out e dall'ecl_scarti out in modo da recuperare agilmente i valori nelle join successive

In [45]:
df_ecl_out_sel = df_ecl_out.select(cols_out)
df_out = df_ecl_out_sel.union(df_ecl_scarti_out)

##### 5.3.2 ECL_SCARTI_OUT_MIG

La stessa logica viene utilizzata in maniera analoga per calcolare l'ECL_SCARTI_OUT_MIG. I campi che prevedono migrazioni di staging (S1S2, S2S1, S3S2 etc saranno sempre uguali a 0, essendo lo staging fisso. I campi S1S1, S2S2 sono valorizzati in base allo staging a consuntivo (COD_STAGING_T0_MIP). I campi S3S3 comprendendo il portafoglio solo bonis sono sempre valorizzati a 0.

In [46]:
# selezione delle colonne effettivamente utilizzate
df_ecl_scarti_sel = df_ecl_scarti.select("COD_ROW_ID_MIP",*cols_sel)

fields = {
  "VAL_ECL_1Y_G": ["VAL_ECL_1Y_T0",3,2],
  "VAL_ECL_LT_G": ["VAL_ECL_LT_T0",3,2],
  "VAL_EAD_PIT_G": ["VAL_EAD_PIT_T0",3,3],
  "VAL_PD_LT_G": ["VAL_PD_LT_T0",2,3],
  "VAL_PD_PIT_G": ["VAL_PD_PIT_1Y",2,3]
}


def field_s1_s2(df,field_new, field_old, i, sx,sy):
    cond = (col("COD_STAGING_T0_MIP") == f"{sy}") & (col("COD_STAGING_T0_MIP") == f"{sx}" )
    df = df.withColumn(f"{field_new}_S{sx}S{sy}_T{i}", when(cond, col(f"{field_old}")).otherwise(0))
    return df

for t in range(anni_proiez+1):
    for f in fields:
        sx_f = fields[f][1]
        sy_f = fields[f][2]
        for sx in range(1,sx_f+1):
            for sy in range(1,sy_f+1):
                df_ecl_scarti_sel = field_s1_s2(df_ecl_scarti_sel,f,fields[f][0],t,sx,sy)        
        
        #campi VAL_ECL_1Y_LT_G
        #sempre valorizzati a 0 perchè non ci sono default
        for sx in range(1,4):
            df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"VAL_ECL_1Y_LT_G_S{sx}S3_T{t}",lit(0))
            
        #campi VAL_ECL_LT_EBA_G_S2S2_Tx valorizzati quando COD_STAGING_T0_MIP = 2
        df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"VAL_ECL_LT_EBA_G_S2S2_T{t}", 
                                                         when(col("COD_STAGING_T0_MIP") == "2", col("VAL_ECL_LT_T0")).otherwise(0))
        
        #campi PRB_MIGR, valorizzati a 1 quando sx=sy=COD_STAGING_MIP
        

        for sx in range(1,4):
            for sy in range(1,4):
                cond = (col("COD_STAGING_T0_MIP") == f"{sy}") & (col("COD_STAGING_T0_MIP") == f"{sx}" )
                df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"PRB_MIGR_G_S{sx}S{sy}_T{t}", when(cond, lit(1)).otherwise(0))
        
        #campi VAL_PD VAL_LT S3 S3
        df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"VAL_PD_LT_G_S3S3_T{t}", when(cond,col("VAL_PD_PIT_1Y")).otherwise(0))
        df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"VAL_PD_PIT_G_S3S3_T{t}", when(cond,col("VAL_PD_LT_T0")).otherwise(0))
        
                                
df_ecl_scarti_out_mig = df_ecl_scarti_sel.drop(*cols_sel)
cols_out_mig = df_ecl_scarti_out_mig.columns

Si accodano i database provenienti dall'ECL_OUT_MIG e dalla SCARTI_OUT_MIG per procedere più agevolmente nelle join successive

In [47]:
df_ecl_out_mig_sel = df_ecl_out_mig.select(cols_out_mig)
df_out_mig = df_ecl_out_mig_sel.union(df_ecl_scarti_out_mig)
print(df_out_mig.count())
print(df_ecl_scarti_out_mig.head())

1752989
Row(COD_ROW_ID_MIP='00000000000000000000000000291225', VAL_ECL_1Y_G_S1S1_T0=5.6757643806, VAL_ECL_1Y_G_S1S2_T0=0.0, VAL_ECL_1Y_G_S2S1_T0=0.0, VAL_ECL_1Y_G_S2S2_T0=0.0, VAL_ECL_1Y_G_S3S1_T0=0.0, VAL_ECL_1Y_G_S3S2_T0=0.0, VAL_ECL_1Y_LT_G_S1S3_T0=0, VAL_ECL_1Y_LT_G_S2S3_T0=0, VAL_ECL_1Y_LT_G_S3S3_T0=0, VAL_ECL_LT_EBA_G_S2S2_T0=0.0, PRB_MIGR_G_S1S1_T0=1, PRB_MIGR_G_S1S2_T0=0, PRB_MIGR_G_S1S3_T0=0, PRB_MIGR_G_S2S1_T0=0, PRB_MIGR_G_S2S2_T0=0, PRB_MIGR_G_S2S3_T0=0, PRB_MIGR_G_S3S1_T0=0, PRB_MIGR_G_S3S2_T0=0, PRB_MIGR_G_S3S3_T0=0, VAL_PD_LT_G_S3S3_T0=0.0, VAL_PD_PIT_G_S3S3_T0=0.0, VAL_ECL_LT_G_S1S1_T0=5.6757643806, VAL_ECL_LT_G_S1S2_T0=0.0, VAL_ECL_LT_G_S2S1_T0=0.0, VAL_ECL_LT_G_S2S2_T0=0.0, VAL_ECL_LT_G_S3S1_T0=0.0, VAL_ECL_LT_G_S3S2_T0=0.0, VAL_EAD_PIT_G_S1S1_T0=424419.68, VAL_EAD_PIT_G_S1S2_T0=0.0, VAL_EAD_PIT_G_S1S3_T0=0.0, VAL_EAD_PIT_G_S2S1_T0=0.0, VAL_EAD_PIT_G_S2S2_T0=0.0, VAL_EAD_PIT_G_S2S3_T0=0.0, VAL_EAD_PIT_G_S3S1_T0=0.0, VAL_EAD_PIT_G_S3S2_T0=0.0, VAL_EAD_PIT_G_S3S3_T0=0.0

##### 5.4 Step2 - Allocazione Misure ECL_OUT

Si procede quindi ad allocare le misure calcolate sui cluster nel motore dell'ECL (tabella ECL_OUT a cui è stata accodato l'ECL_SCARTI opportunamente modificata) sulla base dati di dettaglio TE_BDRI0_ETL_MIP_BRM_RRM contenente le tranche. 
La logica implementata sarà la seguente:
- per gli importi questi saranno allocati sulla base dell'esposizione della tranche (**IMP_EAD_PIT**) rispetto a quella totale del cluster (**VAL_EAD_TOT_CLUSTER**)
- per i parametri il parametro sarà semplicemente riportato nella tranche corrispondente

Nel codice sottostante è possibile vedere i campi importo e i parametri

In [48]:
fields_importi = [
    "VAL_ECL_FINALE_G_T",
    "VAL_ECL_1Y_G_SXS1_T",
    "VAL_ECL_1Y_G_SXS2_T",
    "VAL_ECL_LT_G_SXS1_T",
    "VAL_ECL_LT_G_SXS2_T",
    "VAL_ECL_LT_EBA_G_SxS2_T",
    "VAL_EAD_PIT_G_SXS1_T",
    "VAL_EAD_PIT_G_SXS2_T",
    "VAL_EAD_PIT_G_SXS3_T",
    "VAL_ECL_1Y_LT_G_SXS3_T" 
]

fields_parametri = [
    "PRB_MIGR_G_SX_S1_T",
    "PRB_MIGR_G_SX_S2_T",
    "PRB_MIGR_G_SX_S3_T",
    "VAL_PD_LT_G_SXS1_T", 
    "VAL_PD_LT_G_SXS2_T",
    "VAL_PD_PIT_G_SXS1_T",
    "VAL_PD_PIT_G_SXS2_T",
    "VAL_LGD_PIT_G_T",
]





In [49]:
df_allocatore_out = df_etl_mip_brm.alias("a").join(df_out,on="COD_ROW_ID_MIP",how="left"). \
select("a.COD_EXP","a.COD_CRM","a.COD_SNDG",*[col(f"{field}{i}") for i in range(anni_proiez+1) for field in fields_parametri], \
       *[(col(f"{field}{i}") * col("IMP_EAD_PIT")/col("VAL_EAD_TOT_CLUSTER")).alias(f"{field}{i}") for i in range(anni_proiez+1) for field in fields_importi], col("CASO_STEP"))

##### Check

Di seguito filtriamo un COD_ROW_ID_MIP specifico a titolo di esempio. Come è possibile osservare all'unico cod_row_id_mip dell'ECL_OUT corrispondono tre tranche nell'ETL_BRM_RRM_MIP

In [23]:
#uncomment to execute
print(df_allocatore_out.count())
cod_row_id_mip ="00000000000000000000000001494697"
df_allocatore_out.filter(df_allocatore_out.COD_ROW_ID_MIP == "00000000000000000000000001494697").head(3)
df_out.filter(df_out.COD_ROW_ID_MIP == "00000000000000000000000001494697").head()

                                                                                

11450960


                                                                                

Row(COD_ROW_ID_MIP='00000000000000000000000001494697', VAL_ECL_1Y_G_SXS1_T0=37.41632871, VAL_ECL_LT_G_SXS1_T0=1466.8854122, VAL_EAD_PIT_G_SXS1_T0=398749.18, VAL_PD_LT_G_SXS1_T0=0.0816060738, VAL_PD_PIT_G_SXS1_T0=0.0011520819, VAL_ECL_1Y_G_SXS2_T0=0.0, VAL_ECL_LT_G_SXS2_T0=0.0, VAL_EAD_PIT_G_SXS2_T0=0.0, VAL_PD_LT_G_SXS2_T0=0.0, VAL_PD_PIT_G_SXS2_T0=0.0, VAL_ECL_1Y_G_SXS1_T1=37.743976532, VAL_ECL_LT_G_SXS1_T1=1360.6609063, VAL_EAD_PIT_G_SXS1_T1=396989.47805, VAL_PD_LT_G_SXS1_T1=0.0767074377, VAL_PD_PIT_G_SXS1_T1=0.0011307924, VAL_ECL_1Y_G_SXS2_T1=1.192325409, VAL_ECL_LT_G_SXS2_T1=7.2075742308, VAL_EAD_PIT_G_SXS2_T1=1307.8114428, VAL_PD_LT_G_SXS2_T1=0.0003439715, VAL_PD_PIT_G_SXS2_T1=3.57215e-05, VAL_ECL_1Y_G_SXS1_T2=35.749366331, VAL_ECL_LT_G_SXS1_T2=1258.6355421, VAL_EAD_PIT_G_SXS1_T2=396537.05039, VAL_PD_LT_G_SXS1_T2=0.0722782971, VAL_PD_PIT_G_SXS1_T2=0.0010637914, VAL_ECL_1Y_G_SXS2_T2=1.5711195743, VAL_ECL_LT_G_SXS2_T2=7.7876890834, VAL_EAD_PIT_G_SXS2_T2=1305.2723177, VAL_PD_LT_G_SXS

Verifica ECL_FINALE

df_etl_mip_brm_out.groupby().sum("VAL_ECL_FINALE_G_T0").show()
df_out.groupby().sum("VAL_ECL_FINALE_G_T0").show()

In [24]:
df_allocatore_out.groupby().sum("VAL_ECL_FINALE_G_T0").show()
df_out.groupby().sum("VAL_ECL_FINALE_G_T0").show()

                                                                                

+------------------------+
|sum(VAL_ECL_FINALE_G_T0)|
+------------------------+
|    2.0717411895564346E9|
+------------------------+

+------------------------+
|sum(VAL_ECL_FINALE_G_T0)|
+------------------------+
|    2.0717411895571227E9|
+------------------------+



Verifica EAD

In [25]:
df_allocatore_out.groupby().sum("VAL_EAD_PIT_G_SXS1_T3").show()
df_out.groupby().sum("VAL_EAD_PIT_G_SXS1_T3").show()

                                                                                

+--------------------------+
|sum(VAL_EAD_PIT_G_SXS1_T3)|
+--------------------------+
|      3.889186708630942...|
+--------------------------+

+--------------------------+
|sum(VAL_EAD_PIT_G_SXS1_T3)|
+--------------------------+
|      3.889186708632342E11|
+--------------------------+



In [None]:
#ead iniziale 
df_ecl_out.groupby().sum("VAL_EAD_PIT_T0").union(df_ecl_scarti.groupby().sum("VAL_EAD_PIT_T0")).groupby().sum("sum(VAL_EAD_PIT_T0)").show()

In [None]:
df_allocatore_out = df_allocatore_out.withColumn("EAD_T3",col("VAL_EAD_PIT_G_SXS1_T3")+ col("VAL_EAD_PIT_G_SXS2_T3")+col("VAL_EAD_PIT_G_SXS3_T3"))
df_allocatore_out = df_allocatore_out.withColumn("EAD_T2",col("VAL_EAD_PIT_G_SXS1_T2")+ col("VAL_EAD_PIT_G_SXS2_T2")+col("VAL_EAD_PIT_G_SXS3_T2"))
df_allocatore_out = df_allocatore_out.withColumn("EAD_T1",col("VAL_EAD_PIT_G_SXS1_T1")+ col("VAL_EAD_PIT_G_SXS2_T1")+col("VAL_EAD_PIT_G_SXS3_T1"))
df_allocatore_out = df_allocatore_out.withColumn("EAD_T0",col("VAL_EAD_PIT_G_SXS1_T0")+ col("VAL_EAD_PIT_G_SXS2_T0")+col("VAL_EAD_PIT_G_SXS3_T0"))
df_pivot = df_allocatore_out.groupby().sum("EAD_T0","EAD_T1","EAD_T2","EAD_T3")
df_pivot.select(*[round(col(c),0).alias(c) for c in df_pivot.columns]).show()

In [None]:
df_allocatore_out.filter(col("VAL_ECL_FINALE_G_T0").isNull()).head()

In [None]:
df_out.filter(df_out.COD_ROW_ID_MIP == "00000000000000000000000000000978").head()

##### 5.5 Step 2 - Allocazione Misure ECL_OUT_MIG

SI procede ad allocare le misure provenienti dall'ECL_OUT_MIG con le stesse logiche utilizzate per l'ECL_OUT

In [50]:
fields_importi = [
"VAL_ECL_1Y_G_S1S1_T",
"VAL_ECL_1Y_G_S2S1_T",
"VAL_ECL_1Y_G_S1S2_T",
"VAL_ECL_1Y_G_S2S2_T",
"VAL_ECL_1Y_G_S3S1_T",
"VAL_ECL_1Y_G_S3S2_T",
"VAL_ECL_1Y_LT_G_S1S3_T",
"VAL_ECL_1Y_LT_G_S2S3_T",
"VAL_ECL_1Y_LT_G_S3S3_T",
"VAL_ECL_LT_G_S1S1_T",
"VAL_ECL_LT_G_S2S1_T",
"VAL_ECL_LT_G_S1S2_T",
"VAL_ECL_LT_G_S2S2_T",
"VAL_ECL_LT_G_S3S1_T",
"VAL_ECL_LT_G_S3S2_T",
"VAL_ECL_LT_EBA_G_S2S2_T",
"VAL_EAD_PIT_G_S1S1_T",
"VAL_EAD_PIT_G_S2S1_T",
"VAL_EAD_PIT_G_S1S2_T",
"VAL_EAD_PIT_G_S2S2_T",
"VAL_EAD_PIT_G_S1S3_T",
"VAL_EAD_PIT_G_S2S3_T",
"VAL_EAD_PIT_G_S3S1_T",
"VAL_EAD_PIT_G_S3S2_T",
"VAL_EAD_PIT_G_S3S3_T"
]

fields_parametri = [
"PRB_MIGR_G_S1S1_T",
"PRB_MIGR_G_S2S1_T",
"PRB_MIGR_G_S3S1_T",
"PRB_MIGR_G_S1S2_T",
"PRB_MIGR_G_S2S2_T",
"PRB_MIGR_G_S3S2_T",
"PRB_MIGR_G_S1S3_T",
"PRB_MIGR_G_S2S3_T",
"PRB_MIGR_G_S3S3_T",
"VAL_PD_LT_G_S1S1_T",
"VAL_PD_LT_G_S2S1_T",
"VAL_PD_LT_G_S1S2_T",
"VAL_PD_LT_G_S2S2_T",
"VAL_PD_LT_G_S2S3_T",
"VAL_PD_LT_G_S1S3_T",
"VAL_PD_PIT_G_S1S1_T",
"VAL_PD_PIT_G_S2S1_T",
"VAL_PD_PIT_G_S1S2_T",
"VAL_PD_PIT_G_S2S2_T",
"VAL_PD_PIT_G_S1S3_T",
"VAL_PD_PIT_G_S2S3_T"
]

print(fields_importi)

['VAL_ECL_1Y_G_S1S1_T', 'VAL_ECL_1Y_G_S2S1_T', 'VAL_ECL_1Y_G_S1S2_T', 'VAL_ECL_1Y_G_S2S2_T', 'VAL_ECL_1Y_G_S3S1_T', 'VAL_ECL_1Y_G_S3S2_T', 'VAL_ECL_1Y_LT_G_S1S3_T', 'VAL_ECL_1Y_LT_G_S2S3_T', 'VAL_ECL_1Y_LT_G_S3S3_T', 'VAL_ECL_LT_G_S1S1_T', 'VAL_ECL_LT_G_S2S1_T', 'VAL_ECL_LT_G_S1S2_T', 'VAL_ECL_LT_G_S2S2_T', 'VAL_ECL_LT_G_S3S1_T', 'VAL_ECL_LT_G_S3S2_T', 'VAL_ECL_LT_EBA_G_S2S2_T', 'VAL_EAD_PIT_G_S1S1_T', 'VAL_EAD_PIT_G_S2S1_T', 'VAL_EAD_PIT_G_S1S2_T', 'VAL_EAD_PIT_G_S2S2_T', 'VAL_EAD_PIT_G_S1S3_T', 'VAL_EAD_PIT_G_S2S3_T', 'VAL_EAD_PIT_G_S3S1_T', 'VAL_EAD_PIT_G_S3S2_T', 'VAL_EAD_PIT_G_S3S3_T']


In [None]:
df_allocatore_out_mig = df_etl_mip_brm.alias("c").join(df_out_mig,on="COD_ROW_ID_MIP",how="left"). \
select("c.COD_EXP,c.COD_CRM,c.COD_SNDG",*[col(f"{field}{i}") for i in range(anni_proiez+1) for field in fields_parametri], \
       *[(col(f"{field}{i}") * col("IMP_EAD_PIT")/col("VAL_EAD_TOT_CLUSTER")).alias(f"{field}{i}") for i in range(anni_proiez+1) for field in fields_importi])

##### Check risultati

In [None]:
#verifica ECL
def ecl_1y(df,i):
    df = df.withColumn(f"ECL_1Y_S1{i}",col(f"VAL_ECL_1Y_G_S1S1_T{i}")+ col(f"VAL_ECL_1Y_G_S2S1_T{i}"))
    return df
                                          
def ecl_lt(df,i):
    df = df.withColumn(f"ECL_LT_S2{i}",col(f"VAL_ECL_LT_G_S1S2_T{i}")+ col(f"VAL_ECL_LT_G_S2S2_T{i}"))
    return df

for i in range(anni_proiez+1):
    df_allocatore_out_mig =ecl_1y(df_allocatore_out_mig,i)
    df_allocatore_out_mig =ecl_lt(df_allocatore_out_mig,i)
    df_out_mig =ecl_1y(df_out_mig,i)
    df_out_mig =ecl_lt(df_out_mig,i)

    
                                          

df_pivot_out = df_allocatore_out.groupby().sum(
    *[f"VAL_ECL_1Y_G_SXS1_T{i}" for i in range (anni_proiez+1)],
#    *[f"VAL_ECL_1Y_G_SXS2_T{i}" for i in range (anni_proiez+1)],
#    *[f"VAL_ECL_LT_G_SXS1_T{i}" for i in range (anni_proiez+1)],
    *[f"VAL_ECL_LT_G_SXS2_T{i}" for i in range (anni_proiez+1)],
    )

df_pivot_ecl_out = df_out.groupby().sum(
    *[f"VAL_ECL_1Y_G_SXS1_T{i}" for i in range (anni_proiez+1)],
#    *[f"VAL_ECL_1Y_G_SXS2_T{i}" for i in range (anni_proiez+1)],
#    *[f"VAL_ECL_LT_G_SXS1_T{i}" for i in range (anni_proiez+1)],
    *[f"VAL_ECL_LT_G_SXS2_T{i}" for i in range (anni_proiez+1)],
    )
                    
                                          
    
df_pivot = df_allocatore_out_mig.groupby().sum(
    *[f"ECL_1Y_S1{i}" for i in range(anni_proiez+1)],
    *[f"ECL_LT_S2{i}" for i in range(anni_proiez+1)],
    )

df_pivot_ecl_out_mig = df_out_mig.groupby().sum(
    *[f"ECL_1Y_S1{i}" for i in range(anni_proiez+1)],
    *[f"ECL_LT_S2{i}" for i in range(anni_proiez+1)],
    )
         
print(ecl_out)                                               
df_pivot_ecl_out.select(*[round(col(c),0).alias(c) for c in df_pivot_ecl_out.columns]).show()
print(ecl_out_mig)     
df_pivot_ecl_out_mig.select(*[round(col(c),0).alias(c) for c in df_pivot_ecl_out_mig.columns]).show() 
print("allocatore_out_mig")
df_pivot.select(*[round(col(c),0).alias(c) for c in df_pivot.columns]).show()
print("allocatore_out")
df_pivot_out.select(*[round(col(c),0).alias(c) for c in df_pivot_out.columns]).show()

In [None]:
#verifica EAD
def ead(df,i):
    df = df_allocatore_out_mig.withColumn(f"EAD_T{i}",col(f"VAL_EAD_PIT_G_S1S1_T{i}")+ col(f"VAL_EAD_PIT_G_S1S2_T{i}")+col(f"VAL_EAD_PIT_G_S1S3_T{i}")\
                                                + col(f"VAL_EAD_PIT_G_S2S1_T{i}")+ col(f"VAL_EAD_PIT_G_S2S2_T{i}")+col(f"VAL_EAD_PIT_G_S2S3_T{i}")\
                                                + col(f"VAL_EAD_PIT_G_S3S1_T{i}")+ col(f"VAL_EAD_PIT_G_S3S2_T{i}")+col(f"VAL_EAD_PIT_G_S3S3_T{i}"))
    return df

for i in range(anni_proiez+1):
    df_allocatore_out_mig =ead(df_allocatore_out_mig,i)
    
df_pivot = df_allocatore_out_mig.groupby().sum("EAD_T0","EAD_T1","EAD_T2","EAD_T3")
df_pivot.select(*[round(col(c),0).alias(c) for c in df_pivot.columns]).show()

##### 5.6 Recupero Informazioni addizionali

Al fine di aumentare la flessibilità dell'intero processo IFRS9 e permettere all'utente finale di inserire ulteriori informazioni rispetto a quelle già presenti nel database dell'ETL_IFRS9 (TE_BRM_RRM_MIP_IFRS9) è stata introdotta la possibilità di definire, a carico dell'utente, una tabella (TE_IFR9_INFO_ADDIZIONALI) che consente all’utente di arricchire la base dati di nuove colonne, in base alle proprie esigenze, sulla base di un identificativo di tranche o di controparte.

La tabella avrà la seguente struttura.

- DAT_REPORT
- ID_VERSIONE
- COD_SNDG
- COD_EXP
- COD_CRM
- NOM_INFO_ADDIZIONALI
- COD_INFO_ADDIZIONALI

Nella Colonna COD_INFO_ADDIZIONALI andranno concatenate a cura dell’utente con un “|” le informazioni che si desidera aggiungere al database. 
Nella Colonna NOM_INFO_ADDIZIONALI andranno concatenate a cura dell’utente con un “|” il nome delle colonne che si è inserito.
La tabella recupererà il campo i due campi andando in join per:
- **COD_EXP**, **COD_CRM** se il parametro da interfaccia **CHIAVE_INFO_ADDIZIONALI** = “Tranche” 
- per **COD_SNDG** se il parametro da interfaccia **CHIAVE_INFO_ADDIZIONALI** = “Controparte” 


In [53]:
if INFO_ADDIZIONALI == 1:
    if CHIAVE_INFO_ADDIZIONALI == "Tranche":
        df_allocatore_out = df_allocatore_out.alias("out").join(df_info_addizionali,[df_allocatore_out.COD_EXP == df_info_addizionali.COD_EXP, df_allocatore_out.COD_CRM == df_info_addizionali.COD_CRM] \
                                            ,how = 'left').select("out.*", df_info_addizionali.NOM_INFO_ADDIZIONALI, df_info_addizionali.COD_INFO_ADDIZIONALI)
    elif CHIAVE_INFO_ADDIZIONALI == "Controparte":
        df_allocatore_out = df_allocatore_out = df_allocatore_out.alias("out").join(df_info_addizionali, on ="COD_SNDG",how = 'left') \
        .select("out.*", df_info_addizionali.NOM_INFO_ADDIZIONALI, df_info_addizionali.COD_INFO_ADDIZIONALI)
    else:
        print("Parametro CHIAVE_INFO_ADDIZIONALI non corretto")
else:
    print("tabella INFO_ADDIZIONALI non selezionata")

df_allocatore_out.head(1)

                                                                                

[Row(COD_EXP='EXAC0000000100000017', COD_CRM='CRGU0000000101457890', COD_SNDG='0000000107416751', PRB_MIGR_G_SX_S1_T0=1.0, PRB_MIGR_G_SX_S2_T0=0.0, PRB_MIGR_G_SX_S3_T0=0.0, VAL_PD_LT_G_SXS1_T0=0.0973842426, VAL_PD_LT_G_SXS2_T0=0.0, VAL_PD_PIT_G_SXS1_T0=0.000923562, VAL_PD_PIT_G_SXS2_T0=0.0, VAL_LGD_PIT_G_T0=0.2719796665, PRB_MIGR_G_SX_S1_T1=0.9963142118, PRB_MIGR_G_SX_S2_T1=0.0027885722, PRB_MIGR_G_SX_S3_T1=0.0008972161, VAL_PD_LT_G_SXS1_T1=0.0723957487, VAL_PD_LT_G_SXS2_T1=0.0013570958, VAL_PD_PIT_G_SXS1_T1=0.0007809802, VAL_PD_PIT_G_SXS2_T1=0.0003658948, VAL_LGD_PIT_G_T1=0.2703880409, PRB_MIGR_G_SX_S1_T2=0.9933564689, PRB_MIGR_G_SX_S2_T2=0.0046514253, PRB_MIGR_G_SX_S3_T2=0.0019921058, VAL_PD_LT_G_SXS1_T2=0.0637958987, VAL_PD_LT_G_SXS2_T2=0.002103044, VAL_PD_PIT_G_SXS1_T2=0.0006570503, VAL_PD_PIT_G_SXS2_T2=0.0004930276, VAL_LGD_PIT_G_T2=0.2651783107, PRB_MIGR_G_SX_S1_T3=0.9941794159, PRB_MIGR_G_SX_S2_T3=0.0027330194, PRB_MIGR_G_SX_S3_T3=0.0030875646, VAL_PD_LT_G_SXS1_T3=0.0580316851, 

##### 5.7 Creazione Vista Finale

Al fine di non duplicare le informazioni all'interno del database si è deciso che le tabelle ALLOCATORE_ECL_OUT e ALLOCATORE_ECL_OUT_MIG conterranno esclusivamente i codici identificati più le metriche riallocate.
Le altre informazioni presenti all'interno TE_BDRI0_ETL_MIP_BRM_RRM verranno comunque 

### 6 CODICE

Per soddisfare i necessari requisiti di modularità e di standard qualitativi del codice, questo viene riorganizzato nell'implementazione e "incapsulato" in specifiche funzioni che riflettono gli specifici passi del processo.

In [13]:
# recupero info preliminari (COD_ROW_ID_MIP, EAD_TOT_CLUSTER)

def preprocessing_1(df_etl_mip_brm, df_join_rwa):
    #aggancio campo COD_ROW_ID_MIP
    df_etl_mip_brm = df_etl_mip_brm.alias("a").join(df_join_rwa, [df_etl_mip_brm.COD_EXP == df_join_rwa.COD_EXP, df_etl_mip_brm.COD_CRM == df_join_rwa.COD_CRM], 'left')\
    .select("a.*", df_join_rwa.COD_ROW_ID_MIP)
    return df_etl_mip_brm
    
def preprocessing_2(df_etl_mip_brm,df_ecl_out,df_ecl_scarti):
    #seleziono campo EAD ed accodo ead ECL_OUT ed ECL_SCARTI
    df_ead_mip=df_ecl_out.select("COD_ROW_ID_MIP",col("VAL_EAD_PIT_T0").alias("VAL_EAD_TOT_CLUSTER"))
    df_ead_scarti=df_ecl_scarti.select("COD_ROW_ID_MIP",col("VAL_EAD_PIT_T0").alias("VAL_EAD_TOT_CLUSTER"))
    df_ead = df_ead_mip.union(df_ead_scarti)
    #join tra etl_mip_brm e ecl_out, ecl_out prima della join seleziona i campi COD_ROW_ID_MIP e VAL_EAD_PIT_T0 (rinominato in VAL_EAD_TOT_CLUSTER)
    df_etl_mip_brm = df_etl_mip_brm.alias("a").join(broadcast(df_ead),\
                                         df_etl_mip_brm.COD_ROW_ID_MIP == df_ead.COD_ROW_ID_MIP, 'left')\
                                        .select("a.*", df_ead.VAL_EAD_TOT_CLUSTER)
    return df_etl_mip_brm

In [14]:
#ecl scarti_out

def ecl_scarti_out(df_ecl_scarti,anni_proiez):
    # selezione delle colonne effettivamente utilizzate
    cols_sel = ["VAL_ECL_1Y_T0", "COD_STAGING_T0_MIP","VAL_ECL_LT_T0","VAL_EAD_PIT_T0",\
                "VAL_PD_PIT_1Y","VAL_PD_LT_T0","VAL_LGD_PIT_T0","VAL_ECL_FINAL_T0_MIP"]
    df_ecl_scarti_sel = df_ecl_scarti.select("COD_ROW_ID_MIP",*cols_sel)
    
    #funzione per calcolare i campi dell'ecl out
    #I campi SXS1 vengono popolati se COD_STAGING_T0_MIP = 1, i campi SXS2 vengono popolati se COD_STAGING_T0_MIP = 2
    #inseriamo i campi in un dizionario per evitare duplicazioni di codice

    def field_s1_s2(df,field_new, field_old, i, sy = None):
        if sy is not None:
            df = df.withColumn(f"{field_new}{sy}_T{i}",when(
            col("COD_STAGING_T0_MIP") == f"{sy}", col(f"{field_old}")).otherwise(0))
        else:
            df = df.withColumn(f"{field_new}{i}",col(f"{field_old}")) 
        return df

    #campi S1 e S2
    fields = {
      "VAL_ECL_1Y_G_SXS": "VAL_ECL_1Y_T0",
      "VAL_ECL_LT_G_SXS": "VAL_ECL_LT_T0",
      "VAL_EAD_PIT_G_SXS": "VAL_EAD_PIT_T0",
      "VAL_PD_LT_G_SXS": "VAL_PD_LT_T0",
      "VAL_PD_PIT_G_SXS": "VAL_PD_PIT_1Y"

    }

    for t in range(anni_proiez+1):
        for sy in range(1,3):
            for f in fields:
                df_ecl_scarti_sel = field_s1_s2(df_ecl_scarti_sel,f,fields[f],t,sy)


    #campi che non dipendono dallo staging

    fields = {
        "VAL_ECL_FINALE_G_T": "VAL_ECL_FINAL_T0_MIP",
        "VAL_LGD_PIT_G_T": "VAL_LGD_PIT_T0"
    }

    for t in range(anni_proiez+1):
        for f in fields:
            df_ecl_scarti_sel = field_s1_s2(df_ecl_scarti_sel,f,fields[f],t)
        #campi S3, sempre uguali a 0 perchè portafoglio
        df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"VAL_EAD_PIT_G_SXS3_T{t}",lit(0))
        df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"VAL_ECL_1Y_LT_G_SXS3_T{t}",lit(0))
        df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"PRB_MIGR_G_SX_S3_T{t}",lit(0))
        #campo ECL_LT_EBA_SXS2
        df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"VAL_ECL_LT_EBA_G_SXS2_T{t}",when(
            col("COD_STAGING_T0_MIP") == "2", col(f"VAL_ECL_LT_T0")).otherwise(0))
        #campi PRB_MIGR
        for sy in range(1,3):
            df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"PRB_MIGR_G_SX_S{sy}_T{t}",when(
                col("COD_STAGING_T0_MIP") == f"{sy}", lit(1)).otherwise(0))

    df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn("CASO_STEP",lit("ECL_SCARTI"))      
    df_ecl_scarti_out = df_ecl_scarti_sel.drop(*cols_sel)
    
    return df_ecl_scarti_out

In [21]:
#ecl_scarti_out_mig
def ecl_scarti_out_mig(df_ecl_scarti,anni_proiez):
    # selezione delle colonne effettivamente utilizzate
    cols_sel = ["VAL_ECL_1Y_T0", "COD_STAGING_T0_MIP","VAL_ECL_LT_T0","VAL_EAD_PIT_T0",\
                "VAL_PD_PIT_1Y","VAL_PD_LT_T0","VAL_LGD_PIT_T0","VAL_ECL_FINAL_T0_MIP"]
    df_ecl_scarti_sel = df_ecl_scarti.select("COD_ROW_ID_MIP",*cols_sel)

    fields = {
      "VAL_ECL_1Y_G": ["VAL_ECL_1Y_T0",3,2],
      "VAL_ECL_LT_G": ["VAL_ECL_LT_T0",3,2],
      "VAL_EAD_PIT_G": ["VAL_EAD_PIT_T0",3,3],
      "VAL_PD_LT_G": ["VAL_PD_LT_T0",2,3],
      "VAL_PD_PIT_G": ["VAL_PD_PIT_1Y",2,3]
    }


    def field_s1_s2(df,field_new, field_old, i, sx,sy):
        cond = (col("COD_STAGING_T0_MIP") == f"{sy}") & (col("COD_STAGING_T0_MIP") == f"{sx}" )
        df = df.withColumn(f"{field_new}_S{sx}S{sy}_T{i}", when(cond, col(f"{field_old}")).otherwise(0))
        return df

    for t in range(anni_proiez+1):
        for f in fields:
            sx_f = fields[f][1]
            sy_f = fields[f][2]
            for sx in range(1,sx_f+1):
                for sy in range(1,sy_f+1):
                    df_ecl_scarti_sel = field_s1_s2(df_ecl_scarti_sel,f,fields[f][0],t,sx,sy)        

            #campi VAL_ECL_1Y_LT_G
            #sempre valorizzati a 0 perchè non ci sono default
            for sx in range(1,4):
                df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"VAL_ECL_1Y_LT_G_S{sx}S3_T{t}",lit(0))

            #campi VAL_ECL_LT_EBA_G_S2S2_Tx valorizzati quando COD_STAGING_T0_MIP = 2
            df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"VAL_ECL_LT_EBA_G_S2S2_T{t}", 
                                                             when(col("COD_STAGING_T0_MIP") == "2", col("VAL_ECL_LT_T0")).otherwise(0))

            #campi PRB_MIGR, valorizzati a 1 quando sx=sy=COD_STAGING_MIP


            for sx in range(1,4):
                for sy in range(1,4):
                    cond = (col("COD_STAGING_T0_MIP") == f"{sy}") & (col("COD_STAGING_T0_MIP") == f"{sx}" )
                    df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"PRB_MIGR_G_S{sx}S{sy}_T{t}", when(cond, lit(1)).otherwise(0))

            #campi VAL_PD VAL_LT S3 S3
            df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"VAL_PD_LT_G_S3S3_T{t}", when(cond,col("VAL_PD_PIT_1Y")).otherwise(0))
            df_ecl_scarti_sel = df_ecl_scarti_sel.withColumn(f"VAL_PD_PIT_G_S3S3_T{t}", when(cond,col("VAL_PD_LT_T0")).otherwise(0))

    df_ecl_scarti_out_mig = df_ecl_scarti_sel.drop(*cols_sel)
    return df_ecl_scarti_out_mig


In [22]:
#ALLOCATORE ECL OUT

def allocatore_ecl_out(df_etl_mip_brm,df_out,anni_proiez):

    fields_importi = [
        "VAL_ECL_FINALE_G_T",
        "VAL_ECL_1Y_G_SXS1_T",
        "VAL_ECL_1Y_G_SXS2_T",
        "VAL_ECL_LT_G_SXS1_T",
        "VAL_ECL_LT_G_SXS2_T",
        "VAL_ECL_LT_EBA_G_SxS2_T",
        "VAL_EAD_PIT_G_SXS1_T",
        "VAL_EAD_PIT_G_SXS2_T",
        "VAL_EAD_PIT_G_SXS3_T",
        "VAL_ECL_1Y_LT_G_SXS3_T" 
    ]

    fields_parametri = [
        "PRB_MIGR_G_SX_S1_T",
        "PRB_MIGR_G_SX_S2_T",
        "PRB_MIGR_G_SX_S3_T",
        "VAL_PD_LT_G_SXS1_T", 
        "VAL_PD_LT_G_SXS2_T",
        "VAL_PD_PIT_G_SXS1_T",
        "VAL_PD_PIT_G_SXS2_T",
        "VAL_LGD_PIT_G_T",
    ]
    
    df_allocatore_out = df_etl_mip_brm.alias("a").join(df_out,on="COD_ROW_ID_MIP",how="left"). \
    select("a.COD_EXP","a.COD_CRM","a.COD_SNDG",*[col(f"{field}{i}") for i in range(anni_proiez+1) for field in fields_parametri], \
    *[(col(f"{field}{i}") * col("IMP_EAD_PIT")/col("VAL_EAD_TOT_CLUSTER")).alias(f"{field}{i}") for i in range(anni_proiez+1) for field in fields_importi], col("CASO_STEP"))
    
    return df_allocatore_out




In [23]:
#ALLOCATORE_ECL_OUT_MIG

def allocatore_ecl_out_mig(df_etl_mip_brm,df_out_mig,anni_proiez):
    fields_importi = [
    "VAL_ECL_1Y_G_S1S1_T",
    "VAL_ECL_1Y_G_S2S1_T",
    "VAL_ECL_1Y_G_S1S2_T",
    "VAL_ECL_1Y_G_S2S2_T",
    "VAL_ECL_1Y_G_S3S1_T",
    "VAL_ECL_1Y_G_S3S2_T",
    "VAL_ECL_1Y_LT_G_S1S3_T",
    "VAL_ECL_1Y_LT_G_S2S3_T",
    "VAL_ECL_1Y_LT_G_S3S3_T",
    "VAL_ECL_LT_G_S1S1_T",
    "VAL_ECL_LT_G_S2S1_T",
    "VAL_ECL_LT_G_S1S2_T",
    "VAL_ECL_LT_G_S2S2_T",
    "VAL_ECL_LT_G_S3S1_T",
    "VAL_ECL_LT_G_S3S2_T",
    "VAL_ECL_LT_EBA_G_S2S2_T",
    "VAL_EAD_PIT_G_S1S1_T",
    "VAL_EAD_PIT_G_S2S1_T",
    "VAL_EAD_PIT_G_S1S2_T",
    "VAL_EAD_PIT_G_S2S2_T",
    "VAL_EAD_PIT_G_S1S3_T",
    "VAL_EAD_PIT_G_S2S3_T",
    "VAL_EAD_PIT_G_S3S1_T",
    "VAL_EAD_PIT_G_S3S2_T",
    "VAL_EAD_PIT_G_S3S3_T"
    ]

    fields_parametri = [
    "PRB_MIGR_G_S1S1_T",
    "PRB_MIGR_G_S2S1_T",
    "PRB_MIGR_G_S3S1_T",
    "PRB_MIGR_G_S1S2_T",
    "PRB_MIGR_G_S2S2_T",
    "PRB_MIGR_G_S3S2_T",
    "PRB_MIGR_G_S1S3_T",
    "PRB_MIGR_G_S2S3_T",
    "PRB_MIGR_G_S3S3_T",
    "VAL_PD_LT_G_S1S1_T",
    "VAL_PD_LT_G_S2S1_T",
    "VAL_PD_LT_G_S1S2_T",
    "VAL_PD_LT_G_S2S2_T",
    "VAL_PD_LT_G_S2S3_T",
    "VAL_PD_LT_G_S1S3_T",
    "VAL_PD_PIT_G_S1S1_T",
    "VAL_PD_PIT_G_S2S1_T",
    "VAL_PD_PIT_G_S1S2_T",
    "VAL_PD_PIT_G_S2S2_T",
    "VAL_PD_PIT_G_S1S3_T",
    "VAL_PD_PIT_G_S2S3_T"
    ]
    
    df_allocatore_out_mig = df_etl_mip_brm.alias("c").join(df_out_mig,on="COD_ROW_ID_MIP",how="left"). \
    select("c.COD_EXP,c.COD_CRM,c.COD_SNDG",*[col(f"{field}{i}") for i in range(anni_proiez+1) for field in fields_parametri], \
    *[(col(f"{field}{i}") * col("IMP_EAD_PIT")/col("VAL_EAD_TOT_CLUSTER")).alias(f"{field}{i}") for i in range(anni_proiez+1) for field in fields_importi])
    
    return df_allocatore_out_mig
    

In [24]:
def info_addizionali(df_allocatore_out,df_info_addizionali, CHIAVE_INFO_ADDIZIONALI):
    if CHIAVE_INFO_ADDIZIONALI == "Tranche":
        df_allocatore_out = df_allocatore_out.alias("out").join(df_info_addizionali,[df_allocatore_out.COD_EXP == df_info_addizionali.COD_EXP, df_allocatore_out.COD_CRM == df_info_addizionali.COD_CRM] \
                                            ,how = 'left').select("out.*", df_info_addizionali.NOM_INFO_ADDIZIONALI, df_info_addizionali.COD_INFO_ADDIZIONALI)
    elif CHIAVE_INFO_ADDIZIONALI == "Controparte":
        df_allocatore_out = df_allocatore_out = df_allocatore_out.alias("out").join(df_info_addizionali, on ="COD_SNDG",how = 'left') \
        .select("out.*", df_info_addizionali.NOM_INFO_ADDIZIONALI, df_info_addizionali.COD_INFO_ADDIZIONALI)
    else:
        print("Parametro CHIAVE_INFO_ADDIZIONALI non corretto")
        
    return df_allocatore_out
    

In [19]:
###execution
start=time()
#preprocessing
df_etl_mip_brm = preprocessing_1(df_etl_mip_brm,df_join_rwa)
df_etl_mip_brm = preprocessing_2(df_etl_mip_brm,df_ecl_out,df_ecl_scarti)

df_etl_mip_brm.count()
df_etl_mip_brm.head()

22/04/06 13:25:13 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Row(DAT_RIF=datetime.date(2021, 9, 30), DAT_REPORT=datetime.date(2021, 9, 30), ID_VERSIONE=2, COD_PART_COL='P202109_V00002', COD_ABI='1025', COD_APPROACH='IRB', COD_APPROCCIO_FINALE='A-IRB', COD_AREA_RISCHIO_APPLIC='MEDIUM', COD_AREA_RISCHIO_ORIG='MEDIUM', COD_BU_RND='FAM', COD_BU_TDB='BDT', COD_CARTO_GARC=None, COD_CONFRONTO_DATE=None, COD_COUNTRY_EBA='ITALY', COD_COUNTRY_EBA_FINALE='ITALY', COD_CPTY_COUNTRY='IT', COD_CPTY_COUNTRY_FINALE='IT', COD_CPTY_COUNTRY_HQ='IT', COD_CPTY_COUNTRY_HQ_FINALE='IT', COD_CPTY_COUNTRY_HQ_GB='086', COD_CPTY_COUNTRY_HQ_GB_FINALE='086', COD_CRM='NOT_COVERED', COD_CRM_RWA=None, COD_DIVISA_ORIG='EUR', COD_DIVISA_ORIG_FINALE='EUR', COD_EBA_ASSET_CLASS='RETAIL_OTHER_SME', COD_ENTITY_COUNTRY='IT', COD_ENTITY_DIVISA='EUR', COD_EXP='EXAC0000000100000210', COD_EXP_RWA='EXAC0000000100000210', COD_FIN_BOOK='101', COD_FIN_BOOK_STG='101', COD_FT_EXP_PREPROC='PT440', COD_INDEXED_LTV_RANGE='.', FLG_INTERCOMPANY='0', COD_LGD_GRID_KEY_ECL_MIP='|R4||Other|IT-CT', COD_MAC

In [20]:
df_ecl_scarti_out = ecl_scarti_out(df_ecl_scarti,anni_proiez)
df_ecl_scarti_out_mig = ecl_scarti_out_mig(df_ecl_scarti,anni_proiez)

#accodamento tabelle ecl_out scarti_out
cols_out = df_ecl_scarti_out.columns
df_ecl_out_sel = df_ecl_out.select(cols_out)
df_out = df_ecl_out_sel.union(df_ecl_scarti_out)

#accodamento tabelle ecl_out_mig scarti_out_mig
cols_out_mig = df_ecl_scarti_out_mig.columns
df_ecl_out_mig_sel = df_ecl_out_mig.select(cols_out_mig)
df_out_mig = df_ecl_out_mig_sel.union(df_ecl_scarti_out_mig)

df_out.count()
df_out_mig.count()

1752989

In [None]:
#allocazione
df_allocatore_out = allocatore_ecl_out(df_etl_mip_brm,df_out,anni_proiez)
df_allocatore_out_mig = allocatore_ecl_out_mig(df_etl_mip_brm,df_out_mig,anni_proiez)

#info_addizionali
if INFO_ADDIZIONALI == 1:
    df_allocatore_out = info_addizionali(df_allocatore_out,df_info_addizionali,CHIAVE_INFO_ADDIZIONALI)

    
print(df_allocatore_out.count())
print(df_allocatore_out.head())

end=time()
elapsed = end-start
print(f"{elapsed} seconds")
elapsed_minutes = elapsed/60
print(f"{elapsed_minutes} minutes")

In [28]:
#download tabella etl_mip_brm
q = f"select * from `{project_id}.{dataset_feeding}.{etl_mip_brm}` where ID_VERSIONE = {etl_mip_brm_version}"
print(q)
df_etl_mip_brm = spark.read.format("bigquery").load(q)
df_etl_mip_brm.count()

select * from `prj-isp-ddl00-appl-svil-001.ds_feeding_proiezione_credito.TE_BDRI0_ETL_MIP_BRM_RRM` where ID_VERSIONE = 2


11450960