# 基于DDPG算法的电磁式阻尼器控制

本笔记本实现了基于DDPG算法的二自由度电磁阻尼器控制系统。参数来源于MATLAB仿真文件，方法迁移自《数据驱动的动力学系统建模及控制策略研究》。

In [None]:
import numpy as np
import torch
import random
import matplotlib.pyplot as plt
from datetime import datetime
import os
import json
import logging
from pathlib import Path

from env import ElectromagneticDamperEnv # 自定义环境
from ddpg_agent import DDPGAgent, ReplayBuffer # DDPG智能体和经验回放缓冲区
from train import train_ddpg # 训练函数
from af import plot_rewards, plot_state_comparison, find_checkpoint_files # 绘图和工具函数

# 设置随机种子，保证结果可重现
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

# 设置中文字体
plt.rcParams['font.family'] = ['SimHei', 'Arial']

# 检查CUDA是否可用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")

# 创建保存模型的基础目录
save_base_dir = "./saved_models"
save_plot_dir = "./saved_plots"
os.makedirs(save_base_dir, exist_ok=True)
os.makedirs(save_plot_dir, exist_ok=True)

In [None]:
# 配置日志系统和训练会话名称
def setup_training_session(base_dir="./saved_models"):
    """设置训练会话，创建目录和配置日志"""
    # 获取默认训练名称（时间戳）
    default_name = datetime.now().strftime("%m%d_%H%M")
    
    # 让用户输入自定义训练名称
    custom_name = input(f"请输入本次训练名称 (默认: {default_name}): ").strip()
    session_name = custom_name if custom_name else default_name
    
    # 创建会话目录
    session_dir = os.path.join(base_dir, session_name)
    os.makedirs(session_dir, exist_ok=True)
    
    # 配置日志
    log_file = os.path.join(session_dir, "training.log")
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ]
    )
    
    logging.info(f"创建训练会话: {session_name}")
    logging.info(f"日志保存路径: {log_file}")
    
    return session_name, session_dir

# 设置训练会话
session_name, save_model_dir = setup_training_session(save_base_dir)

## 超参数设置

In [None]:
# 控制参数
T = 3  # 系统运行时间
Ts = 0.001  # 采样时间步长

# 保存训练配置参数
training_config = {
    "session_name": session_name,
    "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    "device": str(device),
    "training": {
        "n_episodes": 200,
        "batch_size": 64,
        "min_buffer_size": 1000,
        "print_interval": 2,
        "save_interval": 5
    }
}

# 记录系统参数
logging.info(f"训练会话: {session_name}")
logging.info(f"设备: {device}")
logging.info(f"仿真时间: T={T}, Ts={Ts}")

## 系统参数设置

从MATLAB仿真文件中提取二自由度电磁阻尼器系统的参数。

In [None]:
# 系统参数（来自MATLAB文件）
m = 1.6    # 电磁吸振器质量
M = 100.0  # 待减振对象质量
k_m = 3000.0  # 电磁吸振器刚度
k_M = 200000.0  # 平台刚度
k_f = 45.0  # 电—力常数 N/A
k_E = 0.0  # 作动器反电动势系数
L = 0.0045  # 线圈的电感
R_m = 5.0  # 线圈的电阻
c_m = 0.2  # 电磁吸振器阻尼
c_M = 1.0  # 平台阻尼

# 状态空间矩阵（来自MATLAB文件）
A = np.array([
    [0.0,    1.0,       0.0,         0.0],
    [-k_m/m, -c_m/m,    k_m/m,       c_m/m],
    [0.0,    0.0,       0.0,         1.0],
    [k_m/M,  c_m/M,     -(k_m+k_M)/M, -(c_m+c_M)/M]
])

B = np.array([[0.0], [k_f/m], [0.0], [-k_f/M]])

C = np.array([
    [-k_m/m, -c_m/m,    k_m/m,       c_m/m],
    [k_m/M,  c_m/M,     -(k_m+k_M)/M, -(c_m+c_M)/M]
])

D = np.array([[k_f/m], [-k_f/M]])

E = np.array([
    [0.0, 0.0, 0.0, c_M/M],
    [0.0, 0.0, 0.0, k_M/M]
]).T

# 更新训练配置参数中的系统参数
training_config["parameters"] = {
    "system": {
        "m": m,
        "M": M,
        "k_m": k_m,
        "k_M": k_M,
        "k_f": k_f,
        "k_E": k_E,
        "L": L,
        "R_m": R_m,
        "c_m": c_m,
        "c_M": c_M
    },
    "simulation": {
        "T": T,
        "Ts": Ts
    }
}

# 记录系统参数
logging.info("系统参数:")
logging.info(f"  吸振器质量 m = {m}")
logging.info(f"  减振对象质量 M = {M}")
logging.info(f"  吸振器刚度 k_m = {k_m}")
logging.info(f"  平台刚度 k_M = {k_M}")
logging.info(f"  电-力常数 k_f = {k_f}")

# 将配置保存为JSON文件
config_file = os.path.join(save_model_dir, "config.json")
with open(config_file, "w") as f:
    json.dump(training_config, f, indent=4)

logging.info(f"训练配置已保存至: {config_file}")

### 无控制输出环境验证

In [None]:
rl_env = ElectromagneticDamperEnv(A, B, C, D, E, Ts=0.0001, T=1)

# 定义正弦扰动函数用于测试
def sine_disturbance(amp, freq):
    """正弦扰动函数"""
    def func(t):
        return amp * np.sin(2 * np.pi * freq * t)
    return func

# 运行无控制的仿真
results_no_control = rl_env.run_simulation(z_func=sine_disturbance(0.01, 30))
plt.plot(results_no_control['times'], results_no_control['states'][:, 2], label='No Control')
plt.xlabel('时间 (s)')
plt.ylabel('位移')
plt.title('无控制系统响应')
plt.legend()
plt.grid(True)
plt.show()

logging.info("无控制系统验证完成")

## 初始化环境和DDPG代理

使用从MATLAB文件提取的参数创建电磁阻尼器系统环境和DDPG代理。

In [None]:
# 创建仿真环境
rl_env = ElectromagneticDamperEnv(A, B, C, D, E, Ts=Ts, T=T)

# 设置外部扰动函数 - 这里设置为简单的正弦波扰动
rl_env.set_disturbance(sine_disturbance(0.001, 30)) # 设置扰动函数

agent = DDPGAgent(
    state_dim=1, # 状态维度
    action_dim=1, # 动作维度
    hidden_dim=64, # 隐藏层维度
    action_bound=5.0, # 动作范围
    actor_lr=1e-4, # 调整学习率可能需要
    critic_lr=1e-3,
    gamma=0.99,
    tau=0.005,
    sigma=0.2 # 初始噪声标准差
)

# 创建经验回放池
replay_buffer = ReplayBuffer(capacity=100000, batch_size=64)

# 记录DDPG代理参数
logging.info("DDPG代理参数:")
logging.info(f"  状态维度: 1")
logging.info(f"  动作维度: 1")
logging.info(f"  隐藏层维度: 64")
logging.info(f"  动作范围: 5.0")
logging.info(f"  Actor学习率: 1e-4")
logging.info(f"  Critic学习率: 1e-3")
logging.info(f"  折扣因子gamma: 0.99")
logging.info(f"  软更新参数tau: 0.005")
logging.info(f"  探索噪声标准差: 0.2")
logging.info(f"  经验回放池容量: 100000")
logging.info(f"  批次大小: 64")

# 更新训练配置中的DDPG参数
training_config["agent_params"] = {
    "state_dim": 1,
    "action_dim": 1,
    "hidden_dim": 64,
    "action_bound": 5.0,
    "actor_lr": 1e-4,
    "critic_lr": 1e-3,
    "gamma": 0.99,
    "tau": 0.005,
    "sigma": 0.2
}

# 重新保存更新后的配置
with open(config_file, "w") as f:
    json.dump(training_config, f, indent=4)

## 训练DDPG代理

训练DDPG代理来控制电磁阻尼器系统。

In [None]:
def r_func(obs:np.ndarray, action:np.ndarray, next_obs:np.ndarray)-> float:
    """自定义奖励函数"""
    sum_next_obs = np.sum(next_obs**2) # 计算下一个状态的平方和
    sum_action = np.sum(action**2) # 计算动作的平方和
    # 奖励函数：负的状态平方和和动作平方和之和
    reward:float = - (10 * sum_next_obs + 0.001 * sum_action) # 奖励越大越好
    return reward

# 记录奖励函数
logging.info("奖励函数: reward = -(10 * sum_next_obs + 0.001 * sum_action)")
training_config["reward_function"] = "-(10 * sum_next_obs + 0.001 * sum_action)"

# 重新保存更新后的配置
with open(config_file, "w") as f:
    json.dump(training_config, f, indent=4)

# 是否加载先前的训练模型
load_previous_model = input("是否加载先前的训练模型? (y/n): ").strip().lower() == 'y'

start_episode = 0
initial_episode_rewards = None
previous_model_path = None

if load_previous_model:
    checkpoint_files = find_checkpoint_files(save_base_dir)
    
    if checkpoint_files:
        print("\n找到以下检查点文件:")
        for i, file in enumerate(checkpoint_files):
            file_name = os.path.basename(file)
            file_time = os.path.getmtime(file)
            print(f"{i+1}. {file_name} (修改时间: {datetime.fromtimestamp(file_time).strftime('%Y-%m-%d %H:%M:%S')})")
        
        choice = input("请选择要加载的检查点文件编号 (输入数字，直接回车取最新): ")
        
        if choice.strip():
            selected_index = int(choice) - 1
            if 0 <= selected_index < len(checkpoint_files):
                previous_model_path = checkpoint_files[selected_index]
        else:
            # 默认选择最新的检查点
            previous_model_path = checkpoint_files[0]
        
        print(f"加载模型: {os.path.basename(previous_model_path)}")
        logging.info(f"加载检查点: {os.path.basename(previous_model_path)}")
        
        # 加载模型并获取训练状态
        episode_rewards, current_episode = agent.load_checkpoint(previous_model_path)
        start_episode = current_episode
        initial_episode_rewards = episode_rewards
        logging.info(f"继续从第 {start_episode} 轮训练，已完成 {len(episode_rewards)} 轮")
        print(f"继续从第 {start_episode} 轮训练，已完成 {len(episode_rewards)} 轮")
    else:
        print("未找到可加载的检查点文件，将从零开始训练")
        logging.info("未找到可加载的检查点，从零开始训练")

# 记录训练参数
training_params = {
    "n_episodes": 200,
    "batch_size": 64,
    "min_buffer_size": 1000,
    "print_interval": 2,
    "save_interval": 5,
    "start_episode": start_episode
}

logging.info(f"训练参数: {training_params}")
logging.info(f"开始DDPG训练...")

# 训练DDPG代理
training_results = train_ddpg(
    env=rl_env,
    agent=agent,
    replay_buffer=replay_buffer,
    n_episodes=training_params["n_episodes"],  # 训练轮数
    batch_size=training_params["batch_size"], # 批次大小
    min_buffer_size=training_params["min_buffer_size"], # 最小回放池大小
    print_interval=training_params["print_interval"], # 打印信息的间隔
    save_interval=training_params["save_interval"], # 保存模型的间隔
    save_path=save_model_dir, # 保存模型的路径
    log_path=save_model_dir, # 日志保存路径
    r_func=r_func, # 奖励函数
    start_episode=start_episode,
    initial_episode_rewards=initial_episode_rewards,
    load_previous=load_previous_model,
    previous_model=previous_model_path
)

logging.info(f"训练完成，最终平均奖励: {training_results['avg_rewards'][-1]:.2f}")

# 绘制训练奖励曲线
rewards_plot_path = os.path.join(save_model_dir, "training_rewards.png")
plot_rewards(training_results['episode_rewards'], training_results['avg_rewards'], save_path=rewards_plot_path)
logging.info(f"奖励图表已保存: {rewards_plot_path}")

# 记录训练结果摘要
logging.info("训练结果摘要:")
logging.info(f"  总轮次: {len(training_results['episode_rewards'])}")
logging.info(f"  最终平均奖励: {training_results['avg_rewards'][-1]:.4f}")
logging.info(f"  最高单轮奖励: {max(training_results['episode_rewards']):.4f}")
logging.info(f"  最终模型路径: {training_results['final_model']}")

## 测试与结果对比

测试不同控制策略（无控制和DDPG控制）的效果，并进行对比分析。

In [None]:
# 创建测试环境
test_env = ElectromagneticDamperEnv(A, B, C, D, E, Ts=0.001, T=1)

# --- 运行仿真 ---
logging.info("开始测试仿真")

# 运行无控制的仿真
logging.info("运行无控制仿真")
results_no_control = test_env.run_simulation(z_func=sine_disturbance(0.001, 30))

# 选择加载的模型
present_model_options = [
    ("最新训练模型", f"{save_model_dir}/ddpg_agent_{session_name}_ep{start_episode+training_params['n_episodes']}_final.pth"),
    ("手动选择模型", None)
]

choice = input(f"请选择测试模型: 1 = 最新训练模型, 2 = 手动选择模型: ").strip()

if choice == "2":
    # 允许用户手动选择模型文件
    model_files = []
    for root, dirs, files in os.walk(save_base_dir):
        for file in files:
            if file.endswith('.pth') and not file.endswith('_checkpoint.pth'):
                model_files.append(os.path.join(root, file))
                
    if not model_files:
        print("未找到模型文件，使用最新训练的模型")
        model_path = present_model_options[0][1]
    else:
        print("\n找到以下模型文件:")
        for i, file in enumerate(model_files):
            print(f"{i+1}. {os.path.basename(file)}")
                
        idx = int(input("请选择模型文件编号: ")) - 1
        if 0 <= idx < len(model_files):
            model_path = model_files[idx]
        else:
            model_path = present_model_options[0][1]
else:
    # 使用当前训练的最新模型
    model_path = present_model_options[0][1]

# 加载选择的DDPG模型
logging.info(f"加载DDPG模型: {os.path.basename(model_path)}")
print(f"加载模型: {os.path.basename(model_path)}")
agent.load(model_path)

# 运行DDPG控制的仿真
logging.info("运行DDPG控制仿真")
results_ddpg = test_env.run_simulation(z_func=sine_disturbance(0.001, 30), controller=agent.actor)

# 计算DDPG控制效果改善百分比
max_displacement_no_control = np.max(np.abs(results_no_control['states'][:, 2]))
max_displacement_ddpg = np.max(np.abs(results_ddpg['states'][:, 2]))
improvement = (max_displacement_no_control - max_displacement_ddpg) / max_displacement_no_control * 100

logging.info(f"位移控制效果: 无控制最大位移={max_displacement_no_control:.6f}, DDPG最大位移={max_displacement_ddpg:.6f}")
logging.info(f"位移改善: {improvement:.2f}%")

# --- 绘制对比结果 ---
comparison_plot_path = os.path.join(save_model_dir, "control_comparison.png")
fig = plot_state_comparison(results_no_control, results_ddpg, save_path=comparison_plot_path)
logging.info(f"对比结果图表已保存: {comparison_plot_path}")

# --- 保存测试结果 ---
test_results = {
    "no_control": {
        "states_mean": results_no_control['states'].mean(axis=0).tolist(),
        "states_max": results_no_control['states'].max(axis=0).tolist(),
        "states_min": results_no_control['states'].min(axis=0).tolist(),
    },
    "ddpg_control": {
        "states_mean": results_ddpg['states'].mean(axis=0).tolist(),
        "states_max": results_ddpg['states'].max(axis=0).tolist(),
        "states_min": results_ddpg['states'].min(axis=0).tolist(),
        "actions_mean": float(results_ddpg['actions'].mean()),
        "actions_max": float(results_ddpg['actions'].max()),
        "actions_min": float(results_ddpg['actions'].min()),
    }
}

results_file = os.path.join(save_model_dir, "test_results.json")
with open(results_file, "w") as f:
    json.dump(test_results, f, indent=4)

logging.info(f"测试结果已保存至: {results_file}")