<a href="https://colab.research.google.com/github/anderson666r/Riesgo-cardiaco/blob/main/Analisis_patologias.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# C√≥digo fuente

A continuaci√≥n se explica paso a paso el c√≥digo implementado, estructurado en tres bloques: carga y preparaci√≥n de datos, an√°lisis con Spark (DataFrames y RDDs), y simulaci√≥n del flujo en tiempo real.

Instalaci√≥n y carga del entorno

In [None]:
!pip install -q pyspark==3.5.1

Se instala la versi√≥n requerida de PySpark en el entorno de Google Colab.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F, types as T
from pathlib import Path
import shutil, time
from datetime import datetime
import threading, csv, os

Importaci√≥n de librer√≠as necesarias para Spark y manejo de archivos en el entorno.

# Carga de datos y creaci√≥n de la sesi√≥n Spark

In [None]:
spark = SparkSession.builder.appName("HeartFailure-Tarea3").getOrCreate()
from google.colab import files
uploaded = files.upload()

Se crea la sesi√≥n de Spark y se carga el archivo local heart.csv en Colab.

In [None]:
DATA_PATH = "/content/heart.csv"
df = spark.read.csv(DATA_PATH, header=True, inferSchema=True)

Se lee el archivo CSV como un DataFrame de Spark, infiriendo autom√°ticamente los tipos de datos.

# Exploraci√≥n inicial y limpieza

In [None]:
df.count()
df.printSchema()
df.show(5, truncate=False)
df.summary().show()

Estas l√≠neas permiten conocer la cantidad de registros, tipos de datos, una vista previa de los datos y estad√≠sticas generales.

In [None]:
nulls = df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns])
nulls.show(truncate=False)

Se genera una tabla con el conteo de valores nulos por columna.

# Transformaciones cl√≠nicas

In [None]:
# Clasificaci√≥n por grupo de edad cl√≠nica
df = df.withColumn("age_group",
F.when(F.col("Age") < 14, "Ni√±o")
.when((F.col("Age") >= 14) & (F.col("Age") <= 26), "Joven")
.when((F.col("Age") >= 27) & (F.col("Age") <= 59), "Adulto")
.otherwise("Mayor")
)


# Indicadores cl√≠nicos de riesgo
df = df.withColumn("bp_risk",
F.when((F.col("RestingBP") < 90) | (F.col("RestingBP") > 120), 1).otherwise(0))


df = df.withColumn("chol_risk",
F.when(F.col("Cholesterol") >= 200, 1).otherwise(0))

Se agregan columnas que clasifican a los pacientes por edad y marcan condiciones de riesgo en presi√≥n arterial y colesterol.

# An√°lisis con DataFrames

In [None]:
# Conteo por grupo de edad y enfermedad card√≠aca
df.groupBy("age_group", "HeartDisease").count().orderBy("age_group", "HeartDisease").show()


# Riesgo por presi√≥n y colesterol
df.groupBy("age_group").agg(
F.sum("bp_risk").alias("con_riesgo_presion"),
F.sum("chol_risk").alias("con_riesgo_colesterol")
).orderBy("age_group").show()


# Combinaci√≥n de factores cl√≠nicos
df.groupBy("age_group", "HeartDisease", "FastingBS", "bp_risk", "chol_risk")\
.count().orderBy("age_group").show(truncate=False)

Se realizan distintos agrupamientos para analizar la distribuci√≥n de los riesgos cl√≠nicos por grupo de edad.

# An√°lisis con RDDs

In [None]:
rdd = df.select("Age", "HeartDisease", "MaxHR", "bp_risk", "chol_risk").rdd


def en_riesgo(fila):
return (
fila["HeartDisease"] == 1 and
(fila["bp_risk"] == 1 or fila["chol_risk"] == 1) and
(fila["MaxHR"] < 60 or fila["MaxHR"] > 180)
)


rdd_filtrado = rdd.filter(en_riesgo)
print("Pacientes con riesgo m√∫ltiple:", rdd_filtrado.count())

Se convierte el DataFrame a RDD para aplicar un filtro m√°s flexible que identifique pacientes con m√∫ltiples riesgos simult√°neos.

# Simulaci√≥n de flujo de datos (Kafka)

In [None]:
import pandas as pd
import os


# Divisi√≥n del CSV en lotes
streaming_dir = "/content/streaming_input"
os.makedirs(streaming_dir, exist_ok=True)


batch_size = 100
for i in range((len(df)//batch_size)+1):
batch_df = df.iloc[i*batch_size:(i+1)*batch_size]
batch_df.to_csv(f"{streaming_dir}/batch_{i}.csv", index=False)

Se divide el dataset original en archivos peque√±os para simular la llegada progresiva de datos como lo har√≠a Kafka.

# Proceso simulado de Spark Streaming

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pathlib import Path
import time


spark = SparkSession.builder.appName("HeartStreamingSim").getOrCreate()
streaming_dir = Path("/content/streaming_input")
archivos = sorted(streaming_dir.glob("batch_*.csv"))
df_acumulado = None


for archivo in archivos:
df_lote = spark.read.csv(str(archivo), header=True, inferSchema=True)
df_acumulado = df_lote if df_acumulado is None else df_acumulado.union(df_lote)


df_filtrado = df_acumulado.withColumn("chol_risk", F.when(F.col("Cholesterol") >= 200, 1).otherwise(0))
print(f"
üü¢ Procesando: {archivo.name}")
print("üîπ Total acumulado:", df_filtrado.count())
print(" - Colesterol alto (‚â•200):", df_filtrado.filter("chol_risk == 1").count())
print(" - Con enfermedad card√≠aca:", df_filtrado.filter("HeartDisease == 1").count())
time.sleep(3)

Este bloque lee cada archivo como si fuera un lote de datos reci√©n llegado, acumula los datos previos y muestra en consola estad√≠sticas acumuladas. El sleep(3) simula un retardo realista entre eventos.