## Download dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!unzip 'drive/MyDrive/emg_imu_data.zip'


In [1]:
# 导入包
from c2net.context import prepare, upload_output
# 初始化导入数据集和预训练模型到容器内
c2net_context = prepare()



       ___                _            _            _          
      |__ \              | |          | |          | |         
  ___    ) | _ __    ___ | |_  ______ | |__    ___ | |_   __ _ 
 / __|  / / | '_ \  / _ \| __||______|| '_ \  / _ \| __| / _` |
| (__  / /_ | | | ||  __/| |_         | |_) ||  __/| |_ | (_| |
 \___||____||_| |_| \___| \__|        |_.__/  \___| \__| \__,_|
                                                               
         

If you have any problems while preparing the data, you can submit an issue in this repository: https://openi.pcl.ac.cn/OpenIOSSG/c2net-pypi
        
Detected .code_cache_file already exists, code has been prepared!
Detected .dataset_cache_file already exists, dataset has been prepared!
please set c2net_context.output_path as the output location


In [2]:
datasetPath = c2net_context.dataset_path
datasetPath

'/tmp/dataset'

In [3]:
#获取数据集路径
imu_data_path = c2net_context.dataset_path+"/"+"imu_data"
emg_data_path = c2net_context.dataset_path+"/"+"emg_data"

#输出结果必须保存在该目录
you_should_save_here = c2net_context.output_path

In [4]:
imu_data_path = c2net_context.dataset_path+"/"+"imu_data"+"/"+"imu_data"
emg_data_path = c2net_context.dataset_path+"/"+"emg_data"+"/"+"emg_data"

## 配置环境

In [None]:
# !pip install scikit-learn
!pip install scikit-learn -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com

## Fix Random Seed

In [5]:
import numpy as np
import torch
import random

def set_seed(seed):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

# set_seed(87)
set_seed(123)

# Data

## Dataset

In [6]:
import torch
from torch.utils.data import Dataset
from sklearn.preprocessing import LabelEncoder
import numpy as np
import os

class FusedDataset(Dataset):
    def __init__(self, emg_directory, imu_directory, emg_seq_length=790, imu_seq_length=205):
        self.emg_seq_length = emg_seq_length
        self.imu_seq_length = imu_seq_length
        self.emg_files = []
        self.imu_files = []
        self.labels = []

        # Load EMG and IMU files into dictionaries with full filenames (minus extension) as keys
        emg_files = {f[:-8]: os.path.join(emg_directory, f) for f in os.listdir(emg_directory) if f.endswith("_emg.txt")}
        imu_files = {f[:-8]: os.path.join(imu_directory, f) for f in os.listdir(imu_directory) if f.endswith("_imu.txt")}

        # Match EMG and IMU files based on the same label (derived from full filenames)
        for file_key in emg_files:
            if file_key in imu_files:
                emg_filepath = emg_files[file_key]
                imu_filepath = imu_files[file_key]
                if os.path.getsize(emg_filepath) > 0 and os.path.getsize(imu_filepath) > 0:
                    self.emg_files.append(emg_filepath)
                    self.imu_files.append(imu_filepath)
                    self.labels.append(file_key.split('_')[0])

        self.label_encoder = LabelEncoder()
        self.labels = self.label_encoder.fit_transform(self.labels)
        self.labels = torch.from_numpy(self.labels).long()

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        emg_path = self.emg_files[idx]
        imu_path = self.imu_files[idx]
        emg_signals = self.load_and_process_data(emg_path, self.emg_seq_length)
        imu_signals = self.load_and_process_data(imu_path, self.imu_seq_length)
        
        return emg_signals, imu_signals, self.labels[idx]

    def load_and_process_data(self, filepath, seq_length):
        data = np.loadtxt(filepath)
        if data.shape[0] < seq_length:
            data = np.vstack([data, np.zeros((seq_length - data.shape[0], data.shape[1]))])
        elif data.shape[0] > seq_length:
            data = data[:seq_length, :]
        return torch.from_numpy(data).float()

    def get_num_classes(self):
        return len(np.unique(self.labels))


In [22]:
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler, LabelEncoder
import numpy as np
import os

class EMGDataset(Dataset):
    def __init__(self, directory, seq_length=790, scaler=None):
        self.directory = directory
        self.seq_length = seq_length
        self.files = []
        self.labels = []
        self.scaler = scaler  # 将预先计算好的Scaler传入

        for f in os.listdir(directory):
            if f.endswith("_emg.txt"):
                filepath = os.path.join(directory, f)
                if os.path.getsize(filepath) > 0:
                    self.files.append(filepath)
                    self.labels.append(f.split('_')[0])

        self.label_encoder = LabelEncoder()
        self.labels = self.label_encoder.fit_transform(self.labels)
        self.labels = torch.from_numpy(self.labels).long()

        self.num_classes=len(np.unique(self.labels))

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        filepath = self.files[idx]
        emg_signals = np.loadtxt(filepath)
        if emg_signals.shape[0] < self.seq_length:
            pad = np.zeros((self.seq_length - emg_signals.shape[0], emg_signals.shape[1]))
            emg_signals = np.vstack([emg_signals, pad])
        elif emg_signals.shape[0] > self.seq_length:
            emg_signals = emg_signals[:self.seq_length, :]

        if self.scaler:
            emg_signals = self.scaler.transform(emg_signals)  # 使用预先拟合的Scaler进行转换

        emg_signals = torch.from_numpy(emg_signals).float()
        return emg_signals, self.labels[idx]

    def get_num_classes(self):
		    return self.num_classes



# # Assuming 'path_to_your_directory' is the path to your data
# dataset = EMGDataset('path_to_your_directory')
# dataloader = DataLoader(dataset, batch_size=32, shuffle=True)


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler, LabelEncoder
import numpy as np
import os

class IMUDataset(Dataset):
    def __init__(self, directory, seq_length=790, scaler=None):
        self.directory = directory
        self.seq_length = seq_length
        self.files = []
        self.labels = []
        self.scaler = scaler  # Optionally use a pre-fitted scaler

        # Load files and labels based on the file naming convention
        for f in os.listdir(directory):
            if f.endswith("_imu.txt"):  # Ensure to load only IMU files
                filepath = os.path.join(directory, f)
                if os.path.getsize(filepath) > 0:  # Check if file is not empty
                    self.files.append(filepath)
                    label = f.split('_')[0]  # Assuming label is before the first underscore
                    self.labels.append(label)

        # Encode labels
        self.label_encoder = LabelEncoder()
        self.labels = self.label_encoder.fit_transform(self.labels)
        self.labels = torch.from_numpy(self.labels).long()

        self.num_classes = len(np.unique(self.labels))

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        filepath = self.files[idx]
        imu_data = np.loadtxt(filepath)
        
        # Handle sequence length discrepancies
        if imu_data.shape[0] < self.seq_length:
            pad = np.zeros((self.seq_length - imu_data.shape[0], imu_data.shape[1]))
            imu_data = np.vstack([imu_data, pad])
        elif imu_data.shape[0] > self.seq_length:
            imu_data = imu_data[:self.seq_length, :]
        
        # Apply scaling if a scaler is provided
        if self.scaler:
            imu_data = self.scaler.transform(imu_data)  # Transform data using the pre-fitted scaler

        imu_data = torch.from_numpy(imu_data).float()
        return imu_data, self.labels[idx]

    def get_num_classes(self):
        return self.num_classes


## 计算sacler

In [None]:
import numpy as np
import os

def incremental_mean_std(directory, seq_length=790):
    mean = np.zeros((seq_length, 8))  # 假设每个样本有8个特征
    M2 = np.zeros((seq_length, 8))
    n = 0
    for filename in os.listdir(directory):
        if filename.endswith("_emg.txt"):
            filepath = os.path.join(directory, filename)
            if os.path.getsize(filepath) > 0:
                data = np.loadtxt(filepath)
                if data.size == 0:  # 检查数据是否为空
                    continue
                if data.shape[0] < seq_length:
                    # 填充不足的数据
                    pad = np.zeros((seq_length - data.shape[0], data.shape[1]))
                    data = np.vstack([data, pad])
                elif data.shape[0] > seq_length:
                    # 截断超出的数据
                    data = data[:seq_length, :]
                n += 1
                delta = data - mean
                mean += delta / n
                M2 += delta * (data - mean)

    std = np.sqrt(M2 / n) if n > 1 else np.sqrt(M2)
    return mean, std

# directory = 'emg_data'
# mean, std = incremental_mean_std(directory)

# 用得到的 mean 和 std 来创建一个 StandardScaler-like 的类
class CustomScaler:
    def __init__(self, mean, std):
        self.mean = mean
        self.std = std

    def transform(self, X):
        return (X - self.mean) / self.std


In [None]:
directory = 'emg_data'
mean, std = incremental_mean_std(directory)
scaler = CustomScaler(mean, std)

In [6]:
import pickle

# 从文件加载scaler对象
# filepath = 'drive/MyDrive/scaler.pkl'
filepath = 'scaler.pkl'
with open(filepath, 'rb') as f:
    loaded_scaler = pickle.load(f)

    scaler = loaded_scaler

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


In [None]:
# 使用加载的scaler来转换数据
# transformed_data = loaded_scaler.transform(new_data)

In [None]:
dataset = EMGDataset(directory=directory, scaler=scaler)

In [7]:
# directory='emg_data'
directory=emg_data_path
# scaler = compute_statistics(directory)
scaler = None
dataset = EMGDataset(directory=directory, scaler=scaler)
# dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

In [9]:
dataset.get_num_classes()

1059

## 确定seq_length
确定seq_length

In [None]:
import numpy as np
import os

lengths0 = []
lengths1 = []
def analyze_emg_lengths(directory):

    for filename in os.listdir(directory):
        if filename.endswith("_imu.txt"):
            filepath = os.path.join(directory, filename)
            try:
                if os.path.getsize(filepath) > 0
                  emg_signals = np.loadtxt(filepath)
#                 if emg_signals.shape[0]!=0:
                  lengths0.append(emg_signals.shape[0])
                  lengths1.append(emg_signals.shape[1])

            except:
                # Handling empty or corrupted files
                print(f"Could not load data from {filename}")
    return lengths0, lengths1

# Assuming 'path_to_your_directory' is the path to your data
# directory = 'emg_data'
directory = imu_data_path
lengths = analyze_emg_lengths(directory)

# Calculate statistics
if lengths0:
    min_length0 = np.min(lengths0)
    max_length0 = np.max(lengths0)
    avg_length0 = np.mean(lengths0)
    median_length0 = np.median(lengths0)

    min_length1 = np.min(lengths1)
    max_length1 = np.max(lengths1)
    avg_length1 = np.mean(lengths1)
    median_length1 = np.median(lengths1)

    print(f"Minimum length: {min_length0}, {min_length1}")
    print(f"Maximum length: {max_length0}， {max_length1}")
    print(f"Average length: {avg_length0:.2f}, {avg_length1:.2f}")
    print(f"Median length: {median_length0}, {median_length1}")
else:
    print("No data available to analyze.")


## Dataloader
- Split dataset into training dataset(90%) and validation dataset(10%).
- Create dataloader to iterate the data.

In [None]:
# scaler=scaler
scaler=None

In [7]:
# from your_dataset_file import FusedDataset
import torch
from torch.utils.data import DataLoader, random_split

def get_fused_dataloader(emg_dir, imu_dir, batch_size, n_workers):
    dataset = FusedDataset(emg_directory=emg_dir, imu_directory=imu_dir)
    num_classes = dataset.get_num_classes()
    
    # Splitting dataset into training and validation sets
    train_len = int(0.9 * len(dataset))
    lengths = [train_len, len(dataset) - train_len]
    train_set, valid_set = random_split(dataset, lengths)

    train_loader = DataLoader(
        train_set,
        batch_size=batch_size,
        shuffle=True,
        drop_last=True,
        num_workers=n_workers,
        pin_memory=True,
    )
    valid_loader = DataLoader(
        valid_set,
        batch_size=batch_size,
        num_workers=n_workers,
        drop_last=True,
        pin_memory=True,
    )

    return train_loader, valid_loader, num_classes


In [17]:
emg_dir = "/tmp/dataset/emg_data/emg_data"
imu_dir = "/tmp/dataset/imu_data/imu_data"
dataset = FusedDataset(emg_directory=emg_dir, imu_directory=imu_dir)
num_classes = dataset.get_num_classes()

# Splitting dataset into training and validation sets
train_len = int(0.9 * len(dataset))
lengths = [train_len, len(dataset) - train_len]
train_set, valid_set = random_split(dataset, lengths)

In [20]:
train_len

950

In [24]:
dataset.__len__()

76213

In [23]:
# directory='emg_data'
directory=emg_dir
# scaler = compute_statistics(directory)
scaler = None
EMGdataset = EMGDataset(directory=directory, scaler=scaler)
# dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

In [None]:
EMGdataset.__len__()

In [7]:
import torch
from torch.utils.data import DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence

def emg_collate_batch(batch):
    data, labels = zip(*batch)
    return torch.stack(data), torch.tensor(labels)

def get_emg_dataloader(data_dir, batch_size, n_workers):
    dataset = EMGDataset(directory=data_dir, scaler=scaler)
    num_classes = dataset.get_num_classes()
    # 分割数据集为训练集和验证集
    train_len = int(0.9 * len(dataset))
    lengths = [train_len, len(dataset) - train_len]
    train_set, valid_set = random_split(dataset, lengths)

    train_loader = DataLoader(
        train_set,
        batch_size=batch_size,
        shuffle=True,
        drop_last=True,
        num_workers=n_workers,
        pin_memory=True,
        collate_fn=emg_collate_batch,
    )
    valid_loader = DataLoader(
        valid_set,
        batch_size=batch_size,
        num_workers=n_workers,
        drop_last=True,
        pin_memory=True,
        collate_fn=emg_collate_batch,
    )

    return train_loader, valid_loader, num_classes


# Model
- TransformerEncoderLayer:
  - Base transformer encoder layer in [Attention Is All You Need](https://arxiv.org/abs/1706.03762)
  - Parameters:
    - d_model: the number of expected features of the input (required).

    - nhead: the number of heads of the multiheadattention models (required).

    - dim_feedforward: the dimension of the feedforward network model (default=2048).

    - dropout: the dropout value (default=0.1).

    - activation: the activation function of intermediate layer, relu or gelu (default=relu).

- TransformerEncoder:
  - TransformerEncoder is a stack of N transformer encoder layers
  - Parameters:
    - encoder_layer: an instance of the TransformerEncoderLayer() class (required).

    - num_layers: the number of sub-encoder-layers in the encoder (required).

    - norm: the layer normalization component (optional).

In [8]:
import torch.nn as nn

class SignLanguageModel(nn.Module):
    def __init__(self, num_classes, emg_input_dim=8, imu_input_dim=10, hidden_dim=128):
        super(SignLanguageModel, self).__init__()
        self.emg_encoder = nn.LSTM(emg_input_dim, hidden_dim, batch_first=True)
        self.imu_encoder = nn.LSTM(imu_input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, num_classes)  # Concatenating features

    def forward(self, emg_data, imu_data):
        _, (emg_features, _) = self.emg_encoder(emg_data)
        _, (imu_features, _) = self.imu_encoder(imu_data)
        
        # Concatenate features from the last hidden state of both encoders
        combined_features = torch.cat((emg_features[-1], imu_features[-1]), dim=1)
        output = self.fc(combined_features)
        return output


In [14]:
import torch
import torch.nn as nn

class SignLanguageModel(nn.Module):
    def __init__(self, num_classes, emg_input_dim=8, imu_input_dim=10, hidden_dim=128, num_heads=4, num_layers=1, feature_dim=128, post_fusion_layers=1):
        super(SignLanguageModel, self).__init__()
        self.emg_embedding = nn.Linear(emg_input_dim, hidden_dim)
        self.imu_embedding = nn.Linear(imu_input_dim, hidden_dim)
        
#         self.emg_transformer = nn.TransformerEncoder(
#             nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=num_heads, dim_feedforward=hidden_dim*4, batch_first=True),
#             num_layers=num_layers
#         )
#         self.imu_transformer = nn.TransformerEncoder(
#             nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=num_heads, dim_feedforward=hidden_dim*4, batch_first=True),
#             num_layers=num_layers
#         )
        self.emg_transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=emg_input_dim, nhead=num_heads, dim_feedforward=emg_input_dim*4, batch_first=True),
            num_layers=num_layers
        )
        self.imu_transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=imu_input_dim, nhead=num_heads, dim_feedforward=imu_input_dim*4, batch_first=True),
            num_layers=num_layers
        )

        self.fusion_transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=feature_dim * 2, nhead=num_heads, dim_feedforward=feature_dim*4, batch_first=True),
            num_layers=post_fusion_layers
        )

        self.fc_dim_reduction = nn.Linear(feature_dim * 2, feature_dim)  # Optional: Reducing dimension before final classification
        self.fc_final = nn.Linear(feature_dim, num_classes)  # Final classification layer

    def forward(self, emg_data, imu_data):
#         emg_proj = self.emg_embedding(emg_data)
#         imu_proj = self.imu_embedding(imu_data)

#         emg_features = self.emg_transformer(emg_proj)
#         imu_features = self.imu_transformer(imu_proj)
        emg_features = self.emg_transformer(emg_data)
        imu_features = self.imu_transformer(imu_data)
        
        emg_features = self.emg_embedding(emg_features)
        imu_features = self.imu_embedding(imu_features)

        emg_pooled = torch.mean(emg_features, dim=1)
        imu_pooled = torch.mean(imu_features, dim=1)

        combined_features = torch.cat((emg_pooled, imu_pooled), dim=1)

        # Transformer for further feature integration post-fusion
        combined_features = self.fusion_transformer(combined_features.unsqueeze(1)).squeeze(1)

        # Optional: dimension reduction
        reduced_features = self.fc_dim_reduction(combined_features)
        
        output = self.fc_final(reduced_features)
        return output


In [18]:
class SignLanguageModel(nn.Module):
    def __init__(self, num_classes, emg_input_dim=8, imu_input_dim=10, hidden_dim=128, num_heads=2, num_layers=1, feature_dim=128, post_fusion_layers=1):
        super(SignLanguageModel, self).__init__()
        self.emg_embedding = nn.Linear(emg_input_dim, hidden_dim)
        self.imu_embedding = nn.Linear(imu_input_dim, hidden_dim)
        
        # Adjusting the number of heads to ensure it divides the embedding dimensions
        self.emg_transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=emg_input_dim, nhead=2, dim_feedforward=emg_input_dim*4, batch_first=True),
            num_layers=num_layers
        )
        self.imu_transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=imu_input_dim, nhead=2, dim_feedforward=imu_input_dim*4, batch_first=True),
            num_layers=num_layers
        )

        self.fusion_transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=feature_dim * 2, nhead=num_heads, dim_feedforward=feature_dim*4, batch_first=True),
            num_layers=post_fusion_layers
        )

        self.fc_dim_reduction = nn.Linear(feature_dim * 2, feature_dim)  # Optional: Reducing dimension before final classification
        self.fc_final = nn.Linear(feature_dim, num_classes)  # Final classification layer

    def forward(self, emg_data, imu_data):
        emg_features = self.emg_transformer(emg_data)
        imu_features = self.imu_transformer(imu_data)

        emg_features = self.emg_embedding(emg_features)
        imu_features = self.imu_embedding(imu_features)        
        
        emg_pooled = torch.mean(emg_features, dim=1)
        imu_pooled = torch.mean(imu_features, dim=1)

        combined_features = torch.cat((emg_pooled, imu_pooled), dim=1)
        combined_features = self.fusion_transformer(combined_features.unsqueeze(1)).squeeze(1)
        reduced_features = self.fc_dim_reduction(combined_features)
        
        output = self.fc_final(reduced_features)
        return output


In [8]:
import torch.nn as nn

class TransformerModel(nn.Module):
    def __init__(self, num_classes, input_dim=8, d_model=224, ff_dim=256, num_heads=2, dropout=0.1):
        super(TransformerModel, self).__init__()
        # Project the dimension of features from that of input into an enhanced feature space
        self.prenet = nn.Linear(input_dim, d_model)
        self.encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, dim_feedforward=d_model*2, nhead=num_heads, dropout=dropout, batch_first=True
        )
        self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=3)
        self.pool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(d_model, num_classes)

    def forward(self, x):
        x = self.prenet(x)  # Apply prenet
        x = self.encoder(x)  # Transformer encoder
        x = x.transpose(1, 2)  # Change (batch, seq_len, features) to (batch, features, seq_len)
        x = self.pool(x).squeeze(-1)
        x = self.fc(x)
        return x


# Assuming 8 features per timestep and the sequence length is the same for all samples
# model = TransformerModel(input_dim=8, num_heads=2, ff_dim=256, num_classes=len(np.unique(dataset.labels)))


# Learning rate schedule
- For transformer architecture, the design of learning rate schedule is different from that of CNN.
- Previous works show that the warmup of learning rate is useful for training models with transformer architectures.
- The warmup schedule
  - Set learning rate to 0 in the beginning.
  - The learning rate increases linearly from 0 to initial learning rate during warmup period.

In [9]:
import math

import torch
from torch.optim import Optimizer
from torch.optim.lr_scheduler import LambdaLR


def get_cosine_schedule_with_warmup(
	optimizer: Optimizer,
	num_warmup_steps: int,
	num_training_steps: int,
	num_cycles: float = 0.5,
	last_epoch: int = -1,
):
	"""
	Create a schedule with a learning rate that decreases following the values of the cosine function between the
	initial lr set in the optimizer to 0, after a warmup period during which it increases linearly between 0 and the
	initial lr set in the optimizer.

	Args:
		optimizer (:class:`~torch.optim.Optimizer`):
		The optimizer for which to schedule the learning rate.
		num_warmup_steps (:obj:`int`):
		The number of steps for the warmup phase.
		num_training_steps (:obj:`int`):
		The total number of training steps.
		num_cycles (:obj:`float`, `optional`, defaults to 0.5):
		The number of waves in the cosine schedule (the defaults is to just decrease from the max value to 0
		following a half-cosine).
		last_epoch (:obj:`int`, `optional`, defaults to -1):
		The index of the last epoch when resuming training.

	Return:
		:obj:`torch.optim.lr_scheduler.LambdaLR` with the appropriate schedule.
	"""
	def lr_lambda(current_step):
		# Warmup
		if current_step < num_warmup_steps:
			return float(current_step) / float(max(1, num_warmup_steps))
		# decadence
		progress = float(current_step - num_warmup_steps) / float(
			max(1, num_training_steps - num_warmup_steps)
		)
		return max(
			0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress))
		)

	return LambdaLR(optimizer, lr_lambda, last_epoch)

# Model Function
- Model forward function.

In [10]:
def model_fn(batch, model, criterion, device):
    emg_data, imu_data, labels = batch
    emg_data = emg_data.to(device)
    imu_data = imu_data.to(device)
    labels = labels.to(device)

    outputs = model(emg_data, imu_data)
    loss = criterion(outputs, labels)

    preds = outputs.argmax(dim=1)
    accuracy = (preds == labels).float().mean()

    return loss, accuracy


# Validate
- Calculate accuracy of the validation set.

In [11]:
from tqdm import tqdm
import torch

def valid(dataloader, model, criterion, device):
    """Validate on validation set."""
    model.eval()
    running_loss = 0.0
    running_accuracy = 0.0
    pbar = tqdm(total=len(dataloader.dataset), ncols=0, desc="Valid", unit="sample")

    for i, batch in enumerate(dataloader):
        with torch.no_grad():
            loss, accuracy = model_fn(batch, model, criterion, device)
            running_loss += loss.item()
            running_accuracy += accuracy.item()

        pbar.update(dataloader.batch_size)
        pbar.set_postfix(
            loss=f"{running_loss / (i+1):.2f}",
            accuracy=f"{running_accuracy / (i+1):.2f}",
        )

    pbar.close()
    model.train()

    return running_accuracy / len(dataloader)


# Main function

In [None]:
import matplotlib.pyplot as plt
from tqdm import tqdm
import torch
import torch.nn as nn
from torch.optim import AdamW
from torch.utils.data import DataLoader

def main(
    emg_dir,
    imu_dir,
    save_path,
    batch_size,
    n_workers,
    valid_steps,
    warmup_steps,
    total_steps,
    save_steps,
    pretrained_path=None,
):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"[Info]: Use {device} now!")

    train_loader, valid_loader, num_classes = get_fused_dataloader(emg_dir, imu_dir, batch_size, n_workers)

    model = SignLanguageModel(num_classes=num_classes).to(device)

    if pretrained_path:
        model.load_state_dict(torch.load(pretrained_path, map_location=device))
        print("[Info]: Pretrained model loaded!")

    criterion = nn.CrossEntropyLoss()
    optimizer = AdamW(model.parameters(), lr=1e-3)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=total_steps)

    # Lists to store metrics
    train_losses = []
    train_accuracies = []
    valid_losses = []
    valid_accuracies = []

    best_accuracy = -1.0
    best_state_dict = None

    pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step")

    for step in range(total_steps):
        try:
            batch = next(iter(train_loader))
        except StopIteration:
            train_iterator = iter(train_loader)
            batch = next(train_iterator)

        loss, accuracy = model_fn(batch, model, criterion, device)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()

        # Store metrics
        train_losses.append(loss.item())
        train_accuracies.append(accuracy.item())

        pbar.update()
        pbar.set_postfix(
            loss=f"{loss.item():.2f}",
            accuracy=f"{accuracy:.2f}",
            step=step + 1,
        )

        if (step + 1) % valid_steps == 0:
            pbar.close()
            valid_loss, valid_accuracy = valid(valid_loader, model, criterion, device)
            valid_losses.append(valid_loss)
            valid_accuracies.append(valid_accuracy)

            if valid_accuracy > best_accuracy:
                best_accuracy = valid_accuracy
                best_state_dict = model.state_dict()

            pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step")

        if (step + 1) % save_steps == 0 and best_state_dict is not None:
            torch.save(best_state_dict, save_path)
            print(f"Step {step + 1}, best model saved. (accuracy={best_accuracy:.4f})")

    pbar.close()

    # Plotting
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(valid_losses, label='Valid Loss')
    plt.title('Loss during training')
    plt.legend()
    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label='Train Accuracy')
    plt.plot(valid_accuracies, label='Valid Accuracy')
    plt.title('Accuracy during training')
    plt.legend()
    plt.show()

def valid(dataloader, model, criterion, device):
    model.eval()
    running_loss = 0.0
    running_accuracy = 0.0
    pbar = tqdm(total=len(dataloader.dataset), ncols=0, desc="Valid", unit="sample")

    for i, batch in enumerate(dataloader):
        with torch.no_grad():
            loss, accuracy = model_fn(batch, model, criterion, device)
            running_loss += loss.item()
            running_accuracy += accuracy.item()

        pbar.update(dataloader.batch_size)
        pbar.set_postfix(
            loss=f"{running_loss / (i+1):.2f}",
            accuracy=f"{running_accuracy / (i+1):.2f}",
        )

    pbar.close()
    model.train()

    return running_loss / len(dataloader), running_accuracy / len(dataloader)

In [19]:
from tqdm import tqdm
import torch
import torch.nn as nn
from torch.optim import AdamW
from torch.utils.data import DataLoader

# from get_emg_dataloader import get_emg_dataloader
# from your_model_file import TransformerModel, model_fn, valid

def parse_args():
    """arguments"""
    config = {
        # "data_dir": "./emg_data",
        # "data_dir": "/tmp/dataset/emg_data/emg_data",
        "emg_dir": "/tmp/dataset/emg_data/emg_data",
        "imu_dir": "/tmp/dataset/imu_data/imu_data",
        "save_path": "model.ckpt",
        "batch_size": 32,
        "n_workers": 2,
        "valid_steps": 500,
        "warmup_steps": 500,
        "save_steps": 2500,
        "total_steps": 7500,
        "pretrained_path": None,  # 可以设置为预先训练好的模型路径
    }
    return config

def main(
#     data_dir,
    emg_dir,
    imu_dir,
    save_path,
    batch_size,
    n_workers,
    valid_steps,
    warmup_steps,
    total_steps,
    save_steps,
    pretrained_path=None,
):
    """Main function."""
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"[Info]: Use {device} now!")

    train_loader, valid_loader, num_classes = get_fused_dataloader(emg_dir, imu_dir, batch_size, n_workers)
    print(f"[Info]: Finish loading data!", flush=True)

    model = SignLanguageModel(num_classes=num_classes).to(device)
#     model = SignLanguageModel(input_dim=EMG_DIM, hidden_dim=HIDDEN_DIM, num_classes=num_classes).to(device)
#     model = TransformerModel(num_classes=num_classes).to(device)

    if pretrained_path:
        model.load_state_dict(torch.load(pretrained_path, map_location=device))
        print("[Info]: Pretrained model loaded!")

    criterion = nn.CrossEntropyLoss()
    optimizer = AdamW(model.parameters(), lr=1e-3)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=total_steps)

    print(f"[Info]: Finish creating model!", flush=True)

    best_accuracy = -1.0
    best_state_dict = None

    pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step")

    
    for step in range(total_steps):
        try:
            batch = next(iter(train_loader))
        except StopIteration:
            train_iterator = iter(train_loader)
            batch = next(train_iterator)

        loss, accuracy = model_fn(batch, model, criterion, device)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()

        pbar.update()
        pbar.set_postfix(
            loss=f"{loss.item():.2f}",
            accuracy=f"{accuracy:.2f}",
            step=step + 1,
        )

        if (step + 1) % valid_steps == 0:
            pbar.close()
            valid_accuracy = valid(valid_loader, model, criterion, device)
            if valid_accuracy > best_accuracy:
                best_accuracy = valid_accuracy
                best_state_dict = model.state_dict()

            pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step")

        if (step + 1) % save_steps == 0 and best_state_dict is not None:
            torch.save(best_state_dict, save_path)
            print(f"Step {step + 1}, best model saved. (accuracy={best_accuracy:.4f})")

    pbar.close()

if __name__ == "__main__":
    main(**parse_args())



[Info]: Use cuda now!
[Info]: Finish loading data!
[Info]: Finish creating model!



Train:   0% 0/500 [00:00<?, ? step/s][A
Train:   0% 1/500 [00:00<04:32,  1.83 step/s][A
Train:   0% 1/500 [00:00<04:32,  1.83 step/s, accuracy=0.00, loss=7.03, step=1][A
Train:   0% 2/500 [00:01<04:18,  1.93 step/s, accuracy=0.00, loss=7.03, step=1][A
Train:   0% 2/500 [00:01<04:18,  1.93 step/s, accuracy=0.00, loss=6.83, step=2][A
Train:   1% 3/500 [00:01<04:05,  2.03 step/s, accuracy=0.00, loss=6.83, step=2][A
Train:   1% 3/500 [00:01<04:05,  2.03 step/s, accuracy=0.00, loss=6.88, step=3][A
Train:   1% 4/500 [00:01<03:59,  2.07 step/s, accuracy=0.00, loss=6.88, step=3][A
Train:   1% 4/500 [00:01<03:59,  2.07 step/s, accuracy=0.00, loss=6.91, step=4][A
Train:   1% 5/500 [00:02<04:04,  2.02 step/s, accuracy=0.00, loss=6.91, step=4][A
Train:   1% 5/500 [00:02<04:04,  2.02 step/s, accuracy=0.03, loss=7.01, step=5][A
Train:   1% 6/500 [00:02<04:00,  2.05 step/s, accuracy=0.03, loss=7.01, step=5][A
Train:   1% 6/500 [00:02<04:00,  2.05 step/s, accuracy=0.03, loss=6.96, step=6]

RuntimeError: CUDA error: an illegal memory access was encountered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


# Inference

## Dataset of inference

In [None]:
import os
import json
import torch
from pathlib import Path
from torch.utils.data import Dataset


class InferenceDataset(Dataset):
	def __init__(self, data_dir):
		testdata_path = Path(data_dir) / "testdata.json"
		metadata = json.load(testdata_path.open())
		self.data_dir = data_dir
		self.data = metadata["utterances"]

	def __len__(self):
		return len(self.data)

	def __getitem__(self, index):
		utterance = self.data[index]
		feat_path = utterance["feature_path"]
		mel = torch.load(os.path.join(self.data_dir, feat_path))

		return feat_path, mel


def inference_collate_batch(batch):
	"""Collate a batch of data."""
	feat_paths, mels = zip(*batch)

	return feat_paths, torch.stack(mels)

## Main funcrion of Inference

In [None]:
import json
import csv
from pathlib import Path
from tqdm.notebook import tqdm

import torch
from torch.utils.data import DataLoader

def parse_args():
	"""arguments"""
	config = {
		"data_dir": "./Dataset",
		"model_path": "./model.ckpt",
		"output_path": "./output.csv",
	}

	return config


def main(
	data_dir,
	model_path,
	output_path,
):
	"""Main function."""
	device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
	print(f"[Info]: Use {device} now!")

	mapping_path = Path(data_dir) / "mapping.json"
	mapping = json.load(mapping_path.open())

	dataset = InferenceDataset(data_dir)
	dataloader = DataLoader(
		dataset,
		batch_size=1,
		shuffle=False,
		drop_last=False,
		num_workers=8,
		collate_fn=inference_collate_batch,
	)
	print(f"[Info]: Finish loading data!",flush = True)

	speaker_num = len(mapping["id2speaker"])
	model = Classifier(n_spks=speaker_num).to(device)
	model.load_state_dict(torch.load(model_path))
	model.eval()
	print(f"[Info]: Finish creating model!",flush = True)

	results = [["Id", "Category"]]
	for feat_paths, mels in tqdm(dataloader):
		with torch.no_grad():
			mels = mels.to(device)
			outs = model(mels)
			preds = outs.argmax(1).cpu().numpy()
			for feat_path, pred in zip(feat_paths, preds):
				results.append([feat_path, mapping["id2speaker"][str(pred)]])

	with open(output_path, 'w', newline='') as csvfile:
		writer = csv.writer(csvfile)
		writer.writerows(results)


if __name__ == "__main__":
	main(**parse_args())

[Info]: Use cuda now!
[Info]: Finish loading data!
[Info]: Finish creating model!


  0%|          | 0/8000 [00:00<?, ?it/s]