In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import pandas as pd
from minio import Minio
import os
import tempfile

# Configuración de conexión a MySQL utilizando SQLAlchemy
user = 'user_arcope'
password = 'passgrupo3'
host = 'redarcope.ddns.net'
port = '3306'
database = 'arcope_uber'

# Conexión a MySQL utilizando SQLAlchemy
db_engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}:{port}/{database}?connect_timeout=60')

# Configuración de MinIO
MINIO_ENDPOINT = 'redarcope.ddns.net:9000'
MINIO_ACCESS_KEY = 'andres'
MINIO_SECRET_KEY = 'andresarcope01'
MINIO_BUCKET_NAME = 'transport-bucket'

# Configuración de los datasets
DATASETS_CONFIG = [
    {'file': 'trips.parquet', 'table': 'trips', 'unique_column': 'unique_trips'},
    {'file': 'vehicles.parquet', 'table': 'vehicles', 'unique_column': 'unique_vehicles'},
    {'file': 'Geographic_Data.parquet', 'table': 'geographic_data', 'unique_column': 'unique_geographic'},
    {'file': 'Meteorological_Data.parquet', 'table': 'meteorological_data', 'unique_column': 'unique_meteorological'},
    {'file': 'fuel_stations.parquet', 'table': 'fuel_stations', 'unique_column': 'unique_stations'},
    {'file': 'Fuel_Economy_Data.parquet', 'table': 'fuel_economy', 'unique_column': 'unique_economy'},
    {'file': 'air_quality_measurement.parquet', 'table': 'air_quality_measurement', 'unique_column': 'unique_air'},
    {'file': 'car_resale_prices.parquet', 'table': 'car_resale_prices', 'unique_column': 'unique_resale_price'}
]

# Función para extraer datos desde MinIO
def extract_from_minio(file, **kwargs):
    client = Minio(MINIO_ENDPOINT, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=False)
    response = client.get_object(MINIO_BUCKET_NAME, file)
    
    # Crear un archivo temporal en una ruta segura
    temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.parquet')
    with open(temp_file.name, 'wb') as file_data:
        for d in response.stream(32*1024):
            file_data.write(d)
    
    # Guardar el nombre del archivo temporal en XCom para uso posterior
    kwargs['ti'].xcom_push(key='temp_file_path', value=temp_file.name)
    print(f"Datos extraídos de {file}")

# Función para verificar si la columna existe en la tabla de MySQL
def check_column_exists(table_name, column_name):
    with db_engine.connect() as connection:
        result = connection.execute(f"DESCRIBE {table_name}")
        columns = [row['Field'] for row in result.fetchall()]
        return column_name in columns

# Función para filtrar datos nuevos basados en una columna única
def filter_new_data(table_name, unique_column, **kwargs):
    temp_file = kwargs['ti'].xcom_pull(key='temp_file_path', task_ids='extract_data')
    df = pd.read_parquet(temp_file)
    
    # Verificar si la columna existe antes de intentar filtrar
    if not check_column_exists(table_name, unique_column):
        raise ValueError(f"La columna '{unique_column}' no existe en la tabla '{table_name}'. Verifica el nombre de la columna.")
    
    # Leer los datos existentes desde la tabla usando SQLAlchemy y un cursor
    with db_engine.connect() as connection:
        result = connection.execute(f"SELECT {unique_column} FROM {table_name}")
        existing_data = pd.DataFrame(result.fetchall(), columns=[unique_column])
    
    # Filtrar los datos que no están presentes en la base de datos
    new_data = df[~df[unique_column].isin(existing_data[unique_column])]
    kwargs['ti'].xcom_push(key='new_data', value=new_data.to_json())  # Guardar los nuevos datos en XCom
    print(f"Datos nuevos filtrados para la tabla {table_name}")

# Función para simular la carga de datos a MySQL
def load_data(table_name, **kwargs):
    new_data_json = kwargs['ti'].xcom_pull(key='new_data', task_ids='filter_new_data')
    new_data = pd.read_json(new_data_json)
    
    if not new_data.empty:
        print(f"Nuevos datos a cargar en la tabla {table_name}:")
        print(new_data.head())  # Muestra las primeras filas para verificar
    else:
        print(f"No hay nuevos datos para cargar en la tabla {table_name}")

# Definición del DAG de Airflow
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='etl_process_pipeline',
    default_args=default_args,
    description='Pipeline ETL para extraer, transformar y cargar datos desde MinIO a MySQL',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 9, 26),
    catchup=False,
) as dag:
    
    for index, config in enumerate(DATASETS_CONFIG):
        # Tareas del DAG para cada archivo de configuración

        extract_data = PythonOperator(
            task_id=f'extract_data_{index}',
            python_callable=extract_from_minio,
            op_kwargs={'file': config['file']},
            provide_context=True,
        )

        filter_new_data = PythonOperator(
            task_id=f'filter_new_data_{index}',
            python_callable=filter_new_data,
            op_kwargs={'table_name': config['table'], 'unique_column': config['unique_column']},
            provide_context=True,
        )

        load_data = PythonOperator(
            task_id=f'load_data_{index}',
            python_callable=load_data,
            op_kwargs={'table_name': config['table']},
            provide_context=True,
        )

        # Definir el orden de ejecución de las tareas
        extract_data >> filter_new_data >> load_data


In [7]:

import os
import pandas as pd
from minio import Minio
from sqlalchemy import create_engine
import tempfile

# Configuración de MinIO y MySQL
MINIO_ENDPOINT = 'redarcope.ddns.net:9000'
MINIO_ACCESS_KEY = 'andres'
MINIO_SECRET_KEY = 'andresarcope01'
MINIO_BUCKET_NAME = 'transport-bucket'

# Configuración de conexión a MySQL
user = 'user_arcope'
password = 'passgrupo3'
host = 'redarcope.ddns.net'
port = '3306'
database = 'arcope_uber'

# Conexión a MySQL
db_engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}:{port}/{database}?connect_timeout=60')

# Configuración de los datasets
DATASETS_CONFIG = [
    {'file': 'trips.parquet', 'table': 'trips', 'unique_column': 'unique_trips'},
    {'file': 'vehicles.parquet', 'table': 'vehicles', 'unique_column': 'unique_vehicles'},
    {'file': 'Geographic_Data.parquet', 'table': 'geographic_data', 'unique_column': 'unique_geographic'},
    {'file': 'Meteorological_Data.parquet', 'table': 'meteorological_data', 'unique_column': 'unique_meteorological'},
    {'file': 'fuel_stations.parquet', 'table': 'fuel_stations', 'unique_column': 'unique_stations'},
    {'file': 'Fuel_Economy_Data.parquet', 'table': 'fuel_economy', 'unique_column': 'unique_economy'},
    {'file': 'air_quality_measurement.parquet', 'table': 'air_quality_measurement', 'unique_column': 'unique_air'},
    {'file': 'car_resale_prices.parquet', 'table': 'car_resale_prices', 'unique_column': 'unique_resale_price'}
]
# Función para extraer datos desde MinIO
def extract_from_minio(file):
    client = Minio(MINIO_ENDPOINT, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=False)
    response = client.get_object(MINIO_BUCKET_NAME, file)
    
    # Crear un archivo temporal en una ruta segura
    temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.parquet')
    with open(temp_file.name, 'wb') as file_data:
        for d in response.stream(32*1024):
            file_data.write(d)
    
    return temp_file.name

# Función para verificar columnas en la tabla
def check_column_exists(table_name, column_name):
    with db_engine.connect() as connection:
        # Obtener la lista de columnas de la tabla
        result = connection.execute(f"DESCRIBE {table_name}")
        columns = [row['Field'] for row in result.fetchall()]
        # Verificar si la columna existe en la lista
        return column_name in columns

# Función para filtrar datos nuevos basados en una columna única
def filter_new_data(df, table_name, unique_column):
    # Verificar si la columna existe en la tabla antes de filtrar
    if not check_column_exists(table_name, unique_column):
        raise ValueError(f"La columna '{unique_column}' no existe en la tabla '{table_name}'. Verifica el nombre de la columna.")

    # Leer los datos existentes desde la tabla usando SQLAlchemy y un cursor
    with db_engine.connect() as connection:
        result = connection.execute(f"SELECT {unique_column} FROM {table_name}")
        existing_data = pd.DataFrame(result.fetchall(), columns=[unique_column])
    
    # Filtrar los datos que no están presentes en la base de datos
    new_data = df[~df[unique_column].isin(existing_data[unique_column])]
    
    return new_data

# Función para simular la carga de datos a MySQL
def load_data(new_data, table_name):
    # Simular la carga de datos a la base de datos
    if not new_data.empty:
        print(f"Nuevos datos a cargar en la tabla {table_name}:")
        print(new_data.head())  # Muestra las primeras filas para verificar
    else:
        print(f"No hay nuevos datos para cargar en la tabla {table_name}")




In [2]:
def check_column_exist(table_name):
    with db_engine.connect() as connection:
        # Obtener la lista de columnas de la tabla
        result = connection.execute(f"DESCRIBE {table_name}")
        columns = [row['Field'] for row in result.fetchall()]
    return(columns)

index = 7
config = DATASETS_CONFIG[index]

#chequear columnas

check= check_column_exist(config['table'])

print(check)

  result = connection.execute(f"DESCRIBE {table_name}")


['full_name', 'registered_year', 'transmission_type', 'fuel_type', 'max_power', 'resale_price', 'unique_resale_price']


In [8]:
# Ejecución de las funciones para un dataset específico
# Cambia 'index' para probar con otro dataset de la lista DATASETS_CONFIG
index = 7
config = DATASETS_CONFIG[index]

# Extraer los datos
temp_file = extract_from_minio(config['file'])
print(f"Datos extraídos de {config['file']}")

# Leer los datos extraídos en un DataFrame
df = pd.read_parquet(temp_file)

# Filtrar los datos nuevos
new_data = filter_new_data(df, config['table'], config['unique_column'])

# Simular la carga a MySQL
load_data(new_data, config['table'])

# Eliminar el archivo temporal
os.remove(temp_file)
print(f"Archivo temporal {temp_file} eliminado.")

Datos extraídos de car_resale_prices.parquet
Nuevos datos a cargar en la tabla car_resale_prices:
                      full_name registered_year transmission_type fuel_type  \
0  2017 Maruti Baleno 1.2 Alpha            2017            Manual    Petrol   
1            2018 Tata Hexa XTA            2018         Automatic    Diesel   
2   2015 Maruti Swift Dzire VXI            2015            Manual    Petrol   
3   2015 Maruti Swift Dzire VXI            2015            Manual    Petrol   
4    2009 Hyundai i10 Magna 1.1            2009            Manual    Petrol   

   max_power resale_price                   unique_resale_price  
0    83.1bhp  ₹ 5.45 Lakh  b9bf9c91-6502-4926-9cf6-05c66100757c  
1  153.86bhp    ₹ 10 Lakh  233edf53-bada-4a39-abb4-50b1e2d7275d  
2   83.14bhp  ₹ 4.50 Lakh  fb4324e4-6bad-4792-b3d1-c1d7ff313c43  
3   83.14bhp  ₹ 4.50 Lakh  c1a05365-5b31-4c18-98ed-579d08e1f26f  
4   68.05bhp  ₹ 1.60 Lakh  c1bd98f9-e236-4d0a-87ce-3a88cfd7b96f  
Archivo temporal C:\Users\david