In [None]:
import pandas as pd
import io
import os
from google.cloud import storage
from google.oauth2 import service_account

# Crea cliente para interactuar con Cloud Storage
storage_client = storage.Client()

# Función de limpieza
def clean_data_yellow(df):
    # Limpieza de valores faltantes
    df.columns = [col.lower() for col in df.columns]

    median_values = {
        'passenger_count': df['passenger_count'].median(),
        'ratecodeid': df['ratecodeid'].median(),
        'congestion_surcharge': df['congestion_surcharge'].median(),
        'airport_fee': df['airport_fee'].median()
    }

    mode_value = df['store_and_fwd_flag'].mode()[0]

    df.fillna(median_values, inplace=True)
    df['store_and_fwd_flag'].fillna(mode_value, inplace=True)

    # Corrección de valores anómalos
    distance_limit = 100
    fare_lower_limit = 0
    fare_upper_limit = 500

    # Crear una máscara de filtro y aplicar el filtro
    filter_mask = (df['trip_distance'] <= distance_limit) & \
                  (df['fare_amount'] >= fare_lower_limit) & \
                  (df['fare_amount'] <= fare_upper_limit)

    # Crear una copia del DataFrame filtrado para evitar SettingWithCopyWarning
    df = df.loc[filter_mask].copy()

    # Normalización y enriquecimiento de datos
    df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

    # Conversión a entero con manejo de valores no numéricos o infinitos
    df['passenger_count'] = pd.to_numeric(df['passenger_count'], errors='coerce').fillna(0).astype(int)
    df['ratecodeid'] = pd.to_numeric(df['ratecodeid'], errors='coerce').fillna(0).astype(int)

    # Tratamiento de registros duplicados
    df.drop_duplicates(inplace=True)

    return df

def clean_data_green(df):
    # Limpieza de valores faltantes
    df.columns = [col.lower() for col in df.columns]

    # Normalización de datos
    df['lpep_pickup_datetime'] = pd.to_datetime(df['lpep_pickup_datetime'])
    df['lpep_dropoff_datetime'] = pd.to_datetime(df['lpep_dropoff_datetime'])

    # Conversión a entero con manejo de valores no numéricos o infinitos
    df['ehail_fee'] = pd.to_numeric(df['ehail_fee'], errors='coerce').fillna(0).astype(float)
    df['passenger_count'] = pd.to_numeric(df['passenger_count'], errors='coerce').fillna(0).astype(int)
    df['payment_type'] = pd.to_numeric(df['payment_type'], errors='coerce').fillna(0).astype(int)
    df['ratecodeid'] = pd.to_numeric(df['ratecodeid'], errors='coerce').fillna(0).astype(int)
    df['trip_type'] = pd.to_numeric(df['trip_type'], errors='coerce').fillna(0).astype(int)

    # Tratamiento de registros duplicados
    df.drop_duplicates(inplace=True)

    return df

def clean_data_alquiler(df):
    # Crear una copia del DataFrame para evitar modificar el original
    df = df.copy()

    # Limpieza de valores faltantes
    df.columns = [col.lower() for col in df.columns]

    # Normalización de datos
    df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])
    df['dropoff_datetime'] = pd.to_datetime(df['dropoff_datetime'])

    # Conversión a entero con manejo de valores no numéricos o infinitos
    df['pulocationid'] = pd.to_numeric(df['pulocationid'], errors='coerce').fillna(0).astype(int)
    df['dolocationid'] = pd.to_numeric(df['dolocationid'], errors='coerce').fillna(0).astype(int)
    df['sr_flag'] = pd.to_numeric(df['sr_flag'], errors='coerce').fillna(0).astype(int)

    # Tratamiento de registros duplicados
    df.drop_duplicates(inplace=True)

    return df

def clean_and_store_data(data, context):

    # El nombre del bucket y el nombre del archivo se obtienen del contexto del evento
    bucket_name = data['bucket']
    file_name = data['name']

    # Verifica si el archivo se encuentra en la carpeta correspondiente
    if (file_name.startswith('data/taxis-amarillos/') or 
        file_name.startswith('data/taxis-verdes/') or 
        file_name.startswith('data/taxis-alquiler/')):

        try:
            # Leer datos del bucket
            bucket = storage_client.bucket(bucket_name)
            blob = bucket.blob(file_name)
            data = blob.download_as_bytes()
            df = pd.read_parquet(io.BytesIO(data))
            print(f"Datos leídos del bucket {bucket_name}.")

            # Aplicar las reglas de limpieza
            if file_name.startswith('data/taxis-amarillos/'):
                df_cleaned = clean_data_yellow(df)
            elif file_name.startswith('data/taxis-verdes/'):
                df_cleaned = clean_data_green(df)
            elif file_name.startswith('data/taxis-alquiler/'):
                df_cleaned = clean_data_alquiler(df)
            
            print("Datos limpiados.")

            # Guardar datos limpios en el nuevo bucket
            cleaned_bucket_name = 'ecoride-data'
            if file_name.startswith('data/taxis-amarillos/'):
                cleaned_file_name = 'pre-data/taxis-amarillos/cleaned_' + os.path.basename(file_name)
            elif file_name.startswith('data/taxis-verdes/'):
                cleaned_file_name = 'pre-data/taxis-verdes/cleaned_' + os.path.basename(file_name)
            elif file_name.startswith('data/taxis-alquiler/'):
                cleaned_file_name = 'pre-data/taxis-alquiler/cleaned_' + os.path.basename(file_name)
            cleaned_blob = storage_client.bucket(cleaned_bucket_name).blob(cleaned_file_name)

            # Convertir DataFrame a Parquet y subirlo
            buffer = io.BytesIO()
            df_cleaned.to_parquet(buffer)
            cleaned_blob.upload_from_string(buffer.getvalue(), content_type='application/octet-stream')
            print(f"Archivo limpio guardado en: gs://{cleaned_bucket_name}/{cleaned_file_name}")
        except Exception as e:
            print("Se produjo un error:", e)
    else:
        print(f"Ignorando archivo fuera de las carpetas especificadas: {file_name}")