# Notebook de Recaudos

Esta notebook procesa datos de recaudos, realizando diversas transformaciones y uniones para consolidar la información necesaria para los reportes finales.

In [None]:
# Importamos las librerías necesarias para el procesamiento de datos.
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import DecimalType
from IPython.core.display import HTML
from pyspark.sql.window import Window
from datetime import datetime, timedelta
from pyspark.sql.functions import coalesce, col, last

# Ajuste para mostrar correctamente los datos en la notebook
display(HTML("<style>pre { white-space: pre !important; }</style>"))

## Variables de Entorno

Definimos las variables de entorno necesarias para el procesamiento de los datos.

In [None]:
cutoff_date = '2024-03-04' # 5 días posteriores al último mes
fecha_procesar = '2024-02'

## Carga de Datos

Cargamos los datos necesarios desde las rutas especificadas.

In [None]:
df_recaudos = spark.read.parquet("/data/master/pmol/data/t_pmol_daily_clct_consolidated/cutoff_date=" + fecha_procesar + '*')
df_convenios = spark.read.parquet("/data/master/psan/data/t_psan_collection_agreements/cutoff_date=" + cutoff_date)
df_people = spark.read.parquet("/data/master/pbtq/data/t_pbtq_people_daily_information/cutoff_date=" + cutoff_date)
df_hierarchies = spark.read.parquet("/data/master/pcog/data/t_pcog_branch_hierarchies_daily/cutoff_date=" + cutoff_date)
df_tcom_seg = spark.read.parquet("/data/master/psag/data/t_psag_mthly_info_cust_seg_tcom/cutoff_date=2024-01-31")

## Selección de Columnas

Seleccionamos las columnas relevantes de cada DataFrame para reducir el tamaño de los datos y enfocarnos solo en la información necesaria.

In [None]:
# Seleccionamos las columnas de interés del DataFrame de personas.
df_people = df_people.select('customer_id', 'personal_type', 'personal_id', 'last_name', 'second_last_name', 'customer_name', 'card_holder_name')

# Seleccionamos las columnas de interés del DataFrame de convenios.
df_convenios = df_convenios.select('entity_id', 'agreement_type', 'agreement_id', 'covenant_class_id', 'covenant_short_name', 'raise_money_cmsn_debit_type', 'current_contract_id', 'commission_account_id')

# Filtramos y seleccionamos las columnas de interés del DataFrame de jerarquías.
df_hierarchies = df_hierarchies.filter(df_hierarchies['entity_id'] == 9012).select('branch_id', 'branch_name', 'level50_territorial_desc', 'level60_operarea_desc')

# Seleccionamos las columnas de interés del DataFrame de segmentación TCOM.
df_tcom_seg = df_tcom_seg.select('customer_id', 'main_branchsf_id')

# Renombramos la columna 'customer_id' a 'customer_tcom_id' para evitar conflictos en futuras uniones.
df_tcom_seg = df_tcom_seg.withColumnRenamed('customer_id', 'customer_tcom_id')

## Uniones de DataFrames

Realizamos las uniones necesarias entre los DataFrames para consolidar la información.

In [None]:
# Unimos los datos de segmentación TCOM con las jerarquías de sucursales.
df_tcom_hierarchies = df_tcom_seg.join(df_hierarchies, df_hierarchies.branch_id == df_tcom_seg.main_branchsf_id, 'left')

# Renombramos columnas en el DataFrame de recaudos para evitar conflictos en futuras uniones.
df_recaudos = df_recaudos.withColumnRenamed('agreement_id', 'agreement_contrato_id')
df_recaudos = df_recaudos.withColumnRenamed('covenant_class_id', 'covenant_class_contrato_id')

# Unimos los recaudos con los convenios utilizando las columnas renombradas.
df_join = df_recaudos.join(df_convenios, (df_convenios.agreement_id == df_recaudos.agreement_contrato_id) & (df_convenios.covenant_class_id == df_recaudos.covenant_class_contrato_id))

# Unimos el resultado anterior con los datos de personas.
df_joined = df_join.join(df_people, df_people.customer_id == df_join.ben_customer_id, 'left')

# Finalmente, unimos el DataFrame resultante con los datos de jerarquías TCOM.
df_joined = df_joined.join(df_tcom_hierarchies, df_tcom_hierarchies.customer_tcom_id == df_joined.ben_customer_id, 'left')

## Transformación de Datos

Realizamos las transformaciones necesarias en los datos.

In [None]:
# Convertimos las columnas de importes a tipo decimal para asegurar precisión en los cálculos.
df_joined = df_joined.withColumn('monto_soles', (F.col('local_currency_amount')).cast('decimal(23,2)'))
df_joined = df_joined.withColumn('comision_empresa_soles', (F.col('local_currency_company_fee_amount')).cast('decimal(23,2)'))
df_joined = df_joined.withColumn('comision_cliente_soles', (F.col('local_currency_customer_fee_amount')).cast('decimal(23,2)'))

## Agregación de Datos

Agrupamos y agregamos los datos para obtener las métricas finales.

In [None]:
# Agrupamos los datos por diversas dimensiones y calculamos métricas agregadas como sumas de montos y comisiones, y el conteo de operaciones.
df_final = df_joined.groupBy('agreement_id', 'covenant_class_id', 'ben_customer_id', 'card_holder_name', 'covenant_short_name', 'level50_territorial_desc', 'level60_operarea_desc', 'branch_name', 'payment_channel_type')\
                    .agg(F.sum('monto_soles').cast("decimal(23,2)").alias('monto_pagado'),
                         F.sum('comision_empresa_soles').cast("decimal(23,2)").alias('comision_empresa'),
                         F.sum('comision_cliente_soles').cast("decimal(23,2)").alias('comision_cliente'),
                         F.count('*').alias('numero_operaciones'))

## Almacenamiento de Datos

Guardamos los datos finales en el almacenamiento especificado.

In [None]:
# Guardamos el DataFrame final en formato Parquet, coalesciendo a un único archivo.
df_final.coalesce(1).write.parquet(path="/intelligence/inpdpm/analytic/users/PRUEBA/workspace/OfRe/reportes/" + fecha_procesar, mode="overwrite")

## Resumen de Transformaciones y Funciones Usadas

- **select**: Selecciona columnas específicas de un DataFrame, lo que ayuda a reducir el tamaño de los datos y enfocarse solo en la información necesaria.
- **withColumnRenamed**: Renombra una columna existente en el DataFrame, útil para evitar conflictos de nombres en futuras uniones.
- **join**: Realiza una unión entre dos DataFrames basándose en una condición específica, permitiendo combinar información de múltiples fuentes.
- **withColumn**: Crea o reemplaza una columna en el DataFrame. En este caso, se usa para convertir importes a tipo decimal.
- **groupBy**: Agrupa los datos por una o más columnas, permitiendo realizar operaciones de agregación en cada grupo.
- **agg**: Realiza operaciones de agregación, como sumas y conteos, en los datos agrupados.
- **coalesce**: Reduce el número de particiones en el DataFrame, útil para escribir datos en un solo archivo.
- **write.parquet**: Guarda el DataFrame en formato Parquet, que es eficiente en términos de almacenamiento y rendimiento para grandes volúmenes de datos.