In [1]:
import torch
import torch.nn as nn
from torchtext.vocab import build_vocab_from_iterator
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import random_split
from torch.utils.data import Dataset, DataLoader
from torch.nn import CrossEntropyLoss
import torch.optim as optim
import os
import numpy as np
from nltk.translate.bleu_score import sentence_bleu
import jieba



In [2]:
#参数列表


#模型参数
input_dim = 42 #输入词汇表大小(等于原词汇表大小+2，+2加的是结束符号和填充符号）
emb_dim=256       # 词向量维度
hidden_dim=256  # LSTM隐藏层维度
output_dim=181   # 输出词汇表大小（需你确认）
n_layers=1
OUTPUT_DIM=181 # 输出词汇表大小（需你确认）

savepath = '../model/xuanmen_km40' #模型保存地址
savename = 'lstm_kme40_emb256_hid256_frame1_双手合并.pth' #模型保存名称

device = 'cuda' if torch.cuda.is_available() else 'cpu'

#数据集参数 
data_dir = "../SLR_dataset/kmeans_40_seq_双手合并_数据去重_三次滑动/"#数据集源文件根目录
max_length = 80  # 源序列最大长度
end_token = (input_dim - 2)     # 源序列结束符号
pad_token = (input_dim - 1)     # 源填充符号

In [3]:
npy_files = sorted([f for f in os.listdir(data_dir) if f.endswith(".npy")])
labels = open("../SLR_dataset/corpus.txt").read().splitlines()  # 假设每行是一个标签
labels = [i.split()[1] for i in labels]
labels = [i.replace('\ufeff','') for i in labels]
samples = [np.load(os.path.join(data_dir, f),allow_pickle=True) for f in npy_files]

# 中文按字符分词（如需分词需修改为jieba等）
tokenizer = lambda x: list(jieba.cut(x)) 

# 构建词表（添加特殊标记）
def yield_tokens(texts):
    for text in texts:
        yield tokenizer(text)

vocab = build_vocab_from_iterator(
    yield_tokens(labels), 
    specials=["<start>","<pad>", "<unk>", "<end>"]
)
vocab.set_default_index(vocab["<unk>"])

# 转换为序列并添加<end>标记
sequences = [torch.tensor([vocab["<start>"]]+vocab(tokenizer(text)) + [vocab["<end>"]]) for text in labels]

# 统一填充长度（填充<pad>）
padded_sequences = pad_sequence(
    sequences, 
    batch_first=True, 
    padding_value=vocab["<pad>"]
)
idx2word = vocab.get_itos()

all_data = []
for i in range(len(samples)):
    input_seq = padded_sequences[i]  # 获取对应的输入序列
    
    for j in samples[i]:
        # 将j转为tensor（如果不是）
        j_tensor = torch.tensor(j) if not isinstance(j, torch.Tensor) else j.clone().detach()
        
        # 1. 先添加结束符41（计入1500长度内）
        j_with_end = torch.cat([j_tensor, torch.tensor([end_token], dtype=j_tensor.dtype)])
        
        # 2. 处理长度
        if len(j_with_end) > max_length:
            # 如果超长：截断到1499再加结束符
            j_processed = torch.cat([j_with_end[:max_length-1], 
                                   torch.tensor([end_token], dtype=j_tensor.dtype)])
        elif len(j_with_end) < max_length:
            pad_needed = max_length - len(j_with_end)
            padding = torch.full((pad_needed,), pad_token, dtype=j_tensor.dtype)
            j_processed = torch.cat([j_with_end, padding])
        else:
            # 刚好1500
            j_processed = j_with_end
        
        # 验证长度
        assert len(j_processed) == max_length, f"长度错误：{len(j_processed)} != {max_length}"
        
        # 添加到最终数据
        all_data.append([input_seq, j_processed])
        
print(f'词表大小：{len(vocab.get_stoi())}')

Building prefix dict from the default dictionary ...
Loading model from cache /tmp/jieba.cache
Loading model cost 0.883 seconds.
Prefix dict has been built successfully.


词表大小：181


In [4]:
class CustomDataset(Dataset):
    def __init__(self):
        self.data =all_data 

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx][1]
        label = self.data[idx][0]
        return sample, label

In [5]:
dataset = CustomDataset()
# 定义划分比例（例如80%训练，20%测试）
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size

# 随机划分
train_dataset, test_dataset = random_split(
    dataset, 
    [train_size, test_size],
    generator=torch.Generator().manual_seed(42)  # 固定随机种子确保可复现
)

# 创建DataLoader
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=128)

In [6]:
class seq2seq(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dim,output_dim,n_layers):
        super().__init__()
        self.encode_embedding = nn.Embedding(input_dim, emb_dim) #将每个词扩充为emb_dim维
        self.decode_embedding = nn.Embedding(output_dim, emb_dim)
        self.encode = nn.LSTM(emb_dim, hidden_dim, n_layers)
        self.decode = nn.LSTM(emb_dim,hidden_dim, n_layers)
        self.fc = nn.Linear(hidden_dim, output_dim)
    def forward(self, src, tar):
        # src: [batch_size, src_len]
        # tar: [batch_size, trg_len]
        
        # 编码器部分
        encode_embedded = self.encode_embedding(src)  # [batch_size, src_len, emb_dim]
        encode_embedded = encode_embedded.permute(1, 0, 2)  # [src_len, batch_size, emb_dim]
        _, (hidden, cell) = self.encode(encode_embedded)
        
        # 解码器部分
        batch_size = tar.shape[0] #3
        trg_len = tar.shape[1] #9
        output_dim = self.fc.out_features #181
        # print(output_dim)
        
        # 准备输出张量
        outputs = torch.zeros(trg_len, batch_size, output_dim).to(src.device)#9x3x181
        
        # 初始输入是<sos> token，这里假设tar已经包含<sos>作为第一个token
        input = tar[:, 0]  # 取第一个token作为初始输入 [batch_size]
        
        for t in range(1, trg_len):
            # 嵌入输入
            embedded = self.decode_embedding(input).unsqueeze(0)  # [1, batch_size, emb_dim]
            # print(f'embedded:{embedded.size()}')
            # print(f'hidden:{hidden.size()}')
            # 通过解码器
            output, (hidden, cell) = self.decode(embedded, (hidden, cell))
            
            # 预测下一个token
            pred = self.fc(output.squeeze(0))
            outputs[t] = pred
            
            # 下一个输入是真实目标(teacher forcing)或预测结果
            # 这里使用teacher forcing，传入真实目标
            input = tar[:, t]
        
        return outputs.permute(1, 0, 2)  # [batch_size, trg_len, output_dim]
    def predict(self, src, sos_token_idx=0, eos_token_idx=1, max_len=9):
        """
        自回归预测（不需要输入tar）
        :param src: 输入序列 [batch_size, src_len]
        :param sos_token_idx: <sos>的索引
        :param eos_token_idx: <eos>的索引（可选）
        :param max_len: 最大生成长度
        :return: 预测序列 [batch_size, max_len]
        """
        # 编码器部分
        encode_embedded = self.encode_embedding(src).permute(1, 0, 2)
        _, (hidden, cell) = self.encode(encode_embedded)
        
        # 解码器初始化
        batch_size = src.size(0)
        outputs = torch.zeros(batch_size, max_len).long().to(src.device)
        input = torch.full((batch_size,), sos_token_idx, dtype=torch.long).to(src.device)
        
        # 自回归解码
        for t in range(max_len):
            embedded = self.decode_embedding(input).unsqueeze(0)  # [1, batch_size, emb_dim]
            output, (hidden, cell) = self.decode(embedded, (hidden, cell))
            pred = self.fc(output.squeeze(0)).argmax(-1)  # [batch_size]
            
            outputs[:, t] = pred
            input = pred  # 使用预测结果作为下一输入
            
            # 如果所有序列都生成<eos>则提前停止
            if eos_token_idx is not None and (pred == eos_token_idx).all():
                break
        
        return outputs

In [7]:
def calculate_f1(reference, candidate):
    # 统计匹配词数
    common_terms = set(reference[0]) & set(candidate)
    tp = len(common_terms)  # True Positives
    fp = len(candidate) - tp  # False Positives
    fn = len(reference[0]) - tp  # False Negatives

    # 计算精确度、召回率、F1
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    return precision, recall, f1

# precision, recall, f1 = calculate_f1(reference, candidate)
# print(f"Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}")

In [8]:
def train(model, train_loader, optimizer, criterion, epochs, device,test_bool=False):
    model.train()
    model.to(device)
    max_bleu = 0
    for epoch in range(epochs):
        epoch_loss = 0
        
        for batch_idx, (src, trg) in enumerate(train_loader):
            src = src.to(device)  # [batch_size, 424]
            trg = trg.to(device)  # [batch_size, 10]
            
            optimizer.zero_grad()
            
            # 前向传播（模型自动处理teacher forcing）
            output = model(src, trg)  # [batch_size, 10, OUTPUT_DIM]
            
            # 计算损失（忽略<sos>和padding）
            output = output[:, 1:].reshape(-1, OUTPUT_DIM)  # 忽略<sos>，形状变为[batch_size*9, OUTPUT_DIM]
            trg = trg[:, 1:].reshape(-1)                    # 忽略<sos>，形状变为[batch_size*9]
            loss = criterion(output, trg)
            # print(output)
            # print(output.size())
            # print(trg)
            # print(trg.size())
            # return 0
            
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            
            epoch_loss += loss.item()
            
            if batch_idx % 100 == 0:
                print(f'Epoch: {epoch+1:03d} | Batch: {batch_idx:03d} | Loss: {loss.item():.4f}')
        test_num = 0
        test_bleu = float(0)
        test_loss = 0
        test_jingque = 0
        test_zhaohui = 0
        test_f1 = 0
        if test_bool:
            for batch_idx , (src,trg) in enumerate(test_loader):
                src = src.to(device)
                output = model.predict(src)
                output = output[0].tolist()
                # print([0]+output[:8])
                # print(trg.tolist())
                score = sentence_bleu(trg.tolist(), [0]+output[:8], weights=(0.5, 0.5)) 
                precision, recall, f1 = calculate_f1(trg.tolist(), [0]+output[:8])
                test_jingque += precision
                test_zhaohui += recall
                test_f1 += f1
                test_num += 1
                test_bleu += score
        print(f'Epoch: {epoch+1:03d} | Avg Loss: {epoch_loss/len(train_loader):.4f}')
        if test_bool:
            print(f'test bleu = {test_bleu/test_num},精确率: {(test_jingque/test_num):.4f}, 召回率: {(test_zhaohui/test_num):.4f}, F1: {(test_f1/test_num):.4f}')
            if (test_bleu/test_num) > max_bleu:
                max_bleu = (test_bleu/test_num)
        print(f'max_bleu={max_bleu}')

In [9]:
model = seq2seq(
    input_dim=input_dim,      # 输入词汇表大小
    emb_dim=emb_dim,       # 词向量维度
    hidden_dim=hidden_dim,    # LSTM隐藏层维度
    output_dim=output_dim,     # 输出词汇表大小（需你确认）
    n_layers=n_layers
).to(device)

In [10]:
# model=torch.load(os.path.join(savepath,savename))

In [11]:
print(model)
train(
    model=model,
    train_loader=train_loader,  # 你的DataLoader
    optimizer=optim.Adam(model.parameters(), lr=0.0001),
    criterion=CrossEntropyLoss(ignore_index=1),  # 假设填充符index=0
    # criterion=CrossEntropyLoss(), 
    epochs=300,
    device=device,
    test_bool=True
)

seq2seq(
  (encode_embedding): Embedding(42, 256)
  (decode_embedding): Embedding(181, 256)
  (encode): LSTM(256, 256)
  (decode): LSTM(256, 256)
  (fc): Linear(in_features=256, out_features=181, bias=True)
)
Epoch: 001 | Batch: 000 | Loss: 5.1931
Epoch: 001 | Batch: 100 | Loss: 3.2830
Epoch: 001 | Avg Loss: 3.9403
test bleu = 0.4419945217707983,精确率: 0.4023, 召回率: 0.4023, F1: 0.4023
max_bleu=0.4419945217707983
Epoch: 002 | Batch: 000 | Loss: 3.0901
Epoch: 002 | Batch: 100 | Loss: 2.4496
Epoch: 002 | Avg Loss: 2.7141
test bleu = 0.4419945217707983,精确率: 0.4023, 召回率: 0.4023, F1: 0.4023
max_bleu=0.4419945217707983
Epoch: 003 | Batch: 000 | Loss: 2.3950
Epoch: 003 | Batch: 100 | Loss: 1.9183
Epoch: 003 | Avg Loss: 2.1632
test bleu = 0.5603699344254983,精确率: 0.4061, 召回率: 0.4061, F1: 0.4061
max_bleu=0.5603699344254983
Epoch: 004 | Batch: 000 | Loss: 1.9467
Epoch: 004 | Batch: 100 | Loss: 1.6350
Epoch: 004 | Avg Loss: 1.7568
test bleu = 0.5448466598578322,精确率: 0.4061, 召回率: 0.4061, F1: 0.4061
max

The hypothesis contains 0 counts of 2-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()


Epoch: 035 | Avg Loss: 0.4171
test bleu = 0.7001184957830698,精确率: 0.5709, 召回率: 0.5709, F1: 0.5709
max_bleu=0.7319679660665068
Epoch: 036 | Batch: 000 | Loss: 0.3897
Epoch: 036 | Batch: 100 | Loss: 0.3967
Epoch: 036 | Avg Loss: 0.4037
test bleu = 0.7025673983869972,精确率: 0.5556, 召回率: 0.5556, F1: 0.5556
max_bleu=0.7319679660665068
Epoch: 037 | Batch: 000 | Loss: 0.3810
Epoch: 037 | Batch: 100 | Loss: 0.3932
Epoch: 037 | Avg Loss: 0.3916
test bleu = 0.7163573924959556,精确率: 0.5785, 召回率: 0.5785, F1: 0.5785
max_bleu=0.7319679660665068
Epoch: 038 | Batch: 000 | Loss: 0.3758
Epoch: 038 | Batch: 100 | Loss: 0.3994
Epoch: 038 | Avg Loss: 0.3802
test bleu = 0.6677374801125538,精确率: 0.5632, 召回率: 0.5632, F1: 0.5632
max_bleu=0.7319679660665068
Epoch: 039 | Batch: 000 | Loss: 0.3887
Epoch: 039 | Batch: 100 | Loss: 0.3647
Epoch: 039 | Avg Loss: 0.3683
test bleu = 0.664626961628843,精确率: 0.5632, 召回率: 0.5632, F1: 0.5632
max_bleu=0.7319679660665068
Epoch: 040 | Batch: 000 | Loss: 0.3589
Epoch: 040 | Batch: 

In [None]:
0.739260

In [16]:
if not os.path.exists(savepath):
    os.makedirs(savepath)
    print(f"目录已创建：{savepath}")
else:
    print(f"目录已存在：{savepath}")
torch.save(model,os.path.join(savepath,savename))
print(os.path.join(savepath,savename))

目录已存在：../model/xuanmen_km40
../model/xuanmen_km40/lstm_kme40_emb256_hid256_frame1_左右手合并.pth


In [17]:
for src, trg in test_loader:
    src = src.to(device)
    output = model.predict(src)
    output = output[0].tolist()
    text = ''
    for i in output:
        text+=idx2word[i]
    print(text)

我同学的妈妈是保姆<end><end><end>
引导他人成功<end><end><end><end><end><end>
他哥哥的同学是医生<end><end><end>
我表哥的邻居是记者<end><end><end>
他的邻居是残疾人<end><end><end><end>
结果圆满成功<end><end><end><end><end><end><end>
他哥哥的同学是医生<end><end><end>
我的毛毯是新的<end><end><end>
他哥哥的目标是解放军<end><end><end>
你妹妹是会计<end><end><end><end><end>
我们是自由恋爱<end><end><end><end><end><end>
紧张的工作气氛<end><end><end><end><end>
他的前途事业成功<end><end><end><end><end>
妈妈有项链<end><end><end><end><end><end>
扭转局面是困难的<end><end><end><end><end>
他的工作是美容<end><end><end><end>
我有打火机<end><end><end><end><end><end>
颜色是丰富的<end><end><end><end><end>
我同学的妹妹是护士<end><end><end>
紧张的工作气氛<end><end><end><end><end>
我同学的妈妈是保姆<end><end><end>
天空没有星星<end><end><end><end><end><end>
他妈妈的同学是公务员<end><end><end>
月亮是地球的卫星<end><end><end><end>
社会地位是平等的<end><end><end><end>
他们的国家摆脱贫苦<end><end><end><end>
他儿子是弱智人<end><end><end><end><end>
我公公是门卫<end><end><end><end><end>
工作环境的改善<end><end><end><end><end>
社会地位的提高<end><end><end><end><end>
他儿子是弱智人<end><end><end><end><end>
事情有改善<end><end><end><end><end><end>
他放弃目标<

In [19]:
class seq2seq(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dim,output_dim,n_layers):
        super().__init__()
        self.encode_embedding = nn.Embedding(input_dim, emb_dim) #将每个词扩充为emb_dim维
        self.decode_embedding = nn.Embedding(output_dim, emb_dim)
        self.encode = nn.LSTM(emb_dim, hidden_dim, n_layers)
        self.decode = nn.LSTM(emb_dim,hidden_dim, n_layers)
        self.fc = nn.Linear(hidden_dim, output_dim)
        
#     def forward(self, src,tar):
#         # src: [batch_size, src_len]
#         # trg: [batch_size, trg_len]
#         encode_embedded = self.encode_embedding(src)  # [batch_size, src_len, emb_dim]
#         encode_embedded = encode_embedded.permute(1, 0, 2) 
#         print(f'embedded:{encode_embedded.size()}')
#         outputs, (hidden, cell) = self.encode(encode_embedded)
#         print(f'encode outputs:{outputs.size()}')
#         print(f'encode hidden:{hidden.size()}')
#         decode_embedded = self.decode_embedding(tar) 
#         print(f'decode embedden:{decode_embedded.size()}')
#         # outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)
        
#         # outputs,_ = self.decode(hidden)
#         return hidden
    def forward(self, src, tar):
        # src: [batch_size, src_len]
        # tar: [batch_size, trg_len]
        
        # 编码器部分
        encode_embedded = self.encode_embedding(src)  # [batch_size, src_len, emb_dim]
        encode_embedded = encode_embedded.permute(1, 0, 2)  # [src_len, batch_size, emb_dim]
        _, (hidden, cell) = self.encode(encode_embedded)
        
        # 解码器部分
        batch_size = tar.shape[0] #3
        trg_len = tar.shape[1] #9
        output_dim = self.fc.out_features #181
        print(output_dim)
        
        # 准备输出张量
        outputs = torch.zeros(trg_len, batch_size, output_dim).to(src.device)#9x3x181
        
        # 初始输入是<sos> token，这里假设tar已经包含<sos>作为第一个token
        input = tar[:, 0]  # 取第一个token作为初始输入 [batch_size]
        
        for t in range(1, trg_len):
            # 嵌入输入
            embedded = self.decode_embedding(input).unsqueeze(0)  # [1, batch_size, emb_dim]
            print(f'embedded:{embedded.size()}')
            print(f'hidden:{hidden.size()}')
            # 通过解码器
            output, (hidden, cell) = self.decode(embedded, (hidden, cell))
            
            # 预测下一个token
            pred = self.fc(output.squeeze(0))
            outputs[t] = pred
            
            # 下一个输入是真实目标(teacher forcing)或预测结果
            # 这里使用teacher forcing，传入真实目标
            input = tar[:, t]
        
        return outputs.permute(1, 0, 2)  # [batch_size, trg_len, output_dim]

In [54]:
tar.size()

torch.Size([3, 9])

In [59]:
tar[:, 1]

tensor([47,  6, 74])

#### import torch
from torch.utils.data import Dataset, DataLoader

class CustomDataset(Dataset):
    def __init__(self):
        self.data =all_data 

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx][1]
        label = self.data[idx][0]
        return sample, label

In [213]:
# 定义划分比例（例如80%训练，20%测试）
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size

# 随机划分
train_dataset, test_dataset = random_split(
    dataset, 
    [train_size, test_size],
    generator=torch.Generator().manual_seed(42)  # 固定随机种子确保可复现
)

# 创建DataLoader
train_loader = DataLoader(train_dataset, batch_size=3, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)

In [13]:
dataset[0]

(tensor([18, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18,
         26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18,
         26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18,
         26, 26, 18, 18, 18, 18, 24, 18, 24, 24, 18, 18, 18, 18, 18, 18, 28, 18,
         35, 18, 38, 18, 38, 18, 38, 18, 38, 38, 38, 18, 38, 38, 38, 18, 38, 38,
         38, 18, 38, 38, 38, 18, 38, 38, 38, 18, 38, 38, 38, 18, 38, 38, 38, 18,
         38, 18, 38, 18, 38, 18, 35, 18, 31, 35, 31, 35, 28, 35, 28, 35, 35, 19,
          4, 18,  4, 18, 28,  4, 35,  4, 35, 18,  6, 18, 38,  5,  1, 18,  1, 18,
          1, 18,  1,  1,  1, 18,  1, 18,  1, 18,  1,  1,  1, 18,  1,  1, 34, 18,
          6, 34, 34, 18, 38, 34, 34, 18,  1, 18, 16, 18, 18, 18, 18, 18, 18, 18,
         18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18,
         18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 32, 18, 27, 18, 12, 18, 12, 18,
         12, 18, 12,  1,  1,

In [267]:
import torch.nn as nn

# class Seq2SeqTransformer(nn.Module):
#     def __init__(self, input_dim, output_dim, d_model=512, nhead=8, num_layers=6):
#         super().__init__()
#         self.encoder = nn.Linear(input_dim, d_model)
#         self.decoder = nn.Linear(d_model, output_dim)
#         self.transformer = nn.Transformer(
#             d_model=d_model,
#             nhead=nhead,
#             num_encoder_layers=num_layers,
#             num_decoder_layers=num_layers
#         )
#         self.pos_encoder = PositionalEncoding(d_model)  # 需自定义

#     def forward(self, src, tgt):
#         # src: (seq_len, batch, input_dim)
#         src = self.encoder(src)  # (seq_len, batch, d_model)
#         src = self.pos_encoder(src)
#         tgt = self.pos_encoder(tgt)  # 假设tgt是decoder输入
#         output = self.transformer(src, tgt)
#         return self.decoder(output)

class Seq2SeqTransformer(nn.Module):
    def __init__(self, vocab_size, output_size, d_model=128, nhead=4, num_layers=3):
        super().__init__()
        self.src_embedding = nn.Embedding(vocab_size, d_model)
        self.tgt_embedding = nn.Embedding(output_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model)
        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers
        )
        self.decoder = nn.Linear(d_model, output_size)

    def forward(self, src, tgt):
        # 确保输入至少有3维
        if src.dim() == 2:
            src = src.unsqueeze(1)  # (seq_len, 1, input_dim)
        if tgt.dim() == 2:
            tgt = tgt.unsqueeze(1)  # (seq_len, 1, output_dim)

        src = self.pos_encoder(self.src_embedding(src))
        tgt = self.pos_encoder(self.tgt_embedding(tgt))
        output = self.transformer(src, tgt)
        return self.decoder(output)

# 位置编码（Transformer需要）
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        pe = self.pe[:x.size(0)].unsqueeze(1)
        x = x + pe
        return x
    
    
class Seq2Seq(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Seq2Seq, self).__init__()
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.encoder = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        self.decoder = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        embedded = self.embedding(x)
        embedded = embedded.permute(1, 0, 2) 
        _, (hidden, cell) = self.encoder(embedded)
        output, _ = self.decoder(embedded, (hidden, cell))
        output = self.fc(output)
        return output

In [268]:
class Seq2Seq(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Seq2Seq, self).__init__()
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.encoder = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        self.decoder = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        embedded = self.embedding(x)
        embedded = embedded.permute(1, 0, 2) 
        _, (hidden, cell) = self.encoder(embedded)
        output, _ = self.decoder(embedded, (hidden, cell))
        # output = self.fc(output)
        return output

In [None]:
import torch
import torch.nn as nn

class Seq2Seq(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, max_output_len=8):
        super(Seq2Seq, self).__init__()
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.encoder = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        self.decoder = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.max_output_len = max_output_len  # 固定输出长度8
        self.output_size = output_size

    def forward(self, x):
        # Encoder
        embedded = self.embedding(x)  # [batch, 424, hidden_size]
        _, (hidden, cell) = self.encoder(embedded)  # hidden: [1, batch, hidden_size]

        # Decoder初始化
        batch_size = x.size(0)
        decoder_input = torch.zeros(batch_size, 1, dtype=torch.long).to(x.device)  # 初始输入<SOS>（假设0是<SOS>）
        outputs = torch.zeros(batch_size, self.max_output_len, self.output_size).to(x.device)

        # 自回归生成（逐步预测）
        for t in range(self.max_output_len):
            decoder_embedded = self.embedding(decoder_input)  # [batch, 1, hidden_size]
            decoder_output, (hidden, cell) = self.decoder(decoder_embedded, (hidden, cell))
            output = self.fc(decoder_output.squeeze(1))  # [batch, output_size]
            outputs[:, t, :] = output

            # 下一步输入是当前预测的token（Teacher Forcing可选）
            decoder_input = output.argmax(-1).unsqueeze(1)  # [batch, 1]

        return outputs  # [batch, 8, output_size]

In [126]:
# vocab_size = 40  # 词汇表大小（根据你的token ID最大值38，建议取稍大的值如50）
# output_size = 180  # 输出维度（与词汇表大小一致，如果是分类任务）

# model = Seq2SeqTransformer(vocab_size=vocab_size, output_size=output_size)

In [269]:
input_size = 40
hidden_size = 64
output_size = 180

In [270]:
model = Seq2Seq(input_size, hidden_size, output_size)

In [271]:
output = model(src)  # (8, 1, 50)
print(output.size())

RuntimeError: permute(sparse_coo): number of dimensions in the tensor input does not match the length of the desired ordering of dimensions i.e. input.dim() = 2 is not equal to len(dims) = 3

In [131]:
model = Seq2Seq(input_size, hidden_size, output_size)
for input_tensor, target_tensor in train_loader:
        print(input_tensor.size())
        print(target_tensor.size())
        output = model(input_tensor)
        print(output.size())
        break

torch.Size([2, 424])
torch.Size([2, 8])
torch.Size([2, 424, 180])


In [114]:
src = torch.tensor([18, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18,
         26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18,
         26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18, 26, 18,
         26, 26, 18, 18, 18, 18, 24, 18, 24, 24, 18, 18, 18, 18, 18, 18, 28, 18,
         35, 18, 38, 18, 38, 18, 38, 18, 38, 38, 38, 18, 38, 38, 38, 18, 38, 38,
         38, 18, 38, 38, 38, 18, 38, 38, 38, 18, 38, 38, 38, 18, 38, 38, 38, 18,
         38, 18, 38, 18, 38, 18, 35, 18, 31, 35, 31, 35, 28, 35, 28, 35, 35, 19,
          4, 18,  4, 18, 28,  4, 35,  4, 35, 18,  6, 18, 38,  5,  1, 18,  1, 18,
          1, 18,  1,  1,  1, 18,  1, 18,  1, 18,  1,  1,  1, 18,  1,  1, 34, 18,
          6, 34, 34, 18, 38, 34, 34, 18,  1, 18, 16, 18, 18, 18, 18, 18, 18, 18,
         18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18,
         18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 32, 18, 27, 18, 12, 18, 12, 18,
         12, 18, 12,  1,  1, 18,  1,  1,  1, 12,  1, 12, 13, 12, 13, 12, 13, 12,
         13, 12, 13, 12, 13, 12, 13, 12, 13, 12,  1, 12,  1, 12,  1, 12,  1, 12,
          1, 12,  1, 12,  1, 12,  1, 12,  1, 12,  1, 12,  1,  1,  1,  1, 20, 18,
         20, 18, 20, 36, 17, 18, 16, 18, 16, 18, 27, 18, 31, 18, 13, 18, 13, 18,
         13, 18, 31, 18, 38, 18, 38, 18, 38, 18, 35, 18, 34, 18, 34, 18, 34, 34,
         34, 18, 34, 18, 34, 18, 34, 18, 34, 18, 34, 18, 34, 34, 34, 18, 34, 34,
         34, 18, 34, 20, 20, 18, 34, 20, 20, 18, 34, 20, 20, 18, 16,  7,  7, 18,
          7,  7, 18, 18, 18, 18, 18, 18,  7, 18,  7, 18,  7, 18,  7, 18,  7, 18,
          7, 18,  7, 18,  7, 18,  7, 18,  7, 18,  7, 18, 20, 18, 20, 18, 20, 18,
         20, 18, 20, 18, 20, 18, 20, 18,  7, 20, 20, 18, 16, 20, 18, 18, 18, 18,
         18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18,
         18, 18, 18, 18, 18, 18, 18, 18, 18, 18])
tgt = torch.tensor([ 5,  3,  9,  4, 49,  2,  0,  0])

In [108]:
src = torch.tensor([list(src),list(src)])
tgt = torch.tensor([list(tgt),list(tgt)])

In [109]:
output = model(src)  # (8, 1, 50)

In [110]:
output.size()

torch.Size([2, 424, 180])

In [66]:
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model = model.to(device)

In [76]:
output.size()

torch.Size([8, 180])

In [59]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss(ignore_index=0)  # 忽略padding值（假设0是padding）

In [60]:
n_epochs = 40
for epoch in range(n_epochs):
    # 训练阶段
    model.train()
    train_loss = 0
    for src, tgt in tqdm(train_loader, desc=f'Epoch {epoch+1}'):
        src, tgt = src.to(device), tgt.to(device)
        print(len(src))
        print(len(src[0]))
        print(tgt)

        # 准备decoder输入（shifted right）
        tgt_input = tgt[:-1, :]  # 去掉最后一个token
        tgt_output = tgt[1:, :]   # 去掉第一个token

        optimizer.zero_grad()
        output = model(src, tgt_input)  # (seq_len, batch, vocab_size)

        # 计算损失（忽略padding）
        loss = criterion(output.view(-1, output.size(-1)), 
                       tgt_output.reshape(-1))
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    # 验证阶段
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for src, tgt in val_loader:
            src, tgt = src.to(device), tgt.to(device)
            tgt_input = tgt[:-1, :]
            tgt_output = tgt[1:, :]

            output = model(src, tgt_input)
            loss = criterion(output.view(-1, output.size(-1)), 
                           tgt_output.reshape(-1))
            val_loss += loss.item()

    # 打印日志
    avg_train_loss = train_loss / len(train_loader)
    avg_val_loss = val_loss / len(val_loader)
    print(f'Epoch {epoch+1}: Train Loss={avg_train_loss:.4f}, Val Loss={avg_val_loss:.4f}')

    # 保存最佳模型
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(model.state_dict(), 'best_model.pth')

Epoch 1:   0%|          | 0/464 [00:00<?, ?it/s]

32
424
tensor([[  5,  87,   3,  26,   4, 129,   2,   0],
        [  5,  20,   3,   9,   4, 106,   2,   0],
        [105,  28,   3,  19,   2,   0,   0,   0],
        [  7,  27,   4, 151,   2,   0,   0,   0],
        [  8, 115,  15,   2,   0,   0,   0,   0],
        [ 23,   3, 168,   4, 152,   3,   2,   0],
        [ 13, 121,  28,  22,   2,   0,   0,   0],
        [ 48,  10, 175,   2,   0,   0,   0,   0],
        [  5, 154,   4, 162,   2,   0,   0,   0],
        [112,   4,  32,   3,   2,   0,   0,   0],
        [ 55,   3,  12, 119, 173,   2,   0,   0],
        [  7,  27,   4, 151,   2,   0,   0,   0],
        [ 68,  12,   3,  59,   2,   0,   0,   0],
        [ 13, 121,  28,  22,   2,   0,   0,   0],
        [  5,   3,  80,   4,  76,   2,   0,   0],
        [  6,   3,  23,  95,   4, 132,   3,   2],
        [  6,   3,  17,   4,  93,   2,   0,   0],
        [ 67,   4, 176,   3,   2,   0,   0,   0],
        [  8,   3,  38,   4,  21,   3,   2,   0],
        [  5,  98,   4,  44,   2,   0,   0,




RuntimeError: The size of tensor a (424) must match the size of tensor b (32) at non-singleton dimension 1

#### import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm  # 进度条工具

    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss(ignore_index=0)  # 忽略padding值（假设0是padding）
    
    # 开始训练
    trained_model = train_model(
        model, train_data, val_data, 
        optimizer, criterion,
        n_epochs=10, batch_size=2, device='cuda' if torch.cuda.is_available() else 'cpu'
    )

In [134]:
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(Encoder, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        
    def forward(self, x):
        _, (hidden, cell) = self.lstm(x)
        return hidden, cell

class Decoder(nn.Module):
    def __init__(self, output_dim, hidden_dim):
        super(Decoder, self).__init__()
        self.lstm = nn.LSTM(output_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x, hidden, cell):
        output, (hidden, cell) = self.lstm(x, (hidden, cell))
        prediction = self.fc(output)
        return prediction, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        
    def forward(self, source, target, teacher_forcing_ratio=0.5):
        batch_size = target.shape[0]
        target_len = target.shape[1]
        target_dim = target.shape[2]
        
        # 存储输出
        outputs = torch.zeros(batch_size, target_len, target_dim).to(self.device)
        
        # 编码器处理
        hidden, cell = self.encoder(source)
        
        # 第一个输入是起始token (全零)
        input = torch.zeros(batch_size, 1, target_dim).to(self.device)
        
        for t in range(target_len):
            # 解码器一步
            output, hidden, cell = self.decoder(input, hidden, cell)
            
            # 存储预测
            outputs[:, t:t+1] = output
            
            # 决定是否使用teacher forcing
            teacher_force = np.random.random() < teacher_forcing_ratio
            
            # 如果使用teacher forcing，下一个输入是真实值；否则使用预测值
            input = target[:, t:t+1] if teacher_force else output
            
        return outputs

In [133]:
# 参数设置
input_dim = 424  # 输入特征维度
output_dim = 8  # 输出特征维度
hidden_dim = 256
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 初始化模型
encoder = Encoder(input_dim, hidden_dim)
decoder = Decoder(output_dim, hidden_dim)
model = Seq2Seq(encoder, decoder, device).to(device)

# 定义优化器和损失函数
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.MSELoss()  # 对于回归任务

# 训练函数
def train(model, dataloader, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    
    for i, (src, trg) in enumerate(train_loader):
        src, trg = src.to(device), trg.to(device)
        
        optimizer.zero_grad()
        
        output = model(src, trg)
        
        loss = criterion(output, trg)
        
        loss.backward()
        
        # 梯度裁剪防止爆炸
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        
        optimizer.step()
        
        epoch_loss += loss.item()
        
    return epoch_loss / len(dataloader)

# 验证函数
def evaluate(model, dataloader, criterion):
    model.eval()
    epoch_loss = 0
    
    with torch.no_grad():
        for i, (src, trg) in enumerate(test_loader):
            src, trg = src.to(device), trg.to(device)
            
            output = model(src, trg, 0)  # 关闭teacher forcing
            
            loss = criterion(output, trg)
            
            epoch_loss += loss.item()
            
    return epoch_loss / len(dataloader)

# 训练循环
n_epochs = 100
clip = 1
best_valid_loss = float('inf')

for epoch in range(n_epochs):
    train_loss = train(model, train_loader, optimizer, criterion, clip)
    valid_loss = evaluate(model, val_loader, criterion)
    
    # 保存最佳模型
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'best_model.pt')
    
    print(f'Epoch: {epoch+1:02}')
    print(f'\tTrain Loss: {train_loss:.3f}')
    print(f'\tVal. Loss: {valid_loss:.3f}')

IndexError: tuple index out of range