In [8]:
import keyboard
from itertools import count

import numpy as np
import gymnasium as gym

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "mps" if torch.backends.mps.is_available() else
    "cpu"
)

# Random action

In [9]:
env = gym.make("CartPole-v1", render_mode="human")
env.reset()

for t in range(100):
    action = env.action_space.sample()
    (next_state, reward, done, truncated, info) = env.step(action)
    print(f"Next State: {next_state} Action: {action} Reward: {reward}")
 
    if done:
        break

env.close()

Next State: [ 0.00780858  0.17839824 -0.04811668 -0.27993217] Action: 1 Reward: 1.0
Next State: [ 0.01137654  0.37417236 -0.05371533 -0.5873943 ] Action: 1 Reward: 1.0
Next State: [ 0.01885999  0.1798422  -0.06546322 -0.31210423] Action: 0 Reward: 1.0
Next State: [ 0.02245684 -0.01428901 -0.0717053  -0.04076356] Action: 0 Reward: 1.0
Next State: [ 0.02217105 -0.20831338 -0.07252057  0.22846179] Action: 0 Reward: 1.0
Next State: [ 0.01800479 -0.40232807 -0.06795134  0.49741656] Action: 0 Reward: 1.0
Next State: [ 0.00995823 -0.5964294  -0.058003    0.7679343 ] Action: 0 Reward: 1.0
Next State: [-0.00197036 -0.79070693 -0.04264432  1.0418172 ] Action: 0 Reward: 1.0
Next State: [-0.0177845  -0.5950453  -0.02180797  0.736058  ] Action: 1 Reward: 1.0
Next State: [-0.02968541 -0.78985935 -0.00708681  1.0217985 ] Action: 0 Reward: 1.0
Next State: [-0.04548259 -0.98488617  0.01334916  1.3122479 ] Action: 0 Reward: 1.0
Next State: [-0.06518032 -0.78993577  0.03959412  1.0237728 ] Action: 1 Rewa

# Custom environment

In [10]:
class CustomCartPole(gym.Env):
    def __init__(self):
        self.env = gym.make("CartPole-v1", render_mode="human")
        self.env._max_episode_steps = 300
        self.new_x_limit = 10.0
        self.new_theta_limit = 0.7

    def reset(self, **kwargs):
        return self.env.reset(**kwargs)

    def step(self, action):
        obs, reward, done, truncated, info = self.env.step(action)

        x, x_dot, theta, theta_dot = obs

        if abs(x) > self.new_x_limit or abs(theta) > self.new_theta_limit:
            done = True

        return obs, reward, done, truncated, info

    def render(self):
        return self.env.render()

    def close(self):
        self.env.close()

# Balancing based on angel

In [11]:
env = CustomCartPole()

state, info = env.reset()

for t in range(300):

    if state[3] < 0:
        action = 0
    else:
        action = 1

    (state, reward, done, truncated, info) = env.step(action)
    print(f"State: {state} Action: {action} Reward: {reward}")
    
    if done:
        break

env.close()

State: [-0.04855463 -0.20277236 -0.01405544  0.28728727] Action: 0 Reward: 1.0
State: [-0.05261008 -0.00745282 -0.0083097  -0.00979528] Action: 1 Reward: 1.0
State: [-0.05275913 -0.20245463 -0.0085056   0.2802543 ] Action: 0 Reward: 1.0
State: [-0.05680823 -0.00721238 -0.00290051 -0.01509909] Action: 1 Reward: 1.0
State: [-0.05695247 -0.2022926  -0.0032025   0.2766673 ] Action: 0 Reward: 1.0
State: [-0.06099832 -0.00712512  0.00233085 -0.01702399] Action: 1 Reward: 1.0
State: [-0.06114083 -0.20228042  0.00199037  0.27639344] Action: 0 Reward: 1.0
State: [-0.06518643 -0.00718692  0.00751824 -0.01566106] Action: 1 Reward: 1.0
State: [-0.06533018 -0.20241588  0.00720502  0.27938443] Action: 0 Reward: 1.0
State: [-0.0693785  -0.00739745  0.01279271 -0.01101736] Action: 1 Reward: 1.0
State: [-0.06952644 -0.20270051  0.01257236  0.2856742 ] Action: 0 Reward: 1.0
State: [-0.07358045 -0.0077601   0.01828584 -0.00301712] Action: 1 Reward: 1.0
State: [-0.07373565 -0.20313945  0.0182255   0.29537

In [12]:
class DQN(nn.Module):
    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        
        hidden_sizes = [256, 128, 64]
        layers = []
        
        layers.append(nn.Linear(n_observations, hidden_sizes[0]))
        layers.append(nn.ReLU())
        
        for i in range(len(hidden_sizes) - 1):
            layers.append(nn.Linear(hidden_sizes[i], hidden_sizes[i + 1]))
            layers.append(nn.ReLU())
        
        layers.append(nn.Linear(hidden_sizes[-1], n_actions))

        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

In [13]:
env = gym.make("CartPole-v1", render_mode="human")
env.unwrapped.theta_threshold_radians = np.deg2rad(45)
state, info = env.reset()
n_observations = len(state)
n_actions = env.action_space.n

policy_net = DQN(n_observations, n_actions).to(device)
policy_net.load_state_dict(torch.load("cartpole_policy_8.pth", map_location=torch.device('cpu'), weights_only=False))

<All keys matched successfully>

In [14]:
env = gym.make("CartPole-v1", render_mode="human")
env.unwrapped.theta_threshold_radians = np.deg2rad(45)
state, info = env.reset()
state = torch.tensor(state, dtype=torch.float32, device="cpu").unsqueeze(0)

for t in count():
    action = policy_net(state).max(1).indices.view(1, 1)

    if keyboard.is_pressed("left"):
        action = torch.tensor([[0]], dtype=torch.int64)
    elif keyboard.is_pressed("right"):
        action = torch.tensor([[1]], dtype=torch.int64)

    observation, reward, terminated, truncated, _ = env.step(action.item())
    state = torch.tensor(observation, dtype=torch.float32, device="cpu").unsqueeze(0)
    
    env.render()

    if terminated:
        print('Iteration: ', t+1, ' steps')
        break

env.close()

Iteration:  703  steps
