In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Crear una sesión de Spark
spark = SparkSession.builder.appName("CruceTotal").getOrCreate()

# Función para cargar, filtrar y limpiar los datos de múltiples directorios
def cargar_y_filtrar(directorios, columna):
    df_unido = None
    for dir in directorios:
        # Leer los archivos Parquet en el directorio actual y seleccionar la columna especificada
        df = spark.read.parquet(dir).select(columna)
        # Filtrar los registros donde la columna especificada no sea igual a 1
        df = df.filter(col(columna) != 1)
        # Eliminar duplicados
        df = df.distinct()
        # Unir los DataFrames
        if df_unido is None:
            df_unido = df
        else:
            df_unido = df_unido.union(df)
    # Eliminar duplicados nuevamente después de unir todos los DataFrames
    df_unido = df_unido.distinct()   
    return df_unido

# Función para realizar un inner join y contar los registros coincidentes
def contar_cruce(df1, columna1, df2, columna2, nombre1, nombre2):
    cruce = df1.join(df2, df1[columna1] == df2[columna2], "inner")
    cuenta = cruce.count()
    print(f"Registros coincidentes entre {nombre1} y {nombre2}: {cuenta}")
    return cuenta

# Listar todos los directorios para cada fuente
dirs_segcovid = [
    "/rawdata/segcovid/segcovid_parquet_2020",
    "/rawdata/segcovid/segcovid_parquet_2021",
    "/rawdata/segcovid/segcovid_parquet_2022",
    "/rawdata/segcovid/segcovid_parquet_2023"
]

dirs_sivigila = [
    "/rawdata/sivigila/sivigila_parquet_2008",
    "/rawdata/sivigila/sivigila_parquet_2009",
    "/rawdata/sivigila/sivigila_parquet_2010",
    "/rawdata/sivigila/sivigila_parquet_2011",
    "/rawdata/sivigila/sivigila_parquet_2012",
    "/rawdata/sivigila/sivigila_parquet_2013",
    "/rawdata/sivigila/sivigila_parquet_2014",
    "/rawdata/sivigila/sivigila_parquet_2015",
    "/rawdata/sivigila/sivigila_parquet_2016",
    "/rawdata/sivigila/sivigila_parquet_2017",
    "/rawdata/sivigila/sivigila_parquet_2018",
    "/rawdata/sivigila/sivigila_parquet_2019",
    "/rawdata/sivigila/sivigila_parquet_2020",
    "/rawdata/sivigila/sivigila_parquet_2021",
    "/rawdata/sivigila/sivigila_parquet_2022"
]

dirs_vacunas = [
    "/rawdata/vacunascovid/vacunascovid_ver2_parquet_2021",
    "/rawdata/vacunascovid/vacunascovid_ver2_parquet_2022"
]

dirs_defunciones = [
    "/rawdata/vitales/defunciones/defunciones_parquet_2008_a_2016",
    "/rawdata/vitales/defunciones/defunciones_parquet_2017_a_2023"
]

dirs_nacimientos = [
    "/rawdata/vitales/vitales_parquet_2014",
    "/rawdata/vitales/vitales_parquet_2015",
    "/rawdata/vitales/vitales_parquet_2016",
    "/rawdata/vitales/vitales_parquet_2017",
    "/rawdata/vitales/vitales_parquet_2018",
    "/rawdata/vitales/vitales_parquet_2019",
    "/rawdata/vitales/vitales_parquet_2020",
    "/rawdata/vitales/vitales_parquet_2021",
    "/rawdata/vitales/vitales_parquet_2022",
    "/rawdata/vitales/vitales_parquet_2023"
]

dirs_rips = [
    "/rawdata/rips/rips_parquet_2009",
    "/rawdata/rips/rips_parquet_2010",
    "/rawdata/rips/rips_parquet_2011",
    "/rawdata/rips/rips_parquet_2012",
    "/rawdata/rips/rips_parquet_2013",
    "/rawdata/rips/rips_parquet_2014",
    "/rawdata/rips/rips_parquet_2015",
    "/rawdata/rips/rips_parquet_2016",
    "/rawdata/rips/rips_parquet_2017",
    "/rawdata/rips/rips_parquet_2018",
    "/rawdata/rips/rips_parquet_2019",
    "/rawdata/rips/rips_parquet_2020",
    "/rawdata/rips/rips_parquet_2021",
    "/rawdata/rips/rips_parquet_2022"
]

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/27 23:59:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Cargar y filtrar los datos de cada fuente
df_rips = cargar_y_filtrar(dirs_rips, "personaid")
df_nacimientos = cargar_y_filtrar(dirs_nacimientos, "PersonaID")
df_defunciones = cargar_y_filtrar(dirs_defunciones, "PersonaID")
df_vacunas = cargar_y_filtrar(dirs_vacunas, "PersonaBasicaID")
df_sivigila = cargar_y_filtrar(dirs_sivigila, "PersonaBasicaID")
df_segcovid = cargar_y_filtrar(dirs_segcovid, "PersonaBasicaID")

# Contar registros de cada DataFrame
total_rips = df_rips.count()
total_nacimientos = df_nacimientos.count()
total_defunciones = df_defunciones.count()
total_vacunas = df_vacunas.count()
total_sivigila = df_sivigila.count()
total_segcovid = df_segcovid.count()

# Imprimir los resultados
print(f"Total de registros en RIPS: {total_rips}")
print(f"Total de registros en Nacimientos: {total_nacimientos}")
print(f"Total de registros en Defunciones: {total_defunciones}")
print(f"Total de registros en Vacunas: {total_vacunas}")
print(f"Total de registros en Sivigila: {total_sivigila}")
print(f"Total de registros en Segcovid: {total_segcovid}")



Total de registros en RIPS: 73357469
Total de registros en Nacimientos: 4510549
Total de registros en Defunciones: 2497077
Total de registros en Vacunas: 39869037
Total de registros en Sivigila: 6082619
Total de registros en Segcovid: 5633747


                                                                                

In [3]:
# Realizar cruces entre cada par de fuentes de datos
contar_cruce(df_nacimientos, "PersonaID", df_defunciones, "PersonaID", "Nacimientos", "Defunciones")
contar_cruce(df_nacimientos, "PersonaID", df_vacunas, "PersonaBasicaID", "Nacimientos", "Vacunas")
contar_cruce(df_nacimientos, "PersonaID", df_sivigila, "PersonaBasicaID", "Nacimientos", "Sivigila")
contar_cruce(df_nacimientos, "PersonaID", df_segcovid, "PersonaBasicaID", "Nacimientos", "Segcovid")
contar_cruce(df_nacimientos, "PersonaID", df_rips, "personaid", "Nacimientos", "RIPS")

contar_cruce(df_defunciones, "PersonaID", df_vacunas, "PersonaBasicaID", "Defunciones", "Vacunas")
contar_cruce(df_defunciones, "PersonaID", df_sivigila, "PersonaBasicaID", "Defunciones", "Sivigila")
contar_cruce(df_defunciones, "PersonaID", df_segcovid, "PersonaBasicaID", "Defunciones", "Segcovid")
contar_cruce(df_defunciones, "PersonaID", df_rips, "personaid", "Defunciones", "RIPS")

contar_cruce(df_vacunas, "PersonaBasicaID", df_sivigila, "PersonaBasicaID", "Vacunas", "Sivigila")
contar_cruce(df_vacunas, "PersonaBasicaID", df_segcovid, "PersonaBasicaID", "Vacunas", "Segcovid")
contar_cruce(df_vacunas, "PersonaBasicaID", df_rips, "personaid", "Vacunas", "RIPS")

contar_cruce(df_sivigila, "PersonaBasicaID", df_segcovid, "PersonaBasicaID", "Sivigila", "Segcovid")
contar_cruce(df_sivigila, "PersonaBasicaID", df_rips, "personaid", "Sivigila", "RIPS")

contar_cruce(df_segcovid, "PersonaBasicaID", df_rips, "personaid", "Segcovid", "RIPS")

                                                                                

Registros coincidentes entre Nacimientos y Defunciones: 15933


                                                                                ]

Registros coincidentes entre Nacimientos y Vacunas: 3262221


                                                                                

Registros coincidentes entre Nacimientos y Sivigila: 737072


                                                                                

Registros coincidentes entre Nacimientos y Segcovid: 539285


                                                                                ]]

Registros coincidentes entre Nacimientos y RIPS: 4415675


                                                                                ]

Registros coincidentes entre Defunciones y Vacunas: 547541


                                                                                

Registros coincidentes entre Defunciones y Sivigila: 248445


                                                                                

Registros coincidentes entre Defunciones y Segcovid: 213944


                                                                                ]

Registros coincidentes entre Defunciones y RIPS: 2287979


                                                                                

Registros coincidentes entre Vacunas y Sivigila: 3146974


                                                                                ]

Registros coincidentes entre Vacunas y Segcovid: 4911524


                                                                                ]]

Registros coincidentes entre Vacunas y RIPS: 35586807


                                                                                

Registros coincidentes entre Sivigila y Segcovid: 568567


                                                                                1]]]

Registros coincidentes entre Sivigila y RIPS: 5539934




Registros coincidentes entre Segcovid y RIPS: 5469196




5469196

In [4]:
# Detener la sesión de Spark
spark.stop()