# General utils

> Some general utility functions that are used throughout the package.

In [None]:
#| default_exp utils

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export

from torch.utils.data import Dataset
from typing import Union, List, Tuple, Literal
from gymnasium.spaces import Space
from ddopai.dataloaders.base import BaseDataLoader

import logging

import numpy as np

In [None]:
#| export

def check_parameter_types(
                            *args,                      # any number of parameters to be checked
                            parameter_type=np.ndarray   # the expected type for each parameter
):
    
    """
    Checks if each argument in args is of the specified type, defaulting to np.ndarray.
    """
    
    for index, arg in enumerate(args):
        if not isinstance(arg, parameter_type):
            raise TypeError(f"Argument {index+1} of {len(args)} is of type {type(arg).__name__}, expected {parameter_type.__name__}")

Example usage for the `check_parameter_types` function.

In [None]:

a = np.array([1, 2, 3])
b = [1, 2, 3]

try:
    check_parameter_types(a, b)
except TypeError as e:
    print(e)

Argument 2 of 2 is of type list, expected ndarray


In [None]:
#| export
class Parameter():

    """
    Simple class to handle parameters in the environment. The advantage of this class is that it can be
    used to set parameters that may change over time and accessed by multiple objects such as the 
    environment, agent or dataloaders.
    """
    
    def __init__(self,
            value: int | float | list[int] | list[float] | np.ndarray,  # the value of the parameter  
            min_value: int | float | list[int] | list[float] | np.ndarray = None,  # the minimum value of the parameter
            max_value: int | float | list[int] | list[float] | np.ndarray = None,  # the maximum value of the parameter
            shape: tuple[int] = (1,)): # the shape of the parameter

        self._min_value = min_value
        self._max_value = max_value
        
        self.set_value(value, shape)

    def __call__(self):
        """
        Returns the parameter value. Alternative to get_value().

        """
        return self.get_value()

    def get_value(self):
        
        """
        Returns the parameter value.

        """

        return self._value

    def set_value(self, 
                    value: Union[int, float, List[int], List[float], np.ndarray], # the value of the parameter
                    shape: Tuple[int] = (1,)): # the shape of the parameter
       
        """
        Set the value of the parameter. The value can be a scalar, list or numpy array. The shape of the value
        must be the same as the shape of the parameter.

        All values, including scalars, are converted to numpy arrays. If the value is a scalar, it is reshaped
        to the shape of the parameter. If the value is a list or numpy array, it must have the same shape as the
        parameter. If the value is a scalar, it is reshaped to the shape of the parameter.

        Optionally, the value can be clipped to a minimum and maximum value.
        """

        if isinstance(value, (int, float)):
            self._value = np.array([value])
            self._value.reshape(shape)
        
        elif isinstance(value, list):
            value = np.array(value)
            assert value.shape == shape, "Shape of value must be the same as the shape of the parameter"
            self._value = value
        
        elif isinstance(value, np.ndarray):
            assert value.shape == shape, "Shape of value must be the same as the shape of the parameter"
            self._value = value
        
        else:
            raise ValueError("Value must be a scalar or numpy array")

        if self._min_value is not None:
            self._value = np.maximum(self._value, self._min_value)
        if self._max_value is not None:
            self._value = np.minimum(self._value, self._max_value)

    @property
    def shape(self):
        """
        Returns: The shape of the table of parameters.
        """
        return self._value.shape
    
    @property
    def size(self):
        """
        Returns: The size of the table of parameters.
        """
        return self._value.size 

In [None]:
show_doc(Parameter, title_level=2)

---

[source](https://github.com/opimwue/ddopai/blob/main/ddopai/utils.py#L32){target="_blank" style="float:right; font-size:smaller"}

## Parameter

>      Parameter (value:int|float|list[int]|list[float]|numpy.ndarray,
>                 min_value:int|float|list[int]|list[float]|numpy.ndarray=None,
>                 max_value:int|float|list[int]|list[float]|numpy.ndarray=None,
>                 shape:tuple[int]=(1,))

*Simple class to handle parameters in the environment. The advantage of this class is that it can be
used to set parameters that may change over time and accessed by multiple objects such as the 
environment, agent or dataloaders.*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| value | int \| float \| list[int] \| list[float] \| numpy.ndarray |  | the value of the parameter |
| min_value | int \| float \| list[int] \| list[float] \| numpy.ndarray | None | the minimum value of the parameter |
| max_value | int \| float \| list[int] \| list[float] \| numpy.ndarray | None | the maximum value of the parameter |
| shape | tuple | (1,) | the shape of the parameter |

In [None]:
show_doc(Parameter.__call__)

---

[source](https://github.com/opimwue/ddopai/blob/main/ddopai/utils.py#L51){target="_blank" style="float:right; font-size:smaller"}

### Parameter.__call__

>      Parameter.__call__ ()

*Returns the parameter value. Alternative to get_value().*

In [None]:
show_doc(Parameter.get_value)

---

[source](https://github.com/opimwue/ddopai/blob/main/ddopai/utils.py#L58){target="_blank" style="float:right; font-size:smaller"}

### Parameter.get_value

>      Parameter.get_value ()

*Returns the parameter value.*

In [None]:
show_doc(Parameter.set_value)

---

[source](https://github.com/opimwue/ddopai/blob/main/ddopai/utils.py#L67){target="_blank" style="float:right; font-size:smaller"}

### Parameter.set_value

>      Parameter.set_value
>                           (value:Union[int,float,List[int],List[float],numpy.n
>                           darray], shape:Tuple[int]=(1,))

*Set the value of the parameter. The value can be a scalar, list or numpy array. The shape of the value
must be the same as the shape of the parameter.

All values, including scalars, are converted to numpy arrays. If the value is a scalar, it is reshaped
to the shape of the parameter. If the value is a list or numpy array, it must have the same shape as the
parameter. If the value is a scalar, it is reshaped to the shape of the parameter.

Optionally, the value can be clipped to a minimum and maximum value.*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| value | Union |  | the value of the parameter |
| shape | Tuple | (1,) | the shape of the parameter |

In [None]:
show_doc(Parameter.shape)

---

[source](https://github.com/opimwue/ddopai/blob/main/ddopai/utils.py#L104){target="_blank" style="float:right; font-size:smaller"}

### Parameter.shape

>      Parameter.shape ()

*Returns: The shape of the table of parameters.*

In [None]:
show_doc(Parameter.size)

---

[source](https://github.com/opimwue/ddopai/blob/main/ddopai/utils.py#L111){target="_blank" style="float:right; font-size:smaller"}

### Parameter.size

>      Parameter.size ()

*Returns: The size of the table of parameters.*

Example usage for the `Parameter` class:

In [None]:
overage_cost = Parameter(1) # integer
underage_cost = Parameter(2.13) # float
ordering_cost = Parameter([2]*5, shape=(5,)) # list
holding_cost = Parameter(np.array([0,1]), shape=(2,)) # numpy array

print(overage_cost.get_value())
print(underage_cost.get_value())
print(ordering_cost.get_value())
print(holding_cost.get_value())

[1]
[2.13]
[2 2 2 2 2]
[0 1]


In [None]:
#| export

class MDPInfo():
    """
    This class is used to store the information of the environment.
    It is based on MushroomRL (https://github.com/MushroomRL). It can be accessed by 
    agents that need the information of the environment, such as the state and action spaces.
    
    Key difference with MushroomRL is that the state and action spaces are gymnasium spaces.
    """
    
    def __init__(self,
                observation_space: Space,
                action_space: Space,  
                gamma: float,
                horizon: int,
                dt: float = 1e-1,
                backend: Literal['numpy'] = 'numpy'  # Currently only numpy is supported
            ) -> None: 

        self.observation_space = observation_space
        self.action_space = action_space
        self.gamma = gamma
        self.horizon = horizon
        self.dt = dt
        self.backend = backend

    @property
    def size(self):
        """
        Returns: The sum of the number of discrete states and discrete actions. Only works for discrete spaces.

        """
        return self.observation_space.size + self.action_space.size

    @property
    def shape(self):
        """
        Returns: The concatenation of the shape tuple of the state and action spaces.

        """
        return self.observation_space.shape + self.action_space.shape

In [None]:
show_doc(MDPInfo, title_level=2)

---

[source](https://github.com/opimwue/ddopai/blob/main/ddopai/utils.py#L118){target="_blank" style="float:right; font-size:smaller"}

## MDPInfo

>      MDPInfo (observation_space:gymnasium.spaces.space.Space,
>               action_space:gymnasium.spaces.space.Space, gamma:float,
>               horizon:int, dt:float=0.1, backend:Literal['numpy']='numpy')

*This class is used to store the information of the environment.
It is based on MushroomRL (https://github.com/MushroomRL). It can be accessed by 
agents that need the information of the environment, such as the state and action spaces.

Key difference with MushroomRL is that the state and action spaces are gymnasium spaces.*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| observation_space | Space |  |  |
| action_space | Space |  |  |
| gamma | float |  |  |
| horizon | int |  |  |
| dt | float | 0.1 |  |
| backend | Literal | numpy | Currently only numpy is supported |
| **Returns** | **None** |  |  |

In [None]:
show_doc(MDPInfo.size)

---

[source](https://github.com/opimwue/ddopai/blob/main/ddopai/utils.py#L144){target="_blank" style="float:right; font-size:smaller"}

### MDPInfo.size

>      MDPInfo.size ()

*Returns: The sum of the number of discrete states and discrete actions. Only works for discrete spaces.*

In [None]:
show_doc(MDPInfo.shape)

---

[source](https://github.com/opimwue/ddopai/blob/main/ddopai/utils.py#L152){target="_blank" style="float:right; font-size:smaller"}

### MDPInfo.shape

>      MDPInfo.shape ()

*Returns: The concatenation of the shape tuple of the state and action spaces.*

In [None]:
#| export

class DatasetWrapper(Dataset):
    """
    This class is used to wrap a Pytorch Dataset around the ddopai dataloader
    to enable the usage of the Pytorch Dataloader during training. This way,
    agents that are trained using Pytorch without interacting with the environment
    can directly train on the data generated by the dataloader.
    
    """

    def __init__(self, 
            dataloader: BaseDataLoader, # Any dataloader that inherits from BaseDataLoader
            obsprocessors: List = None # processors (to mimic the environment processors)
            ):
        self.dataloader = dataloader
        self.obsprocessors = obsprocessors or []
    
    def __getitem__(self, idx):
        """
        Get the item at the provided idx.

        """

        # create tuple of items

        output = self.dataloader[idx]

        X = output[0]

        X = np.expand_dims(X, axis=0) # single datapoints are always returned without batch dimension, need to add for obsprocessors

        for obsprocessor in self.obsprocessors:
            X = obsprocessor(X)
        
        X = np.squeeze(X, axis=0) # remove batch dimension

        output = (X, *output[1:])
        
        return output


    def __len__(self):

        """

        Returns the length of the dataset. Depends on the state of
        the dataloader (train, val, test).

        """
        
        if self.dataloader.dataset_type == 'train':
            return self.dataloader.len_train
        elif self.dataloader.dataset_type == 'val':
            return self.dataloader.len_val
        elif self.dataloader.dataset_type == 'test':
            return self.dataloader.len_test
        else:
            raise ValueError("Dataset type must be either 'train', 'val' or 'test'")

In [None]:
show_doc(DatasetWrapper, title_level=2)

---

[source](https://github.com/opimwue/ddopai/blob/main/ddopai/utils.py#L160){target="_blank" style="float:right; font-size:smaller"}

## DatasetWrapper

>      DatasetWrapper (dataloader:ddopai.dataloaders.base.BaseDataLoader,
>                      obsprocessors:List=None)

*This class is used to wrap a Pytorch Dataset around the ddopai dataloader
to enable the usage of the Pytorch Dataloader during training. This way,
agents that are trained using Pytorch without interacting with the environment
can directly train on the data generated by the dataloader.*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| dataloader | BaseDataLoader |  | Any dataloader that inherits from BaseDataLoader |
| obsprocessors | List | None | processors (to mimic the environment processors) |

In [None]:
show_doc(DatasetWrapper.__getitem__)

---

[source](https://github.com/opimwue/ddopai/blob/main/ddopai/utils.py#L176){target="_blank" style="float:right; font-size:smaller"}

### DatasetWrapper.__getitem__

>      DatasetWrapper.__getitem__ (idx)

*Get the item at the provided idx.*

In [None]:
show_doc(DatasetWrapper.__len__)

---

[source](https://github.com/opimwue/ddopai/blob/main/ddopai/utils.py#L200){target="_blank" style="float:right; font-size:smaller"}

### DatasetWrapper.__len__

>      DatasetWrapper.__len__ ()

*Returns the length of the dataset. Depends on the state of
the dataloader (train, val, test).*

In [None]:
#| export

class DatasetWrapperMeta(DatasetWrapper):
    """
    This class is used to wrap a Pytorch Dataset around the ddopai dataloader
    to enable the usage of the Pytorch Dataloader during training. This way,
    agents that are trained using Pytorch without interacting with the environment
    can directly train on the data generated by the dataloader.
    
    """

    def __init__(self, 
            dataloader: BaseDataLoader, # Any dataloader that inherits from BaseDataLoader
            draw_parameter_function: callable = None, # function to draw parameters from distribution
            distribution: Literal["fixed", "uniform"] | List = "fixed", # distribution for params during training, can be List for multiple parameters
            parameter_names: List[str] = None, # names of the parameters
            bounds_low: Union[int, float] | List = 0, # lower bound for params during training, can be List for multiple parameters
            bounds_high: Union[int, float] | List = 1, # upper bound for params during training, can be List for multiple parameters
            obsprocessors: List = None # processors (to mimic the environment processors)
            ):

        if isinstance(distribution, list) or isinstance(bounds_low, list) or isinstance(bounds_high, list):
            raise NotImplementedError("Multiple parameters not yet implemented")
        if obsprocessors is None:
            raise ValueError("Obsprocessors must be provided")
        
        self.distribution = [distribution]
        self.bounds_low = [bounds_low]
        self.bounds_high = [bounds_high]
        
        self.dataloader = dataloader

        self.draw_parameter = draw_parameter_function
        self.obsprocessors = obsprocessors

        self.parameter_names = parameter_names
    
    def __getitem__(self, idx):
        """
        Get the item at the provided idx.

        """

        features, demand = self.dataloader[idx] 

        features = np.expand_dims(features, axis=0) # add batch dimension as meta environments also return a batch dimension (needed for obsprocessor)

        params = {}
        for i in range(len(self.distribution)):
            param = self.draw_parameter(self.distribution[0], self.bounds_low[0], self.bounds_high[0], samples=1) # idx always gets a single sample
            params[self.parameter_names[i]] = param
        
        obs = params.copy()
        obs["features"] = features

        for obsprocessor in self.obsprocessors:
            obs = obsprocessor(obs)

        obs = np.squeeze(obs, axis=0) # remove batch dimension after observation has been processed as the pytorch dataloader adds the batch dimension

        return obs, demand, params

In [None]:
#| export
def merge_dictionaries(dict1, dict2):
    """ Merge two dictionaries. If a key is found in both dictionaries, raise a KeyError. """
    for key in dict2:
        if key in dict1:
            raise KeyError(f"Duplicate key found: {key}")
    
    # If no duplicates are found, merge the dictionaries
    merged_dict = {**dict1, **dict2}
    return merged_dict

In [None]:
show_doc(merge_dictionaries, title_level=2)

---

[source](https://github.com/opimwue/ddopai/blob/main/ddopai/utils.py#L280){target="_blank" style="float:right; font-size:smaller"}

## merge_dictionaries

>      merge_dictionaries (dict1, dict2)

*Merge two dictionaries. If a key is found in both dictionaries, raise a KeyError.*

In [None]:
#| export
def set_param(obj,
                name: str, # name of the parameter (will become the attribute name)
                input: Parameter | int | float | np.ndarray | List | None , # input value of the parameter
                shape: tuple = (1,), # shape of the parameter
                new: bool = False, # whether to create a new parameter or update an existing one
                ): 
    """
        Set a parameter for the class. It converts scalar values to numpy arrays and ensures that
        environment parameters are either of the Parameter class of Numpy arrays. If new is set to True, 
        the function will create a new parameter or update an existing one otherwise. If new is set to
        False, the function will raise an error if the parameter does not exist.
    """

    if input is None:
        param = None

    elif isinstance(input, Parameter):
        if input.shape != shape:
            raise ValueError("Parameter shape must be equal to the shape specified for this environment parameter")
        param = input
    
    elif isinstance(input, (int, float, np.integer, np.floating)):
        param = np.full(shape, input)

    elif isinstance(input, list):
        input = np.array(input)
        if input.shape == shape:
            param = input
        elif input.size == 1:  # Handle single-element arrays correctly
            param = np.full(shape, input.item())
        else:
            raise ValueError("Error in setting parameter. Input array must match the specified shape or be a single-element array")

    elif isinstance(input, np.ndarray):
        if input.shape == shape:
            param = input
        elif input.size == 1:  # Handle single-element arrays correctly
            param = np.full(shape, input.item())
        else:
            raise ValueError("Error in setting parameter. Input array must match the specified shape or be a single-element array")
    else:
        raise TypeError(f"Input must be a Parameter, scalar, or numpy array, got {type(input).__name__} with value {input}")

    # set the parameter
    if new:
        if hasattr(obj, name):
            logging.warning(f"Parameter {name} already exists. Overwriting it.")
        setattr(obj, name, param)
    else:
        if not hasattr(obj, name):
            raise AttributeError(f"Parameter {name} does not exist")
        else:
            setattr(obj, name, param)

In [None]:
show_doc(set_param, title_level=2)

---

[source](https://github.com/opimwue/ddopai/blob/main/ddopai/utils.py#L291){target="_blank" style="float:right; font-size:smaller"}

## set_param

>      set_param (obj, name:str,
>                 input:Union[__main__.Parameter,int,float,numpy.ndarray,List,No
>                 neType], shape:tuple=(1,), new:bool=False)

*Set a parameter for the class. It converts scalar values to numpy arrays and ensures that
environment parameters are either of the Parameter class of Numpy arrays. If new is set to True, 
the function will create a new parameter or update an existing one otherwise. If new is set to
False, the function will raise an error if the parameter does not exist.*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| obj |  |  |  |
| name | str |  | name of the parameter (will become the attribute name) |
| input | Union |  | input value of the parameter |
| shape | tuple | (1,) | shape of the parameter |
| new | bool | False | whether to create a new parameter or update an existing one |

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()