In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from dataset import *
# class SimpleCNN(nn.Module):
#     def __init__(self):
#         super(SimpleCNN, self).__init__()
#         # 假设输入图像大小为28x28，且有一个颜色通道（灰度图像）
#         self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)  # 第一个卷积层
#         self.pool = nn.MaxPool2d(kernel_size=2, stride=2)  # 池化层
#         self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)  # 第二个卷积层
#         self.fc1 = nn.Linear(64 * 7 * 7, 128)  # 全连接层
#         self.fc2 = nn.Linear(128, num_classes)  # 最后的全连接层，num_classes是类别数

#     def forward(self, x):
#         x = self.pool(F.relu(self.conv1(x)))  # 通过第一个卷积层和池化层
#         x = self.pool(F.relu(self.conv2(x)))  # 通过第二个卷积层和池化层
#         x = x.view(-1, 64 * 7 * 7)  # 展平层
#         x = F.relu(self.fc1(x))  # 全连接层
#         x = self.fc2(x)  # 输出层
#         return x
    

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_layer_size, output_size, num_layers):
        super(LSTMModel, self).__init__()
        self.hidden_layer_size = hidden_layer_size
        self.num_layers = num_layers
        # LSTM层
        self.lstm = nn.LSTM(input_size, hidden_layer_size, num_layers, batch_first=True)
        # 全连接层，用于输出
        self.fc = nn.Linear(hidden_layer_size, output_size)

    def forward(self, x):
        # 初始化隐藏状态和细胞状态
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_layer_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_layer_size).to(x.device)

        # 前向传播LSTM
        out, _ = self.lstm(x, (h0, c0))

        # 取最后一个时间步的输出用于预测
        out = self.fc(out[:, -1, :])
        return out
    

# 假设我们的数据集有10个类别

input_size = 13  # 输入特征的维度
hidden_layer_size = 200  # LSTM隐藏层的维度
output_size = 12  # 输出的维度，例如，对于回归问题
num_layers = 2  # LSTM堆叠的层数

model = LSTMModel(input_size, hidden_layer_size, output_size, num_layers)

In [None]:


# # 假设你已经有了一个PyTorch兼容的Dataset类
# train_dataset = YourDataset(train_data, train_labels, transform=train_transform)
# val_dataset = YourDataset(val_data, val_labels, transform=val_transform)

# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)


with open('config.json', 'r') as config_file:
    config = json.load(config_file)


path = get_path('train')
train_label = get_labels(path)
train_dataset = WavDataset(path, train_label, config)
train_loader = DataLoader(train_dataset, batch_size=config["batchsize"], shuffle=True)

path = get_path('val')
val_label = get_labels(path)
val_dataset = WavDataset(path, val_label, config)
val_loader = DataLoader(val_dataset, batch_size=config["batchsize"], shuffle=True)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
import os
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, save_path):
    for epoch in range(num_epochs):
        model.train()  # 设置模型为训练模式
        running_loss = 0.0
        train_correct = 0
        train_total = 0
        for images, labels in train_loader:
            optimizer.zero_grad()  # 清空梯度
            # images = images.float()
            outputs = model(images)  # 前向传播
            loss = criterion(outputs, labels)  # 计算损失
            loss.backward()  # 反向传播
            optimizer.step()  # 更新权重
            running_loss += loss.item()

            # 转换输出和标签到相同的类型
            preds = outputs.argmax(dim=1)
            # 更新正确预测的数量
            train_correct += preds.eq(labels.argmax(dim=1)).sum().item()
            train_total += labels.size(0)

        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}')
        print(f'Train Acc: {train_correct/train_total}')

        # 验证模型
        model.eval()  # 设置模型为评估模式
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        with torch.no_grad():  # 在验证阶段不计算梯度
            for images, labels in val_loader:
                outputs = model(images)
                # 计算损失
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                
                # 转换输出和标签到相同的类型
                preds = outputs.argmax(dim=1)
                
                # 更新正确预测的数量
                val_correct += preds.eq(labels.argmax(dim=1)).sum().item()
                val_total += labels.size(0)
        print(f'Validation Loss: {val_loss/len(val_loader)}')
        print(f'Validation Acc: {val_correct/val_total}')
        torch.save(model.state_dict(), os.path.join(save_path, f'{epoch:02}_model.pth'))
        with open(os.path.join(save_path, 'checkpoint.txt'), 'a', encoding='utf-8') as f:
            lines = []
            lines.append(f'Epoch {epoch+1}/{num_epochs}')
            lines.append(f'Train Loss: {running_loss/len(train_loader)}')
            lines.append(f'Train Acc: {train_correct/train_total}')
            lines.append(f'Validation Loss: {val_loss/len(val_loader)}')
            lines.append(f'Validation Acc: {val_correct/val_total}')
            lines.append('\n\n')
            f.write('\n'.join(lines))

        

num_epochs = 20
save_path = 'model/LSTM'
train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, save_path)

In [None]:
model = LSTMModel(input_size, hidden_layer_size, output_size, num_layers)
model.load_state_dict(torch.load('model\LSTM\\19_model.pth'))

# model = torch.load('model.pth')
model.eval()  # 设置模型为评估模式
test_loss = 0.0
correct = 0
total = 0

path = get_path('test')
test_label = get_labels(path)
test_dataset = WavDataset(path, test_label, config)
test_loader = DataLoader(test_dataset, batch_size=config["batchsize"], shuffle=True)

with torch.no_grad():  # 在验证阶段不计算梯度
    for images, labels in test_loader:
        outputs = model(images)
        # 计算损失
        loss = criterion(outputs, labels)
        test_loss += loss.item()
        
        # 转换输出和标签到相同的类型
        preds = outputs.argmax(dim=1)
        
        # 更新正确预测的数量
        correct += preds.eq(labels.argmax(dim=1)).sum().item()
        total += labels.size(0)

print(f'Validation Loss: {test_loss/len(test_loader)}')
print(f'Accuracy: {correct/total:.4f}')
print(f'Accuracy: {correct,total}')

In [None]:
torch.save(model.state_dict(), 'model.pth')

In [None]:
# import torch
# import torch.nn as nn

# class LSTMModel(nn.Module):
#     def __init__(self, input_size, hidden_layer_size, num_layers, output_size):
#         super(LSTMModel, self).__init__()
#         self.hidden_layer_size = hidden_layer_size
#         self.num_layers = num_layers
#         self.lstm = nn.LSTM(input_size, hidden_layer_size, num_layers, batch_first=True)
#         self.fc = nn.Linear(hidden_layer_size, output_size)

#     def forward(self, x):
#         # 初始化隐藏状态和细胞状态
#         batch_size = x.size(0)  # 批量大小
#         h0 = torch.zeros(self.num_layers, batch_size, self.hidden_layer_size).to(x.device)
#         c0 = torch.zeros(self.num_layers, batch_size, self.hidden_layer_size).to(x.device)

#         # 前向传播LSTM
#         out, _ = self.lstm(x, (h0, c0))
        
#         # 取最后一个时间步的隐藏状态用于预测
#         out = self.fc(out[:, -1, :])
#         return out

# # 创建模型实例
# input_size = 1  # 假设每个时间步的特征维度是1
# hidden_layer_size = 50
# num_layers = 2
# output_size = 1
# model = LSTMModel(input_size, hidden_layer_size, num_layers, output_size)

# # 假设images是二维张量，形状为[batch_size, sequence_length]
# images = torch.randn(16, 16000, 1)  # sequence_length, batch_size, feature_size  # batch_size=16, sequence_length=16000

# # 调用模型
# output = model(images)
# print(output.shape)