In [1]:
import numpy
import torch
import os
import re

from torch import nn
from tqdm import tqdm
from PIL import Image
from thop import profile

# 读取数据模块
## 加载训练数据

In [2]:
FILE_TRAIN_DATA = "dataset/train/data"
FILE_TRAIN_LABEL = "dataset/train/label"
files_train_data = os.listdir(FILE_TRAIN_DATA)
files_train_label = os.listdir(FILE_TRAIN_LABEL)
vaule, labels = [], []
for file in files_train_data:
    FILE_DATA = os.path.join(FILE_TRAIN_DATA, file)
    for file_data in os.listdir(FILE_DATA):
        image = numpy.array(Image.open(os.path.join(FILE_DATA, file_data)))
        image = numpy.transpose(image, (2, 0, 1))
        time_match = re.search(r'\d+', file_data)
        time_value = int(time_match.group()) if time_match else 0
        time_factor = time_value / 1000.0
        time_matrix = numpy.ones((3, 256, 256), dtype=numpy.float32) * time_factor
        image = image + time_matrix
        vaule.append(image)
for file in files_train_label:
    FILE_LABEL = numpy.load(os.path.join(FILE_TRAIN_LABEL, file))
    for file_data in FILE_LABEL:
        labels.append(file_data)
LABELS_TRAIN = torch.as_tensor(numpy.array(labels), dtype = torch.float32)
VAULE_TRAIN = torch.as_tensor(numpy.array(vaule), dtype = torch.float32)
print(LABELS_TRAIN.shape)
print(VAULE_TRAIN.shape)

torch.Size([1290])
torch.Size([1290, 3, 256, 256])


## 加载测试数据

In [3]:
FILE_TEST_DATA = "dataset/test/data" 
FILE_TEST_LABEL = "dataset/test/label"
files_test_vaule = os.listdir(FILE_TEST_DATA)
files_test_label = os.listdir(FILE_TEST_LABEL)
vaule, labels = [], []
for file in files_test_vaule:
    FILE_DATA = os.path.join(FILE_TEST_DATA, file)
    for file_data in os.listdir(FILE_DATA):
        image = numpy.array(Image.open(os.path.join(FILE_DATA, file_data)))
        image = numpy.transpose(image, (2, 0, 1))
        time_match = re.search(r'\d+', file_data)
        time_value = int(time_match.group()) if time_match else 0
        time_factor = time_value / 1000.0
        time_matrix = numpy.ones((3, 256, 256), dtype=numpy.float32) * time_factor
        image = image + time_matrix
        vaule.append(image)
for file in files_test_label:
    FILE_LABEL = numpy.load(os.path.join(FILE_TEST_LABEL, file))
    for file_data in FILE_LABEL:
        labels.append(file_data)
LABELS_TEST = torch.as_tensor(numpy.array(labels), dtype = torch.float32)
VAULE_TEST = torch.as_tensor(numpy.array(vaule), dtype = torch.float32)
print(LABELS_TEST.shape)
print(VAULE_TEST.shape)

torch.Size([1239])
torch.Size([1239, 3, 256, 256])


## 打乱数据

In [4]:
My_Dataset = torch.utils.data.TensorDataset(VAULE_TRAIN, LABELS_TRAIN)
train, val = torch.utils.data.random_split(dataset=My_Dataset, lengths=[round(0.9*VAULE_TRAIN.shape[0]), round(0.1*VAULE_TRAIN.shape[0])], generator=torch.Generator().manual_seed(0))
train = torch.utils.data.DataLoader(train, batch_size =32, shuffle = True)
val = torch.utils.data.DataLoader(val, batch_size = 32, shuffle = True)
print("Data shuffled successfully!")

My_Dataset = torch.utils.data.TensorDataset(VAULE_TEST, LABELS_TEST)
test, _ = torch.utils.data.random_split(dataset=My_Dataset, lengths=[round(0.9*VAULE_TEST.shape[0]), round(0.1*VAULE_TEST.shape[0])], generator=torch.Generator().manual_seed(0))
test = torch.utils.data.DataLoader(test, batch_size =32, shuffle = True)
print("Data shuffled successfully!")

Data shuffled successfully!
Data shuffled successfully!


# 模型

In [5]:
from SMNet import LDMamba

# Model initialization and profiling
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
MODEL = LDMamba(in_channels=257, out_channels1=128, out_channels2=128, kernel_size1=5, kernel_size2=3, padding=2, stride=1, linear=64).to(device)

# Generate a sample input tensor for profiling
sample_input = torch.randn(1, 3, 256, 256).to(device)
flops, params = profile(MODEL, inputs=(sample_input,))
print(f'MFLOPs: {flops / 1e6:.2f} M')
print(f'Parameters: {params / 1e3:.2f} K')

# Model inference
output = MODEL(sample_input)
print(f"Output Shape: {output.shape}")

  from .autonotebook import tqdm as notebook_tqdm


[INFO] Register count_convNd() for <class 'torch.nn.modules.conv.Conv2d'>.
[INFO] Register count_normalization() for <class 'torch.nn.modules.batchnorm.BatchNorm1d'>.
[INFO] Register count_convNd() for <class 'torch.nn.modules.conv.Conv1d'>.
[INFO] Register zero_ops() for <class 'torch.nn.modules.container.Sequential'>.
[INFO] Register count_linear() for <class 'torch.nn.modules.linear.Linear'>.
[INFO] Register count_softmax() for <class 'torch.nn.modules.activation.Softmax'>.
[INFO] Register count_relu() for <class 'torch.nn.modules.activation.LeakyReLU'>.
[INFO] Register count_adap_avgpool() for <class 'torch.nn.modules.pooling.AdaptiveAvgPool1d'>.
MFLOPs: 23.18 M
Parameters: 214.47 K
Output Shape: torch.Size([1, 2])
[INFO] Register count_convNd() for <class 'torch.nn.modules.conv.Conv2d'>.
[INFO] Register count_normalization() for <class 'torch.nn.modules.batchnorm.BatchNorm1d'>.
[INFO] Register count_convNd() for <class 'torch.nn.modules.conv.Conv1d'>.
[INFO] Register zero_ops() fo

# 优化器设计

In [6]:
LEARNING_RATE = 0.001
optimizer = torch.optim.AdamW(MODEL.parameters(), lr=LEARNING_RATE)
Learn_rate_limit = LEARNING_RATE / 50
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=LEARNING_RATE, eta_min=Learn_rate_limit)

# 训练模块

In [7]:
class data_prefetcher():
    def __init__(self, loader):
        self.loader = iter(loader)
        self.stream = torch.cuda.Stream()
        self.preload()

    def preload(self):
        try:
            self.next_input, self.next_target = next(self.loader)
        except StopIteration:
            self.next_input = None
            self.next_target = None
            return
        with torch.cuda.stream(self.stream):
            self.next_input = self.next_input.cuda(non_blocking=True)
            self.next_target = self.next_target.cuda(non_blocking=True)

    def next(self):
        torch.cuda.current_stream().wait_stream(self.stream)
        signal, label = self.next_input, self.next_target
        self.preload()
        return signal, label

def TRAIN_FUNCTION(function, optimizer, train):
    if torch.cuda.is_available():
        device = torch.device("cuda")
    else:
        device = torch.device("cpu")
    LOSS_FUNCTION = torch.nn.CrossEntropyLoss().to(device)
    losses = []
    perfetcher = data_prefetcher(train)
    signal, label = perfetcher.next()
    while signal is not None:
        signal, label = signal.to(device), label.to(device)

        with torch.cuda.amp.autocast():

            output = function(signal)
            loss = LOSS_FUNCTION(output, label.long())

        losses.append(loss.item())
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        signal, label = perfetcher.next()
        mean_loss = sum(losses) / len(losses)

    return mean_loss

def TEST_FUNCTION(function, test):
    ACCH = []
    if torch.cuda.is_available():
        device = torch.device("cuda")
    else:
        device = torch.device("cpu")

    for signal, label in test:
        signal, label = signal.to(device), label.to(device)
        output = function(signal)
        ACCURACY = (output.argmax(1) == label).sum() / len(label)
    ACCH.append(ACCURACY)
    mean_accuracy = sum(ACCH) / len(ACCH)
    return mean_accuracy
    

In [8]:
loop = tqdm(range(500), ncols=100, colour='cyan', desc='Training Progress', bar_format='{desc}: {percentage:3.0f}%|{bar}| {postfix}', leave = True)
MAX_ACCURACY = []
for _ in loop:
    mean_loss = TRAIN_FUNCTION(MODEL, optimizer, train)
    mean_accuracy = TEST_FUNCTION(MODEL, test)
    MAX_ACCURACY.append(mean_accuracy)
    loop.set_postfix(
    loss = f"{mean_loss:.5f}",
    accuracy =  f"{max(MAX_ACCURACY).item():.4f}"
    )


Training Progress: 100%|[36m███████████████████████████████████████████[0m| , accuracy=0.8889, loss=0.01941[0m
