### car racing - torch

In [67]:
from collections import deque
import random
import matplotlib.pyplot as plt
import numpy as np
import gym
import pylab as p
import torch
import torch.optim as optim
import torch.nn.functional as F
import torch.nn as nn


class ReplayBuffer:
    def __init__(self, buffer_size, batch_size):
        self.buffer = deque(maxlen=buffer_size)
        self.batch_size = batch_size

    def add(self, state, action, reward, next_state, done):
        data = (state, action, reward, next_state, done)
        self.buffer.append(data)

    def __len__(self):
        return len(self.buffer)

    def get_batch(self):
        data = random.sample(self.buffer, self.batch_size)

        state = np.stack([x[0] for x in data])
        action = np.array([x[1] for x in data])
        reward = np.array([x[2] for x in data])
        next_state = np.stack([x[3] for x in data])
        done = np.array([x[4] for x in data]).astype(np.int32)

        return state, action, reward, next_state, done



# CNN 특성 추출기
class CNNFeatureExtractor(nn.Module):
    def __init__(self):
        super(CNNFeatureExtractor, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=8, stride=4, padding=2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1)

        # 선형 레이어는 나중에 초기화
        self.feature_size = None
        self.linear = None
        self.initialized = False

    def forward(self, x):
        #print(f"CNNFeatureExtractor 입력 형태: {x.shape}")

        x = F.relu(self.conv1(x))
        #print(f"conv1 후 형태: {x.shape}")

        x = F.relu(self.conv2(x))
        #print(f"conv2 후 형태: {x.shape}")

        x = F.relu(self.conv3(x))
        #print(f"conv3 후 형태: {x.shape}")

        # 평탄화 (1D로)
        batch_size = x.size(0)
        flat_x = x.view(batch_size, -1)
        #print(f"평탄화 후 형태: {flat_x.shape}")

        # 첫 번째 실행 시 특성 크기 감지 및 레이어 초기화
        if not self.initialized:
            self.feature_size = flat_x.size(1)
            #print(f"특성 크기 감지: {self.feature_size}")
            self.linear = nn.Linear(self.feature_size, 128)
            if x.is_cuda:
                self.linear = self.linear.cuda()
            self.initialized = True

        x = F.relu(self.linear(flat_x))
        return x

class QNet(nn.Module):
    def __init__(self, action_size):
        super(QNet, self).__init__()
        self.feature = CNNFeatureExtractor()

        # 나머지 레이어들
        self.l1 = nn.Linear(128, 256)
        self.l2 = nn.Linear(256, 256)
        self.l3 = nn.Linear(256, action_size)

    def forward(self, x):
        #print(f"QNet 입력 형태: {x.shape}")

        x = self.feature(x)
        #print(f"특성 추출 후 형태: {x.shape}")

        x = F.relu(self.l1(x))
        x = F.relu(self.l2(x))
        x = self.l3(x)
        return x

In [61]:
def print_model_structure(model):
    print("📐 Neural Network Structure:")
    print("=" * 50)
    print(model)
    print("=" * 50)

In [None]:
class DQNAgent:
    def __init__(self, device='cpu'):
        self.gamma = 0.98
        self.lr = 0.0005
        self.epsilon = 0.1
        self.buffer_size = 100000
        self.batch_size = 32
        self.action_size = 5
        self.device = device

        self.replay_buffer = ReplayBuffer(self.buffer_size, self.batch_size)
        self.qnet = QNet(self.action_size).to(self.device)
        self.qnet_target = QNet(self.action_size).to(self.device)
        self.optimizer = optim.Adam(self.qnet.parameters(), lr=self.lr)

        #print_model_structure(self.qnet)

    def get_action(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.choice(self.action_size)
        else:
            state = state[np.newaxis, :]

            state = torch.tensor(state, dtype=torch.float32, device=self.device)

            # # state = torch.FloatTensor(state)
            # # state = state.permute(2, 0, 1).unsqueeze(0).to(self.device)
            # print(f"원본 state 형태: {state.shape}")
            #
            # # 상태 전처리
            # #state = preprocess(state)
            #
            # print(state.shape)

            with torch.no_grad():
                qs = self.qnet(state)
            return qs.argmax(dim=1).item()

    def update(self, state, action, reward, next_state, done):
        self.replay_buffer.add(state, action, reward, next_state, done)

        if len(self.replay_buffer) < self.batch_size:
            return

        state, action, reward, next_state, done = self.replay_buffer.get_batch()
        state = torch.tensor(state, dtype=torch.float32).to(self.device)
        action = torch.tensor(action, dtype=torch.long).to(self.device)
        reward = torch.tensor(reward, dtype=torch.float32).to(self.device)
        next_state = torch.tensor(next_state, dtype=torch.float32).to(self.device)
        done = torch.tensor(done, dtype=torch.float32).to(self.device)

        qs = self.qnet(state)
        q = qs[torch.arange(self.batch_size), action]

        with torch.no_grad():
            next_qs = self.qnet_target(next_state)
            next_q = next_qs.max(1)[0]
            target = reward + (1 - done) * self.gamma * next_q

        loss = F.mse_loss(q, target)

        self.qnet.zero_grad()
        loss.backward()
        self.optimizer.step()

    def sync_qnet(self):
        self.qnet_target.load_state_dict(self.qnet.state_dict())


def preprocess(state):
    #state = torch.FloatTensor(state)
    state = np.transpose(state, (2, 0, 1))
    state = state / 255.0
    return state



device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
episodes = 1000
sync_interval = 10


env = gym.make('CarRacing-v2', continuous=False, render_mode='rgb_array')
agent = DQNAgent(device=device)

# Print observation and action space
print("Observation Space:", env.observation_space)
print("Action Space:", env.action_space)

input_dim = env.observation_space.shape[0]

reward_history = []

for episode in range(episodes):
    state = env.reset()[0]
    state = preprocess(state)
    done = False
    total_reward = 0

    while not done:
        action = agent.get_action(state)
        next_state, reward, terminated, truncated, info = env.step(action)
        next_state = preprocess(next_state)
        done = terminated or truncated

        agent.update(state, action, reward, next_state, done)

        state = next_state
        total_reward += reward

    if episode % sync_interval == 0:
        agent.sync_qnet()

    reward_history.append(total_reward)
    if episode % 10 == 0:
        print(f"Episode {episode} | Total Reward {total_reward}")

plt.xlabel('Episodes')
plt.ylabel('Total Reward')
plt.plot(range(len(reward_history)).reward_history)
plt.show()

Observation Space: Box(0, 255, (96, 96, 3), uint8)
Action Space: Discrete(5)
