# 第6章·实验4：面向时序分析的 CNN本 Notebook 以一维卷积网络完成合成时序数据的分类任务，覆盖数据准备、模型搭建、训练评估和实验思考环节，便于直接运行或在课件要求基础上改动参数重现实验。主要步骤：1. 构造带噪声的双类别时序数据（正弦波 vs. 方波）。2. 使用 `DataLoader` 划分训练/验证/测试集并可视化样本。3. 搭建一维卷积网络（Conv1d + BatchNorm + Dropout）。4. 训练与验证，记录损失与精度曲线，评估测试集并展示混淆矩阵。5. 预留思考与扩展小节，按照课件的“实验要求”整理观察与结论。

## 1. 环境与依赖- 依赖：`torch`, `numpy`, `matplotlib`, `scikit-learn`（如缺失可 `pip install`）。- 建议 GPU 可用时自动切换到 CUDA。

In [None]:
import math
import random
import numpy as np
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt

# 保持可重复性
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device


## 2. 生成与准备时序数据- 类别 0：含噪声的正弦波；类别 1：含噪声的方波。- 每个样本长度固定（默认为 200），可根据实验要求修改。- 划分训练/验证/测试集，并归一化到 [-1, 1] 范围。

In [None]:
from typing import Tuple

def generate_wave(sample_len: int, kind: str, noise_scale: float = 0.1) -> np.ndarray:
    x = np.linspace(0, 2 * math.pi, sample_len)
    if kind == "sine":
        base = np.sin(x)
    elif kind == "square":
        base = np.sign(np.sin(x))
    else:
        raise ValueError("Unsupported kind")
    noise = np.random.normal(scale=noise_scale, size=sample_len)
    return base + noise

def build_dataset(n_samples: int = 1200, sample_len: int = 200) -> Tuple[np.ndarray, np.ndarray]:
    signals = []
    labels = []
    for _ in range(n_samples // 2):
        signals.append(generate_wave(sample_len, "sine"))
        labels.append(0)
        signals.append(generate_wave(sample_len, "square"))
        labels.append(1)
    signals = np.stack(signals).astype(np.float32)
    labels = np.array(labels, dtype=np.int64)
    # 归一化到 [-1, 1]
    max_val = np.abs(signals).max()
    signals = signals / max_val
    return signals, labels

class WaveformDataset(Dataset):
    def __init__(self, signals: np.ndarray, labels: np.ndarray):
        self.signals = torch.tensor(signals)
        self.labels = torch.tensor(labels)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        # Conv1d 期望形状: (C, L)，这里 C=1
        return self.signals[idx].unsqueeze(0), self.labels[idx]

signals, labels = build_dataset(n_samples=1200, sample_len=200)

# 划分数据集
total = len(labels)
indices = np.random.permutation(total)
train_end = int(total * 0.7)
val_end = int(total * 0.85)
train_idx, val_idx, test_idx = indices[:train_end], indices[train_end:val_end], indices[val_end:]

train_data = WaveformDataset(signals[train_idx], labels[train_idx])
val_data = WaveformDataset(signals[val_idx], labels[val_idx])
test_data = WaveformDataset(signals[test_idx], labels[test_idx])

train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
val_loader = DataLoader(val_data, batch_size=64)
test_loader = DataLoader(test_data, batch_size=64)

len(train_data), len(val_data), len(test_data)


### 样本可视化从训练集抽取若干条曲线，确认类别差异与噪声分布是否符合预期。

In [None]:
samples_to_plot = 3
fig, axes = plt.subplots(samples_to_plot, 2, figsize=(10, 6), sharex=True, sharey=True)
for row in range(samples_to_plot):
    for col, label in enumerate([0, 1]):
        idx = (labels == label).nonzero()[0][row]
        axes[row, col].plot(signals[idx])
        axes[row, col].set_title(f"label={label}")
plt.tight_layout()
plt.show()


## 3. 构建一维卷积网络网络结构示例：Conv1d → BatchNorm → ReLU → Dropout → 池化，多层堆叠后接全连接分类器。可以根据课件实验要求自由调整通道数、卷积核大小或加入残差块。

In [None]:
class TimeSeriesCNN(nn.Module):
    def __init__(self, num_classes: int = 2):
        super().__init__()
        self.feature_extractor = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=7, padding=3),
            nn.BatchNorm1d(16),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.MaxPool1d(kernel_size=2),

            nn.Conv1d(16, 32, kernel_size=5, padding=2),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.MaxPool1d(kernel_size=2),

            nn.Conv1d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.AdaptiveAvgPool1d(1),
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(32, num_classes),
        )

    def forward(self, x):
        x = self.feature_extractor(x)
        return self.classifier(x)

model = TimeSeriesCNN().to(device)
model


## 4. 训练、验证与评估- 使用交叉熵损失与 Adam 优化器。- 记录训练/验证损失与准确率，便于绘制学习曲线。- 可根据课件要求增加早停、学习率调度、更多指标等。

In [None]:
from collections import defaultdict

def train_one_epoch(model, loader, criterion, optimizer):
    model.train()
    total_loss, correct, total = 0.0, 0, 0
    for x, y in loader:
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        logits = model(x)
        loss = criterion(logits, y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * y.size(0)
        preds = logits.argmax(dim=1)
        correct += (preds == y).sum().item()
        total += y.size(0)
    return total_loss / total, correct / total

def evaluate(model, loader, criterion):
    model.eval()
    total_loss, correct, total = 0.0, 0, 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            logits = model(x)
            loss = criterion(logits, y)
            total_loss += loss.item() * y.size(0)
            preds = logits.argmax(dim=1)
            correct += (preds == y).sum().item()
            total += y.size(0)
    return total_loss / total, correct / total

def run_training(model, train_loader, val_loader, epochs=20, lr=1e-3):
    history = defaultdict(list)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    for epoch in range(1, epochs + 1):
        train_loss, train_acc = train_one_epoch(model, train_loader, criterion, optimizer)
        val_loss, val_acc = evaluate(model, val_loader, criterion)
        history["train_loss"].append(train_loss)
        history["val_loss"].append(val_loss)
        history["train_acc"].append(train_acc)
        history["val_acc"].append(val_acc)
        print(f"Epoch {epoch:02d}: train_loss={train_loss:.4f} val_loss={val_loss:.4f} "
              f"train_acc={train_acc:.3f} val_acc={val_acc:.3f}")
    return history

history = run_training(model, train_loader, val_loader, epochs=20, lr=1e-3)


### 学习曲线观察损失与准确率随 epoch 的变化趋势，检查是否过拟合或欠拟合。

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
axes[0].plot(history["train_loss"], label="train")
axes[0].plot(history["val_loss"], label="val")
axes[0].set_title("Loss")
axes[0].legend()
axes[1].plot(history["train_acc"], label="train")
axes[1].plot(history["val_acc"], label="val")
axes[1].set_title("Accuracy")
axes[1].legend()
plt.show()


### 测试集评估与混淆矩阵输出分类报告和混淆矩阵，结合实验要求撰写分析（如优势、可能的改进方向）。

In [None]:
criterion = nn.CrossEntropyLoss()
test_loss, test_acc = evaluate(model, test_loader, criterion)
print(f"Test loss: {test_loss:.4f}, Test acc: {test_acc:.3f}")

all_labels = []
all_preds = []
model.eval()
with torch.no_grad():
    for x, y in test_loader:
        x = x.to(device)
        logits = model(x)
        preds = logits.argmax(dim=1).cpu().numpy()
        all_preds.extend(preds)
        all_labels.extend(y.numpy())

print(classification_report(all_labels, all_preds, target_names=["sine", "square"]))
cm = confusion_matrix(all_labels, all_preds)

fig, ax = plt.subplots(figsize=(4, 4))
im = ax.imshow(cm, cmap="Blues")
ax.set_xticks([0, 1])
ax.set_yticks([0, 1])
ax.set_xticklabels(["sine", "square"])
ax.set_yticklabels(["sine", "square"])
ax.set_xlabel("Predicted")
ax.set_ylabel("True")
for (i, j), val in np.ndenumerate(cm):
    ax.text(j, i, int(val), ha="center", va="center", color="black")
fig.colorbar(im)
plt.show()


## 5. 思考与拓展（与课件实验要求对应）- **网络结构对比**：尝试不同卷积核大小/层数，记录验证集表现。- **正则化与泛化**：对比 Dropout/BatchNorm 关闭或调整后的变化。- **数据与窗口**：修改序列长度或增加噪声，观察鲁棒性。- **实验结论**：总结 CNN 在时序分类中的优势、存在问题及改进思路。