<a href="https://colab.research.google.com/github/Maxxx-VS/The-Founder/blob/master/44_4_Pong%2BDQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Установим необходимые нам библиотеки
!pip install gym torch numpy opencv-python

In [None]:
# Импортируем необходимые библиотеки
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
import random
import os
from gym.wrappers import RecordVideo

In [None]:
# Создание класса нейронной сети
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [None]:
# Создание класса агента
class DQNAgent:
    def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99, epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.memory = deque(maxlen=10000)
        self.model = DQN(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.criterion = nn.MSELoss()

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_dim)
        state = torch.FloatTensor(state).unsqueeze(0)
        q_values = self.model(state)
        return torch.argmax(q_values).item()

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return
        batch = random.sample(self.memory, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        states = torch.FloatTensor(np.array(states))
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(np.array(next_states))
        dones = torch.FloatTensor(dones)

        current_q = self.model(states).gather(1, actions.unsqueeze(1))
        next_q = self.model(next_states).max(1)[0].detach()
        target_q = rewards + (1 - dones) * self.gamma * next_q

        loss = self.criterion(current_q.squeeze(), target_q)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

In [None]:
# Предобработка состояния
def preprocess(state):
    state = state[35:195]  # Обрезка изображения
    state = state[::2, ::2, 0]  # Уменьшение размера и выбор одного канала
    state[state == 144] = 0  # Удаление фона
    state[state == 109] = 0  # Удаление фона
    state[state != 0] = 1  # Установка значения 1 для всех объектов
    return state.astype(np.float).ravel()  # Преобразование в одномерный массив

In [None]:
# Создание среды
env = gym.make('Pong-v0')
state_dim = 80 * 80  # Размер состояния (черно-белое изображение 80x80)
action_dim = env.action_space.n

In [None]:
# Создание агента
agent = DQNAgent(state_dim, action_dim)

In [None]:
# Параметры обучения
episodes = 1000
batch_size = 32
best_reward = -float('inf')

In [None]:
# Папка для сохранения видео
video_dir = "pong_videos"
os.makedirs(video_dir, exist_ok=True)

In [None]:
# Обучение
for episode in range(episodes):
    state = env.reset()
    state = preprocess(state)
    total_reward = 0
    done = False

    while not done:
        action = agent.act(state)
        next_state, reward, done, _ = env.step(action)
        next_state = preprocess(next_state)
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward

        if done:
            print(f"Episode: {episode}, Total Reward: {total_reward}, Epsilon: {agent.epsilon}")
            break

        agent.replay(batch_size)

    # Сохранение видео лучшего эпизода
    if total_reward > best_reward:
        best_reward = total_reward
        env = RecordVideo(env, video_dir, episode_trigger=lambda x: True)
        env.reset()
        state = preprocess(env.reset())
        done = False
        while not done:
            action = agent.act(state)
            next_state, _, done, _ = env.step(action)
            state = preprocess(next_state)
        env.close()