# Evolutionary Strategies tests

In [None]:
# from environments.lunarlander import LunarLanderWrapper
import torch
import torch.nn as nn
import numpy as np
from core_algorithms.replay_memory import ReplayMemory
from core_algorithms.utils import calc_smoothness, Episode
from tqdm import tqdm

In [None]:
class LayerNorm(nn.Module):
    """
    Layer normalization module.
    """

    def __init__(self, features, eps=1e-6):
        super().__init__()
        self.gamma = nn.Parameter(torch.ones(features))
        self.beta = nn.Parameter(torch.zeros(features))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.gamma * (x - mean) / (std + self.eps) + self.beta


In [None]:
# test layernorm:
lnorm = LayerNorm(4)
for name, param in lnorm.named_parameters():
    print(name, param)
    print(len(param.shape))
    

In [None]:
class Parameters:
    def __init__(self, conf={}, hidden_size=16, init=True):
        if not init:
            return
        
        use_cuda = False
        if hasattr(conf, 'disable_cuda'):
            use_cuda = not conf.disable_cuda and torch.cuda.is_available()
        
        self.device = torch.device("cuda" if use_cuda else "cpu")
        self.individual_bs = 10000
        
        self.hidden_size = hidden_size
        self.actor_num_layers = 2
        self.activation_layer = 'tanh'
        
        self.state_dim = 6
        self.action_dim = 3

In [None]:
activations = {
    'relu': nn.ReLU(),
    'tanh': nn.Tanh(),
    'leakyRelu': nn.LeakyReLU(),
}

In [None]:

class Actor(torch.nn.Module):
    def __init__(self, args: Parameters, init=False):
        super(Actor, self).__init__()
        self.args = args
        
        self.buffer = ReplayMemory(self.args.individual_bs)
        self.critical_buffer = ReplayMemory(self.args.individual_bs)
        self.h = self.args.hidden_size
        self.L = self.args.actor_num_layers
        self.activation = activations[self.args.activation_layer]
        
        layers = []
        
        # input layer:
        layers.extend([
            nn.Linear(args.state_dim, self.h),
            self.activation,
        ])
        
        # hidden layer(s):
        for _ in range(self.L):
            layers.extend([
                nn.Linear(self.h, self.h),
                LayerNorm(self.h),
                self.activation,
            ])
        
        # output layer:
        layers.extend([
            nn.Linear(self.h, args.action_dim),
            nn.Tanh(),
        ])
        self.net = nn.Sequential(*layers)

    def forward(self, state: torch.tensor):
        return self.net(state)
    
    def select_action(self, state: torch.tensor):
        state = torch.FloatTensor(state.reshape(1, -1)).to(self.args.device)
        return self.forward(state).cpu().data.numpy().flatten()
    
    def extract_parameters(self):
        ''' Extract the parameters of the network and flatten it into a single vector. 
        This is used for the genetic algorithm. 
    
        Returns:
            torch.tensor: Flattened parameters of the network.
        '''
        tot_size = self.count_parameters()
        p_vec = torch.zeros(tot_size, dtype=torch.float32).to(self.args.device)
        i = 0
        for name, param in self.named_parameters():
            if 'lnorm' in name or len(param.shape) != 2:
                continue
            sz = param.numel()
            p_vec[i:i+sz] = param.view(-1)
            i += sz
        return p_vec.detach().clone()
            
    def inject_parameters(self, parameters):
        ''' Inject the parameters into the network. This is used for the genetic algorithm.
        
        Args:
            parameters (torch.tensor): Flattened parameters of the network.
        '''
        i = 0
        for name, param in self.named_parameters():
            if 'lnorm' in name or len(param.shape) != 2:
                continue
            sz = param.numel()
            raw = parameters[i:i+sz]
            reshaped = raw.reshape(param.shape)
            param.data.copy_(reshaped.data)
            i += sz
    
    def count_parameters(self):
        ''' Count the number of parameters in the network.'''
        count = 0
        for name, param in self.named_parameters():
            if 'lnorm' in name or len(param.shape) != 2:
                continue
            count += np.prod(param.shape)
        return count

In [None]:
# test actor:
actor = Actor(Parameters(), init=True)

# for name, param in actor.named_parameters():
#     print(param.data.view(-1))
print(actor.extract_parameters().shape)      

In [None]:
class EvolutionStrategy:
    def __init__(self, actor: Actor, pop_size = 10):
        super(EvolutionStrategy, self).__init__()
        self.pop_size = pop_size
        self.pop = [actor for _ in range(pop_size)]
        
    def ask(self):
        pass
    
    def tell(self, fitness_lst):
        pass
    
    def result(self):
        pass
    

##### Environment


In [None]:
import gymnasium as gym
env = gym.make('LunarLanderContinuous-v2')
seed = 7
torch.manual_seed(seed)
np.random.seed(seed)


In [None]:
params = Parameters()
params.action_dim = env.action_space.shape[0]
params.state_dim = env.observation_space.shape[0]
params.action_dim, params.state_dim

##### Fitness function - Actor evaluation
###### reward + actor action smoothness parameter

In [None]:
def evaluate_agent(agent: Actor, is_action_noise: bool, store_transition: bool):
    """ Evaluate the agent in the environment. One episode 
    is played and the total reward is returned.
    Args:
        agent (Agent): The agent to evaluate.
        is_action_noise (bool): Whether to add action noise.
        store_transition (bool): Whether to store the transition.
    """
    state_lst, rewards, action_lst = [], [], []
    obs, _ = env.reset(seed=seed)
    done = False
    agent.eval()
    while not done:
        action = agent.select_action(obs)
        if is_action_noise:
            action = np.clip(action + np.random.normal(0, 0.1, size=env.action_space.shape[0]), -1.0, 1.0)
        next_obs, reward, done, truncated, _ = env.step(action.flatten())
        action_lst.append(action.flatten())
        rewards.append(reward)
        
        if store_transition:
            # TODO: store transition to a replay buffer:
            transition = (obs, action, next_obs, reward, float(done))
            agent.buffer.push(*transition)
        state_lst.append(obs)
        obs = next_obs
    env.close()
    
    actions = np.asarray(action_lst)
    smoothness = calc_smoothness(actions, plot_spectra=False)
    fitness = np.sum(rewards) + smoothness
    return fitness, smoothness

In [None]:
def fitness_fn(x: torch.tensor, params: Parameters):
    """ fitness function
    Args:
        x: A tensor of shape (lambda, N)
    """
    #TODO: Evaluate the population:
    num_evals = 3
    
    # print(x.shape)
    lamda, N = x.shape
    smoothness_lst = []
    fitness_lst = torch.zeros((num_evals, lamda))
    for j in tqdm(range(lamda), total=lamda, desc='Population Eval', colour='green'):
        for i in range(num_evals):
            agent = Actor(params, init=True)
            agent.inject_parameters(x[j])
            fitness, sm = evaluate_agent(agent=agent, is_action_noise=True, store_transition=False)
            
            smoothness_lst.append(sm)
            fitness_lst[i, j] = fitness
    smoothness_lst = np.asarray(smoothness_lst)
    pop_fitness = fitness_lst.mean(dim=0)
    sm_avg = smoothness_lst.mean()
    sm_sd = smoothness_lst.std()
    return pop_fitness.reshape(-1, 1)
     
    

In [None]:
torch.zeros(5-2)

### CMA-ES: Covariance Matrix Adaptation - Evolution Strategy

In [None]:
import logging

logger = logging.getLogger(__name__)

class CMA(object):
    """ 
    Covariance Matrix Adaptation Evolution Strategy (CMA-ES) implemented with pytorch
    """
    def __init__(
            self,
            initial_solution: torch.tensor,
            initial_step_size,
            fitness_function,
            population_size=None,
            enforce_bounds=None,
            cc=None,
            c_sigma=None,
            c_mu=None,
            c1=None,
            damps=None,
            termination_no_effect=1e-8,
            store_trace=False,
            callback_function=None,
            dtype=torch.float32,
     ):
        """ 
        Args:
            initial_solution: search start point, numpy array
            
            initial_step_size: standard deviation of the covariance matrix
            
            fitness_function: function to be minimized
            
            population_size: number of samples produced at each generation
            
            enforce_bounds: 2D list, the min and max for each dimension. Ensures the fitness function is never called with out of bounds values.:
            
            cc, c_sigma, c_mu, c1, damps: hyper-parameters of CMA-ES
           
            termination_no_effect: termination criterion
            
            store_trace: store trace or not
            
            callback_function: callback function at the end of each generation. Intended for logging purposes.
            
            dtype: data type
        """
        if not isinstance(initial_solution, torch.Tensor):
            raise ValueError("initial_solution must be a torch.Tensor")
        elif initial_solution.dim() != 1:
            ndim=initial_solution.dim()
            raise ValueError("initial_solution must be a 1-D torch.Tensor, but got {}-D".format(ndim))
        elif not np.isscalar(initial_step_size) or initial_step_size <= 0:
            raise ValueError("initial_step_size must be a positive scalar")
        elif not callable(fitness_function):
            raise ValueError("fitness_function must be callable")
        elif population_size is not None and population_size <=4:
            raise ValueError("population_size must be greater than 4")
        elif enforce_bounds is not None and not isinstance(enforce_bounds, (np.ndarray, list)):
            raise ValueError("enforce_bounds must be a list or numpy array")
        elif enforce_bounds is not None and np.ndim(enforce_bounds) != 2:
            raise ValueError("enforce_bounds must be a 2-D list or numpy array")
        elif callback_function is not None and not callable(callback_function):
            raise ValueError("callback_function must be callable")
        
        self.generation=0
        self.initial_solution = initial_solution
        self.initial_step_size = initial_step_size
        self.fitness_fn = fitness_function
        self.population_size = population_size
        self.enforce_bounds = enforce_bounds
        self._cc = cc
        self._csigma = c_sigma
        self._cmu = c_mu
        self._c1 = c1
        self._damps = damps
        self.termination_no_effect = termination_no_effect
        self.store_trace = store_trace
        self.callback_fn = callback_function
        self.dtype = dtype
        self.termination_criterion_met = False
        
        self._initialized = False
        
    def init(self):
        if self._initialized:
            raise ValueError("CMA-ES has already been initialized")
        
        self.generation=0
        self.dimension = self.initial_solution.shape[0]
        self._enforce_bounds = self.enforce_bounds is not None
        self.trace = []
        
        # Solution dimension:
        self.N = torch.tensor(self.dimension, dtype=self.dtype).int()
        # population size:
        if self.population_size is not None:
            self.lamda = torch.tensor(self.population_size, dtype=self.dtype).int()
        else:
            self.lamda = torch.floor(3*torch.log(self.N)+8).int()
        # shape of the population of solutions:
        self.shape = torch.Size([self.lamda, self.N], dtype=torch.int32)
        self.mu = torch.floor(self.lamda/2)
        # recombination weights:
        self.weights = torch.concat([
        torch.log(self.mu+0.5) - torch.log(torch.arange(1, self.mu+1)),
        torch.zeros(((self.lamda-self.mu).int(),), dtype=self.dtype),        
    ], dim=0)
        # normalize and reshape into column matrix:
        self.weights = (self.weights/self.weights.sum()).reshape(-1, 1)
        # variance effective size of mu:
        self.mu_eff = (self.weights.sum()**2)/((self.weights**2).sum())
        # time constant for cumulation for covariance matrix:
        if self._cc is not None:
            self.cc = torch.tensor(self._cc, dtype=self.dtype)
        else:
            self.cc = (4+self.mu_eff/self.N)/(4+self.N+2*self.mu_eff/self.N)
        # Time constant for cumulation for step size control or sigma:
        if self._csigma is not None:
            self.c_sigma = torch.tensor(self._csigma, dtype=self.dtype)
        else:
            self.c_sigma = (self.mu_eff+2)/(self.N+self.mu_eff+5)
        # Learning rate for rank one update of C:
        if self._c1 is not None:
            self.c1 = torch.tensor(self._c1, dtype=self.dtype)
        else:
            self.c1 = 2/((self.N+1.3)**2+self.mu_eff)
        # Learning rate for rank mu update of C:
        if self._cmu is not None:
            self.c_mu = torch.tensor(self._cmu, dtype=self.dtype)
        else:
            self.c_mu =  2*(self.mu_eff-2+1/self.mu_eff)/((self.N+2)**2+self.mu_eff)
        # Damping for sigma usually close to 1:
        if self._damps is not None:
            self.damps = torch.tensor(self._damps, dtype=self.dtype)
        else:
            # self.damps = 1 + self.c_sigma + 2*torch.max(torch.tensor([0, torch.sqrt((self.mu_eff-1)/(self.N+1))-1]), dtype=self.dtype)
            self.damps = 1 + self.c_sigma + 2*torch.maximum(torch.tensor(0), torch.sqrt((self.mu_eff-1)/(self.N+1))-1)
        # Expectation of ||N(0, I)|| == norm(randn(N,1)):
        self.chiN = torch.sqrt(self.N)*(1-1/(4*self.N)+1/(21*self.N**2))
        
        # TODO: define bounds in a format to be fed in torch:
        if self._enforce_bounds:
            bounds = torch.tensor(self.enforce_bounds, dtype=self.dtype)
            self.clip_value_min = bounds[:, 0]
            self.clip_value_max = bounds[:, 1]
            
        
        # ----------------------------------------
        # Trainable Params:
        # ----------------------------------------
        # Mean
        # self.m = torch.tensor(self.initial_solution, dtype=self.dtype)
        self.m = self.initial_solution.clone().detach()
        # Step size or sigma:
        self.sigma = torch.tensor(self.initial_step_size, dtype=self.dtype)
        # Covariance matrix:
        self.C = torch.eye(self.N, dtype=self.dtype)
        # Evolution path for sigma:
        self.p_sigma = torch.zeros(self.N, dtype=self.dtype)
        # Evolution path for C:
        self.p_C = torch.zeros(self.N, dtype=self.dtype)
        # Coordinate system (normalized eigenvectors)
        self.B = torch.eye(self.N, dtype=self.dtype)
        # scaling (square root of the eigenvalues):
        self.D = torch.eye(self.N, dtype=self.dtype)
        
        self._initialized = True
        return self
    
    def search(self, max_generations=500):
        """ 
        Args:
            max_generations: maximum number of generations
            note that the search can be interrupted by the termination criterion
        Returns:
            the best solution and its fitness value:
        """
        if not self._initialized:
            self.init()
        
        # call user defined callback function at generation 0:
        if self.callback_fn is not None:
            self.callback_fn(self, logger)
            
        for _ in range(max_generations):
            self.generation += 1
            print("Generation: ", self.generation)
            # -----------------------
            # (1) Sample a new population of solutions ~ N(m, sigma^2*C)
            # -----------------------
            z = torch.randn(self.shape, dtype=self.dtype) # sample N(0, I)
            y = torch.matmul(z, torch.matmul(self.B, self.D))
            x = self.m + self.sigma * y
            print(x.shape)
            
            penalty = 0.0
            if self._enforce_bounds:
                x_corr = torch.clip(x, self.clip_value_min, self.clip_value_max)
                penalty = torch.norm(x-x_corr)**2
                x = x_corr
                
            # -----------------------
            # (2) Selection and Recombination: Moving the mean:
            # -----------------------
            # Evaluate and sort solutions:
            f_x = self.fitness_fn(x) + penalty
            self.x_sorted = x[f_x.argsort()]
            
            # TODO: store trace if store_trace is True
            if self.store_trace:
                self._store_trace()
            
            # Update the mean as the weighted average of the top-mu solutions:
            x_diff = (self.x_sorted - self.m)
            x_mean = torch.multiply(self.weights, x_diff).sum(axis=0)
            m = self.m + x_mean
            
            # --------------------
            # (3) Adapting the Covariance Matrix:
            # --------------------
            # Update the evolution path for rank-one-update:
            y_mean = x_mean / self.sigma
            p_C = (
                (1-self.cc)*self.p_C + 
                torch.sqrt(self.cc*(2-self.cc)*self.mu_eff) * y_mean
            )
            p_C_matrix = p_C[:, None]
            
            # Compute Rank-mu update:
            C_m = torch.stack([torch.outer(e,e) for e in x_diff/self.sigma])
            y_s = torch.multiply(C_m, self.weights[:,None]).sum(axis=0)
            
            # Combine rank-one and rank-mu update to obtain the new covariance matrix:
            C = (
                (1-self.c1-self.c_mu)*self.C + 
                self.c1 * p_C_matrix * p_C_matrix.T +
                self.c_mu * y_s
            )
            
            # Enforce the symmetry of the covariance matrix:
            # C_upper = 
            # C_upper_no_diag = 
            C = torch.triu(C) + torch.triu(C, 1).T
            
            # ---------------------------------
            # (4) Step size control:
            # ---------------------------------
            # Update the evolution path for sigma:
            D_inv = torch.diag(torch.reciprocal(torch.diag(self.D)))
            C_inv_squared = torch.matmul(torch.matmul(self.B, D_inv), self.B.T)
            C_inv_squared_y = torch.squeeze(torch.matmul(C_inv_squared, y_mean[:, None]))
            p_sigma = (
                (1-self.c_sigma) * self.p_sigma +
                torch.sqrt(self.c_sigma*(2-self.c_sigma)*self.mu_eff) * C_inv_squared_y
            )
            
            # update sigma:
            sigma = self.sigma * torch.exp((self.c_sigma/self.damps)*(torch.norm(p_sigma)/self.chiN - 1))
            
            # -----------------------------------------
            # (5) Update B and D: eigen decomposition:
            # -----------------------------------------
            B, D_squared, _ = torch.svd(C)
            diag_D = torch.sqrt(D_squared)
            D = torch.diag(diag_D)
            
            # -----------------------------------------
            # (6) Assign new variable values:
            # -----------------------------------------
            # Cache computations necessary for the termination criterion:
            self._prev_sigma = self.sigma
            self._prev_D = self.D
            self._diag_D = diag_D
            
            # Assign values:
            self.p_C = p_C
            self.p_sigma = p_sigma
            self.C = C
            self.sigma = sigma
            self.B = B
            self.D = D
            self.m = m
            
            # ------------------------------------------
            # (7) Terminate early if necessary:
            # ------------------------------------------
            self.termination_criterion_met = self.should_terminate()
            
            # Call user defined function last:
            if self.callback_fn is not None:
                self.callback_fn(self, logger)
            
            if self.termination_criterion_met:
                break
        return self.best_solution(), self.best_fitness()
    
    def inject_RL(self, rl_actor):
        pass
    
    def best_solution(self):
        return self.m.detach().numpy()
    
    def best_fitness(self):
        return self.fitness_fn(self.m).detach().numpy()
    
    def should_terminate(self, returns_details=False):
        i = self.generation % self.dimension
        # NoEffectAxis: stop if adding 0.1 stdev in any single principal axis in the direction of C does not change the solution:  
        m_nea = self.m + 0.1 * self.sigma * torch.squeeze(self._diag_D[i] * self.B[i, :])
        m_nea_diff = (self.m - m_nea).abs()
        no_effect_axis = torch.all(m_nea_diff < self.termination_no_effect)
        
        # NoEffectCoord: stop if adding 0.2 stdev in any single coordinate does not change the solution:
        m_nec = self.m + 0.2 * self.sigma * torch.diag(self.C)
        m_nec_diff = (self.m - m_nec).abs()
        no_effect_coord = torch.any(m_nec_diff < self.termination_no_effect)
        
        # ConditionCov: stop if the condition number of the covariance matrix exceeds 10^14:
        max_D = torch.max(self._diag_D)
        min_D = torch.min(self._diag_D)
        condition_number = max_D**2 / min_D**2
        condition_cov = condition_number > 1e14
        
        # TolXUup: stop if sigma * max(D) increased by more than 10^4:
        # this usually indicates a far too small initial sigma or divergent behavior.
        prev_max_D = torch.max(torch.diag(self._prev_D))
        tol_x_up_diff = (self.sigma * max_D - self._prev_sigma * prev_max_D).abs()
        tol_x_up = tol_x_up_diff > 1e4
        
        do_terminate = no_effect_axis or no_effect_coord or condition_cov or tol_x_up
        
        if not returns_details:
            return do_terminate
        else:
            return (
                do_terminate,
                dict(
                    no_effect_axis=bool(no_effect_axis.numpy()),
                    no_effect_coord=bool(no_effect_coord.numpy()),
                    condition_cov=bool(condition_cov.numpy()),
                    tol_x_up=bool(tol_x_up.numpy()),
                )
            )
    
    def reset(self):
        self._initialized = False
        return self.init()
    
    def _store_trace(self):
        self.trace.append({
            'm': self.m.detach().numpy(),
            'sigma': self.sigma.detach().numpy(),
            'C': self.C.detach().numpy(),
            'p_sigma': self.p_sigma.detach().numpy(),
            'p_C': self.p_C.detach().numpy(),
            'B': self.B.detach().numpy(),
            'D': self.D.detach().numpy(),
            'population': self.x_sorted.detach().numpy(),
        })
        
        
                
        

In [None]:
torch.tensor(4).int()

### Test CMA-ES on Lunar Lander Env

In [None]:
def callback_fn(cma: CMA, logger):
    if cma.generation % 10 == 0:
        logger.info(f'Generation {cma.generation}: {cma.best_fitness()}') 

In [None]:
sln = Actor(args=params, init=True).extract_parameters()
type(sln), sln.dim()

##### Initialization


In [None]:
np.random.seed(seed)
torch.manual_seed(seed)
initial_solution = torch.randn(sln_dim)
initial_step_size = 1.0



In [None]:
initial_solution.dim()

In [None]:
cma = CMA(
    initial_solution=initial_solution,
    initial_step_size=initial_step_size,
    fitness_function=fitness_fn,
    # callback_function=callback_fn,
    dtype=torch.float32,
)

In [None]:
cma.init()
cma.lamda

In [None]:
cma.search(max_generations=10)

In [None]:
import numpy as np
import torch


def compute_weight_decay(weight_decay, model_params):
    model_params_grid = torch.tensor(model_params)
    return -weight_decay * torch.mean(model_params_grid * model_params_grid, dim=1)


class CMAES:
    '''CMA-ES algorithm implementation (Wrapper)'''

    def __init__(self, num_params, sigma_init=0.10, pop_size=20, weight_decay=0.01):
        self.num_params = num_params
        self.sigma_init = sigma_init
        self.pop_size = pop_size
        self.weight_decay = weight_decay
        self.solutions = None

        import cma
        self.es = cma.CMAEvolutionStrategy(
            self.num_params * [0], self.sigma_init, {'popsize': self.pop_size})

    def rms_stdev(self):
        sigma = self.es.result[6]
        return np.mean(np.sqrt(sigma*sigma))

    def ask(self):
        '''Returns a list of solutions'''
        self.solutions = np.array(self.es.ask())
        return torch.tensor(self.solutions)

    def tell(self, fitness_list):
        fitness_table = torch.tensor(np.array(fitness_list))
        if self.weight_decay > 0:
            l2_decay = compute_weight_decay(self.weight_decay, self.solutions).reshape(-1,1)
            print(l2_decay.shape)
            fitness_table += l2_decay
        # convert minimizer to maximizer:
        self.es.tell(self.solutions,
                     (-fitness_table).tolist())

    def current_param(self):
        return self.es.result[5]  # mean solution, presumably better with noise

    def set_mu(self):
        pass

    def best_param(self):
        return self.es.result[0]  # best evaluated solution

    def result(self):
        '''return the best params so far along with historically best reward, current reward, sigma'''
        r = self.es.result
        return (r[0], -r[1], -r[1], r[6])


In [None]:
from parameters import Parameters


In [None]:
from core_algorithms.cmaes import CMAES
from core_algorithms.cmaes import fitness_function


In [None]:
params = Parameters(hidden_size=64)
params.action_dim = env.action_space.shape[0]
params.state_dim = env.observation_space.shape[0]
num_params = Actor(args=params, init=True).count_parameters()

In [None]:
lamda = torch.floor(3*torch.log(torch.tensor(num_params))+8).int()
num_params, lamda

In [None]:
strategy = CMAES(num_params, pop_size=10, weight_decay=0.01)

In [None]:
sample_sln = strategy.ask()

In [None]:
sample_sln.shape

In [None]:
fx = fitness_fn(sample_sln, params)

In [None]:
fx

In [None]:
strategy.tell(fx)

In [None]:
strategy.best_param().shape

In [None]:
sltn = Actor(args=params, init=True)
sltn.inject_parameters(torch.tensor(strategy.best_param()))

In [None]:
sltn.parameters

In [None]:
pop_fit = []
max_gen = 200
generation = 0
for _ in range(max_gen):
    generation = generation + 1
    print("Generation: ", generation)
    solution = strategy.ask()
    fx = fitness_fn(solution, params)
    max_fit = fx.max().detach().numpy()
    pop_fit.append(max_fit)
    print("maximum fitness: ", max_fit)
    strategy.tell(fx)
    

In [None]:
import matplotlib.pyplot as plt


In [None]:
plt.plot(pop_fit)

In [None]:
c = torch.randn((3,3))
c

In [None]:
u, s, v = torch.svd(c)
u, s, v

In [None]:
u @ u.T

In [None]:
torch.diag(torch.reciprocal(torch.diag(c)))

In [None]:
c_up = torch.triu(c)
torch.triu(c) + torch.triu(c, 1).T

In [None]:
x_diff = torch.tensor([[1, 2, 3], [4, 5, 6]])

C_m = torch.stack([torch.outer(e, e) for e in x_diff/2])

C_m

In [None]:
w = w.reshape(-1, 1)
w[:, None]

In [None]:
torch.multiply(C_m, w[:, None]).sum(axis=0)


In [None]:
lamda = torch.tensor(24)
mu=torch.floo

weights = torch.concat([
    torch.log(mu+0.5) - torch.log(torch.arange(1, mu+1)),
    torch.zeros(size=(lamda-mu,), dtype=torch.float32),
], dim=0)
weights

In [None]:
from environments.config import select_env


In [None]:
ph_env = select_env("Phlab_attitude_nominal")

In [None]:
obs =ph_env.observation_space.sample()

In [None]:
obs

In [None]:
obs_bar = obs + np.random.normal(0, 0.05, size=obs.shape)

In [None]:
import torch.nn.functional as F

In [None]:
l = 0.1 * F.mse_loss(torch.tensor(obs), torch.tensor(obs_bar))
l.item()

In [None]:
w = weights/weights.sum()
w.reshape(-1, 1)

In [None]:
x = torch.tensor([[4, 5, 6], [1, 2, 3], [7, 8, 9]])
x

In [None]:
x - torch.tensor([1, 2, 3])

In [None]:
weights = torch.tensor([0.1, 0.2, 0.3]).reshape(-1, 1)
torch.multiply(weights, x).sum(axis=0)

In [None]:
def fit_fn(x):
    return torch.sum(x**2, dim=1)

# x = torch.randn((10, 4))
# x

In [None]:
f_x = fit_fn(x)
f_x

In [None]:
len(x.shape)

In [None]:
idx = f_x.argsort(descending=True)
idx

In [None]:
x[idx], x

In [None]:
# torch.gather(x[:], 1, torch.argsort(f_x))
x.gather(0, torch.argsort(f_x, 0).reshape(-1, 1))

In [None]:
from parameters_es import ESParameters

In [None]:
class RLNN(nn.Module):
    '''Base Class for all RL Neural Networks.'''

    def __init__(self, args: ESParameters):
        super(RLNN, self).__init__()
        self.args = args

    def forward(self, state: torch.tensor):
        raise NotImplementedError

    def select_action(self, state):
        state = torch.FloatTensor(state.reshape(1, -1)).to(self.args.device)
        return self.forward(state).cpu().data.numpy().flatten()

    def extract_parameters(self):
        ''' Extract the parameters of the network and flatten it into a single vector.
        This is used for the genetic algorithm.

        Returns:
            torch.tensor: Flattened parameters of the network.
        '''
        tot_size = self.count_parameters()
        p_vec = torch.zeros(tot_size, dtype=torch.float32).to(self.args.device)
        i = 0
        for name, param in self.named_parameters():
            if 'lnorm' in name or len(param.shape) != 2:
                continue
            sz = param.numel()
            p_vec[i:i+sz] = param.view(-1)
            i += sz
        return p_vec.detach().clone()

    def inject_parameters(self, parameters):
        ''' Inject the parameters into the network. This is used for the genetic algorithm.

        Args:
            parameters (torch.tensor): Flattened parameters of the network.
        '''
        i = 0
        for name, param in self.named_parameters():
            if 'lnorm' in name or len(param.shape) != 2:
                continue
            sz = param.numel()
            raw = parameters[i:i+sz]
            reshaped = raw.reshape(param.shape)
            param.data.copy_(reshaped.data)
            i += sz

    def count_parameters(self):
        ''' Count the number of parameters in the network.'''
        count = 0
        for name, param in self.named_parameters():
            if 'lnorm' in name or len(param.shape) != 2:
                continue
            count += np.prod(param.shape)
        return count

    def get_grads(self):
        pass


In [None]:
class RNN_Actor(RLNN):
    def __init__(self, params: ESParameters, rnn_type='LSTM'):
        super(RNN_Actor, self).__init__(params)
        self.args = params
        self.h = self.args.actor_hidden_size
        self.L = self.args.actor_num_layers
        activation = activations[self.args.activation_actor.lower()]
        self.critical_buffer = ReplayMemory(self.args)
        self.rnn = None
        in_layer = []
        # input layer:
        in_layer.extend([
            nn.Linear(self.args.state_dim, self.h),
            LayerNorm(self.h),
            activation,
        ])

        # hidden RNN layers:
        if rnn_type == 'LSTM':
            self.rnn = nn.LSTM(self.h, self.h, num_layers=self.L)
        else:
            self.rnn = nn.RNN(self.h, self.h, num_layers=self.L)

        # output layer:
        out_layer = []
        out_layer.extend([
            nn.Linear(self.h, self.args.action_dim),
            nn.Tanh(),
        ])
        self.in_net = nn.Sequential(*in_layer)
        self.ou_net = nn.Sequential(*out_layer)
        self.to(self.args.device)
    def forward(self, state: torch.tensor):
        h = self.in_net(state)
        out, _ = self.rnn(h)
        return self.ou_net(out)

    


In [None]:
params = ESParameters()
params.action_dim=3
params.state_dim=6
params.device='cpu'
params.actor_hidden_size=40
params.actor_num_layers=1

In [None]:
actor_lstm = RNN_Actor(params, rnn_type='LSTM')
actor_rnn = RNN_Actor(params, rnn_type='RNN')

In [None]:
actor_lstm.count_parameters(), actor_rnn.count_parameters()

In [None]:
actor_rnn

In [None]:
obs

In [None]:
actor_rnn.select_action(obs), actor_lstm.select_action(obs)

In [None]:
from environments.config import select_env

In [None]:
env = select_env("Phlab_attitude_nominal")

In [None]:
env.action_space.sample()

In [3]:
from core_algorithms.actor_model import Actor
from parameters_es import ESParameters

In [4]:
conf = {
    'use_td3': True,
}



import torch
params = ESParameters(conf=conf, init=True)
params.state_dim = 6
params.action_dim = 3
params.device = torch.device("cpu")

Current device: cuda


In [5]:
params.actor_hidden_size = 40

In [4]:
actor = Actor(args=params, init=True)

In [5]:
print(actor)

Actor(
  (activation): Tanh()
  (net): Sequential(
    (0): Linear(in_features=6, out_features=40, bias=True)
    (1): LayerNorm()
    (2): Tanh()
    (3): Linear(in_features=40, out_features=40, bias=True)
    (4): LayerNorm()
    (5): Tanh()
    (6): Linear(in_features=40, out_features=40, bias=True)
    (7): LayerNorm()
    (8): Tanh()
    (9): Linear(in_features=40, out_features=3, bias=True)
    (10): Tanh()
  )
)


In [6]:
actor.count_parameters()

3560

In [7]:
from core_algorithms.actor_model import CriticTD3

In [8]:
critic = CriticTD3(args=params)

In [9]:
critic.count_parameters()

4800

In [10]:
new_d = dict(
    actor_hidden_size=64,
    actor_num_layers=3,
    critic_hidden_size=[64, 128],
)

In [11]:
params.stdout()

{'activation_actor': 'tanh',
 'activation_critic': 'tanh',
 'actor_hidden_size': 64,
 'actor_lr': 0.001,
 'actor_num_layers': 3,
 'batch_size': 100,
 'critic_hidden_size': [32, 64],
 'critic_lr': 0.001,
 'gamma': 0.99,
 'mem_size': 1000000,
 'n_evals': 2,
 'n_generations': 100,
 'noise_sd': 0.33,
 'pop_size': 10,
 'save_foldername': './logs',
 'seed': 7,
 'use_caps': False}


In [12]:
params.update_from_dict(new_d)
params.stdout()

{'activation_actor': 'tanh',
 'activation_critic': 'tanh',
 'actor_hidden_size': 64,
 'actor_lr': 0.001,
 'actor_num_layers': 3,
 'batch_size': 100,
 'critic_hidden_size': [64, 128],
 'critic_lr': 0.001,
 'gamma': 0.99,
 'mem_size': 1000000,
 'n_evals': 2,
 'n_generations': 100,
 'noise_sd': 0.33,
 'pop_size': 10,
 'save_foldername': './logs',
 'seed': 7,
 'use_caps': False}


In [15]:
import numpy as np
np.deg2rad(10)

0.17453292519943295