In [2]:
from pathlib import Path
from typing import List, Dict, Union

import pandas as pd
import xarray

from neuralhydrology.datasetzoo.basedataset import BaseDataset
from neuralhydrology.utils.config import Config

In [3]:
class NovaScotiaBasins(BaseDataset):
    """Template class for adding a new data set.
    
    Each dataset class has to derive from `BaseDataset`, which implements most of the logic for preprocessing data and 
    preparing data for model training. Only two methods have to be implemented for each specific dataset class: 
    `_load_basin_data()`, which loads the time series data for a single basin, and `_load_attributes()`, which loads 
    the static attributes for the specific data set. 
    
    Usually, we outsource the functions to load the time series and attribute data into separate functions (in the
    same file), which we then call from the corresponding class methods. This way, we can also use specific basin data
    or dataset attributes without these classes.
    
    To make this dataset available for model training, don't forget to add it to the `get_dataset()` function in 
    'neuralhydrology.datasetzoo.__init__.py'

    Parameters
    ----------
    cfg : Config
        The run configuration.
    is_train : bool 
        Defines if the dataset is used for training or evaluating. If True (training), means/stds for each feature
        are computed and stored to the run directory. If one-hot encoding is used, the mapping for the one-hot encoding 
        is created and also stored to disk. If False, a `scaler` input is expected and similarly the `id_to_int` input
        if one-hot encoding is used. 
    period : {'train', 'validation', 'test'}
        Defines the period for which the data will be loaded
    basin : str, optional
        If passed, the data for only this basin will be loaded. Otherwise the basin(s) are read from the appropriate
        basin file, corresponding to the `period`.
    additional_features : List[Dict[str, pd.DataFrame]], optional
        List of dictionaries, mapping from a basin id to a pandas DataFrame. This DataFrame will be added to the data
        loaded from the dataset, and all columns are available as 'dynamic_inputs', 'evolving_attributes' and
        'target_variables'
    id_to_int : Dict[str, int], optional
        If the config argument 'use_basin_id_encoding' is True in the config and period is either 'validation' or 
        'test', this input is required. It is a dictionary, mapping from basin id to an integer (the one-hot encoding).
    scaler : Dict[str, Union[pd.Series, xarray.DataArray]], optional
        If period is either 'validation' or 'test', this input is required. It contains the centering and scaling
        for each feature and is stored to the run directory during training (train_data/train_data_scaler.yml).

    """

    def __init__(self,
                 cfg: Config,
                 is_train: bool,
                 period: str,
                 basin: str = None,
                 additional_features: List[Dict[str, pd.DataFrame]] = [],
                 id_to_int: Dict[str, int] = {},
                 scaler: Dict[str, Union[pd.Series, xarray.DataArray]] = {}):
        # initialize parent class
        super(NovaScotiaBasins, self).__init__(cfg=cfg,
                                              is_train=is_train,
                                              period=period,
                                              basin=basin,
                                              additional_features=additional_features,
                                              id_to_int=id_to_int,
                                              scaler=scaler)

    def _load_basin_data(self, basin: str) -> pd.DataFrame:
        """Load timeseries data for a single basin.
        
        This function is used to load the time series data (meteorological 
        forcing, streamflow, etc.) and make available as time series input for 
        model training later on. Make sure that the returned dataframe is 
        time-indexed.
        
        Parameters
        ----------
        basin : str
            Basin identifier as string.

        Returns
        -------
        pd.DataFrame
            Time-indexed DataFrame, containing the time series data (e.g., 
            forcings + discharge).
        """

        return load_ns_rdrs_timeseries(self.cfg.data_dir, basin)

    def _load_attributes(self) -> pd.DataFrame:
        """Load basin static variables for specified basins in a list given as
        an argument.
        
        This function is used to load basin attribute data (e.g. CAMELS 
        catchments attributes) as a basin-indexed dataframe with features 
        in columns.
        
        Returns
        -------
        pd.DataFrame of .csv file containing the static attributes for the
        specified basins in the dataset. If no basins are specified, all
        basins are returned.
        """

        return load_basin_attributes(self.cfg.data_dir, self.basins)


def load_ns_rdrs_timeseries(data_dir: Path, basin: str) -> pd.DataFrame:
    """
    Load the time series data for a specific basin.
    
    Arguments:
        data_dir: path to the root directory called 'LSTM_NS_Data'.
        
        basin: the name of the .csv file containing the time series data for 
        the basin which should be the gauge station id eg. '01FJ002'.
        
        Returns: the .csv file as a pandas DataFrame.
    """


    preprocessed_dir = data_dir / "PreprocessedTimeseries"

    # Make sure the data was already preprocessed and per-basin files exist.
    if not preprocessed_dir.is_dir():
        msg = [
            f"No preprocessed data directory found at {preprocessed_dir}. Use" 
            "preprocessed_camels_cl_dataset ,"
            "in neuralhydrology.datasetzoo.camelscl to preprocess the CAMELS" 
            "CL data set once into ,"
            "per-basin files."
        ]
        raise FileNotFoundError("".join(msg))

    # Load the data for the specific basin into a time-indexed dataframe.
    basin_file = preprocessed_dir / f"{basin}.csv"
    df = pd.read_csv(basin_file, index_col='date', parse_dates=['date'])

    return df

def load_basin_attributes(data_dir: Path, basins: List[str] = []) -> pd.DataFrame:
    """
    Load the static attributes for all basins in the dataset.
    
    Arguments: 
        data_dir: path to the root directory called 'LSTM_NS_Data'.
        
        basins: a list of basin identifiers for which to load the attributes. 
        If empty, all basins will be loaded.
        
    Returns: a pandas DataFrame of the basin static variables for the specified
    basins.
    """


    # Load attributes into basin-indexed dataframe.
    attributes_file = data_dir / 'basin_attributes.csv'
    df = pd.read_csv(attributes_file, index_col="basin_id").transpose()

    # Convert all columns, where possible, to numeric.
    df = df.apply(pd.to_numeric, errors='ignore')

    # Convert the two columns specifying record period start and end to 
    # datetime format
    df["record_period_start"] = pd.to_datetime(df["record_period_start"])
    df["record_period_end"] = pd.to_datetime(df["record_period_end"])

    if basins:
        if any(b not in df.index for b in basins):
            raise ValueError('Some basins are missing static attributes.')
        # Filter the dataframe to only include the specified basins.
        df = df.loc[basins]

    return df



In [5]:
NovaScotiaBasins(BaseDataset, is_train=True, period=)

TypeError: NovaScotiaBasins.__init__() missing 2 required positional arguments: 'is_train' and 'period'

In [None]:
# from neuralhydrology.datasetzoo.camelsus import NovaScotiaBasins
from neuralhydrology.datasetzoo.camelscl import CamelsCL
from neuralhydrology.datasetzoo.camelsgb import CamelsGB
from neuralhydrology.datasetzoo.camelsus import CamelsUS
from neuralhydrology.datasetzoo.hourlycamelsus import HourlyCamelsUS

def get_dataset(cfg: Config,
                is_train: bool,
                period: str,
                basin: str = None,
                additional_features: list = [],
                id_to_int: dict = {},
                scaler: dict = {}) -> BaseDataset:
    """Get data set instance, depending on the run configuration.

    Arguments:
        See documentation at: 
        https://neuralhydrology.readthedocs.io/en/latest/api/neuralhydrology.
        datasetzoo.html#neuralhydrology.datasetzoo.get_dataset

    Currently implemented datasets are 'caravan', 'camels_aus', 'camels_br', 
    'camels_cl', 'camels_gb', 'camels_us', and 'hourly_camels_us', as well as 
    the 'generic' dataset class, which we have added one called 
    'novascotia_basin', that can be used for any kind of dataset as long as it 
    is in the correct format.
    """

    # Check config argument and select appropriate data set class.
    if cfg.dataset == "novascotia_basins":
        Dataset = NovaScotiaBasins
    if cfg.dataset == "camels_us":
        Dataset = CamelsUS
    elif cfg.dataset == "camels_gb":
        Dataset = CamelsGB
    elif cfg.dataset == "hourly_camels_us":
        Dataset = HourlyCamelsUS
    elif cfg.dataset == "camels_cl":
        Dataset = CamelsCL
    else:
        raise NotImplementedError(
            f"No dataset class implemented for dataset {cfg.dataset}"
            )

    # initialize dataset
    ds = Dataset(cfg=cfg,
                 is_train=is_train,
                 period=period,
                 basin=basin,
                 additional_features=additional_features,
                 id_to_int=id_to_int,
                 scaler=scaler)
    return ds