# utils
Shared helper functions for ETL pipeline.

In [1]:
import os
import pandas as pd
import xarray as xr
from datetime import datetime
from typing import Dict, List

In [2]:
def get_current_date_parts():
    now = datetime.now()
    return str(now.year), f"{now.month:02d}", f"{now.day:02d}"

In [3]:
get_current_date_parts()

('2025', '08', '31')

In [4]:
def build_input_path(base, year, month, folder_name):
    return os.path.join(base, year, month, folder_name)

In [5]:
def build_output_path(base, year, month, folder_name):
    path = os.path.join(base, year, month, folder_name)
    os.makedirs(path, exist_ok=True)
    return path

In [6]:
def save_dataframe(df, output_folder, folder_name, file_format):
    output_file = os.path.join(output_folder, f"cleaned_{folder_name}.{file_format}")
    if file_format == 'csv':
        df.to_csv(output_file, index=False)
    elif file_format == 'parquet':
        df.to_parquet(output_file, index=False)
    else:
        raise ValueError(f"Unsupported file format: {file_format}")
    print(f"[INFO] Saved data → {output_file}")

In [7]:
def process_single_netcdf_file(file_path, variable, date_parts, source):
    try:
        ds = xr.open_dataset(file_path)
        if variable not in ds:
            print(f"[WARN] Missing {variable} in {file_path}")
            return pd.DataFrame()
        df = ds[variable].to_dataframe().reset_index()
        df = df.dropna(subset=[variable])
        df['year'], df['month'], df['day'] = int(date_parts['year']), int(date_parts['month']), int(date_parts['day'])
        df['source'] = source
        return df
    except Exception as e:
        print(f"[ERROR] {file_path}: {e}")
        return pd.DataFrame()


In [8]:
def process_internal_folder(input_path, variable, date_parts, folder_name):
    if not os.path.exists(input_path):
        return pd.DataFrame()
    dfs = []
    for f in os.listdir(input_path):
        if f.endswith('.nc'):
            df = process_single_netcdf_file(os.path.join(input_path, f), variable, date_parts, folder_name)
            if not df.empty:
                dfs.append(df)
    return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()