# **Entrega 2 - Transformation and Processing**

#### Importación de librerías

In [1]:
import pandas as pd
from deltalake import write_deltalake, DeltaTable
from datetime import datetime
import os
pd.options.display.max_rows = None # Visualizar todas las filas de los df
pd.options.display.max_columns = None # Visualizar todas las columnas de los df

### **1. Current Weather**

In [21]:
# Traemos los datos desde la capa Bronze
bronze_dir_current = 'datalake/bronze/current_weather_data'
current_data = DeltaTable(bronze_dir_current).to_pandas()

# Validar si Bronze contiene datos
if current_data.empty:
    print("No hay datos disponibles en Bronze para procesar.")
else:
    # Filtrar los datos por la fecha más reciente
    latest_date = current_data['date'].max()

    # Filtrar los datos de la hora más reciente dentro de la fecha más reciente
    latest_hour = current_data[current_data['date'] == latest_date]['hour_time'].max()

    # Crear una máscara booleana para los registros que no cumplen con las condiciones
    mask_to_drop = (current_data['date'] != latest_date) | (current_data['hour_time'] != latest_hour)
    
    # Eliminar los registros que no cumplen con la máscara booleana usando drop con axis=0
    current_data = current_data.drop(current_data[mask_to_drop].index, axis=0)

    # Setear en un diccionario las columnas y sus futuros tipos de datos compatibles con silver
    conversion_mapping = {
        'id': 'int32',
        'visibility': 'int16',
        'name': 'string',
        'coord.lon': 'float32',
        'coord.lat': 'float32',
        'main.temp': 'int8',
        'main.feels_like': 'int8',
        'main.temp_min': 'int8',
        'main.temp_max': 'int8',
        'main.pressure': 'int16',
        'main.humidity': 'int16',
        'main.sea_level': 'int16',
        'main.grnd_level': 'int16',
        'wind.speed': 'float32',
        'wind.deg': 'int16',
        'date': 'datetime64[ns]',
        'hour_time': 'int8'
    }

    # Aplicar conversiones
    current_data = current_data.astype(conversion_mapping)

    # Directorio de la capa silver
    silver_dir_current = 'datalake/silver/current_weather_data'
    
    # Validar si el directorio silver existe
    if not os.path.exists(silver_dir_current):
        # Si no existe, inicializar silver con los datos actuales
        write_deltalake(
            table_or_uri=silver_dir_current,
            data=current_data,
            mode='append',
            partition_by=['date', 'hour_time'],
        )
        print("Delta Lake (silver) inicializado con los datos actuales.")
    else:
        try:
            # Obtener los datos existentes que hay actualmente en Silver
            silver_data = DeltaTable(silver_dir_current).to_pandas()

            # Validar si ya existen datos para la misma combinación de 'date' y 'hour_time' sin afectar el esquema
            date_to_check = current_data['date'].iloc[0]
            hour_to_check = current_data['hour_time'].iloc[0]

            # Filtrar para verificar si ya existe el dato en Silver
            existing_data = silver_data[(silver_data['date'] == date_to_check) & (silver_data['hour_time'] == hour_to_check)]

            if not existing_data.empty:
                print(f"Ya existen datos en silver para la fecha {date_to_check} y la hora {hour_to_check}. La inserción ha sido cancelada.")
            else:
                # Si no hay duplicados, escribir los datos transformados en el silver Delta Lake
                write_deltalake(
                    table_or_uri=silver_dir_current,
                    data=current_data,
                    mode='append', # Usa 'append' para realizar incrementos en cada extracción
                    partition_by=['date', 'hour_time'] # Columnas de partición
                )
                print(f"Datos de la última hora ({hour_to_check}) agregados correctamente a la capa silver.")
        except Exception as e:
            print(f"Error al cargar los datos en la capa silver: {e}")

# Ver cantidad total de registros actuales de la silver deltatable
if os.path.exists(silver_dir_current):
    current_dt = DeltaTable(silver_dir_current)
    print(f"Cantidad total de registros en la silver deltatable actualizada: {current_dt.to_pandas().shape[0]}")

# Descomentar las siguientes lineas para imprimir el último dataframe transformado
# current_data

Datos de la última hora (20) agregados correctamente a la capa silver.
Cantidad total de registros en la silver deltatable actualizada: 27


------

### **2. History Weather**

In [20]:
# Traemos los datos desde 'datalake/history_weather_data/bronze'
history_data = DeltaTable('datalake/bronze/history_weather_data').to_pandas()

# Diccionario de las columnas innecesarias
columns_to_drop = ['rain.3h', 'sys.pod', 'wind.gust', 'clouds.all', 
                   'main.temp_kf', 'dt', 'pop', 'weather']

# Eliminar las columnas innecesarias del DataFrame
history_data = history_data.drop(columns=columns_to_drop)

# Mapeo: asignar tipos de datos correspondiente y reducir el consumo de memoria
conversion_mapping = {
    "dt_txt": "datetime64[ns]",
    "visibility": "int16",
    "main.temp": "int8", 
    "main.feels_like": "int8", 
    "main.temp_min": "int8",
    "main.temp_max": "int8",
    "main.pressure": "int16",
    "main.sea_level": "int16",
    "main.grnd_level": "int16",
    "main.humidity": "int8",
    "wind.speed": "int8",
    "wind.deg": "int16"
}

# Realizar el casteo utilizando el mapeo previo
history_data = history_data.astype(conversion_mapping)

# Crear nuevas columnas 'date' y 'time' a partir de 'dt_txt'
history_data['date'] = history_data['dt_txt'].dt.date
history_data['hour_time'] = history_data['dt_txt'].dt.hour

# Eliminar la columna 'dt_txt'
history_data.drop(columns=['dt_txt'], inplace=True)

# Reordenar las columnas
history_data = history_data[['date', 'hour_time', 'visibility', 'main.temp', 'main.feels_like', 'main.temp_min', 
                             'main.temp_max', 'main.pressure', 'main.sea_level', 'main.grnd_level', 'main.humidity', 
                             'wind.speed', 'wind.deg']]

# URL directorio
silver_dir_history = 'datalake/silver/history_weather_data'

# Generar deltalake
write_deltalake(
    table_or_uri=silver_dir_history,
    data=history_data,
    mode='overwrite',
)

# Ver cantidad de registros en la tabla actualizada
history_dt = DeltaTable(silver_dir_history)
print(f'Cantidad total de registros en la silver deltatable actualizada: {history_dt.to_pandas().shape[0]}') # La cantidad se mantiene constante en 40 registros porque la data se sobreescribe

# Descomentar la siguiente línea para ver el último DataFrame transformado
# history_data

Cantidad total de registros en la silver deltatable actualizada: 40
