# Progetto di Machine Learning
#### Double inverted pendulum - Lorenzo Frangella 1899674

In [None]:
# Code to run in only in colab for packet download
!pip3 install gymnasium
!pip3 install gym[mujoco]

In [1]:
import gymnasium as gym
import math 
import os 
import torch
import torch.nn as nn
import pandas as pd
import torch.nn.functional as F
from collections import deque
import numpy as np


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


  from .autonotebook import tqdm as notebook_tqdm


#### Hyperparameters used in our neural network

In [2]:
DISCOUNT = 0.95
LEARNING_RATE = 1e-4
torch.set_printoptions(threshold=10_000)


A sample of the environment

In [3]:
# test of the environment

env = gym.make('InvertedDoublePendulum-v4',render_mode="human") #change w "human" if needed
observation, info = env.reset()

for i in range(1000):
    action = env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)

    if terminated or truncated:
        observation, info = env.reset()

env.close


OBSERVATIONS = env.observation_space.shape[0]
ACTIONS = 64



/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/glfw/__init__.py:916: GLFWError: (65537) b'The GLFW library is not initialized'


: 

### Convert the output of the neural network to the interval -1,1
Since as action we have only one scalar value that represent magnitude and direction of the force applied to the cart, we have to "cast" the output of the neural network into a scalar contiguos value

In [4]:
def discrete_to_continue_action(index):
    value = (2/ACTIONS) * index - 1
    return value

Defining the structure of the neural network needed for DQN algorithm

In [5]:
class DQN(nn.Module):

    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)
    
print(OBSERVATIONS)

main_nn = DQN(OBSERVATIONS,ACTIONS).to(device)
target_nn = DQN(OBSERVATIONS,ACTIONS).to(device)

optimizer = torch.optim.Adam(main_nn.parameters(), lr=LEARNING_RATE)
loss_fn = nn.MSELoss()

11


Defining the data structure that is needed for the training

In [6]:
class ReplayMemory(object):

    def __init__(self, size, device ="cpu"):
        self.buffer = deque(maxlen=size)
        self.device = device
    
    def add(self,state,action, reward, next_state, done):
        self.buffer.append((state, action,reward,next_state,done))

    def __len__(self):
        return len(self.buffer)
    
    def sample(self, num_samples):
        states, actions, rewards, next_states, dones = [] , [] , [] , [] , []
        idx = np.random.choice(len(self.buffer), num_samples)
        for i in idx:
            elem = self.buffer[i]
            state, action, reward, next_state, done = elem
            states.append(np.array(state, copy=False))
            actions.append(np.array(action, copy=False))
            rewards.append(reward)
            next_states.append(np.array(next_state, copy=False))
            dones.append(done)
        states = np.array(states)
        actions = np.array(actions)
        rewards = np.array(rewards, dtype=np.float32)
        next_states = np.array(next_states)
        dones = np.array(dones, dtype=np.float32)
        return states, actions, rewards, next_states, dones
    
                    

#### The following part of code is needed to develop a epsilon-greedy policy

With a given probability epsilon a random action is choosen otherwise is choosen the best action


In [7]:
def select_epsilon_greedy_action(state, epsilon):
    result = np.random.uniform()
    if result < epsilon:
        return env.action_space.sample()
    else:
        qs = main_nn(state).cpu().data.numpy()
        action_decimal = np.array([(np.argmax(qs)*(2/ACTIONS))-1])
        return action_decimal

#### Definition of a train step that is performed on the replay memory of size batch

In [32]:
def training_step(states, actions, rewards, next_states, dones):
    
    max_next_qs = target_nn(next_states).max(-1).values
    target = rewards + (1.0 - dones) * DISCOUNT * max_next_qs
    qs = main_nn(states)
    action_masks = F.one_hot(actions.type(torch.int64), ACTIONS)
    masked_qs = (action_masks * qs).sum(dim=-1)
    loss = loss_fn(masked_qs, target.detach())
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return loss

In [31]:
# Hyperparameters.
num_episodes = 10000
epsilon = 1.0
batch_size = 32
discount = 0.99
buffer = ReplayMemory(100000, device=device)
cur_frame = 0

# Start training. Play game once and then train with a batch.
last_100_ep_rewards = []
for episode in range(num_episodes+1):
  state = env.reset()[0].astype(np.float32)
  ep_reward, done = 0, False
  while not done:
    state_in = torch.from_numpy(np.expand_dims(state, axis=0)).to(device)
    action = select_epsilon_greedy_action(state_in, epsilon)
    next_state, reward, done, info, unknown_attribute = env.step(action)
    next_state = next_state.astype(np.float32)
    ep_reward += reward
    # Save to experience replay.
    integer_action = (action + 1)*(ACTIONS/2)
    #print(f"the actions is {action} and its integer {integer_action} multiplied by {ACTIONS}")
    buffer.add(state, integer_action, reward, next_state, done)
    state = next_state
    cur_frame += 1
    # Copy main_nn weights to target_nn.
    if cur_frame % 2000 == 0:
      target_nn.load_state_dict(main_nn.state_dict())
    
    # Train neural network.
    if len(buffer) > batch_size:
      states, actions, rewards, next_states, dones = buffer.sample(batch_size)
      next_states = torch.from_numpy(next_states)
      rewards = torch.from_numpy(rewards)
      states = torch.from_numpy(states)
      dones = torch.from_numpy(dones)
      actions = torch.from_numpy(actions)


      loss = training_step(states, actions, rewards, next_states, dones)

  if episode < 950:
    epsilon -= 0.001

  if len(last_100_ep_rewards) == 100:
    last_100_ep_rewards = last_100_ep_rewards[1:]
  last_100_ep_rewards.append(ep_reward)

  if episode % 50 == 0:
    print(f'Episode {episode}/{num_episodes}. Epsilon: {epsilon:.3f}.'
          f' Reward in last 100 episodes: {np.mean(last_100_ep_rewards):.2f}')
    
env.close()

Episode 0/10000. Epsilon: 0.999. Reward in last 100 episodes: 45.57
printing the action masks: tensor([[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
          0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0,
          0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]],

        [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
          0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
          0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]],

        [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
          0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
          0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]],

        [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
          0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
          0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0]],

     

KeyboardInterrupt: 

In [140]:

env_test = gym.make('InvertedDoublePendulum-v4',render_mode="human")

observation = env_test.reset()[0].astype(np.float32)


for i in range(1000):
    inp = torch.from_numpy(observation)
    action = select_epsilon_greedy_action(inp,0)
    observation, reward, terminated, truncated, info = env_test.step(action)
    observation = observation.astype(np.float32)

    

    if terminated or truncated:
        observation, info = env.reset()
        observation = observation.astype(np.float32)


env_test.close

<bound method Wrapper.close of <TimeLimit<OrderEnforcing<PassiveEnvChecker<InvertedDoublePendulumEnv<InvertedDoublePendulum-v4>>>>>>

: 

code to export the trained model

In [24]:
PATH = r"./"
torch.save(main_nn.state_dict(), PATH + r"RNN.pth")

code to import a pretrained model

In [126]:
PATH = r"./"
main_nn.load_state_dict(torch.load(PATH + r"RNN128-128-64.pth",map_location=device))

<All keys matched successfully>