In [1]:
%load_ext autoreload
%autoreload 2

import warnings
warnings.filterwarnings("ignore")

import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import StratifiedShuffleSplit

from datasets import SQDataGenerator
from my_utils.init_utils import set_seed_torch

set_seed_torch(42)



## 1. Load training data and test data
In this part, we load the SEU dataset and split it into training and test sets. In expriments, we use 30% of the data as the training set. Then use a standard normalizer to normalize the data $\mathbf{x}$. Meanwhile, we calculate the high-order difference of the data $\mathbf{x}^{n_\text{df}}$. So we got the prepared training data and test data, which both consist of $[\mathbf{x}, \mathbf{x}^{n_\text{df}}]$.

Note: data file should be put in the folder of `datasets` for a normal data loading operation, i.e. `datasets/seu_2speeds_snr-5.npy`. The SEU and SQ data with SNR=-5 are availbel at https://drive.google.com/drive/folders/1GbioYlKtaTG1KRg9b_Krq3p7z00L7pz7?usp=sharing.

In [2]:
# Here we use the SEU/SQ dataset with SNR=-5 as an example
SNR = -5
# X, Y = SEUDataGenerator(snr=SNR, x_len=1024).data_gen(True)
# num_classes = 10
# (10, 2, 300, 1, 1024) (10, 2, 300) --> (6000, 1024) (6000, )

X, Y = SQDataGenerator(snr=SNR).data_gen(True)
num_classes = 7
# (7, 144, 1, 2048) (7, 144) --> (1008, 2048) (1008, )

print(X.shape, Y.shape)

kf = StratifiedShuffleSplit(n_splits=10, train_size=0.3, random_state=42)  # 按类别比例抽样
train_test_indices = list(kf.split(X, Y))
print(f"Got {len(train_test_indices)} groups of indices for train and test")
train_indices, test_indices = train_test_indices[0]  # 取第一份数据，这里只演示一次试验
X_tra, Y_tra = X[train_indices], Y[train_indices]
X_val, Y_val = X[test_indices], Y[test_indices]
print(X_tra.shape, Y_tra.shape, X_val.shape, Y_val.shape)

Load SQ data...
(1008, 2048) (1008,)
Got 10 groups of indices for train and test
(302, 2048) (302,) (706, 2048) (706,)


## 2. Generate random wavelet kernels
- Generate random wavelet kernels $\mathbf{w}$ with the size of `num_kernels`. We can use the function `fit_kernels` to generate the kernels. 
- `apply_kernels_multi` is used to apply the kernels to the data $\mathbf{x}$, and transform the data into the low-dimensional feature space. Here we use four indicators to extract features.

### 2.1 RaVEL implemented by several functions
You can use the following code to extract random features by random wavelets.

In [7]:
from models.network.ravel.kernel_fit import fit_kernels
from models.network.ravel.kernel_apply import apply_kernels_multi

# normalize these data
n_diff = 3
X_tra_norm = (X_tra - X_tra.mean(1, keepdims=True)) / (X_tra.std(1, keepdims=True) + 1e-12)
X_val_norm = (X_val - X_val.mean(1, keepdims=True)) / (X_val.std(1, keepdims=True) + 1e-12)
X_tra_d, X_val_d = np.diff(X_tra, n_diff), np.diff(X_val, n_diff)
print(X_tra_norm.shape, X_val_norm.shape, X_tra_d.shape, X_val_d.shape)

num_kernels = 250  # You can try: 1000, 500, 400, 250, 200, 100
kernels = fit_kernels(X_tra_norm, num_kernels, multiwavelet=True)
X_tra_feat = apply_kernels_multi(X_tra_norm, kernels, cosine_pool=True)
X_val_feat = apply_kernels_multi(X_val_norm, kernels, cosine_pool=True)

kernels_d = fit_kernels(X_tra_d, num_kernels, multiwavelet=True)
X_tra_feat_d = apply_kernels_multi(X_tra_d, kernels_d, cosine_pool=True)
X_val_feat_d = apply_kernels_multi(X_val_d, kernels_d, cosine_pool=True)

X_tra_feat = np.concatenate([X_tra_feat, X_tra_feat_d], axis=1)
X_val_feat = np.concatenate([X_val_feat, X_val_feat_d], axis=1)

f_mean, f_std = X_tra_feat.mean(0), X_tra_feat.std(0) + 1e-8
X_tra_feat = (X_tra_feat - f_mean) / f_std
X_val_feat = (X_val_feat - f_mean) / f_std

print(f"Num_kernel: {num_kernels}*2, train-{X_tra_feat.shape}, valid-{X_val_feat.shape}")

(302, 2048) (706, 2048) (302, 2045) (706, 2045)
Num_kernel: 250*2, train-(302, 2000), valid-(706, 2000)


### 2.2 RaVEL via Python package

You can use the following code to extract random features by random wavelets.
Or you can use the following package.
`pip install randwave`

In [7]:
# from randwave_source import RandWaveTransform
from randwave import RandWaveTransform  # the downloaded python package

rwt = RandWaveTransform(num_kernels=250, num_diff=3)
X_tra_feat = rwt.fit_transform(X_tra)
X_val_feat = rwt.transform(X_val)

print(f"Num_kernel: {rwt.num_kernels}*2, train-{X_tra_feat.shape}, valid-{X_val_feat.shape}")

Num_kernel: 250*2, train-(302, 2000), valid-(706, 2000)


## 3. Train a simple MLP model 

In [8]:
def do_train_single_label_cpu(model, optimizer: torch.optim.Optimizer, loss_fn: torch.nn.modules.loss._Loss,
                              train_loader, valid_data, max_epochs, scheduler):
    model.train()

    for epoch in range(max_epochs):
        for epi, (bx, by) in enumerate(train_loader):
            logits = model(bx)
            training_loss = loss_fn(logits, by)
            training_loss.backward()
            optimizer.step()
            optimizer.zero_grad()

        scheduler.step()

        if (epoch + 1) % 10 == 0:
            bx_ind, by_ind = valid_data
            model.eval()
            with torch.no_grad():
                logits_ind = model(bx_ind)
            model.train()
            acc_ind = (logits_ind.argmax(1) == by_ind).float().mean().item()
            print(f"[Epoch-{epoch + 1}/{max_epochs}] Test Acc. {acc_ind:.2%}")

    bx_ind, by_ind = valid_data
    model.eval()
    with torch.no_grad():
        logits_ind = model(bx_ind)
    model.train()

    acc_ind = (logits_ind.argmax(1) == by_ind).float().mean().item()
    print(f"\nTest Acc. {acc_ind:.2%}")

In [9]:
num_out_neurons = num_classes
model = nn.Sequential(nn.LazyLinear(128), nn.ReLU6(), nn.LazyLinear(num_out_neurons))
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)

X_tra_feat, Y_tra = torch.FloatTensor(X_tra_feat), torch.LongTensor(Y_tra)
X_val_feat, Y_val = torch.FloatTensor(X_val_feat), torch.LongTensor(Y_val)
train_loader = DataLoader(TensorDataset(X_tra_feat, Y_tra), batch_size=32, shuffle=True)

max_epochs = 20
do_train_single_label_cpu(model, optimizer, loss_fn, train_loader, (X_val_feat, Y_val),
                          max_epochs, scheduler)

[Epoch-10/20] Test Acc. 95.33%
[Epoch-20/20] Test Acc. 95.18%

Test Acc. 95.18%
