# Importar Librerías

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, StructType, StructField, DateType, StringType
from pyspark.sql.functions import col, sum as _sum
from pyspark.sql import DataFrame

In [25]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import abs

In [26]:
from pyspark.sql.functions import when

from pyspark.sql.functions import expr
from pyspark.sql.functions import datediff
from pyspark.sql.functions import array
from pyspark.sql.types import DoubleType, DateType
from pyspark.sql.functions import when, col, abs

In [27]:
from pyspark.sql.functions import col, countDistinct, collect_list, array_contains
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, DateType, StringType

In [None]:
import pandas as pd

In [31]:
import datetime

In [None]:
spark = SparkSession.builder.appName("Pandas to PySpark conversion").getOrCreate()

In [None]:
cols_df1 = [
    'CLIENTEAFILIADOBENEFICIARIO',
    'AUTORIZACION',
    'FECHASERVICIO',
    'TIPOATENCION',
    'PRESTADOR',
    'TipoPrestador',
    'ESPECIALIDAD',
    'PRESTACION',
    'TIPOPRESTACION',
    'SERVICIOSALUD',
    'MONTOUNITARIO',
    'CANTIDAD',
    'MONTORECLAMADO',
    'MONTOAJUSTADO',
    'MONTODIFERENCIAAFILIADO',
    'MONTOCOPAGO',
    'AUTORIZACIONRELACIONADA',
    'MONTOASEGURADORA',
    'TipoInstitucionPrestador'
    ]

cols_df2 = [
    'CLIENTEAFILIADOBENEFICIARIO',
    'AUTORIZACION',
    'FECHASERVICIO',
    'TIPOATENCION',
    'PRESTADOR',
    'TipoPrestador',
    'ESPECIALIDAD',
    'PRESTACION',
    'TIPOPRESTACION',
    'SERVICIOSALUD',
    'MONTOAJUSTADO',
    'MONTOCOPAGO',
    'AUTORIZACIONRELACIONADA',
    'MONTOASEGURADORA',
    'MONTOEXCEPCION',
    'MONTOCONCESION',
    'MONTOAUTORIZADO'
    ]

n = 100000

In [32]:
df1 = spark.read.parquet('../Datos/dataAutorizaciones20221208.parquet',
                         inferSchema = True).select([col(c) for c in cols_df1]).limit(n)



In [None]:
df1.show(3)

In [33]:
df2 = spark.read.parquet('../Datos/dataDistribucionesAutorizaciones20221208.parquet',
                         inferSchema = True).select([col(c) for c in cols_df2]).limit(n)

In [None]:
df2.show(3)

In [None]:
rows_df1 = df1.count()
rows_df2 = df2.count()

In [None]:
print(rows_df1)
print(rows_df2)

In [None]:
# CUPS frecuentes en emergencias
cups_df = pd.read_excel(
    '../Datos/CUPS frecuencias y otros (002).xlsx',engine='openpyxl'
)
# Convertir el dataframe de Pandas 'cups_df' a un dataframe de PySpark
cups_df = spark.createDataFrame(cups_df)

In [None]:
# CUPS cantidades esperadas
valores_referencia_cups_df = pd.read_excel(
    '../Datos/CUPS frecuencias y otros (002).xlsx', 
    sheet_name=1,engine='openpyxl'
)
# Convertir el dataframe de Pandas 'valores_referencia_cups_df' a un dataframe de PySpark
valores_referencia_cups_df = spark.createDataFrame(valores_referencia_cups_df)

In [None]:
# Indicador de dias de habitación

# Read File
salas_df = pd.read_excel(
    '../Datos/salas.xlsx', engine='openpyxl'
)

salas_list = salas_df['DESCRIPCION'].tolist()

# Convertir el dataframe de Pandas 'salas_df' a un dataframe de PySpark
salas_df = spark.createDataFrame(salas_df)

# Convertir la lista 'salas_list' a un objeto de PySpark equivalente (RDD)
#salas_list = spark.sparkContext.parallelize(salas_list)

In [None]:
type(salas_list)

In [39]:
# Unir Datasets

def consolidate_dataframes(df1, df2):
    # Encuentra intersección de autorizaciones
    autorizaciones = df1.select("AUTORIZACION").distinct().intersect(df2.select("AUTORIZACION").distinct())
    
    
    cols_df1 = [
    'CLIENTEAFILIADOBENEFICIARIO',
    'AUTORIZACION',
    'FECHASERVICIO',
    'TIPOATENCION',
    'PRESTADOR',
    'TipoPrestador',
    'ESPECIALIDAD',
    'PRESTACION',
    'TIPOPRESTACION',
    'SERVICIOSALUD',
    'MONTOUNITARIO',
    'CANTIDAD',
    'MONTORECLAMADO',
    'MONTOAJUSTADO',
    'MONTODIFERENCIAAFILIADO',
    'MONTOCOPAGO',
    'AUTORIZACIONRELACIONADA',
    'MONTOASEGURADORA',
    'TipoInstitucionPrestador'
    ]
    
    cols_df2 = [
    'CLIENTEAFILIADOBENEFICIARIO',
    'AUTORIZACION',
    'FECHASERVICIO',
    'TIPOATENCION',
    'PRESTADOR',
    'TipoPrestador',
    'ESPECIALIDAD',
    'PRESTACION',
    'TIPOPRESTACION',
    'SERVICIOSALUD',
    'MONTOAJUSTADO',
    'MONTOCOPAGO',
    'AUTORIZACIONRELACIONADA',
    'MONTOASEGURADORA',
    'MONTOEXCEPCION',
    'MONTOCONCESION',
    'MONTOAUTORIZADO'
    ]




    # Filtrar y seleccionar columnas relevantes
    filtered_df1 = df1.join(autorizaciones, on="AUTORIZACION", how="inner").select(cols_df1)
    filtered_df2 = df2.join(autorizaciones, on="AUTORIZACION", how="inner").select(cols_df2)

    # Agrupar montos en el dataset de distribución
    groupby_columns = [
        'AUTORIZACION',
        'CLIENTEAFILIADOBENEFICIARIO',
        'PRESTACION',
        'PRESTADOR',
        'FECHASERVICIO'
    ]

    aggregated_df2 = filtered_df2.groupBy(groupby_columns).agg(
        _sum('MONTOASEGURADORA'),
        _sum('MONTOEXCEPCION'),
        _sum('MONTOCONCESION'),
        _sum('MONTOAUTORIZADO')
    )

    # Consolidar las dos bases de datos
    consolidated_df = filtered_df1.join(
        aggregated_df2,
        on=groupby_columns,
        how='inner'
    )

    # Eliminar columnas
    consolidated_df = consolidated_df.drop('MONTOASEGURADORA')

    # Renombrar columnas    
    consolidated_df = consolidated_df.withColumnRenamed('sum(MONTOASEGURADORA)', 'MONTOASEGURADORA') \
        .withColumnRenamed('sum(MONTOEXCEPCION)', 'MONTOEXCEPCION') \
        .withColumnRenamed('sum(MONTOCONCESION)', 'MONTOCONCESION') \
        .withColumnRenamed('sum(MONTOAUTORIZADO)', 'MONTOAUTORIZADO') \
        .withColumnRenamed('MONTODIFERENCIAAFILIADO', 'MONTOUSUARIO') \
        .withColumnRenamed('TipoPrestador', 'TIPOPRESTADOR')

    return consolidated_df


# Uso de la función
df = consolidate_dataframes(df1, df2)



In [40]:
type(df)

pyspark.sql.dataframe.DataFrame

In [41]:
df.show(1)

+------------+---------------------------+--------------------+--------------------+--------------------+------------+--------------------+---------------+--------------------+--------------------+-------------+--------+--------------+-------------+------------+-----------+-----------------------+------------------------+----------------+--------------+--------------+---------------+
|AUTORIZACION|CLIENTEAFILIADOBENEFICIARIO|          PRESTACION|           PRESTADOR|       FECHASERVICIO|TIPOATENCION|       TIPOPRESTADOR|   ESPECIALIDAD|      TIPOPRESTACION|       SERVICIOSALUD|MONTOUNITARIO|CANTIDAD|MONTORECLAMADO|MONTOAJUSTADO|MONTOUSUARIO|MONTOCOPAGO|AUTORIZACIONRELACIONADA|TipoInstitucionPrestador|MONTOASEGURADORA|MONTOEXCEPCION|MONTOCONCESION|MONTOAUTORIZADO|
+------------+---------------------------+--------------------+--------------------+--------------------+------------+--------------------+---------------+--------------------+--------------------+-------------+--------+------

In [None]:
del(df1, df2)

In [14]:
def get_shape(dataframe):
    row_count = dataframe.count()
    column_count = len(dataframe.columns)
    return (row_count, column_count)

# Uso de la función get_shape con un DataFrame
shape = get_shape(df)



In [15]:
shape = get_shape(df)
print(shape)

(17614, 22)


In [None]:
df.filter(df['AUTORIZACION'] == 46857903).show(1)


# Preprocesamiento

In [42]:
def clean_and_format_data(df):
    # Eliminar espacios al final y al comienzo de Prestacion
    df = df.withColumn('PRESTACION', F.trim(F.col('PRESTACION')))
    
    # Dar formato a Fechas de Servicio
    df = df.withColumn('FECHASERVICIO', F.to_date(F.col('FECHASERVICIO')))
    
    # Obtener mes de fecha de servicio
    df = df.withColumn('FECHASERVICIO_MONTH', F.date_format(F.col('FECHASERVICIO'), 'MMMM'))
    df = df.withColumn('FECHASERVICIO_MONTH_NUMBER', F.month(F.col('FECHASERVICIO')))
    
    return df

# Uso de la función
df = clean_and_format_data(df)

In [43]:
df.show(1)

+------------+---------------------------+--------------------+--------------------+-------------+------------+--------------------+---------------+--------------------+--------------------+-------------+--------+--------------+-------------+------------+-----------+-----------------------+------------------------+----------------+--------------+--------------+---------------+-------------------+--------------------------+
|AUTORIZACION|CLIENTEAFILIADOBENEFICIARIO|          PRESTACION|           PRESTADOR|FECHASERVICIO|TIPOATENCION|       TIPOPRESTADOR|   ESPECIALIDAD|      TIPOPRESTACION|       SERVICIOSALUD|MONTOUNITARIO|CANTIDAD|MONTORECLAMADO|MONTOAJUSTADO|MONTOUSUARIO|MONTOCOPAGO|AUTORIZACIONRELACIONADA|TipoInstitucionPrestador|MONTOASEGURADORA|MONTOEXCEPCION|MONTOCONCESION|MONTOAUTORIZADO|FECHASERVICIO_MONTH|FECHASERVICIO_MONTH_NUMBER|
+------------+---------------------------+--------------------+--------------------+-------------+------------+--------------------+--------------

In [None]:
shape = get_shape(df)
print(shape)

In [None]:
df.filter(df['AUTORIZACION'] == 46857903).show(1)

In [46]:
dftest = df.alias("dftest")
print(type(dftest))

<class 'pyspark.sql.dataframe.DataFrame'>


In [44]:
def diferencia_montos(df):
    df = df.withColumn('DIFERENCIA_MONTO_RECLAMADO_AUTORIZADO', df['MONTORECLAMADO'] - df['MONTOAUTORIZADO'])
    df = df.withColumn('DIFERENCIA_MONTO_RECLAMADO_AUTORIZADO_USUARIO', df['MONTORECLAMADO'] - (df['MONTOAUTORIZADO'] + df['MONTOUSUARIO']))
    df = df.withColumn('MONTOAJUSTADO_ABS', abs(df['MONTOAJUSTADO']))
    df = df.withColumn('MONTOCONCESION_ABS', abs(df['MONTOCONCESION']))
    df = df.withColumn('DIFERENCIA_ABSOLUTA_MONTO_RECLAMADO_AUTORIZADO', abs(df['DIFERENCIA_MONTO_RECLAMADO_AUTORIZADO']))
    return df



In [47]:
dftest.show(1)

+------------+---------------------------+--------------------+--------------------+-------------+------------+--------------------+---------------+--------------------+--------------------+-------------+--------+--------------+-------------+------------+-----------+-----------------------+------------------------+----------------+--------------+--------------+---------------+-------------------+--------------------------+
|AUTORIZACION|CLIENTEAFILIADOBENEFICIARIO|          PRESTACION|           PRESTADOR|FECHASERVICIO|TIPOATENCION|       TIPOPRESTADOR|   ESPECIALIDAD|      TIPOPRESTACION|       SERVICIOSALUD|MONTOUNITARIO|CANTIDAD|MONTORECLAMADO|MONTOAJUSTADO|MONTOUSUARIO|MONTOCOPAGO|AUTORIZACIONRELACIONADA|TipoInstitucionPrestador|MONTOASEGURADORA|MONTOEXCEPCION|MONTOCONCESION|MONTOAUTORIZADO|FECHASERVICIO_MONTH|FECHASERVICIO_MONTH_NUMBER|
+------------+---------------------------+--------------------+--------------------+-------------+------------+--------------------+--------------

In [48]:
dftest = diferencia_montos(dftest)

In [None]:
shape = get_shape(dftest)
print(shape)

In [49]:
dftest.show(1)

+------------+---------------------------+--------------------+--------------------+-------------+------------+--------------------+---------------+--------------------+--------------------+-------------+--------+--------------+-------------+------------+-----------+-----------------------+------------------------+----------------+--------------+--------------+---------------+-------------------+--------------------------+-------------------------------------+---------------------------------------------+-----------------+------------------+----------------------------------------------+
|AUTORIZACION|CLIENTEAFILIADOBENEFICIARIO|          PRESTACION|           PRESTADOR|FECHASERVICIO|TIPOATENCION|       TIPOPRESTADOR|   ESPECIALIDAD|      TIPOPRESTACION|       SERVICIOSALUD|MONTOUNITARIO|CANTIDAD|MONTORECLAMADO|MONTOAJUSTADO|MONTOUSUARIO|MONTOCOPAGO|AUTORIZACIONRELACIONADA|TipoInstitucionPrestador|MONTOASEGURADORA|MONTOEXCEPCION|MONTOCONCESION|MONTOAUTORIZADO|FECHASERVICIO_MONTH|FECHAS

In [50]:
def calcular_fechas_1(df, salas_list):
    # Calcular Dias de habitación
    df = df.withColumn('DIAS_HABITACION', F.when(F.col('PRESTACION').isin(salas_list), F.col('CANTIDAD')).otherwise(None))
    
    # Calcular la fecha de fin de servicio
    def calculate_end_date(start_date, dias_habitacion):
        if dias_habitacion is None:
            return None
        else:
            end_date = start_date + datetime.timedelta(days=dias_habitacion)
            return end_date

    # Convertir la función de Python a una función de PySpark
    calculate_end_date_udf = F.udf(calculate_end_date, DateType())

    # Aplicar la función UDF a las columnas correspondientes y crear una nueva columna "FECHASERVICIO_FIN"
    df = df.withColumn('FECHASERVICIO_FIN', calculate_end_date_udf(F.col('FECHASERVICIO'), F.col('CANTIDAD')))

    return df

In [51]:
dftest = calcular_fechas_1(dftest, salas_list)

In [52]:
dftest.show(1)

+------------+---------------------------+--------------------+--------------------+-------------+------------+--------------------+---------------+--------------------+--------------------+-------------+--------+--------------+-------------+------------+-----------+-----------------------+------------------------+----------------+--------------+--------------+---------------+-------------------+--------------------------+-------------------------------------+---------------------------------------------+-----------------+------------------+----------------------------------------------+---------------+-----------------+
|AUTORIZACION|CLIENTEAFILIADOBENEFICIARIO|          PRESTACION|           PRESTADOR|FECHASERVICIO|TIPOATENCION|       TIPOPRESTADOR|   ESPECIALIDAD|      TIPOPRESTACION|       SERVICIOSALUD|MONTOUNITARIO|CANTIDAD|MONTORECLAMADO|MONTOAJUSTADO|MONTOUSUARIO|MONTOCOPAGO|AUTORIZACIONRELACIONADA|TipoInstitucionPrestador|MONTOASEGURADORA|MONTOEXCEPCION|MONTOCONCESION|MONTOAUT

In [None]:
shape = get_shape(dftest)
print(shape)

In [53]:
def tipo_especialidad(df, especialidades_claves):
    df = df.withColumn('TIPO_ESPECIALIDAD', F.when(F.col('ESPECIALIDAD').isin(especialidades_claves), F.col('ESPECIALIDAD')).otherwise('OTROS'))
    return df

# Uso de la función
especialidades_claves = [
    'CENTROS MEDICOS',
    'ATENCION FARMACEUTICA', 
    'CTROS. DIAGNOSTICOS',
    'MEDICINA FISICA Y REHABILITACION', 
    'LABORATORIO CLINICO DE HEMATOLOGIA Y BAN',
]
dftest = tipo_especialidad(dftest, especialidades_claves)



In [54]:
dftest.show(1)

+------------+---------------------------+--------------------+--------------------+-------------+------------+--------------------+---------------+--------------------+--------------------+-------------+--------+--------------+-------------+------------+-----------+-----------------------+------------------------+----------------+--------------+--------------+---------------+-------------------+--------------------------+-------------------------------------+---------------------------------------------+-----------------+------------------+----------------------------------------------+---------------+-----------------+-----------------+
|AUTORIZACION|CLIENTEAFILIADOBENEFICIARIO|          PRESTACION|           PRESTADOR|FECHASERVICIO|TIPOATENCION|       TIPOPRESTADOR|   ESPECIALIDAD|      TIPOPRESTACION|       SERVICIOSALUD|MONTOUNITARIO|CANTIDAD|MONTORECLAMADO|MONTOAJUSTADO|MONTOUSUARIO|MONTOCOPAGO|AUTORIZACIONRELACIONADA|TipoInstitucionPrestador|MONTOASEGURADORA|MONTOEXCEPCION|MONTO

In [None]:
shape = get_shape(dftest)
print(shape)

In [55]:
def alertas_montos_1(df):
    df = df.withColumn('ERROR_MONTO_RECLAMADO', when(col('DIFERENCIA_MONTO_RECLAMADO_AUTORIZADO_USUARIO') != 0, 1).otherwise(0)) \
           .withColumn('TIENE_MONTO_EXCEPCION', when(abs(col('MONTOEXCEPCION')) > 0, 1).otherwise(0)) \
           .withColumn('ERROR_MONTO_AUTORIZADO2', when(abs(col('MONTOAUTORIZADO') - col('MONTOASEGURADORA') - col('MONTOCONCESION') - col('MONTOEXCEPCION')) > 0.1, 1).otherwise(0)) \
           .withColumn('ERROR_CONSECION', when(col('MONTOCONCESION_ABS') > 0.2 * col('MONTOAUTORIZADO'), 1).otherwise(0)) \
           .withColumn('ERROR_MONTO_AUTORIZADO1', when(col('MONTOAUTORIZADO') > col('MONTORECLAMADO'), 1).otherwise(0))
    
    return df

# Uso de la función
dftest = alertas_montos_1(dftest)


In [56]:
# Uso de la función
dftest = alertas_montos_1(dftest)

In [57]:
dftest.show(1)

+------------+---------------------------+--------------------+--------------------+-------------+------------+--------------------+---------------+--------------------+--------------------+-------------+--------+--------------+-------------+------------+-----------+-----------------------+------------------------+----------------+--------------+--------------+---------------+-------------------+--------------------------+-------------------------------------+---------------------------------------------+-----------------+------------------+----------------------------------------------+---------------+-----------------+-----------------+---------------------+---------------------+-----------------------+---------------+-----------------------+
|AUTORIZACION|CLIENTEAFILIADOBENEFICIARIO|          PRESTACION|           PRESTADOR|FECHASERVICIO|TIPOATENCION|       TIPOPRESTADOR|   ESPECIALIDAD|      TIPOPRESTACION|       SERVICIOSALUD|MONTOUNITARIO|CANTIDAD|MONTORECLAMADO|MONTOAJUSTADO|MON

In [None]:
shape = get_shape(dftest)
print(shape)

In [58]:
def alertas_dias_habitacion(df):
    # Alertas para Reglas de Negocio, relacionada a los dias de habitación
    df = df.withColumn('LIM_2_DIAS_HABITACION_EXCEDIDO', when(col('DIAS_HABITACION') > 2, 1).otherwise(0)) \
           .withColumn('LIM_6_DIAS_HABITACION_EXCEDIDO', when(col('DIAS_HABITACION') > 6, 1).otherwise(0))
    
    return df

In [59]:
dftest = alertas_dias_habitacion(dftest)
dftest.show(1)

+------------+---------------------------+--------------------+--------------------+-------------+------------+--------------------+---------------+--------------------+--------------------+-------------+--------+--------------+-------------+------------+-----------+-----------------------+------------------------+----------------+--------------+--------------+---------------+-------------------+--------------------------+-------------------------------------+---------------------------------------------+-----------------+------------------+----------------------------------------------+---------------+-----------------+-----------------+---------------------+---------------------+-----------------------+---------------+-----------------------+------------------------------+------------------------------+
|AUTORIZACION|CLIENTEAFILIADOBENEFICIARIO|          PRESTACION|           PRESTADOR|FECHASERVICIO|TIPOATENCION|       TIPOPRESTADOR|   ESPECIALIDAD|      TIPOPRESTACION|       SERVICI

In [None]:
shape = get_shape(dftest)
print(shape)

In [60]:
def indicadores_de_atencion(df):
    df = df.withColumn('ATENCION_AMBULATORIA', when(col('TIPOATENCION') == 'AMBULATORIA', 1).otherwise(0)) \
           .withColumn('ATENCION_HOSPITALARIA', when(col('TIPOATENCION') == 'HOSPITALARIA', 1).otherwise(0))
    
    return df

In [61]:
dftest = indicadores_de_atencion(dftest)

In [None]:
shape = get_shape(dftest)
print(shape)

In [62]:
dftest.show(1)

+------------+---------------------------+--------------------+--------------------+-------------+------------+--------------------+---------------+--------------------+--------------------+-------------+--------+--------------+-------------+------------+-----------+-----------------------+------------------------+----------------+--------------+--------------+---------------+-------------------+--------------------------+-------------------------------------+---------------------------------------------+-----------------+------------------+----------------------------------------------+---------------+-----------------+-----------------+---------------------+---------------------+-----------------------+---------------+-----------------------+------------------------------+------------------------------+--------------------+---------------------+
|AUTORIZACION|CLIENTEAFILIADOBENEFICIARIO|          PRESTACION|           PRESTADOR|FECHASERVICIO|TIPOATENCION|       TIPOPRESTADOR|   ESPEC

In [63]:
def indicadores_de_prestador(df):
    df = df.withColumn('CENTROS_MEDICOS', when(col('TIPO_ESPECIALIDAD') == 'CENTROS MEDICOS', 1).otherwise(0)) \
           .withColumn('CENTRO_DIAGNOSTICO', when(col('TIPO_ESPECIALIDAD') == 'CTROS. DIAGNOSTICOS', 1).otherwise(0)) \
           .withColumn('LABORATORIO_CLINICO', when(col('TIPO_ESPECIALIDAD') == 'LABORATORIO CLINICO DE HEMATOLOGIA Y BAN', 1).otherwise(0)) \
           .withColumn('ATENCION_FARMACEUTICA', when(col('TIPO_ESPECIALIDAD') == 'ATENCION FARMACEUTICA', 1).otherwise(0)) \
           .withColumn('OTRA_ESPECIALIDAD', when(col('TIPO_ESPECIALIDAD') == 'OTROS', 1).otherwise(0)) \
           .withColumn('MEDICO_ESPECIALISTA', when(col('TIPOPRESTADOR') == 'MEDICOS Y/O ESPECIALISTAS', 1).otherwise(0)) \
           .withColumn('ES_CONSULTA', when(col('PRESTACION').contains('CONSULTA'), 1).otherwise(0))
    
    return df

In [64]:
dftest = indicadores_de_prestador(dftest)

In [None]:
shape = get_shape(dftest)
print(shape)

In [65]:
dftest.show(1)

+------------+---------------------------+--------------------+--------------------+-------------+------------+--------------------+---------------+--------------------+--------------------+-------------+--------+--------------+-------------+------------+-----------+-----------------------+------------------------+----------------+--------------+--------------+---------------+-------------------+--------------------------+-------------------------------------+---------------------------------------------+-----------------+------------------+----------------------------------------------+---------------+-----------------+-----------------+---------------------+---------------------+-----------------------+---------------+-----------------------+------------------------------+------------------------------+--------------------+---------------------+---------------+------------------+-------------------+---------------------+-----------------+-------------------+-----------+
|AUTORIZACION

In [66]:
def indicadores_de_procedimiento(df):
    df = df.withColumn('OTROS_SERVICIOS', when(col('PRESTACION') == 'OTROS SERVICIOS NO CUBIERTOS', 1).otherwise(0)) \
           .withColumn('MEDICINAS_INTERNAMIENTO', when(col('PRESTACION') == 'Medicinas Durante Internamiento todo tipo', 1).otherwise(0)) \
           .withColumn('MATERIALES_DESECHABLES', when(col('PRESTACION') == 'MATERIALES DESECHABLES', 1).otherwise(0))
    
    return df

def metricas(df):
    df = df.withColumn('MONTO_USUARIO_PER_CAPITA', col('MONTOUSUARIO') / col('CANTIDAD')) \
           .withColumn('DIFERENCIA_RECLAMADO_AUTORIZADO', col('MONTORECLAMADO') - col('MONTOAUTORIZADO'))
    
    return df

In [67]:
dftest = indicadores_de_procedimiento(dftest)
dftest = metricas(dftest)

In [None]:
shape = get_shape(dftest)
print(shape)

In [68]:
dftest.show(1)

+------------+---------------------------+--------------------+--------------------+-------------+------------+--------------------+---------------+--------------------+--------------------+-------------+--------+--------------+-------------+------------+-----------+-----------------------+------------------------+----------------+--------------+--------------+---------------+-------------------+--------------------------+-------------------------------------+---------------------------------------------+-----------------+------------------+----------------------------------------------+---------------+-----------------+-----------------+---------------------+---------------------+-----------------------+---------------+-----------------------+------------------------------+------------------------------+--------------------+---------------------+---------------+------------------+-------------------+---------------------+-----------------+-------------------+-----------+--------------

In [None]:
type(valores_referencia_cups_df)

In [69]:
def calcular_prestaciones_excedidas(consolidated_df, valores_referencia_cups_df):
    # Combinar los DataFrames en función de la columna 'PRESTACION' en consolidated_df
    # y la columna 'cupsNombre' en valores_referencia_cups_df
    merged_df = consolidated_df.join(
        valores_referencia_cups_df.select(['cupsNombre', 'cupsMaxVecesDia']),
        consolidated_df['PRESTACION'] == valores_referencia_cups_df['cupsNombre'],
        how='left'
    )

    # Crear una nueva columna 'LIM_CANTIDAD_PRESTACION_EXCEDIDO' que es 1 si 'CANTIDAD'
    # excede 'cupsMaxVecesDia' y 0 en caso contrario
    merged_df = merged_df.withColumn(
        'LIM_CANTIDAD_PRESTACION_EXCEDIDO',
        when(col('CANTIDAD') > col('cupsMaxVecesDia'), 1).otherwise(0)
    )

    # Eliminar las columnas innecesarias del resultado
    result_df = merged_df.drop('cupsMaxVecesDia', 'cupsNombre')
    
    return result_df

In [70]:
dftest = calcular_prestaciones_excedidas(dftest, valores_referencia_cups_df)

In [None]:
shape = get_shape(dftest)
print(shape)

In [71]:
dftest.show(1)

+------------+---------------------------+--------------------+--------------------+-------------+------------+--------------------+---------------+--------------------+--------------------+-------------+--------+--------------+-------------+------------+-----------+-----------------------+------------------------+----------------+--------------+--------------+---------------+-------------------+--------------------------+-------------------------------------+---------------------------------------------+-----------------+------------------+----------------------------------------------+---------------+-----------------+-----------------+---------------------+---------------------+-----------------------+---------------+-----------------------+------------------------------+------------------------------+--------------------+---------------------+---------------+------------------+-------------------+---------------------+-----------------+-------------------+-----------+--------------

In [72]:
# Visita al mismo especialista mayor a 2 por mes
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def misma_especialidad_diff_especialista(spark_df, lim_especialista=2):
    # Filtrar por pacientes que hayan visitado a diferentes especialistas en la misma especialidad más de dos veces al mes
    cant_especialistas_diferentes = spark_df.filter(
        (spark_df['MEDICO_ESPECIALISTA'] == 1) & (spark_df['ES_CONSULTA'] == 1)
    ).groupBy(
        'CLIENTEAFILIADOBENEFICIARIO', 'ESPECIALIDAD', 'FECHASERVICIO_MONTH_NUMBER', 'FECHASERVICIO_MONTH'
    ).agg(
        F.countDistinct("PRESTADOR").alias("CANT_ESPECIALISTAS_DIFERENTES"),
        F.collect_set("AUTORIZACION").alias("AUTORIZACIONES"),
    ).orderBy("CANT_ESPECIALISTAS_DIFERENTES", ascending=False)
    
    # Filtrar por pacientes que hayan visitado a más de dos especialistas diferentes en un mes
    cant_especialistas_diferentes = cant_especialistas_diferentes.filter(
        cant_especialistas_diferentes["CANT_ESPECIALISTAS_DIFERENTES"] > lim_especialista
    )
    
    # Explode para tener una autorización por fila
    cant_especialistas_diferentes = cant_especialistas_diferentes.selectExpr("*", "explode(AUTORIZACIONES) as AUTORIZACION_EXPLODED")
    autorizaciones_especialistas_diferentes = cant_especialistas_diferentes.select("AUTORIZACION_EXPLODED").distinct()
    
    # Crear nueva columna en spark_df
    spark_df = spark_df.join(autorizaciones_especialistas_diferentes, spark_df["AUTORIZACION"] == autorizaciones_especialistas_diferentes["AUTORIZACION_EXPLODED"], "left")
    spark_df = spark_df.withColumn(
        "MISMA_ESPECIALIDAD_DIFF_ESPECIALISTA",
        (
            (spark_df["MEDICO_ESPECIALISTA"] == 1) &
            (spark_df["ES_CONSULTA"] == 1) &
            F.when(spark_df["AUTORIZACION_EXPLODED"].isNotNull(), True).otherwise(False)
        ).cast("integer")
    ).drop("AUTORIZACION_EXPLODED")

    return spark_df


In [73]:
dftest = misma_especialidad_diff_especialista(dftest)

In [None]:
shape = get_shape(dftest)
print(shape)

In [74]:
dftest.show(1)

+------------+---------------------------+--------------------+--------------------+-------------+------------+--------------------+---------------+--------------------+--------------------+-------------+--------+--------------+-------------+------------+-----------+-----------------------+------------------------+----------------+--------------+--------------+---------------+-------------------+--------------------------+-------------------------------------+---------------------------------------------+-----------------+------------------+----------------------------------------------+---------------+-----------------+-----------------+---------------------+---------------------+-----------------------+---------------+-----------------------+------------------------------+------------------------------+--------------------+---------------------+---------------+------------------+-------------------+---------------------+-----------------+-------------------+-----------+--------------

In [80]:
# Visita al mismo especialista mayor a 2 por mes
from pyspark.sql import functions as F

def visita_mismo_especialista(spark_df, lim_consultas_mes=2):
    
    # Agregar una columna con el año y el mes de 'FECHASERVICIO'
    spark_df = spark_df.withColumn(
        'FECHASERVICIO_YEAR_MONTH', 
        F.concat(F.year("FECHASERVICIO"), F.lit('-'), F.month("FECHASERVICIO"))
    )
    
    # Agrupar por 'CLIENTEAFILIADOBENEFICIARIO', 'PRESTADOR' y 'FECHASERVICIO_YEAR_MONTH'
    cant_visitas_especialista = spark_df.filter(
        (spark_df['MEDICO_ESPECIALISTA'] == 1) &
        (spark_df['ES_CONSULTA'] == 1)
    ).groupBy(
        'CLIENTEAFILIADOBENEFICIARIO', 'PRESTADOR', 'FECHASERVICIO_YEAR_MONTH'
    ).agg(
        F.countDistinct("FECHASERVICIO").alias("NUM_VISITAS"),
        F.collect_set("AUTORIZACION").alias("AUTORIZACIONES"),
    )
    
    # Filtrar por el límite máximo de visitas permitido por mes
    cant_visitas_especialista = cant_visitas_especialista.filter(
        cant_visitas_especialista["NUM_VISITAS"] > lim_consultas_mes
    )
    
    # Expandir la columna 'AUTORIZACIONES'
    cant_visitas_especialista = cant_visitas_especialista.selectExpr("*", "explode(AUTORIZACIONES) as AUTORIZACION_EXPLODED")
    autorizaciones_visitas_especialista = cant_visitas_especialista.select("AUTORIZACION_EXPLODED").distinct()
    
    # Agregar el indicador 'LIM_CONSULTAS_EXCEDIDO_MES'
    spark_df = spark_df.join(autorizaciones_visitas_especialista, spark_df["AUTORIZACION"] == autorizaciones_visitas_especialista["AUTORIZACION_EXPLODED"], "left")
    spark_df = spark_df.withColumn(
        "LIM_CONSULTAS_EXCEDIDO_MES",
        (
            (spark_df["MEDICO_ESPECIALISTA"] == 1) &
            (spark_df["ES_CONSULTA"] == 1) &
            F.when(spark_df["AUTORIZACION_EXPLODED"].isNotNull(), True).otherwise(False)
        ).cast("integer")
    ).drop("AUTORIZACION_EXPLODED").drop("FECHASERVICIO_YEAR_MONTH")

    return spark_df


In [81]:
dftest = visita_mismo_especialista(dftest)

In [None]:
shape = get_shape(dftest)
print(shape)

In [82]:
dftest.show(1)

+------------+---------------------------+--------------------+--------------------+-------------+------------+--------------------+---------------+--------------------+--------------------+-------------+--------+--------------+-------------+------------+-----------+-----------------------+------------------------+----------------+--------------+--------------+---------------+-------------------+--------------------------+-------------------------------------+---------------------------------------------+-----------------+------------------+----------------------------------------------+---------------+-----------------+-----------------+---------------------+---------------------+-----------------------+---------------+-----------------------+------------------------------+------------------------------+--------------------+---------------------+---------------+------------------+-------------------+---------------------+-----------------+-------------------+-----------+--------------

In [83]:
## Límite Emergencias excedido en el mes

from pyspark.sql import functions as F

def limite_emergencias_excedido(spark_df):
    # Crear la columna "ES_EMERGENCIA"
    spark_df = spark_df.withColumn(
        "ES_EMERGENCIA",
        F.when(F.col("SERVICIOSALUD").contains("EMERGENCIA"), 1).otherwise(0)
    )
    
    # Contar la cantidad de autorizaciones de emergencias por mes y cliente
    freq_emergencias_mes = spark_df.filter(
        spark_df["ES_EMERGENCIA"] == 1
    ).groupBy(
        "CLIENTEAFILIADOBENEFICIARIO", "FECHASERVICIO_MONTH_NUMBER", "FECHASERVICIO_MONTH"
    ).agg(
        F.countDistinct("AUTORIZACION").alias("nunique"),
        F.collect_set("AUTORIZACION").alias("unique"),
    )
    
    # Desagregar la columna "AUTORIZACIONES" y filtrar las autorizaciones que exceden el límite de emergencias
    freq_emergencias_mes = freq_emergencias_mes.selectExpr("*", "explode(unique) as AUTORIZACION_EXPLODED")
    freq_emergencias_mes = freq_emergencias_mes.filter(freq_emergencias_mes["nunique"] > 2)
    
    autorizaciones_emergencias_excedida = freq_emergencias_mes.select("AUTORIZACION_EXPLODED").distinct()
    
    # Crear la columna "LIM_EMERGENCIAS_EXCEDIDO_MES"
    spark_df = spark_df.join(autorizaciones_emergencias_excedida, spark_df["AUTORIZACION"] == autorizaciones_emergencias_excedida["AUTORIZACION_EXPLODED"], "left")
    spark_df = spark_df.withColumn(
        "LIM_EMERGENCIAS_EXCEDIDO_MES",
        (
            (spark_df["ES_EMERGENCIA"] == 1) &
            F.when(spark_df["AUTORIZACION_EXPLODED"].isNotNull(), True).otherwise(False)
        ).cast("integer")
    ).drop("AUTORIZACION_EXPLODED")

    return spark_df


In [84]:
dftest = limite_emergencias_excedido(dftest)

In [None]:
shape = get_shape(dftest)
print(shape)

In [85]:
dftest.show(1)

+------------+---------------------------+--------------------+--------------------+-------------+------------+--------------------+---------------+--------------------+--------------------+-------------+--------+--------------+-------------+------------+-----------+-----------------------+------------------------+----------------+--------------+--------------+---------------+-------------------+--------------------------+-------------------------------------+---------------------------------------------+-----------------+------------------+----------------------------------------------+---------------+-----------------+-----------------+---------------------+---------------------+-----------------------+---------------+-----------------------+------------------------------+------------------------------+--------------------+---------------------+---------------+------------------+-------------------+---------------------+-----------------+-------------------+-----------+--------------

In [86]:
# Consultas que no generan medicamento pt1

def df_autorizaciones_cond_consulta(df):
    # Filtrar las filas con MEDICO_ESPECIALISTA == 1 y ES_CONSULTA == 1
    consultas_df = df.filter((F.col('MEDICO_ESPECIALISTA') == 1) & (F.col('ES_CONSULTA') == 1))
    
    # Obtener los valores únicos de la columna 'AUTORIZACION' y convertirlos en un conjunto (set)
    autorizaciones_cond_consulta = set(
        row['AUTORIZACION'] for row in consultas_df.select('AUTORIZACION').distinct().collect()
    )
    
    return autorizaciones_cond_consulta

# Llamar a la función y guardar el resultado en la variable autorizaciones_cond_consulta
autorizaciones_cond_consulta = df_autorizaciones_cond_consulta(dftest)


In [None]:
type(autorizaciones_cond_consulta)

In [87]:
# Consultas que no generan medicamento pt2


from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
import datetime

def generar_nuevas_consultas(spark_df, autorizaciones):
    # Filtrar el DataFrame utilizando las autorizaciones del conjunto (set) proporcionado
    consultas_atipicas_df = spark_df.filter(spark_df['AUTORIZACION'].isin(autorizaciones))
    consultas_sample = consultas_atipicas_df.select('CLIENTEAFILIADOBENEFICIARIO', 'FECHASERVICIO').na.drop()

    # Ajustar la fecha en 16 días
    consultas_sample = consultas_sample.withColumn(
        'FECHASERVICIO', F.expr("date_add(FECHASERVICIO, -16)")
    )

    consultas_sample_rdd = consultas_sample.rdd.flatMap(lambda x: [(x[0], x[1] + datetime.timedelta(days=i)) for i in range(32)])

    new_consultas_dates_schema = StructType([
        StructField("CLIENTEAFILIADOBENEFICIARIO", StringType(), True),
        StructField("FECHASERVICIO", TimestampType(), True),
    ])

    new_consultas_dates_df = consultas_sample_rdd.toDF(schema=new_consultas_dates_schema)

    return new_consultas_dates_df





In [88]:
# Llamar a la función y guardar el resultado en la variable new_consultas_dates_df
new_consultas_dates_df = generar_nuevas_consultas(dftest, autorizaciones_cond_consulta)

In [89]:
type(new_consultas_dates_df)

pyspark.sql.dataframe.DataFrame

In [90]:
# Consultas que no generan medicamento pt3

from pyspark.sql import functions as F

def agregar_columna_consultas_sin_lab_med(consolidated_df, new_consultas_dates_df):
    # Combinar consolidated_df y new_consultas_dates_df usando CLIENTEAFILIADOBENEFICIARIO y FECHASERVICIO
    consultas_tracking_df = consolidated_df.join(
        new_consultas_dates_df,
        on=['CLIENTEAFILIADOBENEFICIARIO', 'FECHASERVICIO'],
        how='inner'
    )

    # Agrupar por cliente y contar el número de prestaciones
    sample = consultas_tracking_df.groupBy(['CLIENTEAFILIADOBENEFICIARIO']).agg(
        F.count("PRESTACION").alias("NUM_PRESTACIONES")
    )

    # Filtrar los usuarios con una única prestación
    usuarios = sample.filter(sample['NUM_PRESTACIONES'] == 1).select('CLIENTEAFILIADOBENEFICIARIO')

    # Filtrar el DataFrame por los usuarios y excluir las filas con la especialidad 'PSICOLOGIA'
    sample_consultas_tracking_df = consultas_tracking_df.join(usuarios, on='CLIENTEAFILIADOBENEFICIARIO', how='inner')
    sample_consultas_tracking_df = sample_consultas_tracking_df.filter(sample_consultas_tracking_df['ESPECIALIDAD'] != 'PSICOLOGIA')

    # Obtener las autorizaciones únicas
    autorizaciones_cond_consulta_sin_lab_meds = sample_consultas_tracking_df.select("AUTORIZACION").distinct()

    # Agregar la columna 'CONSULTAS_SIN_LAB_MED' al DataFrame original
    consolidated_df = consolidated_df.join(autorizaciones_cond_consulta_sin_lab_meds, on="AUTORIZACION", how="left")
    consolidated_df = consolidated_df.withColumn(
        "CONSULTAS_SIN_LAB_MED",
        F.when(consolidated_df["AUTORIZACION"].isNotNull(), 1).otherwise(0)
    )

    return consolidated_df



In [91]:
# Llamar a la función para aplicar los cambios en el DataFrame original
dftest = agregar_columna_consultas_sin_lab_med(dftest, new_consultas_dates_df)


In [94]:
type(dftest)

pyspark.sql.dataframe.DataFrame

In [97]:
shape = get_shape(dftest)
print(shape)

ConnectionRefusedError: [WinError 10061] No se puede establecer una conexión ya que el equipo de destino denegó expresamente dicha conexión

In [None]:
dftest.filter(dftest['CONSULTAS_SIN_LAB_MED'] == 1).show()

In [None]:
# Distribución de medicamentos > 20% pt1

from pyspark.sql import functions as F
from pyspark.sql.window import Window


def calcular_porcentaje_tipo_prestacion(consolidated_df):
    # Asignar el valor de la columna AUTORIZACION a AUTORIZACIONRELACIONADA donde AUTORIZACION está en autorizaciones_padres_list
    autorizaciones_padres_list = consolidated_df.select("AUTORIZACIONRELACIONADA").distinct().na.drop().rdd.flatMap(lambda x: x).collect()
    consolidated_df = consolidated_df.withColumn("AUTORIZACIONRELACIONADA", F.when(consolidated_df["AUTORIZACION"].isin(autorizaciones_padres_list), consolidated_df["AUTORIZACION"]).otherwise(consolidated_df["AUTORIZACIONRELACIONADA"]))

    # Agrupar y calcular las métricas para cada grupo
    eventos_agrupados_df = consolidated_df.groupBy("AUTORIZACIONRELACIONADA", "TIPOPRESTACION") \
        .agg(
            F.countDistinct("AUTORIZACION").alias("NUM_AUTORIZACIONES"),
            F.count("PRESTACION").alias("NUM_PROCEDIMIENTOS"),
            F.sum("MONTORECLAMADO").alias("MONTO_RECLAMADO"),
            F.sum("MONTOAUTORIZADO").alias("MONTO_AUTORIZADO")
        ).orderBy("MONTO_RECLAMADO", ascending=False)

    # Crear una tabla dinámica con MONTO_RECLAMADO como valor
    eventos_agrupados_pivot_df = eventos_agrupados_df.groupBy("AUTORIZACIONRELACIONADA") \
        .pivot("TIPOPRESTACION") \
        .agg(F.sum("MONTO_RECLAMADO").alias("MONTO_RECLAMADO")) \
        .na.fill(0)

    # Calcular el total de MONTO_RECLAMADO para cada AUTORIZACIONRELACIONADA
    total_columns = [column for column in eventos_agrupados_pivot_df.columns if column != "AUTORIZACIONRELACIONADA"]
    eventos_agrupados_pivot_df = eventos_agrupados_pivot_df.withColumn("TOTAL", sum(F.col(column) for column in total_columns))

    # Calcular el porcentaje de MONTO_RECLAMADO por TIPOPRESTACION
    eventos_agrupados_pivot_percent_df = eventos_agrupados_pivot_df.select("AUTORIZACIONRELACIONADA", "TOTAL")
    for column in total_columns:
        eventos_agrupados_pivot_percent_df = eventos_agrupados_pivot_percent_df.withColumn(column, (F.col(column) / F.col("TOTAL")))

    return eventos_agrupados_pivot_percent_df.drop("TOTAL")





In [None]:
# Llamar a la función y guardar el resultado en la variable eventos_agrupados_pivot_percent_df
eventos_agrupados_pivot_percent_df = calcular_porcentaje_tipo_prestacion(dftest)


In [None]:
eventos_agrupados_pivot_percent_df.show()

In [None]:
print(dftest.count(), len(dftest.columns))

In [None]:
# Distribución de medicamentos > 20% pt2

from functools import reduce
from pyspark.sql import DataFrame

def calcular_lim_dist_meds_excedido(df, eventos_agrupados_pivot_percent_df):
    # Preparar las variables para el proceso de "melt"
    value_vars = eventos_agrupados_pivot_percent_df.columns
    id_vars = value_vars[0]
    value_vars = value_vars[1:]

    # Aplicar el proceso de "melt" en el DataFrame
    melted_dataframes = [eventos_agrupados_pivot_percent_df.select(id_vars, F.lit(c).alias("TIPOPRESTACION"), F.col(c).alias("% Total Monto del Evento")) for c in value_vars]
    eventos_melt = reduce(DataFrame.unionAll, melted_dataframes)

    # Filtrar el DataFrame
    eventos_melt = eventos_melt.filter(F.col('TIPOPRESTACION') != '').filter(F.col('% Total Monto del Evento') > 0)

    # Aplicar la máscara y obtener los outliers
    eventos_melt_mask = (F.col("% Total Monto del Evento") > 0.2) & (F.col("TIPOPRESTACION").contains('MEDICAMENTOS'))
    eventos_melt_outliers = eventos_melt.filter(eventos_melt_mask)

    # Obtener las autorizaciones únicas
    autorizaciones_padres_dist_meds = [row['AUTORIZACIONRELACIONADA'] for row in eventos_melt_outliers.select('AUTORIZACIONRELACIONADA').distinct().collect()]

    # Calcular la columna LIM_DIST_MEDS_EXCEDIDO y agregarla al DataFrame
    df = df.withColumn('LIM_DIST_MEDS_EXCEDIDO', F.when(F.col('AUTORIZACIONRELACIONADA').isin(autorizaciones_padres_dist_meds), 1).otherwise(0))

    return df

In [None]:
dftest = calcular_lim_dist_meds_excedido(dftest, eventos_agrupados_pivot_percent_df)

In [None]:
spark.stop()