In [1]:
import pandas as pd
from pathlib import Path

def load_generation_data(gen_path: Path) -> pd.DataFrame:
    """
    Load generation data from a Parquet file.

    Args:
        gen_path (Path): Path to the generation Parquet file.

    Returns:
        pd.DataFrame: Generation data with 'timestamp' converted to UTC datetime and set as the index.
    """
    df_gen = pd.read_parquet(gen_path)
    df_gen["timestamp"] = pd.to_datetime(df_gen["timestamp"], utc=True)
    df_gen = df_gen.set_index("timestamp")
    return df_gen

def load_lookup_data(lookup_path: Path) -> pd.DataFrame:
    """
    Load lookup data from a CSV file containing plant info.

    Expects the CSV to have:
      - 'CENTRAL' (plant name)
      - 'FirstAppearance' (the date for first appearance)

    The function creates standardized columns:
      - 'central': lowercase version of 'CENTRAL'
      - 'firstappearance': UTC datetime conversion of 'FirstAppearance'

    Args:
        lookup_path (Path): Path to the lookup CSV file.

    Returns:
        pd.DataFrame: Lookup data with standardized columns.
    """
    df_lookup = pd.read_csv(lookup_path)
    expected_cols = ["CENTRAL", "FirstAppearance"]
    for col in expected_cols:
        if col not in df_lookup.columns:
            raise KeyError(f"Expected column '{col}' not found in lookup file. Found columns: {df_lookup.columns.tolist()}")
    df_lookup["firstappearance"] = pd.to_datetime(df_lookup["FirstAppearance"], utc=True)
    df_lookup["central"] = df_lookup["CENTRAL"].str.lower()
    return df_lookup

def load_and_process_meteo_file(meteo_file: Path) -> pd.DataFrame:
    """
    Load meteorological data from a Parquet file.

    Checks for a date column by testing for either 'date' or 'timestamp'. The column is then
    converted to UTC datetime, renamed to 'date', and set as the index.

    Args:
        meteo_file (Path): Path to a meteorological Parquet file.

    Returns:
        pd.DataFrame: Meteorological data with the date column set as the index.
    """
    df_meteo = pd.read_parquet(meteo_file)
    if "date" in df_meteo.columns:
        date_col = "date"
    elif "timestamp" in df_meteo.columns:
        date_col = "timestamp"
    else:
        raise KeyError(f"Neither 'date' nor 'timestamp' column found in file: {meteo_file}")
    df_meteo[date_col] = pd.to_datetime(df_meteo[date_col], utc=True)
    df_meteo = df_meteo.rename(columns={date_col: "date"})
    df_meteo = df_meteo.set_index("date")
    return df_meteo

def merge_generation_and_meteo(df_gen: pd.DataFrame, df_meteo: pd.DataFrame, plant_name: str) -> pd.DataFrame:
    """
    Merge generation and meteorological data based on their date.

    Both DataFrames are reset so that the join is performed on a column named 'date'.
    The generation column for the plant is renamed to 'generation'.

    Args:
        df_gen (pd.DataFrame): Generation data with 'timestamp' as index.
        df_meteo (pd.DataFrame): Meteorological data with 'date' as index.
        plant_name (str): The formatted plant name (in lowercase) to use for generation data.

    Returns:
        pd.DataFrame: Merged DataFrame with a 'date' column.
    """
    if plant_name not in df_gen.columns:
        raise KeyError(f"Generation column '{plant_name}' not found in generation data.")
    df_gen_temp = df_gen[[plant_name]].reset_index().rename(columns={"timestamp": "date", plant_name: "generation"})
    df_meteo_temp = df_meteo.reset_index()
    df_model = pd.merge(df_gen_temp, df_meteo_temp, on="date", how="inner")
    return df_model

def update_first_appearance_date(df_model: pd.DataFrame, plant_name: str, df_lookup: pd.DataFrame) -> pd.DataFrame:
    """
    Update the first appearance date in the merged DataFrame using lookup data.

    For the current plant, find the lookup date (from 'firstappearance') and replace the earliest date in the DataFrame 
    with that lookup date.

    Args:
        df_model (pd.DataFrame): Merged DataFrame with a 'date' column.
        plant_name (str): The formatted plant name (in lowercase).
        df_lookup (pd.DataFrame): Lookup DataFrame with columns 'central' and 'firstappearance'.

    Returns:
        pd.DataFrame: Updated DataFrame with the earliest date replaced by the lookup date (if available).
    """
    if plant_name in df_lookup["central"].values:
        lookup_date = df_lookup.loc[df_lookup["central"] == plant_name, "firstappearance"].iloc[0]
        earliest_date = df_model["date"].min()
        df_model.loc[df_model["date"] == earliest_date, "date"] = lookup_date
    else:
        print(f"Warning: Plant '{plant_name}' not found in lookup file.")
    return df_model

def process_all_meteo_files(gen_path: Path, meteo_dir: Path, lookup_path: Path) -> dict:
    """
    Process all meteorological files by merging generation data and updating the first appearance date.

    For each meteo file:
      - Load the meteo data.
      - Derive the plant name from the file name.
      - Merge with the generation data.
      - Update the earliest date with the lookup date.
      - Store the resulting DataFrame in a dictionary keyed by the plant name.

    Args:
        gen_path (Path): Path to the generation Parquet file.
        meteo_dir (Path): Directory containing meteorological Parquet files.
        lookup_path (Path): Path to the lookup CSV file.

    Returns:
        dict: A dictionary where each key is a plant name and each value is the corresponding merged DataFrame.
    """
    df_gen = load_generation_data(gen_path)
    df_lookup = load_lookup_data(lookup_path)
    merged_dfs = {}
    
    for meteo_file in meteo_dir.glob("*.parquet"):
        plant_name = meteo_file.stem  # e.g. "parque_eolico_agua_clara"
        plant_name_formatted = plant_name.replace("_", " ").lower()  # e.g. "parque eolico agua clara"
        try:
            df_meteo = load_and_process_meteo_file(meteo_file)
            df_model = merge_generation_and_meteo(df_gen, df_meteo, plant_name_formatted)
            df_model = update_first_appearance_date(df_model, plant_name_formatted, df_lookup)
            merged_dfs[plant_name_formatted] = df_model
            print(f"Processed '{plant_name_formatted}' - shape: {df_model.shape}")
            # display(df_model.head())
        except KeyError as e:
            print(f"Skipping file '{meteo_file.name}': {e}")
    
    return merged_dfs

# Define file paths (adjust these paths if needed)
GEN_PATH = Path("../data/interim/post_despacho_transformed.parquet")
METEO_DIR = Path("../data/raw/open_meteo_data/")
LOOKUP_PATH = Path("../data/lookup/central_info.csv")

# Process all meteorological files and store the merged DataFrames in a dictionary
merged_data = process_all_meteo_files(GEN_PATH, METEO_DIR, LOOKUP_PATH)

# Save each merged DataFrame to a separate Parquet file in the output folder
output_dir = Path("../data/interim/meteo_data_with_generation")
output_dir.mkdir(parents=True, exist_ok=True)

for plant_name, df in merged_data.items():
    # Create a filename by replacing spaces with underscores
    file_name = f"{plant_name.replace(' ', '_')}.parquet"
    output_path = output_dir / file_name
    df.to_parquet(output_path, index=False)
    print(f"Saved data for '{plant_name}' to {output_path}")

Processed 'parque eolico agua clara' - shape: (53271, 31)
Saved data for 'parque eolico agua clara' to ..\data\interim\meteo_data_with_generation\parque_eolico_agua_clara.parquet
