In [1]:
import netCDF4 as nc
from tools import Layer
import warnings

In [13]:
#temp_file = "/Users/install/Downloads/test_WGS84.nc"
temp_file = "/Users/install/Downloads/tavg.nc"
#temp_file = "/Users/install/Downloads/temp.nc"

In [14]:
ATTRS = {
        "time": {
            "axis": ("T",),
            "units": ("since",),
            "calendar": (
                "proleptic_gregorian",
                "gregorian"
                "julian",
                "standard",
                "noleap",
                "365_day",
                "all_leap",
                "366_day",
                "360_day",
                "none",
            ),
            "standard_name": ("time",),
            "long_name": ("time",),
            "_CoordinateAxisType": ("Time",),
            "cartesian_axis": ("T",),
            "grads_dim": ("t",),
        },
        "longitude": {
            "axis": "X",
            "units": (
                "degrees_east",
                "degree_east",
                "degree_E",
                "degrees_E",
                "degreeE",
                "degreesE",
            ),
            "standard_name": ("longitude",),
            "long_name": ("longitude",),
            "_CoordinateAxisType": ("Lon",),
        },
        "latitude": {
            "axis": ("Y",),
            "units": (
                "degrees_north",
                "degree_north",
                "degree_N",
                "degrees_N",
                "degreeN",
                "degreesN",
            ),
            "standard_name": ("latitude",),
            "long_name": ("latitude",),
            "_CoordinateAxisType": ("Lat",),
        },
        "Z": {
            "axis": ("Z",),
            "standard_name": (
                "level",
                "pressure level",
                "depth",
                "height",
                "vertical level",
                "elevation",
                "altitude",
            ),
            "long_name": (
                "level",
                "pressure level",
                "depth",
                "height",
                "vertical level",
                "elevation",
                "altitude",
            ),
            "positive": ("up", "down"),
                "_CoordinateAxisType": (
                "GeoZ",
                "Height",
                "Pressure",
            ),
            "cartesian_axis": ("Z",),
            "grads_dim": ("z",),
        },
        "X": {
            "standard_name": ("projection_x_coordinate",),
            "_CoordinateAxisType": ("GeoX",),
            "axis": ("X",),
            "cartesian_axis": ("X",),
            "grads_dim": ("x",),
        },
        "Y": {
            "standard_name": "projection_y_coordinate",
            "_CoordinateAxisType": ("GeoY",),
            "axis": ("Y",),
            "cartesian_axis": ("Y",),
            "grads_dim": ("y",),
        },
    }

In [15]:
def _check_var(old, new, check_var=True):
    if check_var == True:
        if old is None or old == new:
            return new
        raise ValueError(f"Axis already defined as {old}. Found second axis {new}.")
    else:
        return old


def _check_var_attr(variable, data, name, attributes):
    if variable is not None:
        return variable, False
    else:
        for attribute_name, attribute_values in attributes.items():
            if attribute_name in data.ncattrs():
                attribute_value = getattr(data, attribute_name)
                if attribute_value in attribute_values:
                    return name, True
        return None, False


def extract_layers(dataset):
    """
    Extracts the layer information from a dataset following CF convention
    """
    layers = []
    var_list = []
    time_var = None
    x_var = None
    y_var = None
    z_var = None
    
    
    for var, data in dataset.variables.items():
        if len(data.dimensions) > 4:
            raise ValueError(f"Variable {data.name} has more than 4 possible dimensions (T, Z, Y, X).")
        # getting dimensions (T, Z, X, Y) 
        elif var in dataset.dimensions.keys():
            time_var, check_var = _check_var_attr(time_var, data, var, ATTRS["time"])
            time_var = _check_var(time_var, data.name, check_var)

            z_var, check_var = _check_var_attr(z_var, data, var, ATTRS["Z"])
            z_var = _check_var(z_var, data.name, check_var)

            x_var, check_var = _check_var_attr(x_var, data, var, ATTRS["longitude"])
            x_var = _check_var(x_var, data.name, check_var)
            x_var, check_var = _check_var_attr(x_var, data, var, ATTRS["X"])
            x_var = _check_var(x_var, data.name, check_var)

            y_var, check_var = _check_var_attr(y_var, data, var, ATTRS["latitude"])
            y_var = _check_var(y_var, data.name, check_var)
            y_var, check_var = _check_var_attr(y_var, data, var, ATTRS["Y"])
            y_var = _check_var(y_var, data.name, check_var)
        else:
            var_list.append(data.name)
    
    
    if time_var is None:
        warnings.warn("time=None. Time was assigned as a static variable.")
 
    fixed={}
    fixed[z_var]=0
    print(fixed)
    
    xyz = tuple(v for v in [x_var, y_var, z_var] if v is not None)
    if len(xyz) < 2:
        raise ValueError(
            f"CF conventions are not met or coordinates are missing. Input (X,Y) NetCDF coordinates: {xyz}."
        )
    elif len(xyz) == 3 and z_var is None:
        raise ValueError("Input NetCDF coordinate Z does not comply with CF conventions.")
    elif len(xyz) == 3 and z_var is not None:
        warnings.warn(f"Z dimension was assigned as a fixed variable: fixed={fixed}.") 
      
    
    for var in var_list:
        if len(dataset[var].dimensions) == 4 and z_var is None:
            raise ValueError(f"Input NetCDF coordinate Z does not comply with CF conventions!")
        elif len(xyz) == 3:
            layers.append(Layer(var, xyz[0:2], fixed=fixed, static=time_var is None))
        else:
            layers.append(Layer(var, xyz, static=time_var is None))

            
    return time_var, layers       
            
#========EXECUTE==============
file = nc.Dataset(temp_file)
RES = extract_layers(file)
RES

{'lev': 0}




('time', [Layer(var=tas, xyz=('lon', 'lat'), fixed={'lev': 0}, static=False)])

In [99]:
#file.variables

In [12]:
file.dimensions

{'time': <class 'netCDF4._netCDF4.Dimension'>: name = 'time', size = 5,
 'lat': <class 'netCDF4._netCDF4.Dimension'>: name = 'lat', size = 109,
 'lon': <class 'netCDF4._netCDF4.Dimension'>: name = 'lon', size = 105}

In [109]:
z=None
if z is None:
    print("NONE")


NONE


In [101]:
file.variables

{'crs': <class 'netCDF4._netCDF4.Variable'>
 int8 crs()
     grid_mapping_name: latitude_longitude
     _CoordinateAxisTypes: GeoX GeoY
     spatial_ref: GEOGCS["unknown",DATUM["WGS_1984",SPHEROID["WGS 84",6378137,298.257223563,AUTHORITY["EPSG","7030"]],AUTHORITY["EPSG","6326"]],PRIMEM["Greenwich",0,AUTHORITY["EPSG","8901"]],UNIT["degree",0.0174532925199433,AUTHORITY["EPSG","9122"]],AXIS["Longitude",EAST],AXIS["Latitude",NORTH]]
     epsg_code: 4326
     horizontal_datum_name: WGS84
     semi_major_axis: 6378137
     inverse_flattening: 298.257223563
     longitude_of_prime_meridian: 0.0
 unlimited dimensions: 
 current shape = ()
 filling on, default _FillValue of -127 ignored,
 'time': <class 'netCDF4._netCDF4.Variable'>
 int32 time(time)
     standard_name: time
     axis: T
     calendar: julian
     units: days since 2000-01-01 00:00:00
 unlimited dimensions: 
 current shape = (5,)
 filling on, default _FillValue of -2147483647 used,
 'lat': <class 'netCDF4._netCDF4.Variable'>
 fl

In [32]:
import xarray as xr

In [38]:
FILE=xr.open_dataset("/Users/install/Documents/UFZ/mhm/test_domain/input/meteo/net_rad.nc")
FILE
# 

In [None]:
lai_file = "/Users/install/Documents/UFZ/finam/finam-netcdf/tests/data/lai.nc"
tavg_file = "/Users/install/Documents/UFZ/finam/finam-netcdf/tests/data/tavg.nc"

#nc files from mHM
eabs_file = "/Users/install/Documents/UFZ/mhm/test_domain/input/meteo/eabs.nc"
mask_file = "/Users/install/Documents/UFZ/mhm/test_domain/input/meteo/mask.nc"
net_rad_file = "/Users/install/Documents/UFZ/mhm/test_domain/input/meteo/net_rad.nc"
ssrd_file = "/Users/install/Documents/UFZ/mhm/test_domain/input/meteo/ssrd.nc"
strd_file = "/Users/install/Documents/UFZ/mhm/test_domain/input/meteo/strd.nc"
tmax_file = "/Users/install/Documents/UFZ/mhm/test_domain/input/meteo/tmax.nc"
tmin_file = "/Users/install/Documents/UFZ/mhm/test_domain/input/meteo/tmin.nc"
windspeed_file = "/Users/install/Documents/UFZ/mhm/test_domain/input/meteo/windspeed.nc"
pre_file = "/Users/install/Documents/UFZ/mhm/test_domain/input/meteo/pre/pre.nc"
pet_file = "/Users/install/Documents/UFZ/mhm/test_domain/input/meteo/pet/pet.nc"

wgs84_file = "/Users/install/Downloads/test_WGS84.nc"

In [None]:
def _check_var(old, new):
    if old is None or old == new:
        return new
    raise ValueError(f"Axis already defined as {old}. Found second axis {new}.")

def extract_layers(dataset):
    """Extracts the layer information from a dataset following CF convention"""
    layers = []
    time_var = None
    x_var = None
    y_var = None
    z_var = None
    ATTRS = {
        "time": {
            "axis": "T",
            "units": "since",
            "standard_name": "time",
            "long_name": "time",
        },
        "longitude": {
            "axis": "X",
            "units": ["degrees_east", "degree_east", "degree_E", "degrees_E", "degreeE", "degreesE"],
            "standard_name": ["longitude", "lon", "degrees_east", "xc"],
            "long_name": ["longitude", "lon", "degrees_east", "xc"],
        },
        "latitude": {
            "axis": "Y",
            "units": ["degrees_north", "degree_north", "degree_N", "degrees_N", "degreeN", "degreesN"],
            "standard_name": ["latitude", "lat", "degrees_north", "yc"],
            "long_name": ["latitude", "lat", "degrees_north", "yc"],
        },
        "Z": {
            "axis": "Z",
            "standard_name": ["level", "lev", "pressure", "depth", "height", "vertical", "elevation", "altitude"],
            "long_name": ["level", "lev", "pressure", "depth", "height", "vertical", "elevation", "altitude"],
        },
    }

    for var, data in dataset.variables.items():
        
        # getting parameter variables
        if len(data.dimensions) > 2:
            var_list.append(data.name)
            
        # getting variaibles (T,Z,Y,X)
        else:
            if len(data.dimensions) > 4:
                raise ValueError(f"Variable {data.name} has more than 4 possible dimensions (T,Z,Y,X).")       

            if "calendar" in data.ncattrs():
                time_var = _check_var(time_var, data.name)

            elif "positive" in data.ncattrs():
                z_var = _check_var(z_var, data.name)

            elif "units" in data.ncattrs():
                if data.units in ATTRS["time"]["units"]:
                    time_var = _check_var(time_var, data.name)
                elif data.units in ATTRS["longitude"]["units"]:
                    x_var = _check_var(x_var, data.name)
                elif data.units in ATTRS["latitude"]["units"]:
                    y_var = _check_var(Y_var, data.name)

            elif "standard_name" in data.ncattrs():
                if data.standard_name in ATTRS["time"]["standard_name"]:
                    time_var = _check_var(time_var, data.name)
                elif data.standard_name in ATTRS["longitude"]["standard_name"]:
                    x_var = _check_var(x_var, data.name)
                elif data.standard_name in ATTRS["latitude"]["standard_name"]:
                    y_var = _check_var(Y_var, data.name)
                elif data.standard_name in ATTRS["Z"]["standard_name"]:
                    y_var = _check_var(Y_var, data.name)

            elif "long_name" in data.ncattrs():
                if data.long_name in ATTRS["time"]["long_name"]:
                    time_var = _check_var(time_var, data.name)
                elif data.long_name in ATTRS["longitude"]["long_name"]:
                    x_var = _check_var(x_var, data.name)
                elif data.long_name in ATTRS["latitude"]["long_name"]:
                    y_var = _check_var(Y_var, data.name)
                elif data.long_name in ATTRS["Z"]["long_name"]:
                    y_var = _check_var(Y_var, data.name)

            elif "axis" in data.ncattrs():
                ax = data.axis
                if ax == ATTRS["time"]["axis"]:
                    time_var = _check_var(time_var, data.name)
                elif ax == ATTRS["longitude"]["axis"]:
                    x_var = _check_var(x_var, data.name)
                elif ax == ATTRS["latitude"]["axis"]:
                    y_var = _check_var(y_var, data.name)
                elif ax == ATTRS["Z"]["axis"]:
                    z_var = _check_var(z_var, data.name)
                        
    if len(data.dimensions) == 4 and z_var == None:
        raise ValueError(f"Variable {data.name} has more than 4 possible dimensions (T,Z,Y,X).")
                
    
    # if there are 4 dimensions put Z as fixed, and if Z cannot be safely stablished raise a warning 
    
    # creating tuple X Y Z coords
    xyz = tuple(v for v in [x_var, y_var, z_var] if v is not None)
    
    return xzy?




In [None]:
from netCDF4 import Dataset, date2num

In [None]:
def NetCdfPushWriter(path, inputs, time_var, start, step):
    """
    NetCDF writer component that writes on push to its inputs.

    Usage:

    .. testcode:: constructor

       from finam_netcdf import Layer, NetCdfPushWriter

       file = "path/to/file.nc"
       writer = NetCdfPushWriter(
            path=file,
            inputs={
                "LAI": Layer(var="lai", xyz=("lon", "lat")),
                "SM": Layer(var="soil_moisture", xyz=("lon", "lat")),
            },
            time_var="time"
       )

In [None]:
def __init__(
        self,
        path: str,
        inputs: dict[str, Layer],
        time_var: str,
        start: datetime,
        step: timedelta,
    ):
        super().__init__()

        self._path = path
        self._input_dict = inputs
        self._input_names = {v.var: k for k, v in inputs.items()}
        self.time_var = time_var
        self.dataset = None
        self.timestamp_counter = 0
        self._step = step
        self._time = start
        self.last_update = None
        self.timestamps = []

        self._status = fm.ComponentStatus.CREATED

    def _initialize(self):
        for inp in self._input_dict.keys():
            self.inputs.add(
                io=fm.CallbackInput(
                    name=inp,
                    callback=partial(self._data_changed, inp),
                    time=None,
                    grid=None,
                    units=None,
                )
            )
        self.dataset = Dataset(self._path, "w")

        self.create_connector(pull_data=list(self._input_dict.keys()))

    def _connect(self, start_time):
        self.try_connect(start_time)

        if self.status != fm.ComponentStatus.CONNECTED:
            return

        _create_nc_framework(
            self.dataset,
            self.time_var,
            self._time,
            self._step,
            self.connector.in_infos,
            self.connector.in_data,
            self._input_dict,
        )

        # adding time and var data to the first timestamp
        for name in self._input_dict:
            self.dataset[name][self.timestamp_counter, :, :] = self.connector.in_data[
                name
            ].magnitude
            self.dataset[self.time_var][self.timestamp_counter] = date2num(
                self._time, self.dataset[self.time_var].units
            )

    def _validate(self):
        pass

    def _update(self):
        self._time += self._step
        self.timestamp_counter += 1
        for name, inp in self.inputs.items():
            self.dataset[name][self.timestamp_counter, :, :] = inp.pull_data(
                self._time
            ).magnitude
            self.dataset[self.time_var][self.timestamp_counter] = date2num(
                self._time, self.dataset[self.time_var].units
            )

    def _finalize(self):
        self.dataset.close()

    def _data_changed(self, name, caller, time):
        if self.status in (
            fm.ComponentStatus.CONNECTED,
            fm.ComponentStatus.CONNECTING,
            fm.ComponentStatus.CONNECTING_IDLE,
        ):
            self.last_update = time
            if len(self.timestamps) == 0:
                self.timestamps.append(time)

            return

        if not isinstance(time, datetime):
            raise ValueError("Time must be of type datetime")

        if self.status == fm.ComponentStatus.INITIALIZED:
            self.last_update = time
            return
        if time != self.last_update:
            lengths = [a.shape[0] for a in self.data_arrays.values()]
            if lengths.count(lengths[0]) != len(lengths):
                raise ValueError(f"Incomplete dataset for time {self.last_update}")

            self.timestamps.append(time)

        self.last_update = time

        layer = self._input_dict[name]
        data = caller.pull_data(self.last_update)

        self.dataset[layer.var][self.timestamp_counter, :, :] = data.magnitude
        self.dataset[self.time_var][self.timestamp_counter] = date2num(
            time, self.dataset[self.time_var].units
        )

        self.update()




In [None]:
def _create_nc_framework(
    dataset, time_var, start_date, time_freq, in_infos, in_data, layers
):
    """
    Creates coords, eg. (x, y), an empty time, and all other parameter variables, eg. temperature.
    """
    all_layers = []
    all_var = []
    equal_layers = True
    for name in in_data:
        layer = layers[name]
        if layer.var in all_var:
            raise ValueError(f"Duplicated variable {layer.var}.")
        else:
            all_layers.append(layer.xyz)
            all_var.append(layer.var)

    current_layer = all_layers[0]
    for lays in all_layers:
        if current_layer != lays:
            equal_layers = False
            break
    if not equal_layers:
        raise ValueError(
            f"NetCdfTimedWriter Inputs {all_var} have different layers: {all_layers}."
        )

    # creating time dim and var
    dim = dataset.createDimension(time_var, None)
    var = dataset.createVariable(time_var, np.float64, (time_var,))

    def days_hours_minutes(td):
        """funtion to get units of time in days, hours, minutes or seconds as str"""
        if td.days != 0:
            return "days"
        elif td.seconds // 3600 != 0:
            return "hours"
        elif (td.seconds // 60) % 60 != 0:
            return "minutes"
        else:
            return "seconds"

    freq = days_hours_minutes(time_freq)
    var.units = freq + " since " + str(start_date)
    var.calendar = "standard"  # standard may not be always the case

    just_once = True
    for name in in_data:
        # creating lat-lon var and dim just once in a while loop
        while just_once:
            grid_info = in_infos[name].grid
            for i, ax in enumerate(layer.xyz):
                if ax not in grid_info.axes_names:
                    raise ValueError(
                        f"Dimension {i} '{ax}' is not in the data for input {name}. "
                        f"Available axes are {grid_info.axes_names}"
                    )
                axis_values = in_infos[name].grid.data_axes[i]
                axis_type = in_infos[name].grid.data_axes[i].dtype
                dim = dataset.createDimension(ax, len(axis_values))
                var = dataset.createVariable(ax, axis_type, (ax,))
                dataset[ax][:] = axis_values
            just_once = False

        # creating var and dim other than time and coords X,Y
        dim = (time_var, current_layer[0], current_layer[1])
        var = dataset.createVariable(name, np.float64, dim)
        var.units = str(in_data[name].units)

In [70]:
if 3 == 3 and 2 == 2:
    print("Both conditions are True")
else:
    print("At least one condition is False")


Both conditions are True
