In [None]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))
from pyspark.sql import functions as F, DataFrame
import datetime as dt
from datetime import date, datetime, timedelta
from dateutil.relativedelta import relativedelta
from pyspark.sql.window import Window
import pyspark.sql.types as t
from decimal import Decimal
from pyspark.sql.functions import regexp_replace

In [None]:
from dataproc_sdk.dataproc_sdk_datiopysparksession.datiopysparksession import DatioPysparkSession
datioSparkSession = DatioPysparkSession().get_or_create()

from dataproc_sdk.dataproc_sdk_datiopysparksession import datiopysparksession
dataproc = datiopysparksession.DatioPysparkSession().get_or_create()

from dataproc_sdk.dataproc_sdk_schema.datioschema import DatioSchema
from dataproc_sdk.dataproc_sdk_datiofilesystem.datiofilesystem import DatioFileSystem

In [None]:
# para evitar problemas de tipología de datos
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

In [None]:
# para coger todas las columnas del fichero independientemente que hay fotos que o las tengan
spark.conf.set('spark.sql.parquet.mergeSchema', 'true')

# Configuracion

### Paths in

In [None]:
# limites fijados por el inversor, ya tratados en nuestro proceso (notebook previo de procesamiento del launchpad)
path_limites = '/data/sandboxes/dslb/data/Joystick/TITULIZACIONES/limites/current/'

In [None]:
# base de datos de partida la salida de titulizaciones de joystick
path_titulizaciones = '/data/sandboxes/dslb/data/Joystick/mrr/joystick_mrr'

In [None]:
# adaptar los sectores que tenemos a nivel sector-subsector
path_catalogo_sector_project = '/data/sandboxes/dslb/data/Joystick/TITULIZACIONES/catalogos/sectores_proyecto.csv'

In [None]:
# tabla para el cost income
path_ci = "/data/master/xivg/data/t_xivg_cost_income"
# '/data/master/finance/investments_cost/xivg/data/t_xivg_cost_income/' (OLD)

### Paths out

In [None]:
root_path = '/data/sandboxes/dslb/data/Joystick/TITULIZACIONES/cartera_optima/closing_date='+format(date.today().strftime("%Y-%m-%d"))
root_path

In [None]:
path_facilities = root_path + '/facilities'

In [None]:
path_limites_only = root_path + '/limites'

In [None]:
path_constantes = root_path + '/constants'

## Diccionarios
Se genera un diccionario para los valores de rating

In [None]:
# non_ig: marca limites peores a rating BB+ (numérico: 18) (BB+1 escala larga)
# rating que tomamos depende del tipo de titulizacion que se quiera realizar
non_ig_limit ={
    'corporate_loan': {'categoria':'gf_ma_expanded_master_scale_id', 'valor':'ma_expanded_master_scale_number'}, # rating regulatorio para tipologia corporate
    'project_finance': {'categoria':'g_lmscl_internal_ratg_type', 'valor':'ind_rating'} # rating interno para tipologia project
}

# Funciones

In [None]:
# calculamos la fecha más reciente de la ruta tomando como campo de particion el pasado como parámetro
def last_partition (p_path:str, campo:str):
    
    datio_path = DatioFileSystem().get().qualify(p_path)
    fs = datio_path.fileSystem()
    path = datio_path.path()
    path_list = fs.listStatus(path)
    paths = [path.getPath().toString() for path in path_list] #listado de todos los paths de la ruta pasada
    
    l_fechas = [element.split(campo+'=')[1] for element in paths if campo in element] #listado de todas las fechas
    return max(l_fechas) # fecha mayor

# Orígenes de Datos

## 1. Límites
información con los límites del inversor activos en la foto actual
- Ya filtrados para los que aplican a la titulizacion en curso (Activos, Project/Corporate y STS)

In [None]:
limites =dataproc.read().parquet(path_limites)

In [None]:
limites.show(5,False)

### Tipo Titulizacion
pueden ser Corporate/Project

In [None]:
tipo_titulizacion = limites.where(F.col('limit_type')=='portfolio_type').select('corporate_loan_flag','project_finance_flag')
corporate_flag = tipo_titulizacion.select('corporate_loan_flag').collect()[0].corporate_loan_flag
project_flag = tipo_titulizacion.select('project_finance_flag').collect()[0].project_finance_flag

if(corporate_flag==1):
    tipo_titulizacion = 'corporate_loan'
    print('titulizacion de corporate loan')
if(project_flag==1):
    tipo_titulizacion = 'project_finance'
    print('titulizacion de project finance')   

### Fecha Launchpad
para saber al fecha de carga que estamos usando

In [None]:
fecha_launchpad = limites.select('closing_date').distinct().collect()[0].closing_date
print('fecha del launchpad:', fecha_launchpad)

## 2. Tabla Titulizaciones
información que se comparte como punto de partida para generar titulizaciones

In [None]:
fecha_titus = last_partition (path_titulizaciones, 'clan_date')
fecha_titus

In [None]:
titus = dataproc.read().parquet(path_titulizaciones
                               ).where(F.col('clan_date')==fecha_titus)

In [None]:
# titus.printSchema()

In [None]:
titus.count()

In [None]:
titus.show(5,False)

In [None]:
key_t = ['delta_file_id','delta_file_band_id','branch_id']
if (titus.groupBy(*key_t).agg(F.count('delta_file_id').alias('n')).where(F.col('n')>1).count()>1):
    print('Hay duplicados a nivel facility')

In [None]:
# SI HUBIERA DUPLICADOS a nivel key_t, si hay duplicados a nivel 'delta_file_id','delta_file_band_id', unificamos registros a este nivel
# si queremos ultimo status por operacion
# key_fac = ['delta_file_id','delta_file_band_id','branch_id']
# window = Window.partitionBy(*key_fac).orderBy(F.col("clan_date").desc())

# titus = dataproc.read().parquet(path_titulizaciones
#                                ).withColumn("rn", F.row_number().over(window)
#                                ).where(F.col("rn") == 1).drop('rn')  

# analizar fechas usadas de las 2 bases de datos origen
# titus.select('clan_date','basemoto_date','ifrs9_date').distinct().orderBy('clan_date','basemoto_date','ifrs9_date').show(50,False)

# si leemos csv
# titus = spark.read.option('header','True').option('delimiter',',').csv(path_titulizaciones_csv)

In [None]:
# columnas disponibles
# sorted(titus.columns)

**PDTE: datos de garante bei pdte de una incidencia debido al descenso del volumen con esta tipologia**

In [None]:
cols_bei=['non_bei_guaranteed_amount','bei_guaranteed_amount']
titus.select(*cols_bei).show(5,False)

In [None]:
# sorted(titus.columns)

## 3. Catálogo Sector Proyecto
Se carga el catálogo con la relación Sector - Subsector

In [None]:
sector_proj = spark.read.option('header','True').option('delimiter',';').csv(path_catalogo_sector_project
                        ).withColumn('project_sector_desc', F.trim('project_sector_desc'))

In [None]:
sector_proj.show(5,False)

In [None]:
print('Número de sectores de proyecto: ',sector_proj.select('project_sector_desc').distinct().count())

## 4. Cost Income
costes de explotación

In [None]:
path_ci

In [None]:
fecha_income = last_partition (path_ci, 'gf_cutoff_date')

In [None]:
df = dataproc.read().parquet(path_ci).show(5,False)

In [None]:
area = 'GF'

df_ic = dataproc.read().parquet(path_ci).where(F.col('gf_cutoff_date')==fecha_income
                                              ).where(F.col('gf_business_area_id')==area
                                              ).select('gf_customer_contract_control_per','gf_head_office_desc')
df_ic.orderBy('gf_customer_contract_control_per').show(5,False)

In [None]:
# +-----------------------------+------------------+
# |customer_contract_control_per|head_office_des_id|
# +-----------------------------+------------------+
# |0.003649668                  |COLOMBIA          |
# |0.047890240                  |MEXICO            |
# |0.048922224                  |ARGENTINA         |
# |0.056122127                  |PERU              |
# |0.075001987                  |VENEZUELA         |

# Generación Columnas
Nuevas columnas para resolver los límites marcados por el inversor

## 1. Subsector proyecto
Añadimos una columna para indicar el subsector

In [None]:
# tenemos mas sectores en las titulizaciones que en el catálogo de relación sector-subsector
print('Número de sectores de proyecto en titulizaciones: ',titus.select('project_sector_desc').distinct().count())

In [None]:
# sectores incluidos en la tabla de titulizaciones
# titus.groupBy('project_sector_desc').count().orderBy('project_sector_desc').show(10,False)

In [None]:
titus_1 = titus.withColumn('project_sector_desc', F.trim('project_sector_desc')
                          ).join(sector_proj,['project_sector_desc'],'left').fillna('No Informado')

In [None]:
titus_1.show(5,False)

In [None]:
# listado con la tipologia sector-subsector generado en la tabla de titulizaciones
# titus_1.groupBy('project_sector_desc','project_subsector_desc'
#              ).count().orderBy('project_sector_desc','project_subsector_desc'
#              ).show(100,False)

In [None]:
# si queremos el sector-subsector de la actividad del cliente en lugar del proyecto
# titus.groupBy('g_asset_allocation_sector_desc','g_asset_allocation_subsec_desc'
#              ).count().orderBy('g_asset_allocation_sector_desc','g_asset_allocation_subsec_desc'
#              ).show(100,False)

## 2. Marca ICO
flag para indicar si es un tipo de préstamo con financiación ICO

In [None]:
# deal_purpose_type="ICO España"
titus_2 = titus_1.withColumn('ico_flag', F.when(F.trim(F.col('deal_purpose_type'))=="ICO España",1).otherwise(0))
titus_2.groupBy('deal_purpose_type','ico_flag').count().show(20,False)                        

## 3. Marca Non-IG
flag para indicar que el rating es BB+1 o peor
- depende del tipo de titulizacion:Corporate/Project el rating que se toma es distinto

In [None]:
col_rating_categ = non_ig_limit[tipo_titulizacion]['categoria']
col_rating_categ

In [None]:
col_rating_pos = non_ig_limit[tipo_titulizacion]['valor']
col_rating_pos

In [None]:
n_rating = [x[col_rating_pos] for x in titus.select(col_rating_pos).where(F.col(col_rating_categ)=='BB+1').distinct().collect()][0]
print('BB+1 en valor numérico:',n_rating)

In [None]:
# titus.select('ind_rating','g_lmscl_internal_ratg_type').distinct().orderBy('ind_rating').show(100,False)

In [None]:
# los triple b tampoco tienen que ir marcados porque su rating es mejor que BB+1
titus_3 = titus_2.withColumn('non_ig_flag', F.when(((F.col(col_rating_pos)>=n_rating)&(~(F.col(col_rating_categ).like('BBB%')))),1
                                                   ).otherwise(0))

In [None]:
titus_3.groupBy('non_ig_flag').count().show()

In [None]:
# investment grade: sería el opuesto a non_ig_flag, pero hay ciertos rating que no vienen reflejados, así usamos diccionario
titus_3.where(F.col('non_ig_flag')==1).groupBy(col_rating_pos,col_rating_categ
                                              ).count().orderBy(col_rating_pos,col_rating_categ).show(50,False)

## 4. Marca Proyecto en construccion
flag para indicar si el proyecto está en construcción

In [None]:
titus_4 = titus_3.withColumn('building_project_flag', F.when(F.trim(F.col('gf_pf_project_const_type'))=='S',1).otherwise(0))
titus_4.groupBy('building_project_flag','gf_pf_project_const_type').count().show(20,False)

## 5. Marca Workout
flag para indicar si está en foco de mora
- watch_list_clasification_type (si es 1 o 2 -> esta en foco workout)

In [None]:
titus_5 = titus_4.withColumn('workout_flag', F.when(F.trim(F.col('watch_list_clasification_type'))!=0,1).otherwise(0))
titus_5.groupBy('workout_flag','watch_list_clasification_type').count().show(20,False)

## 6. Flag cumple pago STS
convertir columna booleana a formato flag
- (*) problemas en MicroStrategy si no cambiamos a este formato y mejor para hacer conteos

In [None]:
titus_6 = titus_5.withColumn('sts_payment_flag',F.when(F.col('sts_payment_condition')=='true', 1).otherwise(0))
titus_6.groupBy('sts_payment_flag','sts_payment_condition').count().orderBy('sts_payment_flag','sts_payment_condition').show(50,False)

## 7. Flag condicion RW en STS
convertir columna booleana a formato flag
- (*) problemas en MicroStrategy si no cambiamos a este formato y mejor para hacer conteos

In [None]:
titus_7 = titus_6.withColumn('sts_sm_rw_flag',F.when(F.col('sts_sm_rw_condition')=='true', 1).otherwise(0))
titus_7.groupBy('sts_sm_rw_flag','sts_sm_rw_condition').count().orderBy('sts_sm_rw_flag','sts_sm_rw_condition').show(50,False)

## 8. Flag ESG
Añadimos marca al tablon de datos

**usando marca generada en titulizaciones**

In [None]:
titus_8 = titus_7.withColumn('esg_linked_flag',F.when(F.col('esg_linked')==1,1).otherwise(0))
titus_8.groupBy('esg_linked_flag','esg_linked').count().orderBy('esg_linked_flag').show(50,False)

## 9. Flag BEI
generamos flag para saber si el garante el el Banco Europeo Inversiones BEI

In [None]:
titus_9 = titus_8.withColumn('bei_flag',F.when(((F.col('bei_guaranteed_amount')!=0)&(F.col('bei_guaranteed_amount').isNotNull())),1).otherwise(0))
titus_9.groupBy('bei_flag','bei_guaranteed_amount').count().orderBy('bei_flag').show(50,False)

## 10. Fecha de los datos
dejamos una columna formato fecha de los datos cogidos

In [None]:
mascara = "%Y-%m-%d"
fecha = str(fecha_titus)[0:4] + '-' + str(fecha_titus)[4:6] + '-' + str(fecha_titus)[6:8]
fecha_datos = datetime.strptime(fecha, mascara).date()

In [None]:
titus_10 = titus_9.withColumn('data_date',F.lit(fecha_datos))
titus_10.select('data_date').distinct().show()

# Constantes

In [None]:
constantes = ['tax_rate', # disponible Datio
              'coupon','crr_b','crr_c','crr_d','crr_e','crr_upper_A', # pasados en el launchpad
             'lgd', # por launchpad si es project finance y dato desde Datio si es tipo corporate loan
             'detach', # launchpad
             'ratio_cet1', # disponible Datio(joystick)
             'attachment_point', # launchpad
             'ci_ratio',] # disponible Datio (joystick?), ponderado por producto y geografia]

In [None]:
# inicializamos lista de constantes Datio 
collect_df=[]

## Incluidas en Limites
Recogemos las constantes incluidas en el launchpad de los limites

In [None]:
df_constantes = limites.where(F.col('limit_type')=='constant_type'
                             ).select(F.col('concept1_desc').alias('constant_type'),
                                      F.col('limit_value').alias('constant_value'))
df_constantes.show()

## 1. Tasa impositiva
porcentaje de impuestos a pagar por los préstamos

In [None]:
# Usando Rubik
from rubik.load.rorc import Values as RORCvalues
get_rorc_values = RORCvalues(path="/data", dataproc=dataproc)
tax_rate = get_rorc_values.TaxRate()
tax_rate

In [None]:
collect_df.append({"constant_type" : 'tax_rate', "constant_value" : tax_rate})

## 2. Ratio CET1
porcentaje sobre los activos ponderado por riesgo

In [None]:
# Usando Rubik
from rubik.load.rorc import Values as RORCvalues
get_rorc_values = RORCvalues(path="/data", dataproc=dataproc)
ratio_cet1 = get_rorc_values.CET1()
ratio_cet1

In [None]:
collect_df.append({"constant_type" : 'ratio_cet1', "constant_value" : ratio_cet1})

## 3. LGD
Loss given default (porcentaje de pérdida condicional)
- Si es coporate loan: se calcula el dato a posteriori de saber la cartera a titulizar

In [None]:
# dependiendo de la tipología de la titulización

# (corporate_loan) lgd dentro de titulizaciones: el valor de la LGD ponderada de la cartera subyacente a titulizar
# adj_lgd_ma_mitig_per : viene a nivel operación, hay que dejar valor ponderado de todos los préstamos que entren en la cartera óptima a titulizar

# (project finance) : se coge el marcado en el launchpad aunque esté marcado tb por variable a titulizar gf_pf_final_lgd_amount 

In [None]:
lgd = titus.agg(F.avg(F.col("adj_lgd_ma_mitig_per")).cast('float').alias('lgd')
               ).collect()[0].lgd
lgd

In [None]:
collect_df.append({"constant_type" : 'lgd', "constant_value" : lgd})

## 4. CI RATIO
Ratio cost-to-income (entra en la fórmula del RORC, son los gastos de explotación medios del área, expresados en porcentaje del margen bruto)

In [None]:
ci_ratio = df_ic.where(F.trim(F.col('gf_head_office_desc'))=='ESPAÑA'
                      ).select(F.col('gf_customer_contract_control_per').cast('float')
                              ).collect()[0].gf_customer_contract_control_per
ci_ratio

In [None]:
collect_df.append({"constant_type" : 'ci_ratio', "constant_value" : ci_ratio})

# Persistimos en Sbx

## Tabla de constantes
Incluimos en esta tabla:
- las constantes incluidas en el launchpad de limites
- las constantes generadas

In [None]:
df = spark.createDataFrame(collect_df)

In [None]:
df_constantes_t = df.union(df_constantes).withColumn('closing_date', F.lit(fecha_launchpad)) #incluimos la fecha de la particion del launchpad usada

In [None]:
df_constantes_t.show()

In [None]:
df_constantes_t.write.parquet(path_constantes, mode='overwrite')

In [None]:
path_constantes

## Tabla de limites
Incluimos en esta tabla exclusivamente limites

In [None]:
limites.where(F.col('limit_type')!='constant_type').write.parquet(path_limites_only, mode='overwrite')

In [None]:
path_limites_only

## Tabla de facilities
Incluimos todas las operaciones disponibles a carterizar

In [None]:
titus_10.write.parquet(path_facilities,mode='overwrite')

In [None]:
path_facilities

# TEST

In [None]:
path = '/data/sandboxes/dslb/data/Joystick/TITULIZACIONES/cartera_optima/'
part = 'closing_date'
f = last_partition(path,part)

In [None]:
path_l = path + part + '='+ str(f) + '/facilities'
path_l

In [None]:
# si quiere leer otra partición de diferente dia
# path_facilities = '/data/sandboxes/dslb/data/Joystick/TITULIZACIONES/cartera_optima/closing_date=2024-08-02/facilities'

In [None]:
facilities = dataproc.read().parquet(path_l)
facilities.show(5,False)

In [None]:
# marca según el rating interno
# facilities.groupBy('non_ig_flag').count().show()

In [None]:
# sorted(facilities.columns)