# Car Racing Domain Adaptation with DANN

使用 DANN (Domain Adversarial Neural Network) 进行强化学习的领域适应项目。



In [None]:
# Git Clone 仓库并导入所有模块
!git clone https://github.com/bochendong/car-racing-revistied.git 2>/dev/null || echo "Repository already exists or clone failed"
%cd car-racing-revistied

# 导入所有必需的模块（使用新的模块化结构）
import sys
sys.path.append('src')

from src.utils import Env, get_random_buffer, eval, ExperimentLogger
from src.models import DANN
from src.agents import Agent

print("✅ 所有模块导入成功！")

In [None]:
# Colab 环境设置
%%capture
%pip install swig
!sudo apt update && sudo apt install python-opengl
!sudo apt update && sudo apt install xvfb
%pip install gym-notebook-wrapper stable-baselines[mpi] pyglet
%pip install pyvirtualdisplay -qq
!apt-get install -y xvfb python-opengl ffmpeg -qq
%pip install gym[box2d]
%pip install box2d-kengz

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import os
import glob
import matplotlib.pyplot as plt
%matplotlib inline
import gym

In [None]:
# 设置设备并创建输出目录
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
print(f"使用设备: {device}")

if not os.path.exists("./output_r"):
    os.mkdir("output_r")
    
# 清空输出目录
for epoch in range(3000):
    files = glob.glob("./output_r/*.png")
    for f in files:
        os.remove(f)

## 1. 查看unseen 环境

In [None]:
# 环境预览：查看不同背景颜色的环境
source_env = Env(color='g', seed=0)
unseen_1_env = Env(color='c1', seed=0)
unseen_2_env = Env(color='c2', seed=0)

discrete_actions = {
    0: np.array([0,0,0]),       1: np.array([-1,0,0]),      2: np.array([1,0,0]),
    3: np.array([-0.5,0,0]),   4: np.array([0.5,0,0]),     5: np.array([0,1,0]),
    6: np.array([0,0.5,0]),    7: np.array([0,0.25,0]),   8: np.array([0,0,1]),
    9: np.array([0,0,0.5]),    10: np.array([0,0,0.25])
}

def get_obs(env):
    for i in range(30):
        action = torch.randint(low=0, high=11, size=(1,))
        action_transfered = discrete_actions.get(int(action[0]))
        obs, reward, done, _ = env.step([action_transfered[0], action_transfered[1], action_transfered[2]])
    return (obs[0] + 1) / 2.0

env_preview = [get_obs(source_env), get_obs(unseen_1_env), get_obs(unseen_2_env)]

f, axs = plt.subplots(1, 3, figsize=(12, 4))
axs = axs.flatten()
for img, ax in zip(env_preview, axs):
    ax.imshow(img)
plt.show()

## 2. 训练

In [None]:
# 创建环境
green_env = Env(color='g', seed=0)
env_c1 = Env(color='c1', seed=0)
env_c2 = Env(color='c2', seed=0)

source_env = green_env
target_env = [env_c1, env_c2]

In [None]:
# 初始化模型和智能体
criterion = nn.CrossEntropyLoss().to(device)
net = DANN(num_out=2).double().to(device)
optimizer = optim.Adam(net.parameters(), lr=1e-4)

# 注意：使用导入的 Agent 类时，必须传入 device 参数
agent = Agent(net=net, criterion=criterion, optimizer=optimizer, 
              buffer_capacity=1024, batch_size=128, device=device)

print("✅ 模型和智能体初始化完成！")

In [None]:
training_records = []
running_score_records = []
running_score = 0

c1_training_records = []
c2_training_records = []

eta = 0.2

for i_ep in range(3000):
    score = 0
    state = source_env.reset()

    for t in range(1000):
        action, a_logp = agent.select_action(state)
        state_, reward, done, die = source_env.step(action * np.array([2., 1., 1.]) + np.array([-1., 0., 0.]))
        score += reward

        should_update = agent.store((state, action, a_logp, reward, state_))

        if should_update:
            eta_max = 0.5 if i_ep < 500 else (0.45 if i_ep < 1500 else 0.3)
            print("eta: {:.2f}".format(eta))
            agent.update(epoch=i_ep, eta=eta)
            eta = 0.1

        state = state_

        if done or die:
            break

    # 记录分数和计算移动平均
    training_records.append(score)
    running_score = running_score * 0.99 + score * 0.01
    running_score_records.append(running_score)

    # 每 15 个 episode 显示进度并评估
    if (i_ep + 1) % 15 == 0:
        print('Ep {}\tLast score: {:.2f}\tMoving average score: {:.2f}'.format(i_ep, score, running_score))

        c1_score = eval(agent, target_env[0])
        c2_score = eval(agent, target_env[1])
        c1_training_records.append(c1_score)
        c2_training_records.append(c2_score)

        print('c1 score: {:.2f}\t c2 score: {:.2f}'.format(c1_score, c2_score))

        # 绘制训练曲线
        f, axs = plt.subplots(2, 2, figsize=(16, 8))
        axs[0][0].plot(range(len(training_records)), training_records)
        axs[0][0].set_title("Training Records")
        axs[0][1].plot(range(len(running_score_records)), running_score_records)
        axs[0][1].set_title("Running Score")
        axs[1][0].plot(range(len(c1_training_records)), c1_training_records)
        axs[1][0].set_title("C1 Training Records")
        axs[1][1].plot(range(len(c2_training_records)), c2_training_records)
        axs[1][1].set_title("C2 Training Records")

        f.savefig('./output_r/result_%04d.png' % i_ep)
        plt.close(f)