In [23]:
import pandas as pd
import os

# Carpeta donde están los Excel
input_dir = os.path.expanduser("~/BigData_UPAO/Proyecto/DATA_CLINICA")
# Carpeta donde guardar los CSV
output_dir = os.path.expanduser("~/BigData_UPAO/Proyecto/DATA_CLINICA_CSV")
os.makedirs(output_dir, exist_ok=True)

distritos = ["Callayuc", "Choros", "Cujillo", "Cutervo", "LaRamada", "Pimpingos",
             "Querocotillo", "SanAndresdeCutervo", "SanLuisdeLucma", "SanJuandeCutervo",
             "SantaCruz", "SantoDomingodelaCapilla", "SantoTomas", "Socota", "ToribioCasanova"]

for distrito in distritos:
    excel_file = os.path.join(input_dir, f"DATA_{distrito}.xlsx")
    csv_file = os.path.join(output_dir, f"DATA_{distrito}.csv")
    
    if os.path.exists(excel_file):
        print(f"Convirtiendo {distrito} a CSV...")
        df = pd.read_excel(excel_file)
        df.to_csv(csv_file, index=False)
    else:
        print(f"No se encontró archivo para {distrito}")


Convirtiendo Callayuc a CSV...
Convirtiendo Choros a CSV...
Convirtiendo Cujillo a CSV...
Convirtiendo Cutervo a CSV...
Convirtiendo LaRamada a CSV...
Convirtiendo Pimpingos a CSV...
Convirtiendo Querocotillo a CSV...
Convirtiendo SanAndresdeCutervo a CSV...
Convirtiendo SanLuisdeLucma a CSV...
Convirtiendo SanJuandeCutervo a CSV...
Convirtiendo SantaCruz a CSV...
Convirtiendo SantoDomingodelaCapilla a CSV...
Convirtiendo SantoTomas a CSV...
Convirtiendo Socota a CSV...
Convirtiendo ToribioCasanova a CSV...


In [None]:
#Conversion de xlsx a csv archivos generales
import pandas as pd
import os
import glob


input_dir = os.path.expanduser("~/BigData_UPAO/Proyecto/DATA_CLINICA")

output_dir = os.path.expanduser("~/BigData_UPAO/Proyecto/DATA_CLINICA_CSV")
os.makedirs(output_dir, exist_ok=True)


excel_files = glob.glob(os.path.join(input_dir, "*.xlsx"))

for excel_file in excel_files:
    
    base_name = os.path.splitext(os.path.basename(excel_file))[0]
    
    csv_file = os.path.join(output_dir, f"{base_name}.csv")
    
    print(f"Convirtiendo {base_name} a CSV...")
    df = pd.read_excel(excel_file)
    df.to_csv(csv_file, index=False)

print("Conversión completada.")


In [31]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("HistoriasClinicas_ETL")
    .getOrCreate()
)

In [32]:
from pyspark.sql.types import StructType, StructField, StringType


schema = StructType([
    StructField("DNI", StringType(), True),
    StructField("Asegurado", StringType(), True),
    StructField("FecNac", StringType(), True),   
    StructField("SEXO", StringType(), True),
    StructField("CPP", StringType(), True),
    StructField("síntomas", StringType(), True),
    StructField("enfermedad detectada", StringType(), True),
    StructField("Distrito", StringType(), True),
])

In [33]:
distritos = [
    "Callayuc", "Choros", "Cujillo", "Cutervo", "LaRamada", "Pimpingos",
    "Querocotillo", "SanAndresdeCutervo", "SanLuisdeLucma", "SanJuandeCutervo",
    "SantaCruz", "SantoDomingodelaCapilla", "SantoTomas", "Socota", "ToribioCasanova"
]

output_path = "hdfs://localhost:9000/datalake/raw/clinic/historias_csv_unificado/"

dfs = []

for distrito in distritos:
    file_path = f"hdfs://localhost:9000/datalake/raw/clinic/historias_csv/ingestion_date=2025-09-29/distrito={distrito}/*.csv"
    df = spark.read.csv(file_path, header=True, schema=schema)
    dfs.append(df)

In [34]:
from functools import reduce
from pyspark.sql import DataFrame

df_raw = reduce(DataFrame.unionByName, dfs)

In [35]:
from pyspark.sql.functions import trim, upper, col, to_date
df_pre = (
    df_raw
    .withColumn("FecNac", to_date(col("FecNac"), "yyyy-MM-dd"))
    .fillna({"síntomas": "DESCONOCIDOS", "enfermedad detectada": "DESCONOCIDA"})
    .dropDuplicates(["DNI"])
    .withColumn("DNI_masked", concat(lit("XXX-"), substring(col("DNI"), 5, 4)))
    .drop("DNI")
)

In [36]:
df_pre.show(5, truncate=False)
df_pre.printSchema()
print("Total de registros:", df_pre.count())

df_pre.write.mode("overwrite").parquet(output_path)


                                                                                

+--------------------------------+----------+----+---------------------------------------------------------+---------------------------------------+--------------------+---------------------------+----------+
|Asegurado                       |FecNac    |SEXO|CPP                                                      |síntomas                               |enfermedad detectada|Distrito                   |DNI_masked|
+--------------------------------+----------+----+---------------------------------------------------------+---------------------------------------+--------------------+---------------------------+----------+
|NULL                            |NULL      |NULL|NULL                                                     |DESCONOCIDOS                           |DESCONOCIDA         |NULL                       |NULL      |
|Sandra Roberto Torres Herrera   |1990-12-21|M   |NULL                                                     |DESCONOCIDOS                           |DESCONOCIDA     

                                                                                

Total de registros: 131981


                                                                                

In [27]:
output_path = "hdfs://localhost:9000/datalake/raw/clinic/historias_preprocesadas/"

df_pre.write.mode("overwrite").parquet(output_path)
print("Parquet preprocesado guardado correctamente.")



Parquet preprocesado guardado correctamente.


                                                                                

In [28]:
df_pre.printSchema()

root
 |-- Asegurado: string (nullable = true)
 |-- FecNac: string (nullable = true)
 |-- SEXO: string (nullable = true)
 |-- CPP: string (nullable = true)
 |-- síntomas: string (nullable = false)
 |-- enfermedad detectada: string (nullable = false)
 |-- Distrito: string (nullable = true)
 |-- DNI_masked: string (nullable = true)



In [29]:
df_pre.select("síntomas", "enfermedad detectada").show(10, truncate=False)

+-----------------------------------------------------+--------------------+
|síntomas                                             |enfermedad detectada|
+-----------------------------------------------------+--------------------+
|sensibilidad a la luz, náuseas                       |Migraña             |
|dificultad para respirar, pérdida de olfato          |Migraña             |
|dolor de cabeza                                      |Diabetes            |
|DESCONOCIDOS                                         |Diabetes            |
|pérdida de olfato, tos seca, dificultad para respirar|COVID-19            |
|acidez estomacal                                     |Diabetes            |
|fiebre, pérdida de olfato                            |Gripe               |
|DESCONOCIDOS                                         |DESCONOCIDA         |
|sibilancias, dificultad para respirar                |Asma                |
|estornudos, picazón                                  |Alergia             |

In [30]:
spark.stop()