In [None]:
import gymnasium as gym  # Defines RL environments

import numpy as np
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (4,4)  # Set size of visualization
from IPython.display import clear_output  # For inline visualization

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import cma
from math import factorial

# Define task
env = gym.make('CartPole-v1')
state_space_dimension = env.observation_space.shape[0]
action_space_dimension = 1  # env.action_space.n - 1

In [None]:
# Model definition
class Policy(nn.Module):
    def __init__(self, state_space_dimension, action_space_dimension, num_neurons=5, bias = False):
        super(Policy, self).__init__()
        self.fc = nn.Linear(state_space_dimension, num_neurons, bias=bias)
        self.fc1 = nn.Linear(num_neurons, action_space_dimension, bias=bias)

    def forward(self, x):
        hidden = torch.tanh(self.fc(x))
        output = self.fc1(hidden)
        return output

policy_net = Policy(state_space_dimension, action_space_dimension)


In [None]:
#This just comes from the CMA assignment
def fitness_cart_pole(x, nn, env):
    '''
    Returns negative accumulated reward for single pole, fully environment.

    Parameters:
        x: Parameter vector encoding the weights.
        nn: Parameterized model.
        env: Environment ('CartPole-v?').
    '''
    torch.nn.utils.vector_to_parameters(torch.Tensor(x), nn.parameters())  # Set the policy parameters
    
    state = env.reset()  # Forget about previous episode
    state_tensor = torch.Tensor( state[0].reshape((1, state_space_dimension)) )
          
    R = 0  # Accumulated reward
    while True:
        out = nn(state_tensor)
        a = int(out > 0)
        state, reward, terminated, truncated, _ = env.step(a)  # Simulate pole
        state_tensor = torch.Tensor( state.reshape((1, state_space_dimension)) )
        R += reward  # Accumulate 
        if truncated:
            return -1000  # Episode ended, final goal reached, we consider minimization
        if terminated:
            return -R  # Episode ended, we consider minimization
    return -R  # Never reached  

In [None]:
#Only used for evaluating the model performance
def one_run(policy_net, env):
    trajectory_features = []  # Store the features of the trajectory
    state = env.reset()  # Forget about previous episode
    state_tensor = torch.Tensor( state[0].reshape((1, state_space_dimension)) )
    steps = 0
    while True:
        out = policy_net(state_tensor)
        a = int(out > 0)
        state, reward, terminated, truncated, _ = env.step(a)  # Simulate pole
        steps+=1
        state_tensor = torch.Tensor( state.reshape((1, state_space_dimension)) )
        trajectory_features.append(state_tensor.detach().numpy()[0])
        if(terminated or truncated): 
            break
        
    env.close()
    return steps, trajectory_features  # Return the number of steps and the trajectory features

In [None]:
#Again, just comes from the assignment
def train_agent(policy_net , env):     
    d = sum(param.numel() for param in policy_net.parameters())
    initial_weights = np.random.normal(0, 0.01, d)  # Random parameters for initial policy, d denotes the number of weights
    initial_sigma = .01 # Initial global step-size sigma
    # Do the optimization
    res = cma.fmin(fitness_cart_pole,  # Objective function
                initial_weights,  # Initial search point
                initial_sigma,  # Initial global step-size sigma
                args=([policy_net, env]),  # Arguments passed to the fitness function
                options={'ftarget': -9999.9, 'tolflatfitness':1000, 'eval_final_mean':False})
    env.close()
  
    # Set the policy parameters to the final solution
    torch.nn.utils.vector_to_parameters(torch.Tensor(res[0]), policy_net.parameters())      

    return policy_net  # Return the policy network

In [None]:
def get_performance(policy_net, env, no_runs = 10): 
    pole_bal = []
    trajectories = []  # Store the features of the trajectories
    
    for _ in range(no_runs):     

        steps, trajectory_features = one_run(policy_net, env)        
        pole_bal.append(steps)  # Store the number of steps in the pole balancing task
        trajectories+=trajectory_features
        
    return pole_bal, np.array(trajectories)

In [None]:

policy_net = train_agent(Policy(state_space_dimension, action_space_dimension), gym.make('CartPole-v1')) #Here we train the agent, and report the evaluation steps

In [None]:
pole_bal, trajectories = get_performance(policy_net, no_runs = 20) #Running the agent for 20 times, and storing the results
#Also storing the trajectories, which is used to train the Neural Conditioner

In [None]:
print("The average number of steps with max_steps = 5000, is: ", np.mean(pole_bal))

In [None]:
#Define the Neural Conditioner, which comes from this paper: https://arxiv.org/pdf/1902.08401
#It is a basically a variational autoencoder with GAN-like training, where the generator is a neural conditioner, and the discriminator is a neural network that tries to distinguish between real and fake data.
class NeuralConditioner(nn.Module):
    def __init__(self, input_dim=4, latent_dim=64):
        super().__init__()
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        
        # Encoder: [x_a (4) + a (4) + r (4)] = 12 dim input
        self.encoder = nn.Sequential(
            nn.Linear(12, 128),
            nn.ReLU(),
            nn.Linear(128, latent_dim)
        )
        
        # Decoder: [z (latent_dim) + x_a (4) + a (4) + r (4)] = latent_dim + 12
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim + 12, 128),
            nn.ReLU(),
            nn.Linear(128, input_dim)
        )
    
    def forward(self, x_a, a, r, z):
        # Input shapes should be: [batch_size, 4] for x_a, a, r
        # z shape: [batch_size, latent_dim]
        encoder_input = torch.cat([x_a, a, r], dim=1)
        h = self.encoder(encoder_input)
        
        decoder_input = torch.cat([h, x_a, a, r], dim=1)  # Use encoded h instead of raw z
        return self.decoder(decoder_input) * r

class Discriminator(nn.Module):
    def __init__(self, input_dim=4):
        super().__init__()
        # Input: [x_r (4) + x_a (4) + a (4) + r (4)] = 16 dim
        self.net = nn.Sequential(
            nn.Linear(16, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 1),
            nn.Sigmoid()
        )
    
    def forward(self, x_r, x_a, a, r):
        inputs = torch.cat([x_r, x_a, a, r], dim=1)
        return self.net(inputs)

#Training Function
def train_nc(nc, discriminator, dataloader, epochs):
    nc.train()
    discriminator.train()
    
    opt_nc = torch.optim.Adam(nc.parameters(), lr=1e-4)
    opt_d = torch.optim.Adam(discriminator.parameters(), lr=1e-4)
    
    for epoch in range(epochs):
        for x_real in dataloader:
            batch_size = x_real.size(0)
            
            # Create random masks
            a = torch.zeros_like(x_real)
            r = torch.zeros_like(x_real)
            
            # Ensure at least 1 feature observed and 1 predicted
            for i in range(batch_size):
                obs_idx = torch.randperm(4)[:torch.randint(1, 4, (1,))]
                a[i, obs_idx] = 1
                r[i, :] = 1 - a[i, :]
                if r[i].sum() == 0:  # Ensure at least 1 predicted
                    r[i, torch.randint(0, 4, (1,))] = 1
            
            # Generate samples
            z = torch.randn(batch_size, nc.latent_dim)
            x_a = x_real * a
            x_r_fake = nc(x_a, a, r, z)
            x_r_real = x_real * r
            
            # Train discriminator
            opt_d.zero_grad()
            d_real = discriminator(x_r_real, x_a, a, r)
            d_fake = discriminator(x_r_fake.detach(), x_a, a, r)
            loss_d = - (torch.log(d_real) + torch.log(1 - d_fake)).mean()
            loss_d.backward()
            opt_d.step()
            
            # Train generator
            opt_nc.zero_grad()
            d_fake = discriminator(x_r_fake, x_a, a, r)
            loss_g = - torch.log(d_fake).mean()
            loss_g.backward()
            opt_nc.step()
        
        print(f"Epoch {epoch+1}/{epochs} | G Loss: {loss_g.item():.4f} | D Loss: {loss_d.item():.4f}")



In [None]:
class StateFeatureDataset(Dataset):
    def __init__(self, data):
        self.data = torch.FloatTensor(data)  # Convert to PyTorch tensor

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

#Convert the trajectories to a PyTorch dataset, such that they can be used to train the Neural Conditioner
dataset = StateFeatureDataset(trajectories)

In [None]:
batch_size = 32
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
input_dim = 4  # size of the data

latent_dim = 64  # Size of the latent space
NC = NeuralConditioner(input_dim, latent_dim)
discriminator = Discriminator(input_dim)


In [None]:
train_nc(NC, discriminator, dataloader, epochs=50)

In [None]:
#Function to predict missing features using the trained Neural Conditioner
#Takes the neural conditioner, all features (where missing features can be np.nan, but can also have their true values which will just be ignored), 
#and a mask that indicates which features are missing (1 = observed, 0 = missing)
def predict_missing_features(nc, observed_features, observed_mask):
    """
    observed_features: Array of shape (4,), with NaN for missing features.
    observed_mask: Binary array (1 = observed, 0 = missing).
    """
    # Convert to PyTorch tensors
    x_a = torch.FloatTensor(np.nan_to_num(observed_features, nan=0.0) * observed_mask)
    a = torch.FloatTensor(observed_mask)
    r = 1 - a  # Predict missing features
    
    # Generate predictions (multiple samples for uncertainty)
    with torch.no_grad():
        z = torch.randn(nc.latent_dim)
        x_a = x_a.unsqueeze(0)
        a = a.unsqueeze(0)
        r = r.unsqueeze(0)
        preds = nc(x_a, a, r, z)
    
    return preds.mean(0).numpy()

# Example usage:
observed_features = np.array([ 0.01735522 , 0.23706736 , 0.03950975 ,-0.28725505])
mask = np.array([0, 1, 1, 1])  # 1 = observed, 0 = missing



mean_pred = predict_missing_features(NC, observed_features, mask)

print("True values: ", observed_features)

for i in range(4):
    if mask[i] == 0:
        observed_features[i] = mean_pred[i]

print("Predicted features: ", observed_features)




In [None]:
#Basically the local sverl. Uses the neural conditioner to predict missing features in the first step, and then has full observability afterwards. 
#Very uninteresting to be honest, since the cart pole can only go left 0, or right 1. And even though the model gives different values, the decision
#Will usually still be the same, just with more or less certainty. And even if the missing features leads to a bad decision 
#In the initital step, it can be saved, so it doesn't really matter much. 
#I haven't used this function much
def eval_from_state(policy_net, seed, believed_initial_state):
    ''''
    'Evaluate the policy from a given state, using the believed state to make the initial decision'
    '''
    steps = 0
    env_render = gym.make('CartPole-v1', render_mode='rgb_array', max_episode_steps=5000)
    true_state = env_render.reset(seed=seed)  # Forget about previous episode
    believed_tensor = torch.Tensor( believed_initial_state.reshape((1, state_space_dimension)) )
    true_tensor = torch.Tensor( true_state[0].reshape((1, state_space_dimension)) )


    print("Believed state: ", believed_tensor)
    print("True state: ", true_tensor)

    print("Action if evaluated on true state: ", int(policy_net(true_tensor)>0))
    print("Action if evaluated on believed state: ", int(policy_net(believed_tensor)>0))

    out = policy_net(believed_tensor)
    a = int(out > 0)
    state, reward, terminated, truncated, _ = env_render.step(a)  # Simulate pole
    steps +=1
    state_tensor = torch.Tensor( state.reshape((1, state_space_dimension)) ) 
    

    while True:
        out = policy_net(state_tensor)
        a = int(out > 0)
        state, reward, terminated, truncated, _ = env_render.step(a)  # Simulate pole
        steps+=1
        state_tensor = torch.Tensor( state.reshape((1, state_space_dimension)) )
        if(terminated or truncated): 
            break
        
    env_render.close()
    return steps  # Return the number of steps and the trajectory features

In [None]:
#Now this is juicy. Gives a global evaluation of the policy, but with missing features.
#In every step, it uses the neural conditioner to predict the missing features, and then uses the policy to decide what to do.

def eval_with_missing_features(policy_net, seed, NC, mask):
    ''''
    'Evaluate the policy from a given state, using the believed state to make the initial decisino'
    '''
    steps = 0
    env_render = gym.make('CartPole-v1', render_mode='rgb_array', max_episode_steps=5000)
    true_state = env_render.reset(seed=seed)  # Forget about previous episode
    
    pred = predict_missing_features(NC, true_state[0].reshape((1, state_space_dimension))[0], mask)


    believed_state = np.copy(true_state[0].reshape((1, state_space_dimension)))[0]

    for i in range(4):
        if mask[i] == 0:
            believed_state[i] = pred[i]

    believed_tensor = torch.Tensor( believed_state)

    while True:       

        out = policy_net(believed_tensor)
        a = int(out > 0)
        state, reward, terminated, truncated, _ = env_render.step(a)  # Simulate pole
        steps+=1
        pred = predict_missing_features(NC, state.reshape((1, state_space_dimension))[0], mask)
        believed_state = np.copy(state.reshape((1, state_space_dimension)))[0]
        for i in range(4):
            if mask[i] == 0:
                believed_state[i] = pred[i]

        believed_tensor = torch.Tensor( believed_state)
        if(terminated or truncated): 
            break
        
    env_render.close()
    return steps  # Return the number of steps and the trajectory features

In [None]:
#Takes a feature i, and a mask C. Gives the marginal gain of adding feature i to the mask C, via eval_with_missing_features.
def marginal_gain(i, C, seed): 
    C_i = np.copy(C)
    C_i[i] = 1

    """ if(np.sum(C) == 0): 
        # v(Ø) = 0 for game theory, and for ML v(Ø) = expected prediction of model.
        # We have yet to determine what v(Ø) is for SVERL, and for the general RL case.
        V_C = 0
    else: 
        V_C = eval_with_missing_features(policy_net, seed, NC, C) """

    
    V_C = eval_with_missing_features(policy_net, seed, NC, C)
    
    V_C_i = eval_with_missing_features(policy_net, seed, NC, C_i)
    return V_C_i - V_C


In [None]:
#Gets all subsets of masks when one feature is fixed to 0.
#Basically all C \in F/i 

def get_all_subsets(feature):
    variations = []
    # The i-th position is fixed to 0, so we'll generate all combinations for other positions
    # There are n-1 positions that can vary (each can be 0 or 1), so total 2^(n-1) variations
    for num in range(8):
        binary = []
        # We'll build the binary list, inserting 0 at position i
        # and filling the rest based on the binary representation of 'num'
        # We need to split 'num' into bits for positions before and after i
        # Initialize a counter for the current bit position
        bit_pos = 0
        for pos in range(4):
            if pos == feature:
                binary.append(0)
            else:
                # Get the bit at position 'bit_pos' from 'num'
                bit = (num >> (4 - 2 - bit_pos)) & 1
                binary.append(bit)
                bit_pos += 1
        variations.append(binary)
    return variations



In [None]:
#Calculates Shapley values for a feature, using the marginal gain function and the get_all_subsets function.
def shapley_value(feature, seed):
    list_of_C = get_all_subsets(feature)
    sum = 0

    for C in list_of_C:
        sum += marginal_gain(feature, C, seed)* ((factorial(np.sum(C))*factorial(4 - np.sum(C) - 1)) / factorial(4))
    return sum

In [None]:
#Since the seed is set to 0, the results are always the same.
#This is the state which we start in every time for the purpose of this experiment. To get proper shapley values, all this should probably be evaluated
#many times with different seeds. 
env_render = gym.make('CartPole-v1', render_mode='rgb_array', max_episode_steps=5000)
env_render.reset(seed = 0)  # Forget about previous episode
state = env_render.reset(seed=0)[0].reshape((1, state_space_dimension))
print("STATE TO EXPLAIN: ", state)
state_tensor = torch.Tensor( state )
plt.imshow(env_render.render())
plt.show()

In [None]:
#The i is the seed. This is the only way I know how to set the starting position 
#We are doing 100 different seeds, and averaging the results.
NUM_ROUNDS = 100
shapley_cart_pos = 0
shapley_cart_vel = 0
shapley_pole_angle = 0
shapley_pole_vel = 0
for i in range(NUM_ROUNDS): 
    shapley_cart_pos += shapley_value(0, i)
    shapley_cart_vel += shapley_value(1, i)
    shapley_pole_angle += shapley_value(2, i)
    shapley_pole_vel += shapley_value(3, i)
shapley_cart_pos /= NUM_ROUNDS
shapley_cart_vel /= NUM_ROUNDS
shapley_pole_angle /= NUM_ROUNDS
shapley_pole_vel /= NUM_ROUNDS
print("Shapley value of Cart Position: ", shapley_cart_pos)
print("Shapley value of Cart Velocity: ", shapley_cart_vel)
print("Shapley value of Pole Angle: ", shapley_pole_angle)
print("Shapley value of Pole Angular Velocity: ", shapley_pole_vel)

