# 🔄 Automatización y Gestión de Datos con Airflow: 📊 Un Análisis Exploratorio del Flujo de Trabajo de Archivos CSV y Kafka 🚀

### Importaciones de módulos en Python 📚

- `os`: Módulo que proporciona funciones para interactuar con el sistema operativo. Utilizado aquí para manejar rutas de archivos y directorios.
- `shutil`: Módulo que ofrece operaciones de alto nivel en archivos y colecciones de archivos. Ayuda en operaciones como copiar y mover archivos.
- `pandas`: Biblioteca de análisis de datos que proporciona estructuras de datos y herramientas de manipulación de datos de alto rendimiento y fáciles de usar.
- `datetime`: Módulo que permite manipular fechas y horas. Es crucial para definir fechas de inicio en las tareas programadas.
- `airflow`: Framework para programar y coordinar la ejecución de tareas. Se importa el módulo `DAG` para definir el objeto DAG y `PythonOperator` para ejecutar código Python como tareas en el DAG.
- `kafka`: Módulo para interactuar con Apache Kafka, un sistema de mensajería distribuido. Se importa `KafkaProducer` para enviar mensajes a un tópico de Kafka.
- `json`: Módulo que permite la codificación y decodificación de datos en formato JSON, útil para el formateo de mensajes enviados a Kafka.


In [None]:
import os
import shutil
import pandas as pd
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from kafka import KafkaProducer
import json

La función `clear_directories` se utiliza para mantener los directorios limpios eliminando todos los archivos existentes dentro de una lista específica de directorios. Esta operación es crucial para preparar el entorno de trabajo antes de ejecutar procesos que dependen de la ausencia de archivos residuales, asegurando así un entorno limpio y organizado. 🗑️✨


In [None]:
# Función para limpiar directorios
def clear_directories(directories):
    for directory in directories:
        dir_path = os.path.join(BASE_DIR, directory)
        for filename in os.listdir(dir_path):
            file_path = os.path.join(dir_path, filename)
            if os.path.isfile(file_path):
                os.remove(file_path)
                print(f"Removed: {file_path}")


La función `copy_csv_files` facilita la transferencia de archivos CSV que contienen una palabra clave específica desde un directorio de origen a un destino. Primero, asegura que el directorio de destino exista, creándolo si es necesario. Luego, recorre todos los archivos en el directorio de origen, copiando aquellos que coinciden con la palabra clave y terminan en `.csv` al directorio de destino. Este proceso es fundamental para organizar y preparar datos para etapas posteriores de procesamiento. 🔄📁


In [None]:
# Función para copiar archivos CSV basados en una palabra clave
def copy_csv_files(source_dir, target_dir, keyword):
    source_path = os.path.join(BASE_DIR, source_dir)
    target_path = os.path.join(BASE_DIR, target_dir)
    os.makedirs(target_path, exist_ok=True)
    for filename in os.listdir(source_path):
        if keyword.lower() in filename.lower() and filename.endswith('.csv'):
            shutil.copy(os.path.join(source_path, filename), os.path.join(target_path, filename))
            print(f"Copied: {filename}")

La función `unify_csv_files` se encarga de consolidar múltiples archivos CSV de un directorio fuente en un único archivo CSV de destino. Este proceso comienza identificando y listando todos los archivos CSV en el directorio fuente. Posteriormente, lee cada uno de estos archivos y los combina en un solo DataFrame de Pandas, el cual es finalmente guardado en el archivo de destino especificado. Este método es esencial para simplificar la gestión de datos al reducir múltiples archivos a uno solo, facilitando análisis y procesamientos posteriores. 📈🗂️


In [None]:
# Función para unificar archivos CSV
def unify_csv_files(source_dir, target_file):
    source_path = os.path.join(BASE_DIR, source_dir)
    target_path = os.path.join(BASE_DIR, target_file)
    all_files = [os.path.join(source_path, f) for f in os.listdir(source_path) if f.endswith('.csv')]
    combined_df = pd.concat([pd.read_csv(f) for f in all_files])
    combined_df.to_csv(target_path, index=False)
    print(f"Unified file saved to: {target_path}")

La función `remove_duplicates` está diseñada para depurar un archivo CSV específico eliminando todas las filas duplicadas. Primero carga el archivo CSV desde una ruta especificada en un DataFrame de Pandas. Utiliza el método `drop_duplicates` de Pandas para eliminar cualquier duplicado presente, y luego guarda el DataFrame limpio de nuevo en el archivo original. Esta operación asegura la integridad y la unicidad de los datos, lo cual es crucial para análisis de datos precisos y fiables. 🚫📄


In [None]:
# Función para eliminar duplicados en un archivo CSV
def remove_duplicates(source_file):
    file_path = os.path.join(BASE_DIR, source_file)
    df = pd.read_csv(file_path)
    df.drop_duplicates(inplace=True)
    df.to_csv(file_path, index=False)
    print(f"Removed duplicates from: {file_path}")

La función `remove_empty_rows` se encarga de limpiar un archivo CSV eliminando todas las filas que contengan valores vacíos o nulos. Carga el archivo en un DataFrame de Pandas desde una ruta específica, aplica el método `dropna` para eliminar todas las filas que no tengan datos completos, y guarda el DataFrame resultante de vuelta en el archivo, sin los índices. Este proceso es vital para asegurar la calidad de los datos, eliminando entradas incompletas que podrían afectar negativamente el análisis posterior. 🗑️🚀


In [None]:
# Función para eliminar filas vacías en un archivo CSV
def remove_empty_rows(source_file):
    file_path = os.path.join(BASE_DIR, source_file)
    df = pd.read_csv(file_path)
    df.dropna(inplace=True)
    df.to_csv(file_path, index=False)
    print(f"Removed empty rows from: {file_path}")


La función `sumar_duracion` está diseñada para calcular la suma total de valores en la columna "Duración" de un archivo CSV especificado. Inicia convirtiendo los valores de la columna a números, tratando los posibles formatos de números con comas como decimales. Luego, suma todos los valores válidos, excluyendo cualquier dato nulo o incorrecto, y devuelve la suma total. Si ocurre un error durante el proceso, captura la excepción y devuelve `None`, proporcionando también un mensaje de error detallado. Esta función es esencial para obtener métricas agregadas rápidamente, lo cual es útil en análisis y reportes de duraciones o tiempos. 📊🔄


In [None]:
# Función para calcular la sumatoria de la columna "Duración" en cada archivo CSV
def sumar_duracion(source_file):
    file_path = os.path.join(BASE_DIR, source_file)
    try:
        df = pd.read_csv(file_path)
        df['Duración'] = pd.to_numeric(df['Duración'].str.replace(',', '.'), errors='coerce')
        suma_duracion = df['Duración'].dropna().sum()
        print(f"Total Duration for {source_file}: {suma_duracion}")
        return suma_duracion
    except Exception as e:
        print(f"Error processing {source_file}: {e}")
        return None

La función `sumar_duracion` está diseñada para calcular la suma total de valores en la columna "Duración" de un archivo CSV especificado. Inicia convirtiendo los valores de la columna a números, tratando los posibles formatos de números con comas como decimales. Luego, suma todos los valores válidos, excluyendo cualquier dato nulo o incorrecto, y devuelve la suma total. Si ocurre un error durante el proceso, captura la excepción y devuelve `None`, proporcionando también un mensaje de error detallado. Esta función es esencial para obtener métricas agregadas rápidamente, lo cual es útil en análisis y reportes de duraciones o tiempos. 📊🔄


In [None]:
# Función para enviar datos a Kafka
def send_to_kafka(source_file, topic):
    file_path = os.path.join(BASE_DIR, source_file)
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x: json.dumps(x).encode('utf-8'))
    with open(file_path, 'r') as file:
        data = pd.read_csv(file)
        for index, row in data.iterrows():
            message = row.to_dict()
            producer.send(topic, value=message)
            producer.flush()
        print(f"Data from {source_file} sent to Kafka topic '{topic}'")

La función `sumar_duracion_planeada` calcula la suma total de la columna "Duración" en un archivo CSV, utilizando una lógica condicional para determinar cuál columna adicional ('Nombre' o 'codigo_personal_text') debe ser considerada para filtrar los datos. Primero intenta identificar y utilizar la columna 'Nombre', y si no es aplicable, usa 'codigo_personal_text'. Después de seleccionar la columna adecuada, convierte los valores de "Duración" a números, filtrando por las entradas válidas en la columna seleccionada, y luego suma estos valores. Esta función es útil para analizar duraciones planificadas, proporcionando una suma específica basada en criterios de filtrado adicionales, y maneja errores para asegurar la robustez del proceso. Si no se encuentra una columna relevante o si ocurre un error, la función devuelve `None` y reporta el problema. 🧐📈


In [None]:
def sumar_duracion_planeada(source_file):
    file_path = os.path.join(BASE_DIR, source_file)
    try:
        df = pd.read_csv(file_path)
        if 'Nombre' in df.columns and df['Nombre'].notna().any():
            columna_usada = 'Nombre'
        elif 'codigo_personal_text' in df.columns and df['codigo_personal_text'].notna().any():
            columna_usada = 'codigo_personal_text'
        else:
            print("Columna relevante no encontrada")
            return None
        df['Duración'] = pd.to_numeric(df['Duración'].str.replace(',', '.'), errors='coerce')
        df_filtrado = df[df[columna_usada].notna() & df[columna_usada].str.strip().astype(bool)]
        suma_duracion_planeada = df_filtrado['Duración'].dropna().sum()
        print(f"Suma de duración planeada para {source_file} usando columna {columna_usada}: {suma_duracion_planeada}")
        return suma_duracion_planeada
    except Exception as e:
        print(f"Error processing {source_file}: {e}")
        return None

El diccionario `default_args` define los argumentos predeterminados para las tareas en un DAG de Airflow. Estos incluyen:

- `owner`: Define el propietario del DAG, en este caso, 'airflow'.
- `depends_on_past`: Indica si la ejecución de la tarea depende de la finalización exitosa de la misma tarea en la ejecución anterior. Aquí está configurado como `False`, lo que permite la independencia entre ejecuciones.
- `start_date`: Especifica la fecha de inicio de la primera ejecución del DAG; aquí se configura para el 24 de mayo de 2024.
- `email_on_failure`: Si se debe enviar un correo electrónico en caso de fallo de la tarea. Está configurado como `False`, por lo que no se enviarán correos automáticamente.
- `email_on_retry`: Similar a `email_on_failure`, determina si se envía un correo al reintentar la tarea, también configurado como `False` aquí.
- `retries`: Define el número máximo de reintentos en caso de fallo de una tarea. Está establecido en `1`, permitiendo un reintento.

Estos argumentos son cruciales para el manejo de errores y la automatización de notificaciones dentro de los flujos de trabajo de Airflow, proporcionando un control robusto y configurable sobre la ejecución de las tareas. 🚀🔧


In [None]:
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 5, 24),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
}

### Configuración del DAG `indicadores_dag` en Airflow

Este DAG, llamado `indicadores_dag`, está diseñado para realizar una serie de tareas relacionadas con el manejo y procesamiento de archivos CSV diariamente:

- **Descripción del DAG**: El DAG se encarga de limpiar directorios, copiar archivos, unificar datos, eliminar duplicados y filas vacías, calcular duraciones totales, y enviar datos a Kafka.
- **Intervalo de Programación**: Se ejecuta diariamente (`@daily`).
- **No Recuperación**: Configurado con `catchup=False` para evitar la ejecución de fechas pasadas que no se hayan ejecutado.

### Tareas Definidas en el DAG

1. **Limpiar Directorios**: Elimina todos los archivos dentro de los directorios especificados para asegurar un entorno limpio antes de procesar nuevos datos.
2. **Copiar Archivos de Actividades e Inspecciones**: Copia archivos CSV que contienen ciertas palabras clave desde un directorio de origen a un directorio de destino.
3. **Unificar Archivos de Actividades e Inspecciones**: Combina varios archivos CSV en un único archivo, facilitando el procesamiento posterior.
4. **Eliminar Duplicados en Archivos Unificados**: Asegura que los archivos combinados no contengan registros duplicados, manteniendo la calidad de los datos.
5. **Eliminar Filas Vacías en Archivos Unificados**: Remueve filas que no contengan información, purificando aún más los datos.
6. **Calcular Duración Total y Planeada**: Suma los valores en la columna "Duración" de los archivos unificados, proporcionando métricas claves para análisis.
7. **Enviar Datos a Kafka**: Los datos limpios y procesados se envían a tópicos específicos en Kafka para su uso en sistemas downstream o análisis en tiempo real.


In [None]:
with DAG(
    'indicadores_dag',
    default_args=default_args,
    description='DAG that clears directories, copies, unifies, removes duplicates and empty rows, calculates total duration, and sends data to Kafka',
    schedule_interval='@daily',
    catchup=False,
) as dag:

    clear_directories_task = PythonOperator(
        task_id='clear_directories',
        python_callable=clear_directories,
        op_kwargs={'directories': ['CSV_ACTIVIDAD', 'CSV_INSPECCION', 'CSV_unifi']},
    )

    copy_activities_task = PythonOperator(
        task_id='copy_activities_csv_files',
        python_callable=copy_csv_files,
        op_kwargs={'source_dir': 'CSV_DATA', 'target_dir': 'CSV_ACTIVIDAD', 'keyword': 'actividades'},
    )

    copy_inspecciones_task = PythonOperator(
        task_id='copy_inspecciones_csv_files',
        python_callable=copy_csv_files,
        op_kwargs={'source_dir': 'CSV_DATA', 'target_dir': 'CSV_INSPECCION', 'keyword': 'inspecciones'},
    )

    unify_activities_task = PythonOperator(
        task_id='unify_activities_csv_files',
        python_callable=unify_csv_files,
        op_kwargs={'source_dir': 'CSV_ACTIVIDAD', 'target_file': 'CSV_unifi/unified_activities.csv'},
    )

    unify_inspecciones_task = PythonOperator(
        task_id='unify_inspecciones_csv_files',
        python_callable=unify_csv_files,
        op_kwargs={'source_dir': 'CSV_INSPECCION', 'target_file': 'CSV_unifi/unified_inspecciones.csv'},
    )

    remove_duplicates_activities_task = PythonOperator(
        task_id='remove_duplicates_activities_csv_files',
        python_callable=remove_duplicates,
        op_kwargs={'source_file': 'CSV_unifi/unified_activities.csv'},
    )

    remove_duplicates_inspecciones_task = PythonOperator(
        task_id='remove_duplicates_inspecciones_csv_files',
        python_callable=remove_duplicates,
        op_kwargs={'source_file': 'CSV_unifi/unified_inspecciones.csv'},
    )

    remove_empty_rows_activities_task = PythonOperator(
        task_id='remove_empty_rows_activities_csv_files',
        python_callable=remove_empty_rows,
        op_kwargs={'source_file': 'CSV_unifi/unified_activities.csv'},
    )

    remove_empty_rows_inspecciones_task = PythonOperator(
        task_id='remove_empty_rows_inspecciones_csv_files',
        python_callable=remove_empty_rows,
        op_kwargs={'source_file': 'CSV_unifi/unified_inspecciones.csv'},
    )

    calculate_duration_activities_task = PythonOperator(
        task_id='calculate_duration_activities',
        python_callable=sumar_duracion,
        op_kwargs={'source_file': 'CSV_unifi/unified_activities.csv'},
    )

    calculate_duration_inspecciones_task = PythonOperator(
        task_id='calculate_duration_inspecciones',
        python_callable=sumar_duracion,
        op_kwargs={'source_file': 'CSV_unifi/unified_inspecciones.csv'},
    )

    calculate_duration_planeada_activities_task = PythonOperator(
    task_id='calculate_duration_planeada_activities',
    python_callable=sumar_duracion_planeada,
    op_kwargs={'source_file': 'CSV_unifi/unified_activities.csv'},
    )

    calculate_duration_planeada_inspecciones_task = PythonOperator(
    task_id='calculate_duration_planeada_inspecciones',
    python_callable=sumar_duracion_planeada,
    op_kwargs={'source_file': 'CSV_unifi/unified_inspecciones.csv'},
    )

    send_to_kafka_activities_task = PythonOperator(
        task_id='send_to_kafka_activities',
        python_callable=send_to_kafka,
        op_kwargs={'source_file': 'CSV_unifi/unified_activities.csv', 'topic': 'activities_topic'},
    )

    send_to_kafka_inspecciones_task = PythonOperator(
        task_id='send_to_kafka_inspecciones',
        python_callable=send_to_kafka,
        op_kwargs={'source_file': 'CSV_unifi/unified_inspecciones.csv', 'topic': 'inspecciones_topic'},
    )

### Flujo de Tareas en el DAG `indicadores_dag`

El flujo de tareas en el DAG `indicadores_dag` está diseñado para asegurar que las operaciones se ejecuten en un orden lógico y eficiente. A continuación se describe cómo se encadenan las tareas:

1. **Inicialización de la Limpieza de Directorios**:
   - La tarea `clear_directories_task` se ejecuta primero, asegurando que los directorios especificados estén limpios antes de comenzar cualquier otra operación.
   
2. **Copia de Archivos CSV**:
   - Después de la limpieza, dos tareas se ejecutan en paralelo:
     - `copy_activities_task`: Copia archivos relacionados con actividades.
     - `copy_inspecciones_task`: Copia archivos relacionados con inspecciones.

3. **Unificación de Archivos CSV**:
   - `copy_activities_task` está seguido por:
     - `unify_activities_task`: Unifica los archivos de actividades.
   - `copy_inspecciones_task` está seguido por:
     - `unify_inspecciones_task`: Unifica los archivos de inspecciones.

4. **Eliminación de Duplicados y Filas Vacías**:
   - Después de unificar los archivos, las tareas de eliminación de duplicados y filas vacías se ejecutan secuencialmente:
     - `unify_activities_task` → `remove_duplicates_activities_task` → `remove_empty_rows_activities_task`
     - `unify_inspecciones_task` → `remove_duplicates_inspecciones_task` → `remove_empty_rows_inspecciones_task`

5. **Cálculo de Duración Total y Planeada**:
   - Una vez limpios, los archivos son procesados para calcular la duración:
     - `remove_empty_rows_activities_task` → `calculate_duration_activities_task` → `calculate_duration_planeada_activities_task`
     - `remove_empty_rows_inspecciones_task` → `calculate_duration_inspecciones_task` → `calculate_duration_planeada_inspecciones_task`

6. **Envío de Datos a Kafka**:
   - Finalmente, los datos procesados se envían a Kafka:
     - `calculate_duration_planeada_activities_task` → `send_to_kafka_activities_task`
     - `calculate_duration_planeada_inspecciones_task` → `send_to_kafka_inspecciones_task`

In [None]:
    clear_directories_task >> [copy_activities_task, copy_inspecciones_task]
    copy_activities_task >> unify_activities_task >> remove_duplicates_activities_task >> remove_empty_rows_activities_task >> calculate_duration_activities_task >> calculate_duration_planeada_activities_task >> send_to_kafka_activities_task
    copy_inspecciones_task >> unify_inspecciones_task >> remove_duplicates_inspecciones_task >> remove_empty_rows_inspecciones_task >> calculate_duration_inspecciones_task >> calculate_duration_planeada_inspecciones_task >> send_to_kafka_inspecciones_task
