In [2]:
import gymnasium as gym
from gymnasium.spaces import Box
from gymnasium.wrappers import FrameStack
from gymnasium.wrappers import PixelObservationWrapper
from gymnasium.wrappers import StepAPICompatibility
import torch
from torch import nn
from torchvision import transforms as T
from PIL import Image
import numpy as np
from pathlib import Path
from collections import deque
import random, datetime, os
from tensordict import TensorDict
from torchrl.data import TensorDictReplayBuffer, LazyMemmapStorage
import matplotlib.pyplot as plt
import cv2
import numpy as np
import gc
import torch.optim as optim
import torch.nn.functional as F
import retro





class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip):
        """Return only every `skip`-th frame"""
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        """Repeat action, and sum reward"""
        total_reward = 0.0
        for i in range(self._skip):
            
            obs, reward, done, trunk, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, trunk, info


class GrayScaleObservation(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        obs_shape = self.observation_space.shape[:2]
        self.observation_space = Box(low=0, high=240, shape=obs_shape, dtype=np.uint8)

    def permute_orientation(self, observation):
        observation = np.transpose(observation, (2, 0, 1))
        observation = torch.tensor(observation.copy(), dtype=torch.float)
        return observation

    def observation(self, observation):
        observation = self.permute_orientation(observation)
        transform = T.Grayscale()
        observation = transform(observation)
        return observation


class ResizeObservation(gym.ObservationWrapper):
    def __init__(self, env, shape):
        super().__init__(env)
        if isinstance(shape, int):
            self.shape = (shape, shape)
        else:
            self.shape = tuple(shape)

        obs_shape = self.shape + self.observation_space.shape[2:]
        self.observation_space = Box(low=0, high=240, shape=obs_shape, dtype=np.uint8)

    def observation(self, observation):
        transforms = T.Compose(
            [T.Resize(self.shape, antialias=True), T.Normalize(0, 240)]
        )
        observation = transforms(observation).squeeze(0)
        return observation


In [3]:
stacked_frames = 6
env = retro.make(game='GhostsnGoblins-Nes', render_mode='rgb-array')
env = SkipFrame(env, skip=stacked_frames) 
env = GrayScaleObservation(env)
env = ResizeObservation(env, shape=(67,72))

if gym.__version__ < '0.26':
    env = FrameStack(env, num_stack=stacked_frames, new_step_api=True)
else:
    env = FrameStack(env, num_stack=stacked_frames)
env.reset()

(<gymnasium.wrappers.frame_stack.LazyFrames at 0x77f12d930c70>, {})

In [4]:
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        c, h, w = input_dim
        self.conv1 = nn.Conv2d(c, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.fc1 = nn.Linear(self._calculate_conv_output_size((c, h, w)), 512)
        self.fc2 = nn.Linear(512, output_dim)

    def _calculate_conv_output_size(self, shape):
        c, h, w = shape
        x = torch.randn(1, c, h, w)
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        return int(np.prod(x.size()))

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        return self.fc2(x)


In [21]:
from collections import deque
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

class DQNAgent:
    def __init__(self, env, input_dim, output_dim, gamma=0.99, epsilon=1.0, epsilon_min=0.1, epsilon_decay=0.9995, lr=0.001, replay_memory_size=10000, batch_size=32):
        """
        Initieren von Hyperparameter
        """
        self.env = env
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.lr = lr
        self.batch_size = batch_size
        self.device =torch.device("cpu")

        
        self.replay_memory = TensorDictReplayBuffer(storage=LazyMemmapStorage(replay_memory_size, device=torch.device("cpu")))

        self.model = DQN(input_dim, output_dim)
        self.target_model = DQN(input_dim, output_dim)
        self.target_model.load_state_dict(self.model.state_dict())
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.loss_fn = nn.MSELoss()

    def choose_action(self, state):

        if isinstance(state, (int, float)): 
            state = torch.FloatTensor([state]).unsqueeze(0) 
        else:
            state = state[0].__array__() if isinstance(state, tuple) else state.__array__()
            state = torch.tensor(state, device=self.device).unsqueeze(0)  
    
        if np.random.rand() <= self.epsilon:
            return self.env.action_space.sample()  
        else:
            q_values = self.model(state)
            probs = F.softmax(q_values, dim=-1)
            #print(probs)
            threshold = 0.2 
            q_values = (probs > threshold).int() #entscheidung welche Erfahrungen wertvoll sind
            q_values = q_values.squeeze().tolist()
            #print(q_values)
            return q_values 


    def remember(self, state, action, reward, next_state, done):
        """
        Store the experience in replay memory
        """
        def first_if_tuple(x):
            return x[0] if isinstance(x, tuple) else x
        state = first_if_tuple(state).__array__()
        next_state = first_if_tuple(next_state).__array__()

        state = torch.tensor(np.array(state))
        self.replay_memory.add(TensorDict({
            "state": torch.tensor(np.array(state)),
            "action": torch.tensor([action]),
            "reward": torch.tensor([reward]),
            "next_state": torch.tensor(next_state),
            "done": torch.tensor([done])
        }, batch_size=[]))

    def replay(self):
     
        if len(self.replay_memory) < self.batch_size:
            return
        
        batch = self.replay_memory.sample(self.batch_size)#.to(self.device)
        states = batch['state']
        actions = batch['action'].squeeze()
        rewards = batch['reward'].squeeze()
        next_states = batch['next_state']
        dones = batch['done'].squeeze().float()

        current_q_values = self.model(states)
        #current_q_values = current_q_values[torch.arange(self.batch_size), actions]
        next_q_values = self.target_model(next_states).max(1)[0].detach()
        target_q_values = rewards + (1 - dones) * self.gamma * next_q_values

       
        target_q_values = target_q_values.unsqueeze(1)
        current_q_values = current_q_values.unsqueeze(1)

        loss = self.loss_fn(current_q_values, target_q_values.unsqueeze(1))

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def update_target_model(self):
        self.target_model.load_state_dict(self.model.state_dict())

    def train(self, episodes=10000, max_steps_per_episode=1000):
        scores = []
        for episode in range(episodes):
            state = self.env.reset()
            total_reward = 0
            for step in range(max_steps_per_episode):
                action = self.choose_action(state)
                #print(action)
                next_state, reward, done, truncated, info = self.env.step(action)

                self.remember(state, action, reward, next_state, done)
                state = next_state

                self.replay()

                total_reward += reward
                if done:
                    break

            scores.append(total_reward)
            print(f"Episode {episode + 1}/{episodes}, Score: {total_reward}, Epsilon: {self.epsilon:.2f}")
            if (episode + 1) % 10 == 0:
                self.update_target_model()

        print('Training complete.')
        return scores



In [22]:
input_dim = (stacked_frames, 67, 72) 

output_dim = env.action_space.n 
agent = DQNAgent(env, input_dim, output_dim)
scores = agent.train()


state = env.reset()
done = False
while not done:
    action = agent.choose_action(state)
    next_state, reward, done, _ = env.step(action)
    state = next_state

env.close()

tensor([[0.1111, 0.1111, 0.1111, 0.1111, 0.1112, 0.1111, 0.1111, 0.1111, 0.1111]],
       grad_fn=<SoftmaxBackward0>)
[0, 0, 0, 0, 0, 0, 0, 0, 0]
tensor([[0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111]],
       grad_fn=<SoftmaxBackward0>)
[0, 0, 0, 0, 0, 0, 0, 0, 0]
tensor([[0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111]],
       grad_fn=<SoftmaxBackward0>)
[0, 0, 0, 0, 0, 0, 0, 0, 0]
tensor([[0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111]],
       grad_fn=<SoftmaxBackward0>)
[0, 0, 0, 0, 0, 0, 0, 0, 0]
tensor([[0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111]],
       grad_fn=<SoftmaxBackward0>)
[0, 0, 0, 0, 0, 0, 0, 0, 0]
tensor([[0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111]],
       grad_fn=<SoftmaxBackward0>)
[0, 0, 0, 0, 0, 0, 0, 0, 0]
tensor([[0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111, 0.1111]],
       grad_fn=<SoftmaxBackward0>)
[0, 0,

KeyboardInterrupt: 