In [3]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

  return torch._C._cuda_getDeviceCount() > 0


In [4]:
device

device(type='cpu')

In [9]:
import numpy as np
import random
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import json
import torch.nn.functional as F



class Qfunction(nn.Module):
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.model = nn.Sequential(
                nn.Linear(state_dim, 64),
                nn.ReLU(),
                nn.Linear(64, 64),
                nn.ReLU(),
                nn.Linear(64, action_dim)                
        )

    def forward(self, states):
        actions = self.model(states)
        return actions

class DuelingQfunction(nn.Module):
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.model = nn.Sequential(
                nn.Linear(state_dim, 64),
                nn.ReLU(),
                nn.Linear(64, 64),
                nn.ReLU()
        )
        self.v_head = nn.Sequential(
            nn.Linear(64, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )
        
        self.a_head = nn.Sequential(
            nn.Linear(64, 256),
            nn.ReLU(), 
            nn.Linear(256, action_dim)
        )


    def forward(self, states):
        inp = self.model(states)
        v = self.v_head(inp)
        a = self.a_head(inp)

        q = v + a - a.mean(1, keepdim=True)
        return q

class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(QNetwork, self).__init__()

        self.fc1 = nn.Linear(4, 64)
        self.relu = nn.ReLU()
        self.fc_value = nn.Linear(64, 256)
        self.fc_adv = nn.Linear(64, 256)

        self.value = nn.Linear(256, 1)
        self.adv = nn.Linear(256, 2)

    def forward(self, state):
        y = self.relu(self.fc1(state))
        value = self.relu(self.fc_value(y))
        adv = self.relu(self.fc_adv(y))

        value = self.value(value)
        adv = self.adv(adv)

        advAverage = torch.mean(adv, dim=1, keepdim=True)
        Q = value + adv - advAverage

        return Q


class DQN_double:
    def __init__(self, state_dim, action_dim, gamma=0.99, lr=1e-3, batch_size=64, epsilon_decrease=0.0001, epsilon_min=0.0001, period=100):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.q_function = DuelingQfunction(state_dim, action_dim).to(device)
        self.freezing_q_function = DuelingQfunction(self.state_dim, self.action_dim).to(device)
        # self.q_function = Qfunction(state_dim, action_dim).to(device)
        # self.freezing_q_function = Qfunction(self.state_dim, self.action_dim).to(device)
        # self.q_function = QNetwork(state_dim, action_dim).to(device)
        # self.freezing_q_function = QNetwork(self.state_dim, self.action_dim).to(device)
        # self.freezing_q_function.load_state_dict(self.q_function.state_dict())
        self.gamma = gamma
        self.batch_size = batch_size
        self.epsilon = 0.5
        self.epsilon_decrease = epsilon_decrease
        self.epsilon_min = epsilon_min
        self.memory = []
        self.optimzaer = torch.optim.Adam(self.q_function.parameters(), lr=lr)
        self.counter = 1
        self.period = period

    def get_action(self, state):
        with torch.no_grad():
            q_values = self.q_function(torch.FloatTensor(state).unsqueeze(dim=0).to(device))
            q_values = q_values.squeeze()
            
            argmax_action = torch.argmax(q_values)
            probs = self.epsilon * np.ones(self.action_dim) / self.action_dim
            probs[argmax_action] += 1.0 - self.epsilon
            if self.counter % 1_000 == 0:
                print(probs)
                print(self.epsilon)
                print(q_values)
            action = np.random.choice(np.arange(self.action_dim), p=probs)
            return action
    
    def fit(self, state, action, reward, done, next_state):
        self.memory.append([state, action, reward, int(done), next_state])

        if len(self.memory) > 128:

            if self.counter % self.period == 0:
                for parameter_freeze, parameter in zip(self.freezing_q_function.model.parameters(), self.q_function.model.parameters()):
                    with torch.no_grad():
                        parameter_freeze.data.copy_(parameter.data)
                for parameter_freeze, parameter in zip(self.freezing_q_function.a_head.parameters(), self.q_function.a_head.parameters()):
                    with torch.no_grad():
                        parameter_freeze.data.copy_(parameter.data)
                for parameter_freeze, parameter in zip(self.freezing_q_function.v_head.parameters(), self.q_function.v_head.parameters()):
                    with torch.no_grad():
                        parameter_freeze.data.copy_(parameter.data)
                # self.freezing_q_function.load_state_dict(self.q_function.state_dict())
            
            self.counter += 1

            batch = random.sample(self.memory, self.batch_size)

            states, actions, rewards, dones, next_states = map(torch.FloatTensor, list(zip(*batch)))
            states, actions, rewards, dones, next_states = states.to(device), actions.unsqueeze(1).to(device), rewards.unsqueeze(1).to(device), dones.unsqueeze(1).to(device), next_states.to(device)

            with torch.no_grad():
                targets = rewards + self.gamma * (1 - dones) * self.freezing_q_function(next_states.float()).gather(1, torch.argmax(self.q_function(next_states.float()), dim=1, keepdim=True).long())

            loss = F.mse_loss(self.q_function(states.float()).gather(1, actions.long()), targets)

            self.optimzaer.zero_grad()
            loss.backward()
            self.optimzaer.step()
            if self.counter % 10_000 == 0:
                print(f"loss: {loss.item()}")
                # print(targets)
            if self.epsilon > self.epsilon_min:
                self.epsilon -= (0.1 - self.epsilon_min) / 20_000
            


In [5]:
def DQN_learning(env, agent, episode_n = 100, t_max = 500):
    # agent.epsilon_decrease = 1.0 / (0.75*episode_n)

    total_rewards = []
    for episode in range(episode_n):
        total_reward = 0
    
        state, info = env.reset()
        for t in range(t_max):
            action = agent.get_action(state)

            next_state, reward, terminated, truncated, info = env.step(action)
    
            total_reward += reward
            
            agent.fit(state, action, reward, terminated or truncated, next_state)
            
    
            state = next_state
    
            if terminated or truncated:
                break
                
        total_rewards.append(total_reward)
        if episode % 10 == 0:
            print(f'episode: {episode}, total_reward: {np.mean(total_rewards[-10:])}')
    
    return total_rewards

In [21]:
### Проблема при реализации была в том, что в модели добавились новые слои, а я эти слои не копировал, а копировал только общий позвоночник модели как в старой версии.

In [10]:
import gym

torch.manual_seed(43)

env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

episode_n=1000

agent = DQN_double(state_dim, action_dim)

total_rewards = DQN_learning(env, agent, episode_n=episode_n)


loss: 83.88592529296875
[0.8998475 0.1001525]
0.20030499499954207
tensor([163.7247, 163.6149])
episode: 910, total_reward: 66.1


KeyboardInterrupt: 