# Flujo del proceso ETL

## Extract

Para el proceso de Extracción, se recopilaron los datos de las pruebas Saber 11 proporcionados por el Instituto Colombiano para la Evaluación de la Educación (ICFES), disponibles en Datos Abiertos de Colombia. Esta base de datos, actualizada el 20 de abril de 2024, contiene más de 7,11 millones de observaciones distribuidas en 51 columnas.

La extracción de estos datos involucró la utilización de técnicas de tratamiento de datos estructurados (CSV). Los datos, proporcionados por el ICFES y gestionados a través de la Oficina Asesora de Investigaciones, fueron descargados y procesados utilizando herramientas herramientas como Dbeaver, Docker y Python. Esto establece una base sólida para las etapas posteriores del proceso de análisis de datos, como la transformación, visualización y aplicación de técnicas de machine learning.

Las librerías que usaremos son:
- pandas: para la manipulación de datos
- sqlalchemy: para la conexión con la base de datos
- prefect: para la creación de pipelines y la orquestación de tareas

In [None]:
import pandas as pd
import sqlalchemy as sa
from sqlalchemy.orm import declarative_base, Session
from prefect import task, flow

Se creaó el diccionario `config_bd`, el cual contiene los datos de conexión a la base de datos local y remota de AWS. Ambas bases de datos están bajo el motor PostgreSQL.

In [None]:
config_bd = {
    "local": {
        "host": "localhost",
        "port": "5432",
        "user": "postgres",
        "password": "postgres",
        "database": "juan_rocha",
    },
    "aws": {
        "host": "postgres.cspcvpb5rw4y.us-east-1.rds.amazonaws.com",
        "port": "5432",
        "user": "jrocha",
        "password": "fjwvacC_d6iupULVdyK7",
        "database": "analitica",
    },
}

La función `config_conexion` es un decorador que permite la conexión a la base de datos de manera dinámica según el entorno en el que se esté trabajando, (local o aws).

In [None]:
def config_conexion(func):
    def inner(tipo, *args, **kwargs):
        data = config_bd[tipo]
        return func(data, *args, **kwargs)

    return inner

Con la función `conectar_bd`, se establece la conexión a la base de datos. Para definir cuál tipo de conexión se va a realizar, se debe agregar el decorador `@config_conexion`, y recibir como parámetro el la llave del diccionario `config_bd` que se desea utilizar. Adicionalmente, se configura el decorador @task para que la función sea reconocida por Prefect.

In [None]:
@config_conexion
@task
def conectar_bd(env_data):
    print(f'connected to {env_data["database"]} db')

    host = env_data["host"]
    port = env_data["port"]
    user = env_data["user"]
    password = env_data["password"]
    database = env_data["database"]

    return sa.create_engine(
        f"postgresql://{user}:{password}@{host}:{port}/{database}", echo=False
    )

## Transform
Para la fase de Transformación, se emplearon SQL Alchemy y Pandas para realizar consultas y manipular los datos. A través de estas herramientas, se realizaron diferentes consultas para obtener varios DataFrames específicos, que permiten un análisis más enfocado en distintas áreas de interés. Estos DataFrames incluyen:

- `analitica_colegio`: Análisis enfocado en las características y desempeño de los colegios.
- `analitica_educacion_familia`: Análisis de la influencia del nivel educativo de la familia en los resultados.
- `analitica_puntaje_estrato`: Evaluación de la relación entre el puntaje y el estrato socioeconómico.
- `analitica_puntaje_genero`: Comparación de puntajes entre géneros.
- `analitica_puntaje_internet`: Análisis del impacto del acceso a internet en los puntajes.
- `analitica_puntaje_periodo`: Evaluación de los puntajes en diferentes periodos.

Estos DataFrames proporcionan una visión detallada y segmentada de los datos, facilitando la identificación de patrones y tendencias específicos que pueden ser cruciales para comprender los factores que influyen en los resultados de las pruebas Saber 11.

Las funciones `obtener_datos_colegio` y `obtener_datos_estudiante` se encargan de realizar las consultas a la base de datos y retornar los DataFrames correspondientes. Estas funciones se decoran con `@task` para que sean reconocidas por Prefect.

In [None]:
def obtener_datos_colegio(env):
    conn = conectar_bd(env)
    sql_query = """
        select
            s.cole_cod_dane_establecimiento,
            s.cole_jornada,
            s.cole_naturaleza,
            s.cole_calendario,
            s.cole_nombre_establecimiento,
            s.cole_caracter,
            s.cole_area_ubicacion,
            s.cole_genero,
            AVG(s.punt_global) as prom_punt_global
        from
            saber_11 s
        group by
            s.cole_cod_dane_establecimiento,
            s.cole_jornada,
            s.cole_naturaleza,
            s.cole_calendario,
            s.cole_nombre_establecimiento,
            s.cole_caracter,
            s.cole_area_ubicacion,
            s.cole_genero
        order by
            prom_punt_global desc;
      """
    datos_colegio = pd.read_sql_query(sql_query, con=conn)

    conn.dispose()
    return datos_colegio

def obtener_datos_estudiante(env):
    conn = conectar_bd(env)
    sql_query = """
        select
            s.estu_consecutivo,
            s.periodo,
            s.cole_cod_dane_establecimiento,
            s.estu_fechanacimiento,
            s.estu_genero,
            s.fami_estratovivienda,
            s.fami_tieneinternet,
            s.fami_tienecomputador,
            s.fami_educacionmadre,
            s.fami_educacionpadre,
            s.punt_ingles,
            s.punt_matematicas,
            s.punt_sociales_ciudadanas,
            s.punt_c_naturales,
            s.punt_lectura_critica,
            s.punt_global
        from
            saber_11 s;
      """
    datos_estudiante = pd.read_sql_query(sql_query, con=conn)

    conn.dispose()
    return datos_estudiante

La función `nivel_educacion_maximo` se encarga de obtener el nivel educativo máximo de los padres de los estudiantes. Para ello, se crea una lista con los niveles educativos en orden jerárquico y se asigna un valor numérico a cada uno según su índice. Luego, se obtiene el nivel educativo máximo de los padres de los estudiantes y se asigna el valor correspondiente. Esta función no se decoró con `@task` ya que es un proceso inidividual por cada registro de los estudiantes, lo cual no es significativo para el análisis del flujo de trabajo.

In [None]:
def nivel_educacion_maximo(row):
    education_levels = [
        "Ninguno",
        "Primaria incompleta",
        "Primaria completa",
        "Secundaria (Bachillerato) incompleta",
        "Secundaria (Bachillerato) completa",
        "Técnica o tecnológica incompleta",
        "Técnica o tecnológica completa",
        "Educación profesional incompleta",
        "Educación profesional completa",
        "Postgrado",
    ]

    levels = [row["fami_educacionmadre"], row["fami_educacionpadre"]]

    levels = [level for level in levels if level in education_levels]

    if levels:
        return max(levels, key=lambda level: education_levels.index(level))
    return "Ninguno"


Siguiendo el flujo, tenemos la función `limpiar_datos_estudiantes`, la cual se encarga de limpiar los datos de los estudiantes. Se eliminan las columnas que no son relevantes para el análisis y se renombran las columnas restantes. Esta función se decoró con `@task` para que sea reconocida por Prefect.

In [None]:
@task
def limpiar_datos_estudiantes(datos_estudiante):
    datos_estudiante = datos_estudiante[
        datos_estudiante["estu_genero"].isin(["F", "M"])
    ]

    datos_estudiante["fami_estratovivienda"] = datos_estudiante[
        "fami_estratovivienda"
    ].replace("", "Sin Estrato")

    datos_estudiante = datos_estudiante[
        datos_estudiante["fami_tieneinternet"].isin(["Si", "No"])
    ]

    datos_estudiante["nivel_educacion_maximo"] = datos_estudiante.apply(
        nivel_educacion_maximo, axis=1
    )

    return datos_estudiante


Iniciamos obteniendo el promedio de puntaje global por periodo, naturaleza y jornada del colegio con la función `promedio_puntaje_periodo`. Para ello, se realiza un merge entre los DataFrames `datos_colegio` y `datos_estudiante` y se agrupa por periodo, naturaleza y jornada. Se calcula el promedio del puntaje global y el número de estudiantes por grupo. Esta función se decoró con `@task` para que sea reconocida por Prefect.

In [None]:
def promedio_puntaje_periodo(datos_colegio, datos_estudiante):
    merged_data = pd.merge(
        datos_estudiante,
        datos_colegio,
        on="cole_cod_dane_establecimiento",
        how="inner",
        validate="m:m",
    )

    return (
        merged_data.groupby(["periodo", "cole_naturaleza", "cole_jornada"])
        .agg(
            prom_punt_global=("punt_global", "mean"),
            numero_estudiantes=("punt_global", "size"),
        )
        .reset_index()
    )

De manera similar, la función `promedio_puntaje_genero` se encarga de obtener el promedio de puntaje global por género. Se realiza un merge entre los DataFrames `datos_colegio` y `datos_estudiante` y se agrupa por género. Se calcula el promedio del puntaje global y el número de estudiantes por grupo. Esta función se decoró con `@task` para que sea reconocida por Prefect.

In [None]:
@task
def promedio_puntaje_genero(datos_colegio, datos_estudiante):
    merged_data = pd.merge(
        datos_estudiante,
        datos_colegio,
        on="cole_cod_dane_establecimiento",
        how="inner",
        validate="m:m",
    )

    return (
        merged_data.groupby(["estu_genero", "cole_naturaleza", "cole_jornada"])
        .agg(
            prom_punt_global=("punt_global", "mean"),
            numero_estudiantes=("punt_global", "size"),
        )
        .reset_index()
    )

Se realiza un análisis similar con la función `promedio_puntaje_estrato`, la cual obtiene el promedio de puntaje global por estrato socioeconómico.

In [None]:
@task
def promedio_puntaje_estrato(datos_colegio, datos_estudiante):
    merged_data = pd.merge(
        datos_estudiante,
        datos_colegio,
        on="cole_cod_dane_establecimiento",
        how="inner",
        validate="m:m",
    )

    return (
        merged_data.groupby(["fami_estratovivienda", "cole_naturaleza", "cole_jornada"])
        .agg(
            prom_punt_global=("punt_global", "mean"),
            numero_estudiantes=("punt_global", "size"),
        )
        .reset_index()
    )

Continuando con la transformación de los datos, la función `promedio_puntaje_internet` se encarga de obtener el promedio de puntaje global por acceso a internet.

In [None]:
@task
def promedio_puntaje_internet(datos_colegio, datos_estudiante):
    merged_data = pd.merge(
        datos_estudiante,
        datos_colegio,
        on="cole_cod_dane_establecimiento",
        how="inner",
        validate="m:m",
    )

    return (
        merged_data.groupby(["fami_tieneinternet", "cole_naturaleza", "cole_jornada"])
        .agg(
            prom_punt_global=("punt_global", "mean"),
            numero_estudiantes=("punt_global", "size"),
        )
        .reset_index()
    )

Finalmente, la función `promedio_puntaje_educacion_familia` se encarga de obtener el promedio de puntaje global por nivel educativo de la familia. Esta función utiliza la columna generada en la función `nivel_educacion_maximo` para agrupar los datos y calcular el promedio del puntaje global.

In [None]:
@task
def promedio_puntaje_educacion_familia(datos_colegio, datos_estudiante):
    merged_data = pd.merge(
        datos_estudiante,
        datos_colegio,
        on="cole_cod_dane_establecimiento",
        how="inner",
        validate="m:m",
    )

    return (
        merged_data.groupby(
            ["nivel_educacion_maximo", "cole_naturaleza", "cole_jornada"]
        )
        .agg(
            prom_punt_global=("punt_global", "mean"),
            numero_estudiantes=("punt_global", "size"),
        )
        .reset_index()
    )

## Load

Para el proceso de carga de datos se hizo uso de ORM con la librería SQLAlchemy para organizar los datos estudiados en un modelo objeto-relacional. Para esto se crean las clases que se encargaran de definir las tablas con la información que se quiere cargar y definiendo cada uno de los atributos internos y los tipos de datos. Seguido a esto se define una función que devuelve un objeto a la conexión con la base de datos y otra función para la creación todas las tablas definidas.

Finalmente para la carga masiva de datos se realiza la insersión de estos utilizando el método try finally para asegurar una correcta ejecución.

In [None]:
Base = declarative_base()

@task
def cargar_tablas_analitica(
    env,
    datos_colegio,
    puntaje_periodo,
    puntaje_genero,
    puntaje_estrato,
    puntaje_internet,
    puntaje_educacion_familia,
):
    class AnaliticaColegio(Base):
        __tablename__ = "analitica_colegio"

        id = sa.Column(sa.Integer, primary_key=True)
        cole_cod_dane_establecimiento = sa.Column(sa.String)
        cole_jornada = sa.Column(sa.String)
        cole_naturaleza = sa.Column(sa.String)
        cole_calendario = sa.Column(sa.String)
        cole_nombre_establecimiento = sa.Column(sa.String)
        cole_caracter = sa.Column(sa.String)
        cole_area_ubicacion = sa.Column(sa.String)
        cole_genero = sa.Column(sa.String)
        prom_punt_global = sa.Column(sa.Float)

        def _repr_(self):
            return f"AnaliticaColegio(id={self.id!r}, cole_cod_dane_establecimiento={self.cole_cod_dane_establecimiento!r}, \
                    cole_jornada = {self.cole_jornada!r}, cole_naturaleza = {self.cole_naturaleza!r}, cole_calendario = {self.cole_calendario!r}, \
                    cole_nombre_establecimiento = {self.cole_nombre_establecimiento!r}, cole_caracter = {self.cole_caracter!r}, \
                    cole_area_ubicacion = {self.cole_area_ubicacion!r}, cole_genero = {self.cole_genero!r}, prom_punt_global = {self.prom_punt_global!r}"

    class AnaliticaPuntajePeriodo(Base):
        __tablename__ = "analitica_puntaje_periodo"

        id = sa.Column(sa.Integer, primary_key=True)
        periodo = sa.Column(sa.Integer)
        cole_naturaleza = sa.Column(sa.String)
        cole_jornada = sa.Column(sa.String)
        prom_punt_global = sa.Column(sa.Float)
        numero_estudiantes = sa.Column(sa.Integer)

        def _repr_(self):
            return f"AnaliticaPuntajePeriodo(id={self.id!r}, periodo={self.periodo!r}, cole_naturaleza={self.cole_naturaleza!r}, \
                    cole_jornada = {self.cole_jornada!r}, prom_punt_global = {self.prom_punt_global!r}, numero_estudiantes = {self.numero_estudiantes!r}"

    class AnaliticaPuntajeGenero(Base):
        __tablename__ = "analitica_puntaje_genero"

        id = sa.Column(sa.Integer, primary_key=True)
        estu_genero = sa.Column(sa.String)
        cole_naturaleza = sa.Column(sa.String)
        cole_jornada = sa.Column(sa.String)
        prom_punt_global = sa.Column(sa.Float)
        numero_estudiantes = sa.Column(sa.Integer)

        def _repr_(self):
            return f"AnaliticaPuntajeGenero(id={self.id!r}, estu_genero={self.estu_genero!r}, cole_naturaleza={self.cole_naturaleza!r}, \
                    cole_jornada = {self.cole_jornada!r}, prom_punt_global = {self.prom_punt_global!r}, numero_estudiantes = {self.numero_estudiantes!r}"

    class AnaliticaPuntajeEstrato(Base):
        __tablename__ = "analitica_puntaje_estrato"

        id = sa.Column(sa.Integer, primary_key=True)
        fami_estratovivienda = sa.Column(sa.String)
        cole_naturaleza = sa.Column(sa.String)
        cole_jornada = sa.Column(sa.String)
        prom_punt_global = sa.Column(sa.Float)
        numero_estudiantes = sa.Column(sa.Integer)

        def _repr_(self):
            return f"AnaliticaPuntajeEstrato(id={self.id!r}, fami_estratovivienda={self.fami_estratovivienda!r}, cole_naturaleza={self.cole_naturaleza!r}, \
                    cole_jornada = {self.cole_jornada!r}, prom_punt_global = {self.prom_punt_global!r}, numero_estudiantes = {self.numero_estudiantes!r}"

    class AnaliticaPuntajeInternet(Base):
        __tablename__ = "analitica_puntaje_internet"

        id = sa.Column(sa.Integer, primary_key=True)
        fami_tieneinternet = sa.Column(sa.String)
        cole_naturaleza = sa.Column(sa.String)
        cole_jornada = sa.Column(sa.String)
        prom_punt_global = sa.Column(sa.Float)
        numero_estudiantes = sa.Column(sa.Integer)

        def _repr_(self):
            return f"AnaliticaPuntajeInternet(id={self.id!r}, fami_tieneinternet={self.fami_tieneinternet!r}, cole_naturaleza={self.cole_naturaleza!r}, \
                    cole_jornada = {self.cole_jornada!r}, prom_punt_global = {self.prom_punt_global!r}, numero_estudiantes = {self.numero_estudiantes!r}"

    class AnaliticaEducacionFamilia(Base):
        __tablename__ = "analitica_educacion_familia"

        id = sa.Column(sa.Integer, primary_key=True)
        nivel_educacion_maximo = sa.Column(sa.String)
        cole_naturaleza = sa.Column(sa.String)
        cole_jornada = sa.Column(sa.String)
        prom_punt_global = sa.Column(sa.Float)
        numero_estudiantes = sa.Column(sa.Integer)

        def _repr_(self):
            return f"AnaliticaEducacionFamilia(id={self.id!r}, nivel_educacion_maximo={self.nivel_educacion_maximo!r}, cole_naturaleza={self.cole_naturaleza!r}, \
                    cole_jornada = {self.cole_jornada!r}, prom_punt_global = {self.prom_punt_global!r}, numero_estudiantes = {self.numero_estudiantes!r}"

    conn = conectar_bd(env)

    Base.metadata.create_all(conn)

    try:
        with Session(conn) as session:
            session.bulk_insert_mappings(
                AnaliticaColegio, datos_colegio.to_dict(orient="records")
            )
            session.bulk_insert_mappings(
                AnaliticaPuntajePeriodo, puntaje_periodo.to_dict(orient="records")
            )
            session.bulk_insert_mappings(
                AnaliticaPuntajeGenero, puntaje_genero.to_dict(orient="records")
            )
            session.bulk_insert_mappings(
                AnaliticaPuntajeEstrato, puntaje_estrato.to_dict(orient="records")
            )
            session.bulk_insert_mappings(
                AnaliticaPuntajeInternet, puntaje_internet.to_dict(orient="records")
            )
            session.bulk_insert_mappings(
                AnaliticaEducacionFamilia,
                puntaje_educacion_familia.to_dict(orient="records"),
            )
            session.commit()
    finally:
        conn.dispose()

Compilando todo en la función `flujo_analitica`, se orquesta el flujo de trabajo completo. Se obtienen los datos de los colegios y los estudiantes, se limpian los datos de los estudiantes, se realizan los cálculos de los promedios de puntaje global por diferentes variables y se cargan los resultados en la base de datos de AWS. Al finalizar el proceso, se imprime un mensaje indicando que el proceso ETL ha finalizado. Esta función se decoró con `@flow` para que sea reconocida por Prefect como el inicializador de todo el flujo.

In [None]:
@flow(name="proyecto_grupal", log_prints=True)
def flujo_analitica():
    datos_colegio = obtener_datos_colegio("local")
    datos_estudiante = obtener_datos_estudiante("local")

    datos_estudiante = limpiar_datos_estudiantes(datos_estudiante)

    puntaje_periodo = promedio_puntaje_periodo(datos_colegio, datos_estudiante)
    puntaje_genero = promedio_puntaje_genero(datos_colegio, datos_estudiante)
    puntaje_estrato = promedio_puntaje_estrato(datos_colegio, datos_estudiante)
    puntaje_internet = promedio_puntaje_internet(datos_colegio, datos_estudiante)
    puntaje_educacion_familia = promedio_puntaje_educacion_familia(
        datos_colegio, datos_estudiante
    )

    cargar_tablas_analitica(
        "aws",
        datos_colegio,
        puntaje_periodo,
        puntaje_genero,
        puntaje_estrato,
        puntaje_internet,
        puntaje_educacion_familia,
    )

    print("ETL process finished")

Por último, se ejecuta el flujo de trabajo con la función `flujo_analitica()`.

In [None]:
flujo_analitica()