# Data transformer for PCSE

### Climate

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import ee
import geemap
from geeagri.extract import extract_timeseries_to_point
import os

import yaml

In [2]:
class ClimateConfig:
    """
    Efficient parser for climate variable configuration.
    Loops through config only once and stores:
    - single-band variables
    - multi-band variables (e.g., wind u/v)
    - conversion functions
    - units
    - derived variable flags
    """

    def __init__(self, config_dict):
        self.raw = config_dict                  
        self.variables = config_dict["variables"]

        # Storage
        self.all_bands = []                     # all band names
        self.var_to_bands = {}                  # var → [bands]
        self.var_to_units = {}                  # var → (native, target)
        self.var_to_conversion = {}             # var → conversion name
        self.derived = set()                    # derived variables

        # Parse once
        self._parse_variables()

    def _parse_variables(self):
        """Parse all variable info just once."""
        for var_name, info in self.variables.items():

            # Units
            native = info.get("native_unit")
            target = info.get("target_unit")
            self.var_to_units[var_name] = (native, target)

            # Conversion method
            self.var_to_conversion[var_name] = info.get("conversion")

            # Derived?
            if info.get("derived", False):
                self.derived.add(var_name)

            # Collect bands (single or multi)
            bands = []
            for key, value in info.items():
                if key.startswith("band") and value is not None:
                    bands.append(value)
                    self.all_bands.append(value)

            self.var_to_bands[var_name] = bands

    # Convenience Getters
    def get_all_bands(self):
        return list(self.all_bands)

    def get_bands_for(self, var_name):
        return self.var_to_bands[var_name]

    def get_units(self, var_name):
        return self.var_to_units[var_name]

    def get_conversion(self, var_name):
        return self.var_to_conversion[var_name]

    def is_derived(self, var_name):
        return var_name in self.derived

    def summary(self):
        return {
            "bands": self.all_bands,
            "units": self.var_to_units,
            "conversions": self.var_to_conversion,
            "derived": list(self.derived),
        }

In [3]:
def K_to_C(series):
    return series - 273.15

def J_to_kJ(series):
    return series / 1000

def m_to_cm(series):
    return series * 100

def m_to_mm(series):
    return series * 1000

def none(series):
    return series

def Td_to_VAP(dewpoint_K):
    """Compute vapour pressure (hPa) from dewpoint temperature (K)."""
    Td_C = dewpoint_K - 273.15
    vap_kPa = 0.6108 * np.exp((17.27 * Td_C) / (Td_C + 237.3))
    return vap_kPa

def uv_to_wind(u, v):
    """Compute wind speed magnitude (m/s)."""
    return np.sqrt(u**2 + v**2)

In [4]:
CONVERSION_FUNCS = {
    "K_to_C": K_to_C,
    "J_to_kJ": J_to_kJ,
    'm_to_cm': m_to_cm,
    "m_to_mm": m_to_mm,
    "none": none,
    "Td_to_VAP": Td_to_VAP,
    "uv_to_wind": uv_to_wind,
}

In [5]:
import ee
# ee.Authenticate()
ee.Initialize(project='ee-geonextgis')

In [None]:
import yaml
import importlib.resources as pkg_resources
from . import configs

class GEEWeatherDataProvider:
    def __init__(self, latitude, longitude, start_date, end_date, source='era5_land', filepath=None):
        self.latitude = latitude
        self.longitude = longitude
        self.start_date = start_date
        self.end_date = end_date
        self.source = source
        self.filepath = filepath

        with pkg_resources.files(configs).joinpath("meteo.yaml").open("r") as f:
            full_config = yaml.safe_load(f)

        if source not in full_config:
            raise ValueError(
                f"Source '{source}' not found in meteo.yaml. "
                f"Available: {list(full_config.keys())}"
            )

        self.weather_config = full_config[source]
        self.cfg = ClimateConfig(self.weather_config)
        self.summary = self.cfg.summary()

    def _get_elevation(self):
        """Fetches elevation. Includes error handling for GEE timeouts."""
        try:
            geom = ee.Geometry.Point(self.longitude, self.latitude)
            # Use the config for the DEM source if available, else default to GLO-30
            dem_source = self.weather_config.get('dem_collection', "projects/sat-io/open-datasets/GLO-30")
            
            elev = ee.ImageCollection(dem_source)\
                .filterBounds(geom)\
                .first()\
                .sample(geom, scale=30)\
                .first()\
                .get('b1')
            return float(elev.getInfo())
        except Exception as e:
            print(f"Warning: Could not fetch elevation via GEE ({e}). Defaulting to 0.")
            return 0.0

    def _extract_data(self):
        """Fetches raw data from GEE."""
        return extract_timeseries_to_point(
            lat=self.latitude,
            lon=self.longitude,
            image_collection=ee.ImageCollection(self.weather_config.get('collection')),
            start_date=self.start_date,
            end_date=self.end_date,
            band_names=self.cfg.get_all_bands(),
            scale=self.weather_config.get('default_scale')
        )

    def process(self):
        """
        Processes raw GEE data into the final format.
        Uses caching to avoid re-calculating on save.
        """
        # Return cached data if it exists
        if self._cached_df is not None:
            return self._cached_df

        df_raw = self._extract_data()
        output = pd.DataFrame(index=df_raw.index)
        output['date'] = df_raw['time']
        
        for var, bands in self.cfg.var_to_bands.items():
            conversion = self.cfg.var_to_conversion[var]

            # Handle Derived Variables
            if self.cfg.is_derived(var):
                if conversion == 'Td_to_VAP':
                    dew = df_raw[bands[0]]
                    output[var] = Td_to_VAP(dew)
                elif conversion == 'uv_to_wind':
                    u, v = df_raw[bands[0]], df_raw[bands[1]]
                    output[var] = uv_to_wind(u, v)
                else:
                    raise ValueError(f"Unknown derived converter: {conversion}")
                
            # Handle Direct Variables
            else:
                raw_series = df_raw[bands[0]]
                converter = CONVERSION_FUNCS[conversion]
                output[var] = converter(raw_series)
        
        # Apply global rounding to 3 decimal places
        self._cached_df = output.round(3)
        return self._cached_df

    def save_weather_excel(self, filepath=None, **site_kwargs):
        """
        Save the weather data to Excel.
        
        Args:
            filepath: Optional override for the filepath.
            **site_kwargs: Metadata overrides.
        """
        # Determine output path
        target_path = filepath or self.filepath
        if not target_path or target_path == "*":
            raise ValueError("Please provide a valid filepath to save the Excel file.")

        # 1. Process data FIRST to ensure 'df' exists
        df = self.process()

        # 2. Prepare Metadata
        metadata_defaults = {
            "Country": "Unknown",
            "Station": "GEE_Point",
            "Description": "Processed with GEEWeatherDataProvider",
            "Source": self.source,
            "Contact": "",
            "Missing values": -999,
            "Longitude": self.longitude,
            "Latitude": self.latitude,
            "Elevation": self._get_elevation(), # This triggers a GEE call
            "Angstrom_a": "",
            "Angstrom_b": "",
            "HasSunshine": False,
        }
        metadata = {**metadata_defaults, **site_kwargs}

        # 3. Prepare Headers and Fill NaN
        variable_order = ["IRRAD", "TMIN", "TMAX", "VAP", "WIND", "RAIN", "SNOWDEPTH"]
        header_row1 = ["DAY"] + variable_order
        
        # specific error handling if variable missing in config
        try:
            units_row = ["date"] + [self.cfg.var_to_units[v][-1] for v in variable_order]
        except KeyError as e:
            print(f"Warning: Variable missing in config units: {e}")
            units_row = ["date"] + ["-"] * len(variable_order)

        # Apply missing value code to NaNs
        df_export = df.copy() # Work on a copy to not mutate the cached dataframe
        df_export = df_export.fillna(metadata["Missing values"])

        # Ensure columns are in the correct order (if they exist in the processed data)
        # We filter variable_order to only include columns actually present in df
        present_vars = [v for v in variable_order if v in df_export.columns]
        df_export = df_export[['date'] + present_vars]

        # 4. Write to Excel
        with pd.ExcelWriter(target_path, engine="openpyxl") as writer:
            # Metadata
            meta_df = pd.DataFrame(list(metadata.items()))
            meta_df.to_excel(writer, sheet_name="Sheet1", index=False, header=False)

            start_row = len(meta_df) + 2

            # "Observed data" label
            pd.DataFrame([["Observed data"]]).to_excel(
                writer, sheet_name="Sheet1", index=False, header=False, startrow=start_row
            )

            # Header + Units
            pd.DataFrame([header_row1, units_row]).to_excel(
                writer, sheet_name="Sheet1", index=False, header=False, startrow=start_row + 2
            )

            # Data
            df_export.to_excel(
                writer, sheet_name="Sheet1", index=False, header=False, startrow=start_row + 4
            )
            
        print(f"File saved successfully to {target_path}")

ImportError: attempted relative import with no known parent package

In [19]:
meteo = GEEWeatherDataProvider(latitude=30.50, longitude=-98.15, start_date='2020-01-01', end_date='2022-12-31', source='era5_land')
meteo_data = meteo.process()

In [25]:
meteo.cfg.var_to_units['TMIN'][-1]

'Celcius'

In [21]:
meteo._get_elevation()

247.39358520507812