In [1]:
!pip install gym
!pip install tensorboard


Collecting gym
  Downloading gym-0.26.2.tar.gz (721 kB)
[K     |████████████████████████████████| 721 kB 1.3 MB/s eta 0:00:01
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h    Preparing wheel metadata ... [?25ldone
Collecting gym-notices>=0.0.4
  Downloading gym_notices-0.0.8-py3-none-any.whl (3.0 kB)
Collecting cloudpickle>=1.2.0
  Downloading cloudpickle-3.0.0-py3-none-any.whl (20 kB)
Building wheels for collected packages: gym
  Building wheel for gym (PEP 517) ... [?25ldone
[?25h  Created wheel for gym: filename=gym-0.26.2-py3-none-any.whl size=827623 sha256=62a2f2b3427dea713638c468fa4a3adc2a18c9772c94739ab687ae58cfd7ea81
  Stored in directory: /Users/chenhongyan/Library/Caches/pip/wheels/b9/22/6d/3e7b32d98451b4cd9d12417052affbeeeea012955d437da1da
Successfully built gym
Installing collected packages: gym-notices, cloudpickle, gym
Successfully installed cloudpickle-3.0.0 gym-0.26.2 gym-notices-0.0.8
You shou

In [4]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque
import matplotlib.pyplot as plt

# 自定义生产线环境 (示例)
class ProductionLineEnv(gym.Env):
    def __init__(self):
        super(ProductionLineEnv, self).__init__()
        self.state_space = gym.spaces.Box(low=0, high=1, shape=(4,))
        self.action_space = gym.spaces.Discrete(3)
        self.reset()

    def reset(self):
        self.state = np.random.uniform(0, 1, 4)
        return self.state

    def step(self, action):
        reward = self.calculate_reward(action)
        done = False
        self.state = np.random.uniform(0, 1, 4)
        return self.state, reward, done, {}

    def calculate_reward(self, action):
        efficiency = self.state[action]  # 简单示例：使用当前状态的一个值作为效率
        return efficiency * 100  # 奖励是效率的倍数

# DQN 模型
class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_size, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# Replay Buffer
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
        return np.array(state), np.array(action), np.array(reward), np.array(next_state), np.array(done)

    def __len__(self):
        return len(self.buffer)

# 主要流程
def train_dqn(dqn, optimizer, memory, batch_size=128, gamma=0.99):
    state, action, reward, next_state, done = memory.sample(batch_size)
    state = torch.FloatTensor(state).to(device)
    next_state = torch.FloatTensor(next_state).to(device)
    action = torch.LongTensor(action).to(device)
    reward = torch.FloatTensor(reward).to(device)
    done = torch.FloatTensor(done).to(device)

    q_values = dqn(state)
    next_q_values = dqn(next_state)
    
    q_value = q_values.gather(1, action.unsqueeze(1)).squeeze(1)
    next_q_value = next_q_values.max(1)[0]
    expected_q_value = reward + gamma * next_q_value * (1 - done)
    
    loss = (q_value - expected_q_value.detach()).pow(2).mean()
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    return loss.item()

# 主程序
env = ProductionLineEnv()
state_size = env.state_space.shape[0]
action_size = env.action_space.n

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

dqn = DQN(state_size, action_size).to(device)
optimizer = optim.Adam(dqn.parameters())
memory = ReplayBuffer(1000)

# 模拟训练过程
num_episodes = 10000
best_reward = -float('inf')
best_episode = None
losses = []  # 用于存储每一集的损失
all_rewards = []  # 用于存储每一集的总奖励

for episode in range(num_episodes):
    state = env.reset()
    total_reward = 0
    for t in range(1000):
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
        q_values = dqn(state_tensor)
        action = torch.argmax(q_values).item()
        next_state, reward, done, _ = env.step(action)
        
        memory.push(state, action, reward, next_state, done)
        total_reward += reward
        state = next_state
        
        if len(memory) > 128:
            loss = train_dqn(dqn, optimizer, memory)
            losses.append(loss)
    
    all_rewards.append(total_reward)
    print(f"Episode {episode + 1}: Total Reward: {total_reward}, Loss: {loss:.4f}")
    
    if total_reward > best_reward:
        best_reward = total_reward
        best_episode = episode + 1

# 输出最佳 episode 的结果
print(f"\n最佳 Episode 是第 {best_episode} 集，总奖励为 {best_reward}")

# 绘制损失曲线
plt.figure(figsize=(24, 12))
plt.plot(losses)
plt.title("Training Loss Curve")
plt.xlabel("Training step")
plt.ylabel("Loss")
plt.show()

# 绘制奖励曲线
plt.figure(figsize=(24, 12))
plt.plot(all_rewards)
plt.title("every eposide total reward")
plt.xlabel("Episode")
plt.ylabel("Total Reward")
plt.show()

# 如果需要，可以在这里保存模型参数
torch.save(dqn.state_dict(), 'best_dqn_model.pth')


Episode 1: Total Reward: 50479.14062665884, Loss: 797.0499
