In [84]:
import numpy as np
class Features():
    def __init__(self):
        self.feature_matrix = []

    def get_feature_matrix(self, n_actions, observation):
        observation = self.get_engineered_feature(observation)
        num_obs = len(observation)
        features = []
        for j in range(n_actions):
          feature = [[None]*num_obs]*n_actions
          feature[j] = observation
          for i in range(j, n_actions-1+j):
            feature[(i+1)%n_actions] = np.zeros(num_obs)
          features.append(np.array(feature).flatten())
        # self.feature_matrix = features
        return features
    
    def get_engineered_feature(self, observation):
      return observation

# f = Features().get_feature_matrix(4, np.array([1,2,3,4]))
# f

In [3]:
import gym
import numpy as np
import matplotlib.pyplot as plt
from gym.wrappers.monitoring.video_recorder import VideoRecorder


# class Features():
#     def __init__(self):
#         self.feature_matrix = []

#     def get_feature_matrix(self, n_actions, observation):
#         observation = self.get_engineered_feature(observation)
#         num_obs = len(observation)
#         features = []
#         for j in range(n_actions):
#           feature = [[None]*num_obs]*n_actions
#           feature[j] = observation
#           for i in range(j, n_actions-1+j):
#             feature[(i+1)%n_actions] = np.zeros(num_obs)
#           features.append(np.array(feature).flatten())
#         self.feature_matrix = features
    
#     # def get_action(self, observation):
#     #     features =self.get_feature_matrix(observation)
#     #     action = np.argmax([feature for feature in features])
#     #     delta = reward + self.gamma*max([feature for feature in features])
#     #     self.w = (1-self.alpha)*(self.w) + self.alpha*(features[action]*delta) 

#     def get_engineered_feature(self, observation):
#       return observation
    
class CartPoleEnvLinear(gym.Env):

    metadata = {"render.modes": ["human", "rgb_array"], "video.frames_per_second": 50}

    def __init__(self, render_mode="rgb_array", n_observations=1000, q_table=np.array(None), alpha=0.5):
      self.env = gym.make("CartPole-v1", render_mode=render_mode)
      self.action_space = self.env.action_space
      self.n_actions = 2
      self.observation_space = self.env.observation_space
      self.n_observation_space = 4
      self.n_buckets = 4
      self.bucket_size = self.env.observation_space.high / (self.n_buckets/2)
      self.q_table =  np.zeros((self.env.action_space.n, np.power(self.n_buckets, self.n_observation_space))) if not q_table.all() else q_table
      self.epsilon = 1
      self.n_episodes = n_observations
      self.epsilon_delta = self.epsilon / self.n_episodes
      self.metric = []
      self.test_metric = []
      self.alpha = alpha
      self.gamma =  0.99
      self.w = np.zeros(self.n_observation_space*2)
      self.features = Features()
      return 
    
    def get_index(self, observation):
      observation_bucketed = self.bucket_observation(observation)
      i = len(observation_bucketed) - 1
      idx = 0
      for el in observation_bucketed:
        idx = idx + (self.n_buckets**i * el)
        i -= 1
      return int(idx)
    
    def bucket_observation(self, observation):
      return(np.floor(observation/self.bucket_size))

    def update_q_table(self, observation, observation_prime, reward, action=1):
        action, features = self.get_action(observation=observation)
        max_future_reward = self.get_max_future_reward(observation=observation_prime)
        delta = reward + self.gamma*max_future_reward
        self.w = (1-self.alpha)*(self.w) + self.alpha*(features[action]*delta)

    def policy(self, observation):
      self.epsilon = self.epsilon - self.epsilon_delta   #at first, low probability to read from q-table, ie high prob take random action
      take_random_action = self.epsilon < np.random.random()
      if take_random_action:
        return self.action_space.sample()
      else:
        action, _ = self.get_action(observation)
        return action
      
    def get_action(self, observation):
       features = self.features.get_feature_matrix(n_actions=self.n_actions, observation=observation)
       action = np.argmax([feature.dot(self.w) for feature in features])
       return action, features
       
    def get_max_future_reward(self, observation):
      features = self.features.get_feature_matrix(n_actions=self.n_actions, observation=observation)
      max_reward = np.max([feature.dot(self.w) for feature in features])
      return max_reward
    
    def train(self):
        observation, _ = self.env.reset()
        j=0
        for i in range(self.n_episodes):
          action = self.policy(observation)
          observation_prime, reward, terminated, truncated, info = self.env.step(action)
          self.update_q_table(observation, observation_prime, reward, action)
          observation = observation_prime
          j +=1
          if terminated or truncated:
            self.metric.append(j)
            j=0
            observation, info = self.env.reset()
            # print(info)
        self.env.close()
      
    def test(self):
        vid = VideoRecorder(self.env, enabled=True, path="./qlearning_linear_test_video/vid.mp4")
        observation, _ = self.env.reset()
        j = 0
        for i in range(1000):
            features_prime_0 = np.append(observation, np.zeros(len(observation))) # take action 0
            features_prime_1 = np.append(np.zeros(len(observation)), observation) # take action 1
            vid.capture_frame()
            if features_prime_0.dot(self.w) > features_prime_1.dot(self.w):
              action = 0
            else:
              action = 1 
            observation, reward, terminated, truncated, info = self.env.step(action)
            j += 1
            if terminated or truncated:
              self.test_metric.append(j)
              j = 0
              observation, _ = self.env.reset()
        self.env.reset()
        self.env.close()
        vid.close()

In [82]:
# env = CartPoleEnvLinear(n_observations=100000)
# env.train()
# env.test()
# plt.plot(env.metric)
# plt.show()
# plt.plot(env.test_metric)
# plt.show
# print(np.mean(env.metric))
# print(np.median(env.metric))
# print(np.quantile(env.metric, [0, 0.25, 0.5, 0.75, .95, .99, 0.999, 1]))
# plt.hist(env.metric, bins=10, range=[50,np.max(env.metric)])

In [83]:
# y=[]
# r = 10000
# seq = -np.arange(-r, r)
# for x in seq:
#     y.append(np.max((1/ (1+np.exp(-x*5/r)), 0.05)))
# plt.plot(np.array(y))