In [None]:
import gymnasium
import numpy as np
import itertools
import mujoco_py
import matplotlib.pyplot as plt
import random

In [2]:
class discretize_action:
    def __init__(self, env) -> None:
        self.env = env
        self.action_space = self.env.action_space

    def discretize_action_space(self, steps_per_dimension):
        """
        Discretizes the action space.

        Parameters:
        - low: The lower bound of the action space.
        - high: The higher bound of the action space.
        - steps_per_dimension: Number of discrete steps per dimension.

        Returns:
        - A list of discretized actions.
        """
        low = self.action_space.low
        high = self.action_space.high

        # Generate a range of values for each dimension
        ranges = [np.linspace(low[i], high[i], steps_per_dimension) for i in range(len(low))]
        
        # Create a meshgrid of all possible combinations
        mesh = np.meshgrid(*ranges)
        
        # Reshape the meshgrid to create a list of actions
        self.actions = np.vstack([m.flatten() for m in mesh]).T
        
        return self.actions
    
    def allocate_action(self, action):
        distances = np.linalg.norm(self.actions - action, axis=1)
        nearest_index = np.argmin(distances)
        return self.actions[nearest_index]

In [3]:
env = gymnasium.make("Hopper-v4")

In [7]:
def empirical_range(env):
    states = []
    rewards = []
    for i in range(5000):
        state, info = env.reset()
        range_reward = 0
        while True:
            states.append(state)
            action = env.action_space.sample()
            state, reward, terminated, truncated, info = env.step(action)
            range_reward += reward
            if terminated or truncated:
                break
        rewards.append(range_reward)
    env.close()
    print(np.mean(rewards))
    return np.max(states, axis = 0), np.min(states, axis = 0)

state_range = empirical_range(env)
print(state_range)

18.487879862147356
(array([1.32204448, 0.19949392, 0.04759451, 0.05032007, 0.84226284,
       2.50705314, 0.97596636, 5.75795449, 6.64300498, 6.95190269,
       8.25392854]), array([  0.70436945,  -0.19999362,  -1.33468869,  -1.68129874,
        -0.72579529,  -2.04953341,  -2.79013046,  -7.70453989,
        -9.12327969, -10.        ,  -9.03711978]))


In [9]:
class Fourier_Basis:
  def __init__(self, order, k):
    #self.env = env
    self.order = [order]*k
    self.coefficients = np.array([])

  def get_coefficients(self):
    prods = [range(0, i+1) for i in self.order]
    #print(prods)
    coeffs = [v for v in itertools.product(*prods)]
    self.coefficients = np.array(coeffs)
    return self.coefficients
  
  def value(self, state):
    self.get_coefficients()
    return np.cos(np.pi*np.dot(self.coefficients, state))

class Expected_SARSA_Continuous_Action:
  def __init__(self, env, gamma, order, k):
    self.state_num = env.observation_space.shape[0]
    discrete_actions = discretize_action(env)
    action_list = discrete_actions.discretize_action_space(steps_per_dimension)
    self.action_num = len(action_list)
    self.w = np.random.uniform(-0.001, 0.001, ((order + 1)**k, self.action_num))
    self.gamma = gamma

  #define the greedy policy
  def policy(self, states, epsilon):
    if random.uniform(0, 1) < epsilon:
        return np.random.randint(self.action_num)
    else:
        return states.argmax()

  def apply_weight(self, state): #dot product for linear approximation function with weights vector
    return np.dot(state, self.w)

  def update(self, current, target, reward, state, action, alpha, done):
    error = reward + self.gamma * target* (not done) - current
    #print(error)
    self.w[:, action] += alpha * error * state

def Expected_SARSA_implement(env, alpha, epsilon, order, k, gamma = 0.99):
  trial_reward = []
  for j in range(trials):
    Expected_SAR = Expected_SARSA_Continuous_Action(env, gamma, order, k)
    FB = Fourier_Basis(order, k)
    u_state = state_range[0]
    l_state = state_range[1]
    d_state = u_state - l_state
    print(d_state)


    rewards = []
    for i in range(episodes):
      s, _ = env.reset()
      reward_ep = 0
      done = False
      state_encoding = (s - l_state) / d_state
      #print(state_encoding)
      state_cos = FB.value(state_encoding)
      state_value = Expected_SAR.apply_weight(state_cos)
      for i in range(episode_length):
        a = Expected_SAR.policy(state_value, epsilon)
        v_i = state_value[a]
        #print(action_list[a])
        s, reward, done, info, _ = env.step(action_list[a])
        next_state = (s - l_state) / d_state
        next_state_cos = FB.value(next_state)
        v_next = epsilon * np.mean(Expected_SAR.apply_weight(next_state_cos)) + (1 - epsilon) * np.max(Expected_SAR.apply_weight(next_state_cos))
        Expected_SAR.update(v_i, v_next, reward, state_cos, a, alpha, done)
        state_encoding = next_state
        state_cos = FB.value(state_encoding)
        state_value = Expected_SAR.apply_weight(state_cos)
        reward_ep += reward
      rewards.append(reward_ep)
    trial_reward.append(rewards)

  return trial_reward

In [8]:
env = gymnasium.make("Hopper-v4")
obs = env.reset()

#define the parameters
epsilon = [0.1]#, 0.01]
order = 2
k = env.observation_space.shape[0]
alpha = [0.01]#, 0.001]
steps_per_dimension = 5
discrete_actions = discretize_action(env)
action_list = discrete_actions.discretize_action_space(steps_per_dimension)
trials = 2
gamma = 0.99
episodes = 100
episode_length = 100

In [None]:
HP_epsilon_return_SARSA = []
for j in range(len(alpha)):
  learning_return_SARSA = []
  for i in range(len(epsilon)):
    avg_return = Expected_SARSA_implement(env, alpha[j], epsilon[i], order, k, gamma)
    learning_return_SARSA.append(avg_return)
  HP_epsilon_return_SARSA.append(learning_return_SARSA)