In [11]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torch import optim
from torch import nn

from torchmetrics import Accuracy
from torchkeras import KerasModel

from matplotlib import pyplot as plt
from scipy import signal

from entity import *
from utils import deg_pol2cart, analysis

## 数据准备

### 数据生成

由于预测是实时的，因此GCC等计算不能在数据集中进行，应作为模型的一层。此处的数据就是原始信号。

In [9]:
c = 1500
fs = 4 * 37500
dist_max = 1
n_max = int(np.ceil(dist_max / 1500 * fs))  # 采样频率下最大时延采样点数

In [3]:
# !python ./sim_data_generator.py

In [5]:
dataDict = np.load('train_sim_data.npz')
array_signal = dataDict['array_signal']
labels = dataDict['labels']

In [6]:
array_signal.shape, labels.shape

((3, 750000, 151), (151,))

进行加窗互相关，每个样本由$(C, L)$形状转为$(C, L // W, 2\tau +1)$。W暂时选择1s的采样点数

生成的是3通道，5s的数据，从15度到165度，采样率37500x4，共5x151个样本。

In [3]:
class SigArrDataSet(Dataset):
    def __init__(self, data, labels, period, fs):
        self.data = torch.from_numpy(data)
        self.seq_len = period * fs  # 1周期为一个样本
        self.labels = labels / 90 - 1  # 归一化到（-1，1）

    def __len__(self):
        """数据集样本数=N/fs/period*len(labels)"""
        return int(self.data.shape[1] / self.seq_len * self.data.shape[2])

    def __getitem__(self, idx):
        # TODO: 下一步是从三通道5s数据中随机左右偏置，每个点位生成10个样本
        div, res = divmod(idx, int(self.data.shape[1] / self.seq_len))
        signals = self.data[:, res * self.seq_len:(res + 1) * self.seq_len, div]
        return signals, self.labels[div]

### DataLoader构造

In [7]:
def create_dataloaders(batch_size=10000):
    # 同点位4个样本训练，一个用于评估，将数据集拆分为训练集和验证集
    idx = 4 * 4 * 37500
    # FIXME: 这个切片要改
    train_data = array_signal[:, :idx, :]
    val_data = array_signal[:, idx:, :]
    ds_train = SigArrDataSet(train_data, labels, 1, 4 * 37500)
    ds_val = SigArrDataSet(val_data, labels, 1, 4 * 37500)

    # 目前样本很少，用很大的batch_size只分一批
    dl_train = DataLoader(ds_train, batch_size=batch_size, shuffle=True, num_workers=2, drop_last=False)
    dl_val = DataLoader(ds_val, batch_size=batch_size, shuffle=True, num_workers=2, drop_last=False)
    return dl_train, dl_val

dl_train, dl_val = create_dataloaders()

## 模型定义

In [None]:
# 自定义一维因果卷积层
class CausalConv1d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, dilation=1):
        super().__init__()
        self.padding = (kernel_size - 1) * dilation
        self.conv1d = nn.Conv1d(in_channels, out_channels, kernel_size, padding=self.padding, dilation=dilation)

    def forward(self, x):
        return self.conv1d(x)[:, :, :x.size(2)]


# 自定义一维因果CNN层
# NOTE: 由于是CW信号，在有短脉冲的情况下归一化幅度会进一步压平相关峰，因此不使用GCC
class CausalCnn1d(nn.Module):
    def __init__(self, in_channels):
        super().__init__()
        in_conv_channels = 1024
        mid_conv_channels = 512
        out_conv_channels = 128
        kernel_size = 5
        mid_depth = 4

        self.in_layers = nn.Sequential(
            CausalConv1d(in_channels, in_conv_channels, kernel_size),
            nn.PReLU(in_conv_channels),
        )

        self.mid_layers = nn.ModuleList([
            CausalConv1d(in_conv_channels, mid_conv_channels, kernel_size),
            nn.PReLU(mid_conv_channels),
        ])
        self.mid_layers += nn.ModuleList([
            CausalConv1d(mid_conv_channels, mid_conv_channels, kernel_size),
            nn.PReLU(mid_conv_channels),
        ] * (mid_depth - 1))

        self.out_layers = nn.Sequential(
            CausalConv1d(mid_conv_channels, out_conv_channels, kernel_size, dilation=2),
            nn.PReLU(),
            CausalConv1d(out_conv_channels, 1, kernel_size, dilation=2)  # 仅识别平面方位角，因此输出通道数为1
        )

    def forward(self, x):
        x = self.in_layers(x)
        for layer in self.mid_layers:
            x = layer(x)
        x = self.out_layers(x)
        x = torch.tanh(x)  # 应当是用于将输出归一化到（-1，1）
        return x

class Xcorr(nn.Module):
    def __init__(self, n_max):
        super().__init__()
        self.n_max = n_max

    def forward(self, x):
        """进行加窗互相关，窗口暂时设为整个序列长度，输入为（batch_size, channels, seq_len），输出为（batch_size, channels, 2*n_max+1）
        """
        batch_size, channels, seq_len = x.size()
        # conv1d本身不支持一批输入使用不同卷积内核，因此调整形状，将批量数调整到通道数维度，并行计算
        y = x[:, [1, 2, 0], :].reshape((batch_size * channels, 1, seq_len))
        x = x.reshape(1, batch_size * channels, seq_len)
        # 以padding设置互相关计算的最大时延限制，减少计算量
        return nn.functional.conv1d(x, y, groups=batch_size * channels, padding=self.n_max).reshape(batch_size, channels, -1)

class CausalCnn1d_from_Xcorr(nn.Module):
    def __init__(self, n_max, in_channels) -> None:
        super().__init__()
        self.xcorr = Xcorr(n_max)
        self.cnn = CausalCnn1d(in_channels)

    def forward(self, x):
        x = self.xcorr(x)
        return self.cnn(x)


net = CausalCnn1d_from_Xcorr(n_max, 3)

In [None]:
# a, b, c = np.array((1, 2, 3, 4, 6, 3)), np.array((1, 2, 3, 4, 6, 3)), np.array((1, 3, 5, 6, 3, 0))
# print(np.correlate(a, b, mode='same'), np.correlate(b, c, mode='same'), np.correlate(c, a, mode='same'))
# t = np.stack((np.stack((a, b, c), axis=0),
#      np.stack((c, b, a), axis=0)), axis=0)
# t = torch.from_numpy(t).float()
# x = t.reshape(1, 6, -1)
# y = t[:, [1, 2, 0], :].reshape((6, 1, -1))
# print(nn.functional.conv1d(x, y, groups=6, padding='same').reshape(2, 3, -1))
# model = Xcorr(2)
# print(model(t))

## 模型训练

In [None]:
model = KerasModel(
    net,
    loss_fn=nn.MSELoss(),
    metrics_dict={'acc': Accuracy(task='multiclass', num_classes=151)},
    optimizer=optim.Adam(net.parameters(), lr=1e-3)
)

model.fit(
    dl_train,
    dl_val,
    epochs=10,
    monitor='val_acc',
    mode='max',
)

In [None]:
# def train(model, train_dataset, test_dataset, epochs, batch_size, device, lr, scheduler_kwargs):
#     optimizer = optim.Adam(model.parameters(), lr=lr)  # 可学习参数, 学习率 (超参数)
#     scheduler = optim.lr_scheduler.StepLR(optimizer, **scheduler_kwargs)  # 学习率调整

#     model.train()
#     model.to(device)

#     for epoch in range(epochs):
#         for data, target in train_dataset:
#             optimizer.zero_grad()
#             output = model(data)
#             loss = criterion(output, target)
#             loss.backward()
#             optimizer.step()
#         model.eval()
#         test_loss = 0
#         correct = 0
#         with torch.no_grad():
#             for data, target in test_dataset:
#                 output = model(data)
#                 test_loss += criterion(output, target).item()
#                 pred = output.argmax(dim=1, keepdim=True)
#                 correct += pred.eq(target.view_as(pred)).sum().item()
#         test_loss /= len(test_dataset.dataset)
#         # print(f"Epoch {epoch}: Test set: Average loss: {test_loss}, Accuracy: {correct}/{len(test_dataset.dataset)} ({100. * correct / len(test_dataset.dataset)}%)")
#         scheduler.step() # 更新学习率
#     print('训练结束')

In [None]:
# 参数设置

# EPOCHS = 30
# BATCH_SIZE = 32
# LR = 1e-3
# print(device := torch.device("cuda:0" if torch.cuda.is_available() else "cpu"))

In [None]:
# model = Cnn_from_GCC()

# train(
#     model=model,
#     train_dataset=train_dataset,
#     test_dataset=test_dataset,
#     epochs=EPOCHS,
#     batch_size=BATCH_SIZE,
#     device=device,
#     lr=LR,
#     scheduler_kwargs={'step_size': 10, 'gamma': 0.1},  # 每10个epoch学习率乘0.1, 避免后期学习率过大导致损失震荡
# )

# torch.save(model.state_dict(), 'model.pth')

## 模型评估

In [None]:
# def detect():
#     pass

In [None]:
# model = Cnn_from_GCC()
# model.load_state_dict(torch.load('model.pth'))

In [None]:
# 识别

## 预测