In [0]:

from pyspark.sql import SQLContext, Row

# Definimos la ruta base del archivo
ruta_base = "file:/home/vagrant/"

# Leemos el archivo CSV como RDD, dividimos por comas, y almacenamos en caché
data = sc.textFile(ruta_base + "archivo.csv").map(lambda x: x.split(";")).cache()

# Imprimimos las primeras filas para ver los datos
data.take(5)

In [None]:

# Obtenemos la primera fila del RDD
encabezado = data.first()

# Imprimimos el encabezado
print(encabezado)

In [None]:

from pyspark.sql.types import StructType, StructField, StringType, FloatType, DateType, IntegerType

estructura = StructType([
    StructField('documento_anonimizado', StringType(), True),
    StructField('periodo', IntegerType(), True),
    StructField('mes', IntegerType(), True),
    StructField('renaes', StringType(), True),
    StructField('ipress', StringType(), True),
    StructField('region', StringType(), True),
    StructField('departamento', StringType(), True),
    StructField('provincia', StringType(), True),
    StructField('distrito', StringType(), True),
    StructField('ubigeo', StringType(), True),
    StructField('codigo_diagnostico', StringType(), True),
    StructField('diagnosticos', StringType(), True),
    StructField('grupo_diagnosticos', StringType(), True),
    StructField('grupo_cobertura', StringType(), True),
    StructField('sexo', StringType(), True),
    StructField('edad', IntegerType(), True),
    StructField('tipo_seguro', StringType(), True),
    StructField('servicio', StringType(), True),
    StructField('fecha_atencion', DateType(), True),
    StructField('monto_bruto', FloatType(), True),
    StructField('fecha_corte', DateType(), True)
])



In [4]:
%pyspark
# Para formatear las fechas
from datetime import datetime
def parse_date(date_string):
    if date_string:       
        return datetime.strptime(date_string, '%Y%m%d').date()   
    else:      
        return None
        
filas = data.filter(lambda x: x != encabezado)

# Transformamos el RDD, aplicando el casteo de tipos a las columnas
filas = filas.map(lambda x: [
    x[0],
    int(x[1]),
    int(x[2]),
    x[3],
    x[4],
    x[5],
    x[6],
    x[7],
    x[8],
    x[9],
    x[10],
    x[11],
    x[12],
    x[13],
    x[14],
    int(x[15]),
    x[16],
    x[17],
    parse_date(x[18]),
    float(x[19]),
    parse_date(x[20])
])

filas.take(3)




In [5]:
%pyspark
df = spark.createDataFrame(filas, schema=estructura)



In [6]:
%pyspark
df.show(3)






In [7]:
%pyspark
df.printSchema()

In [8]:
%pyspark
#DEPURACION
#1. VERIFICAR EL % DE FILAS CON VALORES NULOS O ESPACIOS VACIOS
from pyspark.sql import functions as F

# Calcular el porcentaje de datos faltantes (nulos o vacíos) para cada columna
porcentaje_faltantes = [(col, (df.filter((F.col(col).isNull()) | (F.col(col) == "") | (F.trim(F.col(col)) == "")).count() / df.count()) * 100) for col in df.columns]

# Convertir el resultado a un DataFrame sin redondeo
df_porcentaje = spark.createDataFrame(porcentaje_faltantes, ["Columna", "Porcentaje_Faltantes"])

# Mostrar el DataFrame con todos los decimales
df_porcentaje.show(21)

In [9]:
%pyspark
from pyspark.sql import functions as F

print(f"Número de filas original: {df.count()}")

# Crear una condición combinada para identificar filas con valores nulos o vacíos
condicion = None
for col in df.columns:
    nueva_condicion = (F.col(col).isNull()) | (F.trim(F.col(col)) == "")  # Verificar nulos o espacios vacíos
    if condicion is None:
        condicion = nueva_condicion
    else:
        condicion = condicion | nueva_condicion

# Filtrar las filas que NO cumplen con la condición (es decir, eliminar las filas con nulos o vacíos)
df = df.filter(~condicion)

# Mostrar el número de filas después de eliminar las filas con nulos o vacíos
print(f"Número de filas después de la limpieza: {df.count()}")

In [10]:
%pyspark
print(f"Número de filas original sin vacios: {df.count()}")
df = df[df['GRUPO_DIAGNOSTICOS'] == 'ENFERMEDAD RARA O HUERFANA']
print(f"Número de filas original con la ENFERMEDAD RARA: {df.count()}")
#print(df['GRUPO_DIAGNOSTICOS'].unique())

In [11]:
%pyspark
#1. DEPURACIÓN DE FORMATO Y CONSISTENCIA DE LAS FECHAS
from pyspark.sql import functions as F
from pyspark.sql.functions import col, to_date

# Depuración de formato y consistencia de las fechas
df = df.withColumn("FECHA_ATENCION_VALID", to_date(col("FECHA_ATENCION"), "yyyyMMdd").isNotNull())
df = df.withColumn("FECHA_CORTE_VALID", to_date(col("FECHA_CORTE"), "yyyyMMdd").isNotNull())

# Creación de una tabla resumen con la cantidad de fechas mal formateadas
df_summary = df.agg(
    F.sum(F.when(~col("FECHA_ATENCION_VALID"), 1).otherwise(0)).alias("Fechas_ATENCION_mal_formateadas"),
    F.sum(F.when(~col("FECHA_CORTE_VALID"), 1).otherwise(0)).alias("Fechas_CORTE_mal_formateadas"),
    F.count("*").alias("Total_filas")
)

df_summary.show()

In [12]:
%pyspark
print(f"Número de filas original sin vacios y solo con ENFERMEDAD: {df.count()}")

#2. DEPURACION Verificar valores inconsistentes de las edades
df = df.filter((col("EDAD") >= 0) & (col("EDAD") <= 110))
print(f"Número de filas después de la limpieza de edad: {df.count()}")

#3. Verificar que MONTO_BRUTO sea un valor positivo
df=df.filter(col("MONTO_BRUTO") >= 0)
print(f"Número de filas después de la limpieza del monto: {df.count()}")

#4. DEPURACION Verificar valores inconsistentes de los meses
df=df.filter((col("MES") > 0) | (col("MES") <= 12))
print(f"Número de filas después de la limpieza del mes: {df.count()}")

#4. Verificar valores inconsistentes en la columna SEXO
df.select("SEXO").distinct().show()

#5. Verificar valores inconsistentes en la columna TIPO_SEGURO
df.select("TIPO_SEGURO").distinct().show()

#5. Verificar valores inconsistentes en la columna TIPO_SEGURO
df.select("REGION").distinct().show(24)

In [13]:
%pyspark
#6. Eliminar filas duplicadas, si es que lo hay
filas_antes = df.count()
df = df.dropDuplicates()
filas_despues = df.count()

print(f"Filas antes de eliminar duplicados: {filas_antes}")
print(f"Filas después de eliminar duplicados: {filas_despues}")
print(f"Cantidad de duplicados eliminados: {filas_antes - filas_despues}")



In [14]:
%pyspark
#Código de verificación

from pyspark.sql import functions as F

# Calcular el porcentaje de datos faltantes (nulos o vacíos) para cada columna
porcentaje_faltantes = [(col, (df.filter((F.col(col).isNull()) | (F.col(col) == "") | (F.trim(F.col(col)) == "")).count() / df.count()) * 100) for col in df.columns]

# Convertir el resultado a un DataFrame sin redondeo
df_porcentaje = spark.createDataFrame(porcentaje_faltantes, ["Columna", "Porcentaje_Faltantes"])

# Mostrar el DataFrame con todos los decimales
df_porcentaje.show(21)

In [15]:
%pyspark
from unidecode import unidecode
from pyspark.sql.functions import udf,col
# Función UDF para limpiar caracteres especiales
def clean_string(s):
    return unidecode(s) if s else s

clean_string_udf = udf(clean_string, StringType())

# Aplicar la función UDF a las columnas que necesitan limpieza
columnas_a_limpiar = ['ipress', 'region', 'departamento', 'provincia', 'diagnosticos']

for columna in columnas_a_limpiar:
    df = df.withColumn(columna, clean_string_udf(col(columna)))

df.show(5)

In [16]:
%pyspark
#CREACION DE TABLAS ADICIONALES
from pyspark.sql import SQLContext, Row

# Definimos la ruta base del archivo
ruta_base2 = "file:/home/vagrant/"

# Leemos el archivo CSV como RDD, dividimos por comas, y almacenamos en caché
data2 = sc.textFile(ruta_base + "tabla1.csv").map(lambda x: x.split(";")).cache()

# Imprimimos las primeras filas para ver los datos
data2.take(5)

In [17]:
%pyspark
# Obtenemos la primera fila del RDD
encabezado2 = data2.first()

# Imprimimos el encabezado
print(encabezado2)

In [18]:
%pyspark
from pyspark.sql.types import StructType, StructField, StringType, FloatType, DateType, IntegerType

estructura2 = StructType([
    StructField('FECHA_CORTE', DateType(), True),
    StructField('DEPARTAMENTO', StringType(), True),
    StructField('PROVINCIA', StringType(), True),
    StructField('DISTRITO', StringType(), True),
    StructField('UBIGEO', StringType(), True),
    StructField('RED', StringType(), True),
    StructField('IPRESS', StringType(), True),
    StructField('ID_PACIENTE', StringType(), True),
    StructField('EDAD_PACIENTE', IntegerType(), True),
    StructField('SEXO_PACIENTE', StringType(), True),
    StructField('EDAD_MEDICO', IntegerType(), True),
    StructField('ID_MEDICO', StringType(), True),
    StructField('COD_DIAG', StringType(), True),
    StructField('DIAGNOSTICO', StringType(), True),
    StructField('AREA_HOSPITALARIA', StringType(), True),
    StructField('SERVICIO_HOSPITALARIO', StringType(), True),
    StructField('ACTIVIDAD_HOSPITALARIA', StringType(), True),
    StructField('FECHA_MUESTRA', DateType(), True),
    StructField('FEC_RESULTADO_1', DateType(), True),
    StructField('DIFERIMIENTO_1', IntegerType(), True),
    StructField('PROCEDIMIENTO_1', StringType(), True),
    StructField('RESULTADO_1', FloatType(), True),
    StructField('UNIDADES_1', StringType(), True),
    StructField('FEC_RESULTADO_2', DateType(), True),
    StructField('PROCEDIMIENTO_2', StringType(), True),
    StructField('RESULTADO_2', FloatType(), True),
    StructField('UNIDADES_2', StringType(), True),
    StructField('DIFERIMIENTO_2', IntegerType(), True)
])


In [19]:
%pyspark
from datetime import datetime

def parse_date2(date_string):
    if date_string:
        return datetime.strptime(date_string, '%Y%m%d').date()
    else:
        return None

filas2 = data2.filter(lambda x: x != encabezado2)

# Transformamos el RDD, aplicando el casteo de tipos a las columnas
filas2 = filas2.map(lambda x: [
    parse_date2(x[0]),  # FECHA_CORTE
    x[1],  # DEPARTAMENTO
    x[2],  # PROVINCIA
    x[3],  # DISTRITO
    x[4],  # UBIGEO
    x[5],  # RED
    x[6],  # IPRESS
    x[7],  # ID_PACIENTE
    int(x[8]),  # EDAD_PACIENTE
    x[9],  # SEXO_PACIENTE
    int(x[10]),  # EDAD_MEDICO
    x[11],  # ID_MEDICO
    x[12],  # COD_DIAG
    x[13],  # DIAGNOSTICO
    x[14],  # AREA_HOSPITALARIA
    x[15],  # SERVICIO_HOSPITALARIO
    x[16],  # ACTIVIDAD_HOSPITALARIA
    parse_date2(x[17]),  # FECHA_MUESTRA
    parse_date2(x[18]),  # FEC_RESULTADO_1
    int(x[19]),  # DIFERIMIENTO_1
    x[20],  # PROCEDIMIENTO_1
    float(x[21]),  # RESULTADO_1
    x[22],  # UNIDADES_1
    parse_date2(x[23]),  # FEC_RESULTADO_2
    x[24],  # PROCEDIMIENTO_2
    float(x[25]),  # RESULTADO_2
    x[26],  # UNIDADES_2
    int(x[27])  # DIFERIMIENTO_2
])

filas2.take(4)

In [20]:
%pyspark
df2 = spark.createDataFrame(filas2, schema=estructura2)
df2.show(5)

df2.printSchema()

In [21]:
%pyspark
# Seleccionamos las columnas Diagnóstico, procedimiento_1 y resultado_1 desde el RDD
data_rdd2 = data2.map(lambda row: (row[13], row[20], row[21]))

# Mostramos las primeras filas de los resultados
data_rdd2.take(5)  # Aquí puedes ajustar el número de filas que deseas visualizar

In [22]:
%pyspark
# Obtenemos el encabezado (primera fila) del RDD
encabezado2 = data2.first()


# Eliminamos la primera fila (encabezado) y seleccionamos solo las columnas necesarias
data_rdd2 = data2.filter(lambda row: row != encabezado2).map(lambda row: (row[8], row[13], row[20], row[21]))  # Usamos los índices de las columnas necesarias

# Convertimos el RDD a una lista de tuplas
data_list = data_rdd2.collect()

# Definir las columnas utilizando solo los índices necesarios para 'Diagnóstico', 'Procedimiento_1' y 'Resultado_1'
columns = ['EDAD','DIAGNOSTICO', 'PROCEDIMIENTO_1', 'RESULTADO_1']

# Creamos el DataFrame usando los nombres de las columnas correctos
df_tabla1 = spark.createDataFrame(data_list, columns)

# Mostramos el DataFrame
df_tabla1.show(5)

In [23]:
%pyspark
# Obtenemos el encabezado (primera fila) del RDD
encabezado2 = data2.first()


# Eliminamos la primera fila (encabezado) y seleccionamos solo las columnas necesarias
data_rdd2 = data2.filter(lambda row: row != encabezado2).map(lambda row: (row[1], row[14], row[15], row [16]))  # Usamos los índices de las columnas necesarias

# Convertimos el RDD a una lista de tuplas
data_list = data_rdd2.collect()

columns = ['DEPARTAMENTO', 'AREA_HOSPITALARIA', 'SERVICIO_HOSPITALARIO', 'ACTIVIDAD_HOSPITALARIA']

# Creamos el DataFrame usando los nombres de las columnas correctos
df_tabla2 = spark.createDataFrame(data_list, columns)

# Mostramos el DataFrame
df_tabla2.show(5)


In [24]:
%pyspark
#Creacion de vistas temporales

df1 = df.createTempView('Vista01')
df2 = df.createTempView('Vista02')
df3 = df.createTempView('Vista03')
df4 = df.createTempView('Vista04')
df5 = df.createTempView('Vista05')
df6 = df.createTempView('Vista06')
df7 = df.createTempView('Vista07')
df8 = df.createTempView('Vista08')


In [25]:
%pyspark
#CONSULTAS
#Indicador 1: Costo Promedio de Atención de Diagnóstico por sexo
#Costo Promedio de Atención de Diagnóstico por Sexo y Edad (segmentado por ERH)
rpta1 = spark.sql("""
SELECT 
    SEXO,
    EDAD,
    AVG(MONTO_BRUTO) AS Costo_Promedio
FROM 
    Vista01 
WHERE 
    grupo_cobertura = 'ERH'
GROUP BY 
    SEXO, 
    EDAD
ORDER BY 
    SEXO, 
    EDAD
""")

#Mostrar resultado
rpta1.show()

In [26]:
%pyspark
#Indicador 1: Costo Promedio de Atención de Diagnóstico por sexo
#Distribución de Edades por Sexo

rpta2 = spark.sql("""
SELECT
    sexo,
    edad,
    COUNT(*) AS numero_pacientes
FROM
    Vista02
GROUP BY
    sexo,
    edad
ORDER BY
    sexo,
    edad
""")
#Mostrar resultado
rpta2.show()


In [27]:
%pyspark
#Indicador 2: Porcentaje de Diagnósticos por Sexo
#Distribución de Edades por Sexo #DUDA
rpta3 = spark.sql("""
SELECT
    grupo_cobertura,
    (COUNT(*) / (SELECT COUNT(*) FROM Vista08)) * 100 AS porcentaje_atenciones
FROM
    Vista08
WHERE
    grupo_diagnosticos = 'ENFERMEDAD RARA O HUERFANA'  -- Asegurarse de filtrar solo las ERH
GROUP BY
    grupo_cobertura
ORDER BY
    porcentaje_atenciones DESC  -- Ordenar para ver los grupos de cobertura más frecuentes
""")

# Mostrar resultado
rpta3.show()

In [28]:
%pyspark
#Indicador 2: Porcentaje de Diagnósticos por Sexo
# Top 10 Diagnósticos Más Comunes

rpta4 = spark.sql("""
SELECT
    diagnosticos,
    COUNT(*) AS numero_diagnosticos
FROM
    Vista08
GROUP BY
    diagnosticos
ORDER BY
    numero_diagnosticos DESC
LIMIT 10
""")

#Mostrar resultado
rpta4.show()

In [29]:
%pyspark
#Indicador 3: Costo promedio de diagnóstico por tipo de seguro 
# Consulta: Evolución del Costo Promedio Mensual por Tipo de Seguro

rpta5 = spark.sql("""
SELECT
    periodo,
    mes,
    tipo_seguro,
    AVG(monto_bruto) AS costo_promedio
FROM
    Vista08
GROUP BY
    periodo,
    mes,
    tipo_seguro
ORDER BY
    periodo,
    mes,
    tipo_seguro
""")

#Mostrar resultado
rpta5.show()

In [30]:
%pyspark
#Indicador 3: Costo promedio de diagnóstico por tipo de seguro 
#9 Monto Bruto promedio por region y sexo

rpta6 = spark.sql("""
SELECT region, sexo, AVG(monto_bruto) AS promedio_monto_bruto FROM Vista08 GROUP BY region, sexo
""")

#Mostrar resultado
rpta6.show()

In [31]:
%pyspark
#Indicador 4: Porcentaje de Atención por Región 
# Consulta: Porcentaje de Atenciones de ERH por Grupo de Cobertura #DUDAR

rpta7 = spark.sql("""
SELECT
    grupo_cobertura,
    (COUNT(*) * 100.0 / (SELECT COUNT(*) FROM Vista08)) AS porcentaje_atenciones
FROM
    Vista08
WHERE
    grupo_diagnosticos = 'ENFERMEDAD RARA O HUERFANA'  -- Asegurarse de filtrar solo las ERH
GROUP BY
    grupo_cobertura
ORDER BY
    porcentaje_atenciones DESC  -- Ordenar para ver los grupos de cobertura más frecuentes
""")
# Mostrar resultado
rpta7.show()

In [32]:
%pyspark
#Indicador 4: Porcentaje de Atención por Región 
# Consulta: Cantidad de atenciones por mes
rpta8 = spark.sql("""
SELECT mes, COUNT(*) AS total_atenciones FROM Vista08 GROUP BY mes ORDER BY mes
""")

rpta8 = spark.sql("""
SELECT mes, COUNT(*) AS total_atenciones FROM Vista08 GROUP BY mes ORDER BY mes
""")

#Mostrar resultado
rpta8.show()

In [33]:
%pyspark
# INDICADOR 1
from pyspark.sql import functions as F
from pyspark.sql.functions import sum as _sum, count, round, col

# INDICADOR 1: Agrupación por GRUPO_COBERTURA, SEXO y DIAGNOSTICOS con orden alfabético
resultado1 = df.groupBy("SEXO", "DIAGNOSTICOS") \
    .agg(
        count("*").alias("TOTAL_ATENCIONES"),
        round((_sum("MONTO_BRUTO") / count("*")), 2).alias("COSTO_PROMEDIO_ATENCION")
    ) \
    .orderBy("DIAGNOSTICOS")

print("COSTO PROMEDIO Y TOTAL DE ATENCIONES POR SEXO:")
resultado1.show(truncate=False)


In [34]:
%pyspark
#INDICADOR 2
from pyspark.sql import functions as F

# Calcular el número de diagnósticos por sexo
diagnosticos_por_sexo = df.groupBy("SEXO").agg(
    F.count("DIAGNOSTICOS").alias("TOTAL_DIAGNOSTICOS")
)

# Calcular el total general de diagnósticos
total_diagnosticos = df.select(F.count("DIAGNOSTICOS").alias("TOTAL_GENERAL")).collect()[0]["TOTAL_GENERAL"]

# Calcular el porcentaje de diagnósticos por sexo
porcentaje_diagnosticos = diagnosticos_por_sexo.withColumn(
    "PORCENTAJE_DIAGNOSTICOS(%)",
    F.round((F.col("TOTAL_DIAGNOSTICOS") / total_diagnosticos) * 100, 2)
)

# Mostrar el resultado ordenado por SEXO
porcentaje_diagnosticos.orderBy("SEXO").show(truncate=False)


In [35]:
%pyspark
#INDICADOR 3
from pyspark.sql import functions as F
from pyspark.sql.functions import col, sum as _sum, count, round

# Calcular el costo promedio y el total de atenciones agrupando por tipo de seguro y diagnóstico
resultado3 = df.groupBy("TIPO_SEGURO", "DIAGNOSTICOS").agg(
    count("*").alias("TOTAL_ATENCIONES"),  # Total de atenciones
    round(_sum("MONTO_BRUTO") / count("*"), 2).alias("COSTO_PROMEDIO")  # Costo promedio usando _sum
)

# Mostrar el resultado ordenado por tipo de seguro
resultado3.orderBy("TIPO_SEGURO").show(truncate=False)



In [36]:
%pyspark
#INDICADOR 4
# Contamos el número total de atenciones en todas las regiones
from pyspark.sql import functions as F
from pyspark.sql.functions import col

total_atenciones = df.count()

# Contamos el número de atenciones por cada región
atenciones_por_region = df.groupBy("REGION").count().withColumnRenamed("count", "TOTAL_ATENCIONES")

# Calculamos la tasa de atención por región y redondeamos en una sola operación
tasa_atencion_region = atenciones_por_region.withColumn(
    "Tasa de Atención (%)", 
    F.round((col("TOTAL_ATENCIONES") / total_atenciones) * 100, 2)
)

# Mostramos el resultado
tasa_atencion_region.show(24)


In [37]:
%pyspark
#ESTADISTICA DESCRIPTIVA
from pyspark.sql.functions import round

# Calcular las estadísticas numéricas
numerical_stats = df.select("EDAD", "MONTO_BRUTO") \
    .summary("count", "mean", "stddev", "min", "max")

# Redondear a 2 decimales
numerical_stats_rounded = numerical_stats.select(
    "summary",
    round("EDAD", 2).alias("EDAD"),
    round("MONTO_BRUTO", 2).alias("MONTO_BRUTO")
)

# Mostrar el resultado
numerical_stats_rounded.show()


In [38]:
%pyspark
#CORRELACION ENTRE DATOS
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
from pyspark.ml.stat import Correlation

# Preparar datos para correlación
df = df.withColumn("EDAD", col("EDAD").cast("integer"))
df = df.withColumn("MONTO_BRUTO", col("MONTO_BRUTO").cast("float"))

vector_cols = ["EDAD", "MONTO_BRUTO"]
assembler = VectorAssembler(inputCols=vector_cols, outputCol="features")
df_vector = assembler.transform(df)


# Calcular matriz de correlación
matrix = Correlation.corr(df_vector, "features").head()[0]
correlation_matrix = matrix.toArray().tolist()

for row in correlation_matrix:
    print(row)

In [39]:
%pyspark
#ANALISIS ESTADISTICO
# KPI 1: Análisis detallado 
from pyspark.sql import functions as F
from pyspark.sql.functions import col, count, round, avg, stddev, sum as _sum, when

# Calcular métricas detalladas
resultado1_detallado = df.groupBy("SEXO", "DIAGNOSTICOS") \
    .agg(
        count("*").alias("TOTAL_ATENCIONES"),  # Total de atenciones
        round(avg("MONTO_BRUTO"), 2).alias("COSTO_PROMEDIO_ATENCION"),  # Promedio de MONTO_BRUTO
        # Si el total de atenciones es 1, la desviación estándar será 0
        round(when(count("*") == 1, 0).otherwise(stddev("MONTO_BRUTO")), 2).alias("DESVIACION_ESTANDAR_MONTO"),  
        round(avg("EDAD"), 2).alias("PROMEDIO_EDAD")  # Promedio de la edad
    ) \
    .orderBy("DIAGNOSTICOS")

# Mostrar resultados
print("ANÁLISIS DETALLADO: COSTO PROMEDIO, DESVIACIÓN ESTÁNDAR Y PROMEDIO DE EDAD POR SEXO Y DIAGNÓSTICO:")
resultado1_detallado.show(truncate=False)


In [40]:
%pyspark
#INDICADOR 2: ANALISIS DETALLADO
#INDICADOR 2: SEXO, TOTAL DE DIAGNOSTICOS, PORCENTAJE, EDAD(PROMEDIO), MONTO_BRUTO(PROMEDIO Y DES.EST)
from pyspark.sql import functions as F

# Calcular el número de diagnósticos, edad promedio, monto promedio y desviación estándar por sexo
diagnosticos_detallado = df.groupBy("SEXO").agg(
    F.count("DIAGNOSTICOS").alias("TOTAL_DIAGNOSTICOS"),               # Total de diagnósticos
    F.round(F.avg("EDAD"), 2).alias("EDAD_PROMEDIO"),                  # Promedio de edad
    F.round(F.avg("MONTO_BRUTO"), 2).alias("PROMEDIO_MONTO_BRUTO"),    # Promedio del monto bruto
    F.round(F.stddev("MONTO_BRUTO"), 2).alias("DESVIACION_MONTO_BRUTO")# Desviación estándar del monto bruto
)

# Calcular el total general de diagnósticos
total_diagnosticos = df.select(F.count("DIAGNOSTICOS").alias("TOTAL_GENERAL")).collect()[0]["TOTAL_GENERAL"]

# Calcular el porcentaje de diagnósticos por sexo
diagnosticos_con_porcentaje = diagnosticos_detallado.withColumn(
    "PORCENTAJE_DIAGNOSTICOS(%)",
    F.round((F.col("TOTAL_DIAGNOSTICOS") / total_diagnosticos) * 100, 2)
)

# Mostrar el resultado ordenado por SEXO
diagnosticos_con_porcentaje.orderBy("SEXO").show(truncate=False)



In [41]:
%pyspark
#INDICADOR 3: ANALISIS DETALLADO
#INDICADOR 3: TIPO DE SEGURO, DIAGNOSTICO, MONTO(PROMEDIO Y DESV.), TOTAL DE ATENCIONES, EDAD(PROMEDIO)
from pyspark.sql import functions as F
from pyspark.sql.functions import col, sum as _sum, count, round, avg, stddev, when

# Calcular el costo promedio, la desviación estándar y el promedio de edad agrupando por tipo de seguro y diagnóstico
resultado_detallado = df.groupBy("TIPO_SEGURO", "DIAGNOSTICOS").agg(
    count("*").alias("TOTAL_ATENCIONES"),  # Total de atenciones
    round(avg("MONTO_BRUTO"), 2).alias("PROMEDIO_MONTO_BRUTO"),  # Promedio del MONTO_BRUTO
    # Si el total de atenciones es 1, la desviación estándar será 0
    round(when(count("*") == 1, 0).otherwise(stddev("MONTO_BRUTO")), 2).alias("DESVIACION_MONTO_BRUTO"),  
    round(avg("EDAD"), 2).alias("PROMEDIO_EDAD")  # Promedio de la edad
)

# Mostrar el resultado ordenado por tipo de seguro y diagnóstico
resultado_detallado.orderBy("TIPO_SEGURO", "DIAGNOSTICOS").show(truncate=False)



In [42]:
%pyspark
#INDICADOR 4: ANALISIS DETALLADO
#INDICADOR 4: REGION, TOTAL DE ATENCIONES, PORCENTAJE, EDAD(PROMEDIO), MONTO_BRUTO(PROMEDIO Y DES. ESTA)
from pyspark.sql import functions as F
from pyspark.sql.functions import col, avg, stddev, round, count, when

# Contar el número total de atenciones en todas las regiones
total_atenciones = df.count()

# Agrupamos por región, calculamos las métricas y ajustamos la desviación estándar si hay solo una atención
analisis_region = df.groupBy("REGION").agg(
    count("*").alias("TOTAL_ATENCIONES"),
    round(avg("EDAD"), 2).alias("PROMEDIO_EDAD"),
    round(avg("MONTO_BRUTO"), 2).alias("PROMEDIO_MONTO_BRUTO"),
    round(when(count("*") == 1, 0).otherwise(stddev("MONTO_BRUTO")), 2).alias("DESVIACION_MONTO_BRUTO")
).withColumn(
    "PORCENTAJE_ATENCIONES(%)", 
    round((col("TOTAL_ATENCIONES") / total_atenciones) * 100, 2)
).orderBy(col("PORCENTAJE_ATENCIONES(%)").desc())

# Mostrar el resultado
analisis_region.show(truncate=False)


In [43]:
%pyspark
#GRAFICOS
#INDICADOR 1
import matplotlib.pyplot as plt
# Convertir el DataFrame de Spark en un DataFrame de pandas
data_kpi = resultado1.orderBy("COSTO_PROMEDIO_ATENCION", ascending=False).toPandas()

# Seleccionar los 20 diagnósticos con mayor costo promedio
top_data = data_kpi.head(20)

# Crear el gráfico de barras horizontales, diferenciando por SEXO
plt.figure(figsize=(12, 8))

# Colorear las barras según el SEXO (azul para masculino, rosa para femenino)
for sexo in top_data['SEXO'].unique():
    subset = top_data[top_data['SEXO'] == sexo]
    plt.barh(subset["DIAGNOSTICOS"], subset["COSTO_PROMEDIO_ATENCION"], label=sexo, alpha=0.7)

# Añadir etiquetas y título
plt.xlabel("Costo Promedio de Atención")
plt.ylabel("Diagnósticos")
plt.title("Top 20 Diagnósticos con Mayor Costo Promedio de Atención por Sexo")

# Invertir el eje Y para que el diagnóstico con mayor costo esté arriba
plt.gca().invert_yaxis()

# Añadir una leyenda para SEXO
plt.legend(title="Sexo")

plt.tight_layout()
plt.show()


In [44]:
%pyspark
# INDICADOR 2:
import matplotlib.pyplot as plt

# Convertir el DataFrame de PySpark a Pandas
data_sexo = porcentaje_diagnosticos.orderBy("SEXO").toPandas()

# Crear el gráfico de barras
plt.figure(figsize=(10, 6))  # Ajusté el tamaño para mejor visualización
plt.bar(data_sexo["SEXO"], data_sexo["PORCENTAJE_DIAGNOSTICOS(%)"], color="salmon")

# Etiquetas y título
plt.xlabel("Sexo")
plt.ylabel("Porcentaje de Diagnósticos (%)")
plt.title("Porcentaje de Diagnósticos por Sexo")

# Mostrar el gráfico
plt.xticks(rotation=0)  # No es necesario rotar ya que solo son dos categorías
plt.tight_layout()
plt.show()


In [45]:
%pyspark
# Convertir el DataFrame de Spark en un DataFrame de pandas
data_kpi3_1 = resultado3.orderBy("COSTO_PROMEDIO", ascending=False).toPandas()

# Seleccionar los 20 diagnósticos con mayor costo promedio
top_data3_1 = data_kpi3_1.head(20)

# Crear el gráfico de barras agrupadas por TIPO_SEGURO para cada DIAGNOSTICO
plt.figure(figsize=(14, 8))

# Crear barras agrupadas por TIPO_SEGURO para cada DIAGNOSTICO
for tipo_seguro in top_data3_1["TIPO_SEGURO"].unique():
    subset = top_data3_1[top_data3_1["TIPO_SEGURO"] == tipo_seguro]
    plt.barh(subset["DIAGNOSTICOS"], subset["COSTO_PROMEDIO"], label=tipo_seguro, alpha=0.7)

# Añadir etiquetas y título
plt.xlabel("Costo Promedio")
plt.ylabel("Diagnósticos")
plt.title("Top 20 Diagnósticos con Mayor Costo Promedio por Tipo de Seguro")

# Invertir el eje Y para que el diagnóstico con mayor costo esté arriba
plt.gca().invert_yaxis()

# Añadir una leyenda para los tipos de seguro
plt.legend(title="Tipo de Seguro")

plt.tight_layout()
plt.show()

In [46]:
%pyspark
# INDICADOR 4:
# Mostrar tasa de atención por "REGION"
data_region = tasa_atencion_region.orderBy("Tasa de Atención (%)").toPandas()
plt.figure(figsize=(10, 6))
plt.bar(data_region["REGION"], data_region["Tasa de Atención (%)"], color="violet")
plt.xlabel("Región")
plt.ylabel("Tasa de Atención (%)")
plt.title("Tasa de Atención por Región")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [47]:
%pyspark
#ANALISIS ESTADISTICO INTEGRADO
from pyspark.sql import functions as F
from pyspark.sql.functions import col, count, round, avg, stddev, sum as _sum, when, countDistinct

analisis_integrado = df.groupBy("TIPO_SEGURO").agg(
    count("*").alias("TOTAL_ATENCIONES"),
    countDistinct("DIAGNOSTICOS").alias("TOTAL_DIAGNOSTICOS"),
    count(when(col("SEXO") == "FEMENINO", True)).alias("TOTAL_ATENCIONES_FEMENINO"),
    count(when(col("SEXO") == "MASCULINO", True)).alias("TOTAL_ATENCIONES_MASCULINO"),
    round(avg("MONTO_BRUTO"), 2).alias("PROMEDIO_MONTO_BRUTO"),
    round(when(count("*") == 1, 0).otherwise(stddev("MONTO_BRUTO")), 2).alias("DESVIACION_ESTANDAR_MONTO"),
    round(avg("EDAD"), 2).alias("PROMEDIO_EDAD")
).orderBy("TIPO_SEGURO")

# Mostrar resultados
print("ANÁLISIS INTEGRADO:")
analisis_integrado.show(truncate=False)

In [48]:
%pyspark
#METRICAS DE RENDIMIENTO
from pyspark.sql import functions as F
from pyspark.sql.functions import col, count, round, avg, stddev
 
# Calcula las métricas de rendimiento global
performance_metrics = df.agg(
    countDistinct("DIAGNOSTICOS").alias("Total Diagnósticos"),  # Total de diagnósticos únicos
    round(avg("MONTO_BRUTO"), 2).alias("Promedio Monto Bruto"),  # Promedio del monto bruto
    round(stddev("MONTO_BRUTO"), 2).alias("Desviación Estándar Monto"),  # Desviación estándar del monto bruto
    round(avg("EDAD"), 2).alias("Promedio Edad"),  # Promedio de la edad
    count("*").alias("Total Atenciones") #Total de atenciones
).collect()[0].asDict()
 
print("\n=== Métricas de Rendimiento Global ===")
for field, value in performance_metrics.items(): #Iterar sobre el diccionario
    print(f"{field}: {value}")

In [49]:
%pyspark
#MODELO PREDICTIVO
# 1. Distribución de la variable objetivo
if 'MONTO_BRUTO' in df.columns:
    df.groupBy('MONTO_BRUTO').count().show()
 




In [50]:
%pyspark
 
# Lista de columnas a eliminar
columns_to_drop = ['PERIODO','DOCUMENTO_ANONIMIZADO', 'RENAES', 'DISTRITO', 'UBIGEO', 'SERVICIO', 'CODIGO_DIAGNOSTICO']
 
# Eliminar las columnas del DataFrame
df = df.drop(*columns_to_drop)
print(df.columns)

 

In [51]:
%pyspark
#2. Análisis estadístico de columnas numéricas
df.describe().show()

# 4. Verificar correlaciones entre columnas numéricas
numeric_columns= [col_name for col_name, dtype in df.dtypes if dtype in ['int', 'double', 'float']]
if numeric_columns:
    assembler = VectorAssembler(inputCols=numeric_columns, outputCol="features_numeric")
    df_assembled = assembler.transform(df)
    correlation_matrix = Correlation.corr(df_assembled, "features_numeric").head()[0]
    print(f"Matriz de correlaciones (ordenada por columnas numércias):")
    print (correlation_matrix)



In [52]:
%pyspark
print(df.columns)

 #4. Evaluación de categorías en columnas categóricas
categorical_columns = [col_name for col_name, dtype in df.dtypes if dtype == 'string']
for col_name in categorical_columns:
    unique_count = df.select(col_name).distinct().count()
    print(f"Columna categórica {col_name}' tiene {unique_count} valores únicos.")

 


 





 

In [53]:
%pyspark
#5. Recomendación preliminar del modelo basado en análisis de datos
if len(numeric_columns) > 3 and df.select('MONTO_BRUTO').distinct().count() == 2:
    print("Sugerencia: Modelos de clasificación binaria como Logistic Regression o Random Forest son adecuados.")
elif len(numeric_columns) > 3:
  print("Sugerencia: Modelos de regresión como Linear Regression podrían ser útiles.")
else:
  print("Sugerencia: Realiza más análisis para identificar relaciones complejas y considerar modelos no lineales.")


In [54]:
%pyspark
print(df.columns)


In [55]:
%pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

# Asumiendo que 'df' ya está cargado y contiene las siguientes columnas
categorical_columns = ['ipress', 'region', 'departamento', 'provincia','diagnosticos','sexo','tipo_seguro']
numeric_columns = ['EDAD']
target_column = 'MONTO_BRUTO'

# Transformar variables categóricas a numéricas
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_columns]

# Crear vector de características
feature_columns = [col + "_index" for col in categorical_columns] + numeric_columns
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Definir pipeline
pipeline = Pipeline(stages=indexers + [assembler])
df_transformed = pipeline.fit(df).transform(df)
df_transformed = df_transformed.select("features", target_column)

# Dividir en datos de entrenamiento y prueba
train_data, test_data = df_transformed.randomSplit([0.7, 0.3], seed=1234)

# Definir modelos
models = {
    "Linear Regression": LinearRegression(featuresCol="features", labelCol=target_column),
    "Decision Tree": DecisionTreeRegressor(featuresCol="features", labelCol=target_column, maxBins=450),
    "Random Forest": RandomForestRegressor(featuresCol="features", labelCol=target_column, numTrees=100, maxBins=450),
    "Gradient Boosting": GBTRegressor(featuresCol="features", labelCol=target_column, maxBins=450)
}

# Evaluadores
rmse_evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="rmse")
r2_evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="r2")

# Entrenar y evaluar modelos
results = {}
for name, model in models.items():
    trained_model = model.fit(train_data)
    predictions = trained_model.transform(test_data)
    rmse = rmse_evaluator.evaluate(predictions)
    r2 = r2_evaluator.evaluate(predictions)
    results[name] = {"RMSE": rmse, "R²": r2}

# Mostrar resultados de los cuatro modelos
for name, metrics in results.items():
    print(f"{name} -> RMSE: {metrics['RMSE']:.4f}, R²: {metrics['R²']:.4f}")

# Identificar el mejor modelo
best_model = min(results, key=lambda x: results[x]["RMSE"])
print(f"\nEl mejor modelo es {best_model} con RMSE: {results[best_model]['RMSE']:.4f} y R²: {results[best_model]['R²']:.4f}")


In [56]:
%pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

# Eliminar columnas con un solo valor único
df = df.drop('grupo_diagnosticos', 'periodo', 'grupo_cobertura', 'FECHA_CORTE_VALID', 'FECHA_ATENCION_VALID', 'fecha_corte', 'fecha_atencion')

# 1. Indexar variables categóricas
categorical_columns = ['ipress','region', 'departamento', 'provincia', 'diagnosticos', 'sexo', 'tipo_seguro']

indexers = [StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="keep") for col in categorical_columns]

# 2. Crear el vector de características
feature_columns = [col + "_index" for col in categorical_columns] + ['EDAD', 'mes']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# 3. Escalar las características
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# 4. Definir la variable objetivo
df = df.withColumnRenamed("MONTO_BRUTO", "label")

# 5. Construir el pipeline
pipeline = Pipeline(stages=indexers + [assembler, scaler])
df_transformed = pipeline.fit(df).transform(df)

# 6. Seleccionar solo las columnas necesarias
df_transformed = df_transformed.select("scaledFeatures", "label")

# 7. Dividir en entrenamiento y prueba
train_data, test_data = df_transformed.randomSplit([0.7, 0.3], seed=1234)

# 8. Modelos de regresión con ajuste de hiperparámetros
models = {
    "Regresión Lineal": LinearRegression(featuresCol="scaledFeatures", labelCol="label", regParam=0.1, elasticNetParam=0.8),
    "Árbol de Decisión": DecisionTreeRegressor(featuresCol="scaledFeatures", labelCol="label", maxDepth=8, maxBins=500),
    "Random Forest": RandomForestRegressor(featuresCol="scaledFeatures", labelCol="label", numTrees=200, maxDepth=5, maxBins=500),
    "Gradient Boosting": GBTRegressor(featuresCol="scaledFeatures", labelCol="label", maxIter=100, maxDepth=5, maxBins=500)
}

# Evaluador de regresión
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

# 9. Entrenamiento y evaluación
for name, model in models.items():
    print(f"Entrenando {name}...")
    model_fit = model.fit(train_data)
    predictions = model_fit.transform(test_data)
    
    # Evaluar RMSE y R²
    rmse = evaluator.evaluate(predictions)
    evaluator.setMetricName("r2")
    r2 = evaluator.evaluate(predictions)
    
    print(f"{name} -> RMSE: {rmse}, R²: {r2}")



In [57]:
%pyspark
#7. PERO POR LO MENOS ESTO SI TENEMOS QUE HACER









In [58]:
%pyspark
#8. MODELO QUE VA A PREDECIR
# Y ACA RECIEN TENEMOS QUE HACER EL MODELO PREDICTIVO



In [59]:
%pyspark







In [60]:
%pyspark




In [64]:
%pyspark

