In [1]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym

In [None]:
env = gym.make("MountainCar-v0", domain_randomize=True)

In [None]:
from torch import nn
import torch.nn.functional as F
from gymnasium import spaces
import numpy as np

class TDLambda(nn.Module):
    def __init__(self, lam: float = 0.9, discount: float = 0.95, alpha: float = 0.05, action_size = 3, observation_size = 2):
        self.lam = lam
        self.discount = discount
        self.alpha = alpha

        # environment specific vars
        self.action_space = spaces.Discrete(action_size)
        self.action_size = action_size
        self.observation_size = observation_size
        
        self.env = env

        # metrics 
        self.actions = []
        self.rewards = []
        self.observations = []

        self.E = {}
        self.V = {}

    def train(self, total_steps: int = 5000):

        self.initialize_values_and_traces()
        obs = env.reset()
        self.observations.append(obs)
        # self.rewards.append(rew)

        action = None

        for step in range(total_steps):
            action = self.policy()
            prevobs = obs
            obs, rew, done, trunc, info = env.step(action)

            td = rew + self.discount * self.value(obs) - self.value(prevobs)
            self.eligibility_trace(prevobs)

            for s in self.V.keys():
                self.V[s] = self.V[s] + self.alpha * td * self.E[s]
                self.E[s] = self.discount * self.lam * self.E[s]

            self.observations.append(obs)
            self.rewards.append(rew)
            self.actions.append(action)

            if done or trunc:
                self.reset_traces()
                obs = env.reset()
                self.observations.append(obs)


    def initialize_values_and_traces(self):
        for i in range(-1.2, 0.7, 0.1):
            for j in range(-0.07, 0.08, 0.01):
                if i>=0.5: 
                    self.V[(i, j)] = 0
                    self.E[(i, j)] = 0
                    continue
                self.V[(i, j)] = np.random.random_sample()
                self.E[(i, j)] = 0
    
    def reset_traces(self):
        for i in range(-1.2, 0.7, 0.1):
            for j in range(-0.07, 0.08, 0.01):
                if i>=0.5: 
                    self.E[(i, j)] = 0
                    continue
                self.E[(i, j)] = 0

    def eligibility_trace(self, state):
        pos = np.round(state[0], 1)
        vel = np.round(state[1], 2)
        value = (pos, vel)
        self.E[value] += 1
        return self.E[value]

    def value(self, state):
        pos = np.round(state[0], 1)
        vel = np.round(state[1], 2)
        value = (pos, vel)
        return self.V[value]

    def policy(self):
        return self.action_space.sample()