In [2]:
import glob
import os
import pathlib
import random
import subprocess
import sys
import math

from collections import OrderedDict
from typing import Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torchaudio
from sklearn.metrics import ConfusionMatrixDisplay, accuracy_score, confusion_matrix
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import DataLoader, Dataset, random_split
from torchaudio.transforms import AmplitudeToDB, MelSpectrogram

AUDIO_DIR = "/kaggle/input/free-spoken-digit-dataset-fsdd/recordings/"

# torch.manual_seed(0)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

class Dataset(Dataset):
    def __init__(self, targ_dir: str, transform=None, max_length: int = 16) -> None:
        self.paths = list(pathlib.Path(targ_dir).glob("*.wav"))
        self.labels = [int(path.stem.split("_")[0]) for path in self.paths]  # Convert labels to int
        self.transform = transform
        self.max_length = max_length

    def load_audio(self, index: int):
        audio_path = self.paths[index]
        waveform, sample_rate = torchaudio.load(audio_path, normalize=True)  # 关键修改
        return waveform

    def __len__(self) -> int:
        return len(self.paths)

    def __getitem__(self, index: int) -> Tuple[torch.Tensor, int]:
        waveform = self.load_audio(index)
        label = self.labels[index]

        if self.transform:
            # mel_spec_transform = MelSpectrogram(sample_rate=32000, n_fft=1024, hop_length=512, n_mels=128)
            mel_spec_transform = MelSpectrogram(sample_rate=32000, n_fft=256, hop_length=128, n_mels=32)
            amp2db_transform = AmplitudeToDB()
            waveform = mel_spec_transform(waveform)
            waveform = amp2db_transform(waveform)
            
            # 可选：对梅尔频谱进一步标准化
            waveform = (waveform - waveform.mean()) / waveform.std()

            # 长度调整
            if waveform.shape[-1] < self.max_length:
                padding = self.max_length - waveform.shape[-1]
                waveform = torch.nn.functional.pad(waveform, (0, padding))
            elif waveform.shape[-1] > self.max_length:
                waveform = waveform[:, :, :self.max_length]

        return waveform.squeeze(), label

dataset = Dataset(AUDIO_DIR, transform=True)

train_size = int(0.9 * len(dataset))
test_size = len(dataset) - train_size


train_dataset, test_dataset = random_split(dataset, [train_size, test_size])


train_loader = DataLoader(train_dataset, batch_size=100, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=100)

cpu


In [3]:
#SCELL TEST
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

lens = 0.5
thresh = 0.875
decay = 0.5 # decay constants(Leak)
gamma = 0.5

# surroguate_type = 'sigmoid'
# surroguate_type = 'MG'
# surroguate_type = 'slayer'
surroguate_type = 'G'
print('surroguate_type: ', surroguate_type)

def gaussian(x, mu=0., sigma=0.5):
    return torch.exp(-((x - mu) ** 2) / (2 * sigma ** 2)) / torch.sqrt(2 * torch.tensor(math.pi)) / sigma


class AcFun_adp(torch.autograd.Function):
    @staticmethod
    def forward(ctx, input):  # input = memberane potential-threshold
        ctx.save_for_backward(input)
        return input.gt(0).float()

    @staticmethod
    def backward(ctx, grad_output):  # approximate the gradients
        input, = ctx.saved_tensors
        grad_input = grad_output.clone()

        scale = 6.0
        height = 0.15
        if surroguate_type == 'G':
            temp = torch.exp(-(input ** 2) / (2 * lens ** 2)) / torch.sqrt(2 * torch.tensor(math.pi)) / lens
        elif surroguate_type == 'MG':
            temp = gaussian(input, mu=0., sigma=lens) * (1.0 + height) \
                   - gaussian(input, mu=lens, sigma=scale * lens) * height \
                   - gaussian(input, mu=-lens, sigma=scale * lens) * height
        elif surroguate_type == 'linear':
            temp = F.relu(1 - input.abs())
        elif surroguate_type == 'slayer':
            temp = torch.exp(-5 * input.abs())
        elif surroguate_type == 'sigmoid':
            temp = torch.exp(-input) / (1 + torch.exp(-input)) ** 2
        return grad_input * temp.float() * gamma

act_fun_adp = AcFun_adp.apply

def LIF_mem_update(inputs, mem, spike):
    # if(mem.shape[1] == 3):
    #     print(mem*32768)
    # mem = mem * decay * (1. - spike) + inputs
    
    n = float(2 ** 14)
    mem = mem * decay * (1. - spike)
    mem = torch.clip(torch.round(mem * n) / n , -14.99993896484375, 14.99993896484375)
    # print(mem.shape)
    # print(inputs.shape)
    mem = mem  + inputs
    mem = torch.clip(torch.round(mem * n) / n , -14.99993896484375, 14.99993896484375)

    
    temp_mem = mem - thresh
    spike = act_fun_adp(temp_mem)
    # spike = act_fun_adp(mem)

    return mem, spike

def Quantize(tensor, n_bit):
    n = float(2 ** (n_bit-2))
    # n = float(2 ** (n_bit-1))

    return torch.clip(torch.round(tensor * n) / n , -1.99993896484375,1.99993896484375)
    # return torch.clip(torch.round(tensor * n) / n , -0.9921875, 0.9921875)


def Binarize(tensor):
    return Quantize(tensor,16)


class BinarizeLinear(nn.Linear):

    def __init__(self, *kargs, **kwargs):
        super(BinarizeLinear, self).__init__(*kargs, **kwargs)

    def forward(self, input):

        if not hasattr(self.weight,'org'):
            self.weight.org=self.weight.data.clone()
        self.weight.data=Binarize(self.weight.org)
        out = nn.functional.linear(input, self.weight)
        if not self.bias is None:
            self.bias.org=self.bias.data.clone()
            out += self.bias.view(1, -1).expand_as(out)

        return out

def Get_HexData(binary_tensor):
    binary_tensor = binary_tensor.to(torch.int)

    # 将张量展平为一维
    flattened = binary_tensor.view(-1)

    # 确保长度是4的倍数
    if len(flattened) % 4 != 0:
        padding = torch.zeros(4 - len(flattened) % 4, dtype=torch.int)
        flattened = torch.cat((flattened, padding))

    grouped = flattened.view(-1, 4)
    grouped = torch.flip(grouped, dims=[-1])

    # 将每4位二进制数转换为16进制字符
    hex_data = []
    for group in grouped:
        binary_str = ''.join([str(bit.item()) for bit in group])  # 将4位二进制转换为字符串
        hex_value = hex(int(binary_str, 2))[2:].upper()  # 转换为16进制并去掉前缀 '0x'，转为大写
        hex_data.append(hex_value)

    # 打印结果
    # print("16进制数据:", hex_data)
    
    # 倒序打印 hex_data
    reversed_hex_data = hex_data[::-1]
    print("倒序后的16进制数据:", reversed_hex_data)

print("Load LIF Done!!!!")

surroguate_type:  G
Load LIF Done!!!!


In [4]:
# GRUCELL TEST
import torch
import torch.nn as nn
import torch.nn.functional as F
import math


class GRU(nn.Module):

    def __init__(self, input_size, hidden_size, output_size, bias=False, dropout=0.0):
        super(GRU, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.dropout = dropout
        
        # self.Encoding = nn.Linear(input_size, input_size, bias=bias)
        self.RZGate = nn.Linear(input_size + hidden_size, hidden_size, bias=bias)
        self.h2o = nn.Linear(hidden_size, output_size, bias=bias)
        self.relu = nn.ReLU()

        self.dropout_layer = nn.Dropout(dropout)
        self.reset_parameters()

    def reset_parameters(self):
        std = 1.0 / math.sqrt(self.hidden_size)
        for w in self.parameters():
            w.data.uniform_(-std, std)

    def forward(self, x, hidden_spike=None):
        # 初始化隐藏状态
        if hidden_spike is None:
            hidden_spike = torch.zeros(x.size(0), self.hidden_size, device=x.device)
        else:
            # 确保hidden是三维的(num_layers, batch_size, hidden_size)
            hidden_spike = hidden_spike.squeeze(0)

        # 对序列中的每个时间步进行处理
        Encoding_mem = Encoding_spike = (torch.zeros(x.size(0),x.size(1), x.size(2))).to(device)
        
        Encoding_mem, Encoding_spike = LIF_mem_update(x, Encoding_mem, Encoding_spike)
        outputs = []
        for t in range(x.size(1)):
            # 获取当前时间步的输入
            X_spike = Encoding_spike[:, t, :]
            # X_spike = x[:, t, :]
            # 计算门控
            # print(xt)
            # print(hidden_spike)
            # xt = EncodingBinarize(xt)
            # X_spike = self.relu(self.Encoding(xt))
            
            x_in = torch.cat((X_spike, hidden_spike), dim=1).to(device)
            hidden_spike = torch.sigmoid(self.RZGate(x_in))

            # print(hidden_spike)
            o_input = self.h2o(hidden_spike)

            # print(output_spike_sum)

        # print(h)
        return o_input


print("Network!")
# 使用示例
# 假设输入序列x的形状是(batch_size, seq_length, input_size)
# 初始化GRUCell
gru_cell = GRU(input_size=10, hidden_size=20, output_size = 2).to(device)

# 假设的输入序列
x = torch.randn(5, 3, 10).to(device)  # batch_size=5, seq_length=3, input_size=10

# 调用GRUCell，得到所有时间步的隐藏状态
out= gru_cell(x)

# h的形状是(batch_size, seq_length, hidden_size)
print(out.shape)  # 应该输出torch.Size([5, 2])


Network!
torch.Size([5, 2])


In [5]:
# GRUCELL TEST
import torch
import torch.nn as nn
import torch.nn.functional as F
import math


class SGRU(nn.Module):

    def __init__(self, input_size, hidden_size, output_size, bias=False, dropout=0.0):
        super(SGRU, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.dropout = dropout
        
        # self.RZGate = BinarizeLinear(input_size + hidden_size, hidden_size, bias=bias)
        # self.h2o = BinarizeLinear(hidden_size, output_size, bias=bias)
        self.relu = nn.ReLU()
        self.RZGate = nn.Linear(input_size + hidden_size, hidden_size, bias=bias)
        self.h2o = nn.Linear(hidden_size, output_size, bias=bias)

        self.dropout_layer = nn.Dropout(dropout)
        self.reset_parameters()

    def reset_parameters(self):
        std = 1.0 / math.sqrt(self.hidden_size)
        for w in self.parameters():
            w.data.uniform_(-std, std)

    def forward(self, x, hidden_spike=None):
        # 初始化隐藏状态
        if hidden_spike is None:
            hidden_spike = torch.zeros(x.size(0), self.hidden_size, device=x.device)
        else:
            # 确保hidden是三维的(num_layers, batch_size, hidden_size)
            hidden_spike = hidden_spike.squeeze(0)

        # 对序列中的每个时间步进行处理
        Encoding_mem = Encoding_spike = (torch.zeros(x.size(0),x.size(1), x.size(2))).to(device)
        RZgate_mem = RZgate_spike = torch.zeros(x.size(0), self.hidden_size, device=x.device)
        
        Encoding_mem, Encoding_spike = LIF_mem_update(x, Encoding_mem, Encoding_spike)
        outputs = []
        for t in range(x.size(1)):
            # 获取当前时间步的输入
            X_spike = Encoding_spike[:, t, :]
            # X_spike = x[:, t, :]
            # 计算门控
            # print(xt)
            # print(hidden_spike)
            # xt = EncodingBinarize(xt)
            # X_spike = self.relu(self.Encoding(xt))
            
            x_in = torch.cat((X_spike, hidden_spike), dim=1)

            RZgate_x =self.RZGate(x_in)
            RZgate_mem, RZgate_spike = LIF_mem_update(RZgate_x, RZgate_mem, RZgate_spike)

            # print(hidden_spike)
            hidden_spike = RZgate_spike
            o_input = self.h2o(hidden_spike)

            # print(output_spike_sum)

        # print(h)
        return o_input


print("Network!")
# 使用示例
# 假设输入序列x的形状是(batch_size, seq_length, input_size)
# 初始化GRUCell
gru_cell = SGRU(input_size=10, hidden_size=20, output_size = 2).to(device)

# 假设的输入序列
x = torch.randn(5, 3, 10).to(device)  # batch_size=5, seq_length=3, input_size=10

# 调用GRUCell，得到所有时间步的隐藏状态
out= gru_cell(x)

# h的形状是(batch_size, seq_length, hidden_size)
print(out.shape)  # 应该输出torch.Size([5, 2])


Network!
torch.Size([5, 2])


In [6]:
# GRUCELL TEST
import torch
import torch.nn as nn
import torch.nn.functional as F
import math


class LSGRU(nn.Module):

    def __init__(self, input_size, hidden_size, output_size, bias=False, dropout=0.0):
        super(LSGRU, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.dropout = dropout
        
        self.RZGate = BinarizeLinear(input_size + hidden_size, hidden_size, bias=bias)
        self.h2o = BinarizeLinear(hidden_size, output_size, bias=bias)
        

        self.dropout_layer = nn.Dropout(dropout)
        self.reset_parameters()

    def reset_parameters(self):
        std = 1.0 / math.sqrt(self.hidden_size)
        for w in self.parameters():
            w.data.uniform_(-std, std)

    def forward(self, x, hidden_spike=None):
        # 初始化隐藏状态
        if hidden_spike is None:
            hidden_spike = torch.zeros(x.size(0), self.hidden_size, device=x.device)
        else:
            # 确保hidden是三维的(num_layers, batch_size, hidden_size)
            hidden_spike = hidden_spike.squeeze(0)

        # 对序列中的每个时间步进行处理
        Encoding_mem = Encoding_spike = (torch.zeros(x.size(0),x.size(1), x.size(2))).to(device)
        RZgate_mem = RZgate_spike = torch.zeros(x.size(0), self.hidden_size, device=x.device)
        
        Encoding_mem, Encoding_spike = LIF_mem_update(x, Encoding_mem, Encoding_spike)
        outputs = []
        for t in range(x.size(1)):
            # 获取当前时间步的输入
            X_spike = Encoding_spike[:, t, :]
            # X_spike = x[:, t, :]
            # 计算门控
            # print(xt)
            # print(hidden_spike)
            # xt = EncodingBinarize(xt)
            # X_spike = self.relu(self.Encoding(xt))
            
            x_in = torch.cat((X_spike, hidden_spike), dim=1)
            # print("input Spiking!!!")
            # print(hidden_spike)
            # print(x_in)

            RZgate_x =self.RZGate(x_in)
            RZgate_mem, RZgate_spike = LIF_mem_update(RZgate_x, RZgate_mem, RZgate_spike)

            # print("hidden Spiking!!!")
            # print(RZgate_spike)
            hidden_spike = RZgate_spike
            o_input = self.h2o(hidden_spike)

            # print(output_spike_sum)

        # print(h)
        return o_input


print("Network!")
# 使用示例
# 假设输入序列x的形状是(batch_size, seq_length, input_size)
# 初始化GRUCell
gru_cell = LSGRU(input_size=10, hidden_size=20, output_size = 2).to(device)

# 假设的输入序列
x = torch.randn(5, 3, 10).to(device)  # batch_size=5, seq_length=3, input_size=10

# 调用GRUCell，得到所有时间步的隐藏状态
out= gru_cell(x)

# h的形状是(batch_size, seq_length, hidden_size)
print(out.shape)  # 应该输出torch.Size([5, 2])


Network!
torch.Size([5, 2])


In [16]:
# GRUCELL TEST
import torch
import torch.nn as nn
import torch.nn.functional as F
import math


class OLSGRU(nn.Module):

    def __init__(self, input_size, hidden_size, output_size, bias=False, dropout=0.0):
        super(OLSGRU, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.dropout = dropout
        
        self.RZGate = BinarizeLinear(input_size + hidden_size, hidden_size, bias=bias)
        self.h2o = BinarizeLinear(hidden_size, output_size, bias=bias)
        

        self.dropout_layer = nn.Dropout(dropout)
        self.reset_parameters()

    def reset_parameters(self):
        std = 1.0 / math.sqrt(self.hidden_size)
        for w in self.parameters():
            w.data.uniform_(-std, std)

    def forward(self, x, hidden_spike=None):
        # 初始化隐藏状态
        if hidden_spike is None:
            hidden_spike = torch.zeros(x.size(0), self.hidden_size, device=x.device)
        else:
            # 确保hidden是三维的(num_layers, batch_size, hidden_size)
            hidden_spike = hidden_spike.squeeze(0)

        # 对序列中的每个时间步进行处理
        Encoding_mem = Encoding_spike = (torch.zeros(x.size(0),x.size(1), x.size(2))).to(device)
        RZgate_mem = RZgate_spike = torch.zeros(x.size(0), self.hidden_size, device=x.device)
        
        Encoding_mem, Encoding_spike = LIF_mem_update(x, Encoding_mem, Encoding_spike)
        
        o_input = Encoding_spike

            # print(output_spike_sum)

        # print(h)
        return o_input


print("Network!")
# 使用示例
# 假设输入序列x的形状是(batch_size, seq_length, input_size)
# 初始化GRUCell
gru_cell = OLSGRU(input_size=10, hidden_size=20, output_size = 2).to(device)

# 假设的输入序列
x = torch.randn(5, 3, 10).to(device).to(device)  # batch_size=5, seq_length=3, input_size=10

# 调用GRUCell，得到所有时间步的隐藏状态
out= gru_cell(x)

# h的形状是(batch_size, seq_length, hidden_size)
print(out.shape)  # 应该输出torch.Size([5, 2])


Network!
torch.Size([5, 3, 10])


In [17]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        # Create a matrix of shape (max_len, d_model)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)  # Apply sine to even indices
        pe[:, 1::2] = torch.cos(position * div_term)  # Apply cosine to odd indices
        pe = pe.unsqueeze(0)  # Add a batch dimension (1, max_len, d_model)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x shape: (batch_size, time_steps, d_model)
        # Add positional encoding to the input embeddings
        # print(self.pe.device)
        return x + self.pe[:, :x.size(1)]

class SpeechTransformer(nn.Module):
    def __init__(self, num_classes=10, d_model=32, nhead=4, num_layers=4):
        super(SpeechTransformer, self).__init__()
        # self.input_layer = nn.Linear(128, d_model)  # Map mel bins to d_model
        self.positional_encoding = PositionalEncoding(d_model)  # Add positional encoding
        
        # self.classifier = GRU(d_model, 64, 10)
        # self.classifier = SGRU(d_model, 64, 10)
        # self.classifier = LSGRU(d_model, 64, 10)
        self.classifier = OLSGRU(d_model, 64, 10)


    def forward(self, x):
        # print(x.shape)
        # x shape: (batch_size, mel_bins, time_steps)
        x = x.transpose(1, 2)  # (batch_size, time_steps, mel_bins)
        # x = self.input_layer(x)  # Map to d_model
        x = self.positional_encoding(x)  # Add positional encoding
        # print(x[0][0])
        # xx
        # print(x.shape)
        # x = self.transformer_encoder(x)  # Pass through transformer encoder
        # x = x.mean(dim=1)  # Aggregate time dimension (average over time)
        
        x = self.classifier(x)  # Final classifier
        
        # _, x_, s_ = self.classifier(x)  # Final classifier
        # print(x.shape)
        # xx
        return x

In [15]:
from torch.optim.lr_scheduler import StepLR
# Prepare for saving models
os.makedirs("models", exist_ok=True)

# torch.manual_seed(0)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

model = SpeechTransformer().to(device)
model.load_state_dict(torch.load("/kaggle/working/models/srnn_best_model.pth"))
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = StepLR(optimizer, step_size=80, gamma=0.1)

# Training
epochs = 5
train_loss_list = []
val_loss_list = []
best_val_accuracy = 0.0  # Track the best validation accuracy

for epoch in range(epochs):
    model.train()
    train_loss = 0.0
    for inputs, labels in train_loader:
        # print(inputs[0][0])
        inputs, labels = inputs.to(device), labels.to(device)
        # print(inputs.device, labels.device)  
        optimizer.zero_grad()
        outputs = model(inputs)
        # print(outputs.device)
        # xx
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    # Validation
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            # print(inputs[0][0])
            # xx
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    # Calculate metrics
    train_loss_avg = train_loss / len(train_loader)
    val_loss_avg = val_loss / len(test_loader)
    val_accuracy = 100 * correct / total
    train_loss_list.append(train_loss_avg)
    val_loss_list.append(val_loss_avg)

    print(
        f"Epoch {epoch + 1}, Train Loss: {train_loss_avg:.4f}, Val Loss: {val_loss_avg:.4f}, Val Accuracy: {val_accuracy:.2f}%"
    )

    # Save the model if it's the best so far
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        torch.save(model.state_dict(), os.path.join("models", "lsrnn_best_model.pth"))
        print(f"Best model saved with Val Accuracy: {val_accuracy:.2f}%")

cpu
Epoch 1, Train Loss: 1.5346, Val Loss: 1.6366, Val Accuracy: 43.00%
Best model saved with Val Accuracy: 43.00%
Epoch 2, Train Loss: 1.5346, Val Loss: 1.6366, Val Accuracy: 43.00%
Epoch 3, Train Loss: 1.5346, Val Loss: 1.6366, Val Accuracy: 43.00%
Epoch 4, Train Loss: 1.5346, Val Loss: 1.6366, Val Accuracy: 43.00%
Epoch 5, Train Loss: 1.5346, Val Loss: 1.6366, Val Accuracy: 43.00%


In [18]:
import numpy as np
import os

# 初始化存储容器
all_features = []
all_labels = []

Tran_Model = SpeechTransformer()
Tran_Model.load_state_dict(torch.load("/kaggle/working/models/lsrnn_best_model.pth"))

# 确保保存路径存在
os.makedirs("save_dir", exist_ok=True)

# 训练循环
model.eval()
with torch.no_grad():
    # for inputs, labels in train_loader:
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        feature = Tran_Model(inputs)
        # print(feature[0][0])
        # xxx
        # 转换为 NumPy 并累积
        feature_np = feature.cpu().detach().numpy()
        labels_np = labels.cpu().detach().numpy()
        
        all_features.append(feature_np)
        all_labels.append(labels_np)

# 合并并保存
all_features = np.concatenate(all_features, axis=0)
all_labels = np.concatenate(all_labels, axis=0)
print(all_features.shape)
print(all_labels.shape)
print(all_labels)
# xxx
# np.save("save_dir/FSDD_train_data.npy", all_features)
# np.save("save_dir/FSDD_train_label.npy", all_labels)
np.save("save_dir/FSDD_test_data.npy", all_features)
np.save("save_dir/FSDD_test_label.npy", all_labels)

(300, 16, 32)
(300,)
[3 1 6 2 4 1 0 2 0 8 9 8 0 3 3 7 2 4 4 8 6 8 4 7 6 4 0 9 1 4 5 6 1 0 8 7 0
 0 6 5 4 9 0 5 5 6 8 5 6 9 3 5 3 7 9 5 0 8 3 3 3 6 6 7 2 3 4 1 6 1 1 7 3 1
 2 2 0 7 5 5 6 9 7 4 6 0 3 1 7 5 9 9 9 6 2 4 5 6 2 4 6 3 6 7 5 1 6 5 0 4 5
 4 7 2 9 8 1 3 3 3 9 2 1 8 4 1 4 7 4 3 9 9 4 2 9 4 5 6 7 2 8 1 8 8 3 8 8 5
 8 3 3 8 2 2 9 7 9 3 2 2 8 8 4 6 9 8 9 5 7 3 3 5 6 4 1 0 7 3 9 3 4 7 2 4 4
 4 9 4 0 5 9 6 4 5 3 3 3 5 3 3 0 8 2 6 3 9 5 1 4 5 8 1 7 2 7 8 4 9 9 1 0 7
 9 7 7 6 7 0 1 3 8 9 8 9 1 2 8 5 3 8 5 6 4 1 1 1 6 7 2 1 9 3 0 7 9 7 5 4 1
 6 7 1 2 5 2 6 1 3 9 0 0 2 9 6 7 8 5 9 7 7 6 6 9 7 2 3 2 0 7 3 3 3 5 8 0 7
 0 0 9 6]


In [19]:
###------------------------------------------------TEST--------------------------------------------

In [20]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

class CustomTestDataset(Dataset):
    def __init__(self, data_path, label_path, transform=None):
        self.data = np.load(data_path)  # 加载数据文件 
        self.labels = np.load(label_path)  # 加载标签文件 
        self.transform = transform  # 可选的数据预处理（如转为Tensor）

    def __len__(self):
        return len(self.data)  # 确保数据与标签数量一致 

    def __getitem__(self, idx):
        sample = self.data[idx]
        label = self.labels[idx]
        if self.transform:
            sample = self.transform(sample)  # 应用预处理（如归一化）
        return sample, label


# 定义预处理流程
test_transform = transforms.Compose([
    transforms.ToTensor(),  # 将numpy数组转为Tensor，并自动归一化到[0,1] [[1]]
    # transforms.Normalize(mean=[0.5], std=[0.5])  # 可选标准化
])

# 初始化Dataset
test_dataset = CustomTestDataset(
    data_path='/kaggle/working/save_dir/FSDD_test_data.npy',
    label_path='/kaggle/working/save_dir/FSDD_test_label.npy',
    transform=test_transform
)

# 创建DataLoader
test_loader = DataLoader(
    test_dataset,
    batch_size=64,  # 按需设置批次大小
    shuffle=False,  # 测试时通常不洗牌 
    num_workers=2   # 多进程加载加速
)

In [21]:
def Get_HexData(binary_tensor):
    binary_tensor = binary_tensor.to(torch.int)

    # 将张量展平为一维
    flattened = binary_tensor.view(-1)

    # 确保长度是4的倍数
    if len(flattened) % 4 != 0:
        padding = torch.zeros(4 - len(flattened) % 4, dtype=torch.int)
        flattened = torch.cat((flattened, padding))

    grouped = flattened.view(-1, 4)
    grouped = torch.flip(grouped, dims=[-1])

    # 将每4位二进制数转换为16进制字符
    hex_data = []
    for group in grouped:
        binary_str = ''.join([str(bit.item()) for bit in group])  # 将4位二进制转换为字符串
        hex_value = hex(int(binary_str, 2))[2:].upper()  # 转换为16进制并去掉前缀 '0x'，转为大写
        hex_data.append(hex_value)

    # 打印结果
    # print("16进制数据:", hex_data)
    
    # 倒序打印 hex_data
    reversed_hex_data = hex_data[::-1]
    print("倒序后的16进制数据:", reversed_hex_data)

In [22]:
# 在创建Dataset之前检查
data = np.load('/kaggle/working/save_dir/FSDD_test_data.npy')
labels = np.load('/kaggle/working/save_dir/FSDD_test_label.npy')

print(f"整体数据形状: {data.shape}")
print(f"标签形状: {labels.shape}")
print(f"第一个样本形状: {data[0].shape}")
print(f"第一个样本维度: {data[0].ndim}")

整体数据形状: (300, 16, 32)
标签形状: (300,)
第一个样本形状: (16, 32)
第一个样本维度: 2


In [23]:
import os
from torch.optim.lr_scheduler import StepLR
# Prepare for saving models
os.makedirs("models", exist_ok=True)
from collections import OrderedDict

# torch.manual_seed(0)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

model = LSGRU(input_size=32, hidden_size=64, output_size=10).to(device)
# model.load_state_dict(torch.load("/kaggle/input/fffffsdd/FSDD_SRNN/best_model2 (1).pth"))
# 加载原始 state_dict
criterion = torch.nn.CrossEntropyLoss()
pretrained_dict = torch.load('/kaggle/working/models/lsrnn_best_model.pth')
# 创建新字典，过滤并重命名键
new_state_dict = OrderedDict()

for k, v in pretrained_dict.items():
    if k.startswith("classifier."):
        # 移除 "classifier." 前缀
        name = k.replace("classifier.", "", 1)
        new_state_dict[name] = v
    # 可添加其他条件过滤不需要的键（如 positional_encoding.pe）
    elif k == "positional_encoding.pe":
        continue  # 直接跳过此键
    else:
        new_state_dict[k] = v  # 保留其他键

# 加载调整后的 state_dict
model.load_state_dict(new_state_dict, strict=False)
print(model)
# xxx
val_loss_list = []
model.eval()
val_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        print(inputs.shape)
        # xx
        inputs = inputs.squeeze(dim=1) 
        print(inputs.shape)
        print(labels.shape)
        # print(inputs[0][0])
        # print(labels)
        # xxx
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        # outputs = torch.clamp(outputs, min= 0)
        n = float(2 ** 3)
        outputs = torch.clip(torch.round(outputs * n) / n , -14.875, 14.875)
        print(outputs.shape)

        # 非全负数的个数
        # non_negative_mask = outputs >= 0
        # 沿行方向（dim=1）判断是否存在至少一个非负元素
        # has_non_negative = torch.any(non_negative_mask, dim=1)
        # 统计满足条件的行数
        # count = torch.sum(has_non_negative).item()
        # print(count)

        # 判断每行是否全为负数
        # all_negative_mask = torch.all(outputs < 0, dim=1)  # 使用dim=1沿行方向检查
        # 获取全负数行的索引
        # invalid_indices = torch.nonzero(all_negative_mask, as_tuple=True)[0].tolist()
        # print("全负数行的索引：", invalid_indices)
        
        # print("------------------")
        # for i in range(64):
        #     print(outputs[i])
        print(outputs.max())
        print(outputs.min())
        loss = criterion(outputs, labels)
        val_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

# Calculate metrics
val_loss_avg = val_loss / len(test_loader)
val_accuracy = 100 * correct / total
val_loss_list.append(val_loss_avg)

print(f"Val Loss: {val_loss_avg:.4f}, Val Accuracy: {val_accuracy:.2f}%")
torch.save(model.state_dict(), os.path.join("models", "best_model.pth"))


cpu
LSGRU(
  (RZGate): BinarizeLinear(in_features=96, out_features=64, bias=False)
  (h2o): BinarizeLinear(in_features=64, out_features=10, bias=False)
  (dropout_layer): Dropout(p=0.0, inplace=False)
)
torch.Size([64, 1, 16, 32])
torch.Size([64, 16, 32])
torch.Size([64])
torch.Size([64, 10])
tensor(5.5000)
tensor(-6.)
torch.Size([64, 1, 16, 32])
torch.Size([64, 16, 32])
torch.Size([64])
torch.Size([64, 10])
tensor(4.6250)
tensor(-6.)
torch.Size([64, 1, 16, 32])
torch.Size([64, 16, 32])
torch.Size([64])
torch.Size([64, 10])
tensor(6.6250)
tensor(-6.)
torch.Size([64, 1, 16, 32])
torch.Size([64, 16, 32])
torch.Size([64])
torch.Size([64, 10])
tensor(4.5000)
tensor(-6.5000)
torch.Size([44, 1, 16, 32])
torch.Size([44, 16, 32])
torch.Size([44])
torch.Size([44, 10])
tensor(4.7500)
tensor(-6.1250)
Val Loss: 1.6384, Val Accuracy: 42.33%


In [24]:
print(model.RZGate.weight.shape)
print(model.h2o.weight.shape)

torch.Size([64, 96])
torch.Size([10, 64])


In [25]:
import torch

# 加载权重文件
state_dict = torch.load("/kaggle/working/models/best_model.pth")

# 遍历字典中的每个权重
for name, weight in state_dict.items():
    # 将点分隔的层名转换为下划线分隔的文件名
    filename = "/kaggle/working/" + name.replace('.', '_') + '.pth'
    # 创建一个包含名称和权重的字典
    weight_dict = {name: weight}
    # 保存权重到文件
    torch.save(weight_dict, filename)
    print(f'Saved {name} to {filename}')
    print("shape is:",weight.shape)

Saved RZGate.weight to /kaggle/working/RZGate_weight.pth
shape is: torch.Size([64, 96])
Saved h2o.weight to /kaggle/working/h2o_weight.pth
shape is: torch.Size([10, 64])


In [27]:
os.makedirs("Weight_npy", exist_ok=True)
load_dict = torch.load(r'/kaggle/working/RZGate_weight.pth',map_location='cpu')
# load_dict = torch.load(r'/kaggle/working/h2o_weight.pth',map_location='cpu')
# print(load_dict)
i = 0

pre = 1

for k, v in load_dict.items():
    print(type(v))
    a = v.cpu().numpy()
    print(a.shape)
    # print(a)
    print(type(a))
    if k[-6:] == 'weight':
        print("save weight")
        np.save(r"Weight_npy/{}.npy".format(k), a)

<class 'torch.Tensor'>
(64, 96)
<class 'numpy.ndarray'>
save weight


In [28]:
def bin2hex_fan(x):
# 调用input216bit函数，函数将输入的数转换为16位的二进制字符串。
  x = input216bit(x)

  z = x
  new_hex = []
# 循环4次，因为16位二进制可以分成4组，每组4位。
# 将每组4位二进制转换为十六进制，并移除前缀0x。
# 将转换后的十六进制字符添加到列表中。
# 将列表中的十六进制字符连接成字符串，并转换为大写。
  for i in range(4):
      temp = hex(int(z[i*4:(i+1)*4], 2)).replace("0x", "")
      new_hex.append(temp)
  return "".join(new_hex).upper()

def dec2bin(x):
# 这个函数将浮点数的小数部分转换为二进制表示，最多保留15位。
# 移除x的整数部分，仅保留小数部分。
  int_X = int(x)
  x -= int(x)
  bins = []
# 当x不为0且二进制位数少于15位时，继续循环。
# *2相对于进行左移位，将小数变成二进制的整数，判断是否为1，之后再减去该值，继续后续的循环操作
# 返回二进制字符串，并在前面补足0，以确保总共有15位。
  bins.append(str(1) if int_X >= 1. else str(0))
  while x and len(bins)<15:
    x *= 2
    bins.append(str(1) if x>=1. else str(0))
    x -= int(x)
  return "".join(bins) + (15-len(bins))*str(0)

# 将整数转换为3位的二进制表示，返回格式化的字符串，确保长度为3位。
def int2bin(x):
  return '{0:03b}'.format(x)

def input216bit(x):
# 函数将输入的数转换为16位的二进制字符串。
# 取x的绝对值。
  new_x = abs(x)

  # int_bin = int2bin(int(new_x))
# 调用dec2bin函数，获取x的小数部分二进制表示。（15位小数）
  dec_bin = dec2bin(new_x)

# 判断他的正负号
  if x > 0:
    return "0" + dec_bin
  elif x == 0:
    return  16 * "0"
  else:
    return "1" + dec_bin

test1 = 1.99993896484375
quan1 = bin2hex_fan(test1)
print(test1)
print(quan1)

test1 = 0.99993896484375
quan1 = bin2hex_fan(test1)
print(test1)
print(quan1)

test1 = -1.99993896484375
quan1 = bin2hex_fan(test1)
print(test1)
print(quan1)

test1 = -0.99993896484375
quan1 = bin2hex_fan(test1)
print(test1)
print(quan1)

1.99993896484375
7FFF
0.99993896484375
3FFF
-1.99993896484375
FFFF
-0.99993896484375
BFFF


In [29]:
FC1_Weight = np.load(r'/kaggle/working/Weight_npy/RZGate.weight.npy').T
FC3_Weight = np.load(r'/kaggle/working/Weight_npy/h2o.weight.npy').T

# print(FC2_Weight*32768)
# print(FC1_Weight[0][0]*32768)
print(np.max(FC1_Weight), np.min(FC1_Weight))
print(np.max(FC3_Weight), np.min(FC3_Weight))

print(FC1_Weight.shape)
print(FC3_Weight.shape)
# xxxx
with open("FC_FSDD_Weight.coe", "w") as f:
    f.writelines("MEMORY_INITIALIZATION_RADIX=16;\n")
    f.writelines("MEMORY_INITIALIZATION_VECTOR=\n")
    for i in range(96):
        for j in range(1):
            Weight = FC1_Weight[i, 64*j:64*j+64]
            # print(Weight.shape)
            for k in range(64):
                # print(Weight.shape)
                f.writelines(bin2hex_fan(Weight[63-k]))
            f.writelines('\n')


    for i in range(64):
        Weight = FC3_Weight[i]
        for k in range(54):
        # for k in range(62):
            f.writelines('0000')
        for k in range(10):
        # for k in range(2):
            f.writelines(bin2hex_fan(Weight[9-k]))
        #     f.writelines(bin2hex_fan(Weight[1 - k]))

        f.writelines('\n')

1.4175415 -1.999939
1.2242432 -1.999939
(96, 64)
(64, 10)


In [34]:
model = LSGRU(input_size=32, hidden_size=64, output_size=10).to(device)
model.load_state_dict(torch.load("/kaggle/working/models/best_model.pth"))

for inputs, labels in test_loader:
    inputs = inputs.squeeze(dim=1) 
    # print(inputs.shape)
    # print(labels.shape)
    # print(inputs[0][0])
    # print(labels)
    one_input = inputs[3].unsqueeze(dim=0) 
    one_label = labels[3]
    print(one_input.shape)
    print(one_label)
    # xx
    one_output = model(one_input)
    one_output = torch.clamp(one_output, min= 0)
    n = float(2 ** 3)
    one_output = torch.clip(torch.round(one_output * n) / n , 0, 14.875)
    print(one_output)
    print(one_output*16384)
    print(one_output*8)
    _, predicted = torch.max(one_output, 1)
    print(predicted)
    break

torch.Size([1, 16, 32])
tensor(2)
tensor([[0.7500, 0.7500, 3.1250, 0.8750, 0.0000, 0.0000, 0.3750, 0.6250, 0.0000,
         0.0000]], grad_fn=<ClampBackward1>)
tensor([[12288., 12288., 51200., 14336.,     0.,     0.,  6144., 10240.,     0.,
             0.]], grad_fn=<MulBackward0>)
tensor([[ 6.,  6., 25.,  7.,  0.,  0.,  3.,  5.,  0.,  0.]],
       grad_fn=<MulBackward0>)
tensor([2])


In [53]:
test_Data = torch.tensor([[0., 1., 1., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.,
         0., 0., 1., 1., 1., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         1., 0., 1., 0., 1., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 1., 1.,
         0., 1., 0., 0., 0., 1., 0., 0., 0., 0.]])

Get_HexData(test_Data)

倒序后的16进制数据: ['0', '8', 'B', '0', '8', '1', '5', '0', '0', '1', '7', '0', '2', '0', '8', '6']


In [35]:
import numpy as np
import os
import re

num_index = int(3)

file_path = "/kaggle/working/save_dir/FSDD_test_data.npy"
label_file_path = "/kaggle/working/save_dir/FSDD_test_label.npy"
data = np.load(file_path)
reshaped_data = data
label = np.load(label_file_path).reshape(300)
print(label.shape)
print(reshaped_data.shape)
print(label[num_index])
One_data_list = reshaped_data[num_index]
One_target = label[num_index]
int_one_data = One_data_list.reshape((1, 512)).astype("int")
print(int_one_data)
int_list = [int(re.sub(r'[\[\]]', '', str(int_one_data[0][i: i + 8])).replace(" ", ""), 2) for i in range(0, len(int_one_data[0]), 8)]
np_array = np.array(int_list).reshape((1, len(int_list)))
print(np_array)
# xxxx
data_len = len(int_list)
print(data_len)
os.makedirs("txt_file", exist_ok=True)
print(str(int(One_target)))

print('txt_file/' + str(int(One_target)) + '.coe')
# xx
with open('txt_file/' + str(int(One_target)) + '.coe', 'w') as f:
    # f.writelines("initial begin\n")
    f.writelines("MEMORY_INITIALIZATION_RADIX=16;\n")
    f.writelines("MEMORY_INITIALIZATION_VECTOR=\n")
    for i in range(data_len):
        # a = hex(data[0][i]).replace('0x','')
        # print(a)
        # f.writelines("image1[{0}] = {1};\n".format(i,data[0][i]))
        x = hex(int_list[i]).replace('0x', '')
        f.writelines('0' * (2 - len(x)) + x + '\n')



(300,)
(300, 16, 32)
2
[[0 0 0 0 0 0 0 0 0 1 0 1 0 1 0 0 0 0 0 0 0 1 0 0 0 1 0 1 0 1 1 1 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 1 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 1 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 1 1 0 1 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0
  0 0 0 0 0 0 0 0 0 1 0 1 1 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 1 0 1 1 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 1 0 1 0 1 0 1 0 0 0 0 1 0 1 1 1 1 1 1 0 1 0 0 0 0 0 0 0 0 0 0 0 1 0 1
  0 1 0 1 0 0 0 0 1 0 1 1 1 1 1 1 1 1 0 1 0 0 0 0 0 0 0 0 0 1 0 1 0 1 0 1
  0 0 0 1 1 0 1 1 1 1 1 1 1 1 0 1 0 1 0 0 0 0 0 1 1 1 0 1 0 1 0 1 0 0 0 1
  1 0 1 1 1 1 1 1 1 1 0 1 0 1 0 1 0 0 0 1 1 1 0 1 0 1 0 1 0 0 0 1 1 0 1 1
  1 1 1 1 1 1 0 1 0 1 0 0 0 1 0 1 0 1 0 1 0 1 0 1 0 0 0 1 0 0 1 1 1 1 1 1
  1 1 0 1 0 1 0 0 0 1 0 1 1 1 0 1 0 1 0 1 0 0 0 1 0 0 1 0 1 1 1 1 1 1 0 1
  0 0 0 0 0 1 0 1 1 1 0 1 0 0 0 1 0 0 1 1 0 1 1 0 1 1 1 1 1 1 0 1 0 0 0 0
  0 1 0 1 1 1 0