In [None]:
import random
import numpy as np
import gymnasium
from gymnasium import spaces
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib


In [1]:
class Multilayerthinfilm(gymnasium.Env):
    def __init__(self, 
                 N: np.array, 
                 maximum_layers: int, 
                 target: dict, 
                 weights: np.array = None, 
                 normalization: bool = True, 
                 sparse_reward: bool = True, 
                 max_thickness: float = 150e-9, 
                 min_thickness: float = 10e-9, 
                 work_path: str = '') -> None:
        """
        Parameters:
        -----------
        N : np.array of shape [M]
            where M is the number of available materials
        maximum_layers : integer
            maximum_layers defines the maximum number of layers to stack
        target : dictionary
            with keys 'spectrum' and 'target'
            target['spectrum'] holds the spectrum [m] under consideration and is of shape S
            target['target'] holds the target reflectivity of a MLTF and is of shape S
        weights : np.array of same shape S
            This array allows you to steer the pixels' relative influence on the optimization/reward
        normalization : bool
            Determines whether to exponentially transform the reward or not (Look publication for details)
        sparse_reward : bool
            Determines whether to simulate and reward each intermediate stack or only the final stack
        max_thickness : float
            Determines the maximum layer thickness in meters.
        min_thickness : float
            Determines the minimum layer thickness in meters.
        work_path : str
            Path to the working directory, e.g., to save images
        """

        self.N = N
        self.maximum_layers = maximum_layers
        # target-related:
        # wavelength range
        self.wl = target['spectrum']
        # desired value for reflectivity (pixel-wise)
        self.target = target['target']
        if weights is None:
            self.weights = np.ones_like(self.target)
        else:
            self.weights = weights
            assert weights.shape[0] == self.target.shape[0], 'Shape of weights and target must coincide!'
            assert np.all(weights >= 0), 'Weights are supposed to be non-negative!'
            if np.all(weights == 0):
                self.weights = np.ones_like(self.target)
                print('All weights were zero -> if nothing is important quit optimization; we set each weight to one for you ;)...')
        self.weights = self.weights / np.max(self.weights)

        self.work_path = work_path
        # borders for thicknesses:
        self.max_thickness = max_thickness
        self.min_thickness = min_thickness

        # initialization for some attributes:
        self.number_of_materials = N.shape[0]
        self.simulation = np.nan * np.zeros_like(self.target)
        self.reward = None
        self.old_reward = None
        self._reward_track = []
        self.layers = []
        self._initial_nmb_layers = 0

        self.n = []
        self.d = []
        self.f = None
        self.axs = None
        # OpenAI/Gym related settings:
        # action space:
        space_list = [spaces.Discrete((self.number_of_materials + 1))]
        for space in range(self.number_of_materials + 1):
            space_list.append(spaces.Box(low=0, high=1, shape=(1,)))
        self.action_space = spaces.Tuple(space_list)

        # simulation state space:
        self.observation_space = spaces.Box(low=0, high=1, shape=((self.number_of_materials + 1) * maximum_layers, ), dtype=np.float64)

        
    def set_cladding(self):
        
        self.n_substrate = np.ones((1, self.wl.shape[0]))
        self.d_substrate = np.array([np.inf]).squeeze()
        self.n_ambient = np.ones((1, self.wl.shape[0]))
        self.d_ambient = np.array([np.inf]).squeeze()
        print('--- cladding set to default values ---')
        if np.iscomplex(self.n_substrate[0, :]).any():
            self.n_substrate[0, :] = np.real(self.n_substrate[0, :])
            print('n_substrate must feature real-valued refractive indices in the first layer for computational/physical reasons (TMM); adopted via np.real()')
        if np.iscomplex(self.n_ambient[-1, :]).any():
            self.n_ambient[-1, :] = np.real(self.n_ambient[-1, :])
            print('n_ambient must feature real-valued refractive indices in the last layer for computational/physical reasons (TMM); adopted via np.real()')
        assert not np.iscomplex(self.n_substrate[0, :]).any(), 'n_substrate must feature real-valued refractive indices in the first layer for computational/physical reasons (TMM)..'
        assert not np.iscomplex(self.n_ambient[-1, :]).any(), 'n_ambient must feature real-valued refractive indices in the last layer for computational/physical reasons (TMM)..'
        self.d_substrate = self.d_substrate.reshape(-1, 1)
        self.d_ambient = self.d_ambient.reshape(-1, 1)
        print('cladding set....')
        
        

    def step(self, action):
        
        """
        This method implements the conduction of an action in the environment. Namely, to stack a layer of a
        particular thickness on top of an existing stack.

        Args:
            action: np.array of shape [2]
                action[0 holds the material chosen by the agent as an integer value. Note that 0 leads to
                termination of stacking. action[1] is a float between 0 and 1 and encodes the assigned thickness.

        Returns:
            observation: np.array
                The new state of the environment after taking the action.
            reward: float
                The reward obtained from the action.
            done: bool
                A flag that determines whether the episode is done or not.
            info: dict
                Additional information (empty in this case).
        """

        done = False
        self.old_reward = self.reward

        if action[0] == 0 or len(self.layers) >= self.maximum_layers:
            done = True
        else:
            self.layers.append(int(action[0]))
            n_layer = self.N[int(action[0] - 1), :].reshape(1, -1)
            d_layer = self.denormalize_thickness(action[1])
            self.n.append(n_layer)
            self.d.append(d_layer)

        cladded_n, cladded_d = self.stack_layers()
        self.simulation = self.simulate(cladded_n, cladded_d)
        self.reward = 0

        if done or not self.sparse_reward:
            self.reward, mse = self.reward_func(self.simulation, self.target, self.weights, self.baseline_mse, self.normalization)
            if done:
                self._reward_track.append(mse)  # track reward to compute baseline

        if np.all(np.isnan(self.simulation)):
            print('All simulated values in TMM are NaN!')

        observation = [self.simulation, self.n, self.d, self.one_hot_layer_status()]
        handback_reward = self.reward

        return observation, handback_reward, done, {}
    
    def one_hot_layer_status(self):
        one_hot_vectors = []
        for layer in range(self.maximum_layers):
            one_hot_vector = np.zeros((self.number_of_materials + 1))
            if layer < len(self.layers):
                one_hot_vector[int(self.layers[layer])] = 1 * self.normalize_thickness(self.d[layer])
            one_hot_vectors.append(one_hot_vector)
        one_hot_vectors = np.hstack(one_hot_vectors)
        return one_hot_vectors
    
    
    
    
    
    

    def reset(self):
        """
        This method implements the reset of the environment to an initial state. However, to ease exploration,
        a defined number of layers can be stacked before handing it to the agent.

        Args:
            self

        Returns:
            observation: np.array
                The initial state of the environment after resetting.
            reward: float
                The initial reward.
            done: bool
                A flag that determines whether the episode is done or not (empty list in this case).
            info: dict
                Additional information (empty in this case).
        """
        self.layers = []
        self.n = []
        self.d = []

        if self._initial_nmb_layers > 0:
            num_layers = random.randint(1, self._initial_nmb_layers - 1)
            for _ in range(num_layers):
                rnd_material_idx = random.randint(0, self.number_of_materials - 1)
                rnd_material_d = random.uniform(0, 1)
                self.layers.append(rnd_material_idx + 1)
                n_layer = self.N[rnd_material_idx].reshape(1, -1)
                d_layer = (self.max_thickness - self.min_thickness) * rnd_material_d + self.min_thickness
                self.n.append(n_layer)
                self.d.append(d_layer)

        cladded_n, cladded_d = self.stack_layers()
        self.simulation = self.simulate(cladded_n, cladded_d)

        self.reward = 0
        if not self.sparse_reward:
            self.reward, _ = self.reward_func(self.simulation, self.target, self.weights, self.baseline_mse, self.normalization)

        one_hot_status = self.one_hot_layer_status()

        return [self.simulation, self.n, self.d, one_hot_status], self.reward, False, {}
    
    
    
    
    
    
    
  

    def render_target(self):
        assert self.wl.shape[0] > 1, 'No rendering for a single wavelength!'

        f_target, axs_target = plt.subplots(nrows=1, ncols=2)

        xaxis = np.linspace(0, self.wl.shape[0], 10, dtype=int)
        xtickslabels = np.linspace(np.min(self.wl * 10 ** 9), np.max(self.wl * 10 ** 9), 10, dtype=int)

        # Target
        plt.sca(axs_target[0])
        plt.plot(self.target.squeeze())
        plt.xticks(xaxis, xtickslabels)
        plt.xlabel('Wavelength [nm]')
        plt.ylabel(self.mode + ' [1]')
        plt.ylim([0, 1.05])
        plt.title('Target')

        # Weights
        plt.sca(axs_target[1])
        plt.plot(self.weights.squeeze())
        plt.xticks(xaxis, xtickslabels)
        plt.xlabel('Wavelength [nm]')
        plt.ylabel('Weight [1]')
        plt.ylim([0, 1.05 * np.max(self.weights)])
        plt.title('Weights')

        plt.tight_layout()
        plt.show(block=False)
        plt.pause(0.1)

        return [f_target, axs_target]

    
    
    def simulate(self, n, d):
        """
        This method implements the simulation of the reflectivity of a particular stack. The TMM and its
        parallelization is based on previous work of Alexander Luce.

        Args:
            n: np.array of shape [(Sub + L + Am) x S]
                n holds the refractive indices of L stacked layers by the agent, including substrate and ambient.
            d: np.array of shape Sub + L + Am
                d holds the thicknesses in meters of the layers

        Returns:
            r: np.array of shape [D x S]
                r holds the pixel-wise reflectivity values for the directions and wavelengths under consideration
        """
        result_dicts = tmm('s', n, d, (np.pi/180)*self.angle, self.wl)
        result_dictp = tmm('p', n, d, (np.pi/180)*self.angle, self.wl)
        if self.mode == 'reflectivity':
            rs = result_dicts['R']
            rp = result_dictp['R']
            r = (rs + rp) / 2
            return r
        else:
            ts = result_dicts['T']
            tp = result_dictp['T']
            t = (ts + tp) / 2
            return t

        
    def create_action(self, mat_number, thickness, is_normalized=True):
        if not is_normalized:
            normalized_thickness = (thickness - self.min_thickness) / (self.max_thickness - self.min_thickness)
        else:
            normalized_thickness = thickness
        action = (mat_number, np.array([normalized_thickness]))
        return action


    def create_stack(self, material_list, thickness_list=None):
        if thickness_list is not None:
            t = np.array(thickness_list)
        else:
            t = np.empty(0)
        n = []
        for material in material_list:
            n.append(self.N[material-1, :])
        n = np.vstack(n)
        dictionary = {'n': n, 'd': t}
        return n, t, dictionary
    
    
    def stack_layers(self, d_array=None, n_array=None):
        """
        This method clads the stack suggested by the agent with the pre-defined cladding.
        The returned arrays n, d describe a particular stack, it includes the cladding.
        """

        if n_array is not None:
            n_list = list(n_array)
        else:
            n_list = self.n
        if d_array is not None:
            d_list = list(d_array)
        else:
            d_list = self.d

        if len(n_list) != 0:
            cladded_n = np.vstack(n_list)
            cladded_d = np.vstack(d_list).reshape(-1, 1)
            cladded_n = np.vstack((self.n_substrate, cladded_n))
            cladded_d = np.vstack((self.d_substrate, cladded_d))
            cladded_n = np.vstack((cladded_n, self.n_ambient))
            cladded_d = np.vstack((cladded_d, self.d_ambient))
        else:
            cladded_n = np.vstack((self.n_substrate, self.n_ambient))
            cladded_d = np.vstack((self.d_substrate, self.d_ambient))
        return cladded_n.squeeze(), cladded_d.squeeze()
    
    
    def steps_made(self):
        """
        Returns the number of steps made in the environment
        """
        return len(self.layers)

    def reset_reward_track(self):
        """
        To reset private property _reward_track.
        """
        self._reward_track = []

    def set_initial_layers(self, nmb_of_initial_layers):
        """
        Setter for the private property that specifies an initial number of layers during environment reset
        """
        if nmb_of_initial_layers > self.maximum_layers:
            raise ValueError("Initial number of layers already exceeds the total number of allowed layers!")
        self._initial_nmb_layers = nmb_of_initial_layers
        
        
        
            @property
    def baseline_mse(self):
        """
        Returns the baseline mse for reward computation/transformation (See publication for details)
        """
        if len(self._reward_track) == 0:
            return 0.4
        else:
            return 0.4  # You can update this with your desired baseline MSE value.

    @property
    def num_layers(self) -> float:
        """
        Returns the explicit number of layers of a stack
        """
        if len(self.layers) == 0:
            return 0
        else:
            counter = 0
            prev_layer = -1
            for layer in self.layers:
                if not layer == prev_layer:
                    counter += 1
                    prev_layer = layer
            return counter

    @staticmethod
    def reward_func(reflectivity, target, weights=None, baseline_mse=1.0, normalization=False, low_reward=0.01, high_reward=1.0):
        """
        An unconstrained reward computation based on the observed reflectivity and the given target.
        """
        if weights is None:
            weights = np.ones_like(target)
        else:
            assert np.all(weights >= 0), 'weights are supposed to be non-negative!'
        temp = np.abs(reflectivity - target) * weights
        temp[weights == 0] = np.nan
        baseline_error = np.nanmean(temp)
        if normalization:
            assert low_reward > 0, 'low_rewards needs to be non-negative!'
            highest_measureable_reward = high_reward
            lowest_measureable_reward = low_reward  # > 0
            a = highest_measureable_reward
            b = np.log(lowest_measureable_reward / highest_measureable_reward) / baseline_mse
            reward = a * np.exp(b * baseline_error)
        else:
            reward = np.exp(-baseline_error)

        return reward, baseline_error







        
        
       


NameError: name 'gymnasium' is not defined