## Import required libraries

In [None]:
import numpy as np
import scipy as sp
from numpy.linalg import LinAlgError

## Parameters Dictionary

In [None]:
class gen_param_dict(object):
    '''
    Init function not required. Currently there for debugging.
        init: define the class constructor of with no arguments
    '''
    def __init__(self, verbose=True):
        self.verbose = verbose
        if self.verbose:
            print('-'*25)
            print('Call to the class gen_param_dict has started')
            print('-'*25)
        
    def gen_param_ALG(self, inner_iterations=1, min_eta=1e-5, kl_step=0.2, min_step_mult=0.01,
                     max_step_mult=10.0, min_mult=0.1, max_mult=5.0, initial_state_var=1e-6, 
                     init_traj_distr=None, traj_opt=None, max_ent_traj=0.0, dynamics=None, 
                     cost=None, sample_on_policy=False, fit_dynamics=True):
        # Algorithm
        ALG = {
            # Number of iterations.
            'inner_iterations': inner_iterations,  
            # Minimum initial lagrange multiplier in DGD for trajectory optimization.
            'min_eta': min_eta,        
            'kl_step': kl_step,
            'min_step_mult': min_step_mult,
            'max_step_mult': max_step_mult,
            'min_mult': min_mult,
            'max_mult': max_mult,
            # Trajectory settings.
            'initial_state_var': initial_state_var,
            # A list of initial LinearGaussianPolicy objects for each condition.
            'init_traj_distr': init_traj_distr, 
            # Trajectory optimization.
            'traj_opt': traj_opt,
            # Weight of maximum entropy term in trajectory optimization.
            'max_ent_traj': max_ent_traj,
            # Dynamics hyperaparams.
            'dynamics': dynamics,
            # Costs.
            'cost': cost,  # A list of Cost objects for each condition.
            # Whether or not to sample with neural net policy (only for badmm/mdgps).
            'sample_on_policy': sample_on_policy,
            # Inidicates if the algorithm requires fitting of the dynamics.
            'fit_dynamics': fit_dynamics,    
        }
        return ALG
    
    def gen_param_ALG_BADMM(self, inner_iterations=4.0, policy_dual_rate=0.1, policy_dual_rate_covar=0.0,
                           fixed_lg_step=0.0, lg_step_schedule=10.0, ent_reg_schedule=0.0, init_pol_wt=0.01,
                           policy_sample_mode='add', exp_step_increase=2.0, exp_step_decrease=0.5, 
                            exp_step_upper=0.5, exp_step_lower=1.0): 
        # Algorithm BADMM
        ALG_BADMM = {
            'inner_iterations': inner_iterations,
            'policy_dual_rate': policy_dual_rate,
            'policy_dual_rate_covar': policy_dual_rate_covar,
            'fixed_lg_step': fixed_lg_step,
            'lg_step_schedule': lg_step_schedule,
            'ent_reg_schedule': ent_reg_schedule,
            'init_pol_wt': init_pol_wt,
            'policy_sample_mode': policy_sample_mode,
            'exp_step_increase': exp_step_increase,
            'exp_step_decrease': exp_step_decrease,
            'exp_step_upper': exp_step_upper,
            'exp_step_lower': exp_step_lower,
        }
        return ALG_BADMM
    
    def gen_param_TRAJ_OPT_LQR(self, del0=1e-4, eta_error_threshold=1e16, min_eta=1e-8, max_eta=1e16,
                              cons_per_step=False, use_prev_distr=False, update_in_bwd_pass=True):
        TRAJ_OPT_LQR = {
            'del0': del0,
            'eta_error_threshold': eta_error_threshold,
            'min_eta': min_eta,
            'max_eta': max_eta,
            # Whether or not to enforce separate KL constraints at each time step.
            'cons_per_step': cons_per_step,  
            # Whether or not to measure expected KL under the previous traj distr.
            'use_prev_distr': use_prev_distr,  
            # Whether or not to update the TVLG controller during the bwd pass.
            'update_in_bwd_pass': update_in_bwd_pass,  
        }
        return TRAJ_OPT_LQR
    
    def gen_param_INIT_LG_LQR(self, init_var=1.0, stiffness=1.0, stiffness_vel=0.5,
                             final_weight=1.0):
        # Initial Linear Gaussian Trajectory distribution, LQR-based initializer.
        INIT_LG_LQR = {
            'init_var': init_var,
            'stiffness': stiffness,
            'stiffness_vel': stiffness_vel,
            'final_weight': final_weight,
            # Parameters for guessing dynamics
            # dU vector of accelerations, default zeros.
            'init_acc': [],  
            # dU vector of gains, default ones.
            'init_gains': [],  
        }
        return INIT_LG_LQR
    
    def gen_param_POLICY_PRIOR(self, strength=1e-4):
        # PolicyPrior
        POLICY_PRIOR = {
            'strength': strength,
        }
        return POLICY_PRIOR
    
    def gen_param_POLICY_PRIOR_GMM(self, min_samples_per_cluster=20.0, max_clusters=50.0, max_samples=20.0,
                                  strength=1.0):
        # PolicyPriorGMM
        POLICY_PRIOR_GMM = {
            'min_samples_per_cluster': min_samples_per_cluster,
            'max_clusters': max_clusters,
            'max_samples': max_samples,
            'strength': strength,
        }
        return POLICY_PRIOR_GMM
    
    def gen_param_COST_STATE(self, RAMP_CONSTANT, l1=0.0, l2=1.0, alpha=1e-2, wp_final_multiplier=1.0, 
                            target_state=None, wp=None):
        # CostState
        COST_STATE = {
            # How target cost ramps over time.
            'ramp_option': RAMP_CONSTANT,  
            'l1': l1,
            'l2': l2,
            'alpha': alpha,
            # Weight multiplier on final time step.
            'wp_final_multiplier': 1.0,  
            'data_types': {
                'JointAngle': {
                    # Target state - must be set.
                    'target_state': target_state,  
                    # State weights - must be set.
                    'wp': wp,  
                },
            },
        }
        return COST_STATE
    
    def gen_param_COST_SUM(self, costs=[], weights=[]):
        # CostSum
        COST_SUM = {
            # A list of hyperparam dictionaries for each cost.
            'costs': costs,  
            # Weight multipliers for each cost.
            'weights': weights,  
        }
        return COST_SUM
    
    def gen_param_COST_ACTION(self, wu=np.array([])):
        # CostAction
        COST_ACTION = {
            # Torque penalties, must be 1 x dU numpy array.
            'wu': np.array([]),  
        }
        return COST_ACTION
        


## Required class for different functions

### Class General Utilities

In [None]:
class general_utlis(object):
    '''
    Init function not required. Currently there for debugging.
        init: define the class constructor of with no arguments
    '''
    def __init__(self, verbose=True):
        self.verbose = verbose
        if self.verbose:
            print('-'*25)
            print('Call to the class general_utlis has started')
            print('-'*25)
    
    def check_shape(self, value, expected_shape, name=''):
    """
    Throws a ValueError if value.shape != expected_shape.
    Args:
        value: Matrix to shape check.
        expected_shape: A tuple or list of integers.
        name: An optional name to add to the exception message.
    """
    if value.shape != tuple(expected_shape):
        raise ValueError('Shape mismatch %s: Expected %s, got %s' % (name, str(expected_shape), str(value.shape)))
    
    # TODO: Don't know how to use this function. Write explanation. 
    def extract_condition(self, hyperparams, m):
    """
    Pull the relevant hyperparameters corresponding to the specified
    condition, and return a new hyperparameter dictionary.
    """
    return {var: val[m] if isinstance(val, list) else val for var, val in hyperparams.items()}

    def approx_equal(self, a, b, threshold=1e-5):
    """
    Return whether two numbers are equal within an absolute threshold.
    Returns:
        True if a and b are equal within threshold.
    """
    return np.all(np.abs(a - b) < threshold)

    def finite_differences(self, func, inputs, func_output_shape=(), epsilon=1e-5):
    """
    Computes gradients via finite differences.
    derivative = (func(x+epsilon) - func(x-epsilon)) / (2*epsilon)
    Args:
        func: Function to compute gradient of. Inputs and outputs can be
            arbitrary dimension.
        inputs: Vector value to compute gradient at.
        func_output_shape: Shape of the output of func. Default is
            empty-tuple, which works for scalar-valued functions.
        epsilon: Difference to use for computing gradient.
    Returns:
        Gradient vector of each dimension of func with respect to each
        dimension of input.
    """
    gradient = np.zeros(inputs.shape+func_output_shape)
    for idx, _ in np.ndenumerate(inputs):
        test_input = np.copy(inputs)
        test_input[idx] += epsilon
        obj_d1 = func(test_input)
        assert obj_d1.shape == func_output_shape
        test_input = np.copy(inputs)
        test_input[idx] -= epsilon
        obj_d2 = func(test_input)
        assert obj_d2.shape == func_output_shape
        diff = (obj_d1 - obj_d2) / (2 * epsilon)
        gradient[idx] += diff
    return gradient

### Class Linear Gaussian Policy

In [None]:
class LinearGaussianPolicy(object):
    """
    Time-varying linear Gaussian policy.
    U = K*x + k + noise, where noise ~ N(0, chol_pol_covar)
    """
    # TODO: Add additional noise patterns
    def __init__(self, K, k, pol_covar, chol_pol_covar, inv_pol_covar, check_shape, verbose=True):
        self.verbose = verbose
        if self.verbose: 
            print('-'*25)
            print('Call to the class LinearGaussianPolicy has started')
            print('-'*25)
        
        # Assume K has the correct shape, and make sure others match.
        self.T = K.shape[0]
        self.dU = K.shape[1]
        self.dX = K.shape[2]

        check_shape(k, (self.T, self.dU))
        check_shape(pol_covar, (self.T, self.dU, self.dU))
        check_shape(chol_pol_covar, (self.T, self.dU, self.dU))
        check_shape(inv_pol_covar, (self.T, self.dU, self.dU))

        self.K = K
        self.k = k
        self.pol_covar = pol_covar
        self.chol_pol_covar = chol_pol_covar
        self.inv_pol_covar = inv_pol_covar

    def act(self, x, obs, t, noise=None):
        """
        Return an action for a state.
        Args:
            x: State vector.
            obs: Observation vector.
            t: Time step.
            noise: Action noise. This will be scaled by the variance.
        """
        u = self.K[t].dot(x) + self.k[t]
        u += self.chol_pol_covar[t].T.dot(noise)
        return u

    def fold_k(self, noise):
        """
        Fold noise into k.
        Args:
            noise: A T x dU noise vector with mean 0 and variance 1.
        Returns:
            k: A T x dU bias vector.
        """
        k = np.zeros_like(self.k)
        for i in range(self.T):
            scaled_noise = self.chol_pol_covar[i].T.dot(noise[i])
            k[i] = scaled_noise + self.k[i]
        return k

    def nans_like(self):
        """
        Returns:
            A new linear Gaussian policy object with the same dimensions
            but all values filled with NaNs.
        """
        policy = LinearGaussianPolicy(
            np.zeros_like(self.K), np.zeros_like(self.k),
            np.zeros_like(self.pol_covar), np.zeros_like(self.chol_pol_covar),
            np.zeros_like(self.inv_pol_covar)
        )
        policy.K.fill(np.nan)
        policy.k.fill(np.nan)
        policy.pol_covar.fill(np.nan)
        policy.chol_pol_covar.fill(np.nan)
        policy.inv_pol_covar.fill(np.nan)
        return policy

### Class Trajectory Optimization Utilities 

In [None]:
class traj_opt_utils(object):
    '''
    init: defines some important parameters used by this function
    '''
    def __init__(self, DGD_MAX_ITER=50, DGD_MAX_LS_ITER=20, DGD_MAX_GD_ITER=200, 
                 ALPHA=0.005, BETA1=0.9, BETA2=0.999, EPS=1e-8, verbose=True):
        self.verbose = verbose
        if self.verbose:
            print('-'*25)
            print('Call to the class traj_opt_utils has started')
            print('-'*25)
        # TODO: Maybe add this parameter to TrajOptLQR
        # Constants used in TrajOptLQR.
        self.DGD_MAX_ITER = 50
        self.DGD_MAX_LS_ITER = 20
        self.DGD_MAX_GD_ITER = 200
        # Adam parameters
        self.ALPHA, self.BETA1, self.BETA2, self.EPS = 0.005, 0.9, 0.999, 1e-8  
        
    def traj_distr_kl(self, new_mu, new_sigma, new_traj_distr, prev_traj_distr, tot=True):
    """
    Compute KL divergence between new and previous trajectory
    distributions.
    Args:
        new_mu: T x dX, mean of new trajectory distribution.
        new_sigma: T x dX x dX, variance of new trajectory distribution.
        new_traj_distr: A linear Gaussian policy object, new
            distribution.
        prev_traj_distr: A linear Gaussian policy object, previous
            distribution.
        tot: Whether or not to sum KL across all time steps.
    Returns:
        kl_div: The KL divergence between the new and previous
            trajectories.
    """
    # Constants.
    T = new_mu.shape[0]
    dU = new_traj_distr.dU

    # Initialize vector of divergences for each time step.
    kl_div = np.zeros(T)

    # Step through trajectory.
    for t in range(T):
        # Fetch matrices and vectors from trajectory distributions.
        mu_t = new_mu[t, :]
        sigma_t = new_sigma[t, :, :]
        K_prev = prev_traj_distr.K[t, :, :]
        K_new = new_traj_distr.K[t, :, :]
        k_prev = prev_traj_distr.k[t, :]
        k_new = new_traj_distr.k[t, :]
        chol_prev = prev_traj_distr.chol_pol_covar[t, :, :]
        chol_new = new_traj_distr.chol_pol_covar[t, :, :]

        # Compute log determinants and precision matrices.
        logdet_prev = 2 * sum(np.log(np.diag(chol_prev)))
        logdet_new = 2 * sum(np.log(np.diag(chol_new)))
        prc_prev = sp.linalg.solve_triangular(
            chol_prev, sp.linalg.solve_triangular(chol_prev.T, np.eye(dU),
                                                  lower=True)
        )
        prc_new = sp.linalg.solve_triangular(
            chol_new, sp.linalg.solve_triangular(chol_new.T, np.eye(dU),
                                                 lower=True)
        )

        # Construct matrix, vector, and constants.
        M_prev = np.r_[
            np.c_[K_prev.T.dot(prc_prev).dot(K_prev), -K_prev.T.dot(prc_prev)],
            np.c_[-prc_prev.dot(K_prev), prc_prev]
        ]
        M_new = np.r_[
            np.c_[K_new.T.dot(prc_new).dot(K_new), -K_new.T.dot(prc_new)],
            np.c_[-prc_new.dot(K_new), prc_new]
        ]
        v_prev = np.r_[K_prev.T.dot(prc_prev).dot(k_prev),
                       -prc_prev.dot(k_prev)]
        v_new = np.r_[K_new.T.dot(prc_new).dot(k_new), -prc_new.dot(k_new)]
        c_prev = 0.5 * k_prev.T.dot(prc_prev).dot(k_prev)
        c_new = 0.5 * k_new.T.dot(prc_new).dot(k_new)

        # Compute KL divergence at timestep t.
        kl_div[t] = max(
            0,
            -0.5 * mu_t.T.dot(M_new - M_prev).dot(mu_t) -
            mu_t.T.dot(v_new - v_prev) - c_new + c_prev -
            0.5 * np.sum(sigma_t * (M_new-M_prev)) - 0.5 * logdet_new +
            0.5 * logdet_prev
        )

    # Add up divergences across time to get total divergence.
    return np.sum(kl_div) if tot else kl_div


    def traj_distr_kl_alt(self, new_mu, new_sigma, new_traj_distr, prev_traj_distr, tot=True):
    """
    This function computes the same quantity as the function above.
    However, it is easier to modify and understand this function, i.e.,
    passing in a different mu and sigma to this function will behave properly.
    """
    T, dX, dU = new_mu.shape[0], new_traj_distr.dX, new_traj_distr.dU
    kl_div = np.zeros(T)

    for t in range(T):
        K_prev = prev_traj_distr.K[t, :, :]
        K_new = new_traj_distr.K[t, :, :]

        k_prev = prev_traj_distr.k[t, :]
        k_new = new_traj_distr.k[t, :]

        sig_prev = prev_traj_distr.pol_covar[t, :, :]
        sig_new = new_traj_distr.pol_covar[t, :, :]

        chol_prev = prev_traj_distr.chol_pol_covar[t, :, :]
        chol_new = new_traj_distr.chol_pol_covar[t, :, :]

        inv_prev = prev_traj_distr.inv_pol_covar[t, :, :]
        inv_new = new_traj_distr.inv_pol_covar[t, :, :]

        logdet_prev = 2 * sum(np.log(np.diag(chol_prev)))
        logdet_new = 2 * sum(np.log(np.diag(chol_new)))

        K_diff, k_diff = K_prev - K_new, k_prev - k_new
        mu, sigma = new_mu[t, :dX], new_sigma[t, :dX, :dX]

        kl_div[t] = max(
                0,
                0.5 * (logdet_prev - logdet_new - new_traj_distr.dU +
                       np.sum(np.diag(inv_prev.dot(sig_new))) +
                       k_diff.T.dot(inv_prev).dot(k_diff) +
                       mu.T.dot(K_diff.T).dot(inv_prev).dot(K_diff).dot(mu) +
                       np.sum(np.diag(K_diff.T.dot(inv_prev).dot(K_diff).dot(sigma))) +
                       2 * k_diff.T.dot(inv_prev).dot(K_diff).dot(mu))
        )

    return np.sum(kl_div) if tot else kl_div


    def approximated_cost(self, sample_list, traj_distr, traj_info):
    """
    This function gives the LQR estimate of the cost function given the noise
    experienced along each sample in sample_list.
    Args:
        sample_list: List of samples to extract noise from.
        traj_distr: LQR controller to roll forward.
        traj_info: Used to obtain dynamics estimate to simulate trajectories.
    Returns:
        mu_all: Trajectory means corresponding to each sample in sample_list.
        predicted_cost: LQR estimates of cost of each sample in sample_list.
    """
    T = traj_distr.T
    N = len(sample_list)
    dU = traj_distr.dU
    dX = traj_distr.dX
    noise = sample_list.get_noise()

    # Constants.
    idx_x = slice(dX)
    mu_all = np.zeros((N, T, dX+dU))

    # Pull out dynamics.
    Fm = traj_info.dynamics.Fm
    fv = traj_info.dynamics.fv
    dyn_covar = traj_info.dynamics.dyn_covar

    for i in range(N):
        mu = np.zeros((T, dX+dU))
        mu[0, idx_x] = traj_info.x0mu
        for t in range(T):
            mu[t, :] = np.hstack([
                mu[t, idx_x],
                (traj_distr.K[t, :, :].dot(mu[t, idx_x]) + traj_distr.k[t, :]
                 + traj_distr.chol_pol_covar[t].T.dot(noise[i, t]))
            ])
            if t < T - 1:
                mu[t+1, idx_x] = Fm[t, :, :].dot(mu[t, :]) + fv[t, :]
        mu_all[i, :, :] = mu

    # Compute cost.
    predicted_cost = np.zeros((N, T))

    for i in range(N):
        for t in range(T):
            predicted_cost[i, t] = traj_info.cc[t] + \
                    0.5 * mu_all[i,t,:].T.dot(traj_info.Cm[t, :, :]).dot(mu_all[i,t,:]) + \
                    mu_all[i,t,:].T.dot(traj_info.cv[t, :])

    return mu_all, predicted_cost
        

## Scrap code

In [13]:
# algorithm = {
#     'type': 'init_lqr',
#     'init_gains': np.zeros(6),
#     'init_acc': np.zeros(6),
#     'init_var': 0.1,
#     'stiffness': 0.01,
#     'dt': 1,
#     'T': 2,
# }
# INIT_LG_LQR = {
#     'wow': algorithm,
#     'init_var': 1,
#     'stiffness': 2,
#     'stiffness_vel': 3,
#     'final_weight': 4,
#     # Parameters for guessing dynamics
#     # dU vector of accelerations, default zeros.
#     'init_acc': [],  
#     # dU vector of gains, default ones.
#     'init_gains': [],  
# }

# check_shape(np.full((10), (10)), (10,), 'Sam')

# temp = range(10)
# b = extract_condition(algorithm, temp)
# b = extract_condition(INIT_LG_LQR, temp[0])
# print(b)
# for var, val in INIT_LG_LQR.items():
#     if isinstance(val, list):
#         print('in if')
#         a = {var: val[1]}
#         print(a)
#     else:
#         print('in else')
#         print(val)

# def test():
#     print('inside test function, arg pass func test success')
    
# class a(object):
#     def try_new(self, new_func):
#         new_func()
        
# wow = a()
# wow.try_new(test)

inside test function, arg pass func test success


In [15]:
param = gen_param_dict()
a = param.gen_param_ALG()
print(a)

NameError: name 'gen_param_dict' is not defined