In [2]:
import pandas as pd
import numpy as np
from datetime import datetime
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import torchvision.models as models
from torch.utils.data import DataLoader, random_split
from dataset import ElevatorDataset
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from losses import FocalLoss
from models.MLP import MLP
from models.CNN import CNN
from models.CNN import DeepCNN
from models.resnet1 import ResNet
from models.resnet2 import resnet34

from torch.optim.lr_scheduler import StepLR
from tqdm import tqdm
from imblearn.over_sampling import SMOTE

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# Data preprocessing

In [3]:
dataset = ElevatorDataset('../processed_data/data_imbal.csv')

class
0    5324
1    1819
2     343
3     233
4     108
5      72
Name: count, dtype: int64


# Split train and test and validation set

In [6]:
dataset_size = len(dataset)
train_ratio = 0.7
val_ratio = 0.15
test_ratio = 0.15

train_size = int(train_ratio * dataset_size)
val_size = int(val_ratio * dataset_size)
test_size = dataset_size - train_size - val_size

train_dataset, remaining_dataset = random_split(dataset, [train_size, dataset_size - train_size])
val_dataset, test_dataset = random_split(remaining_dataset, [val_size, test_size])

batch_size = 4096  # Adjust the batch size according to your needs
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

In [None]:
# # SMOTE augmentation
# X = np.array(train_val_df['data'].tolist())

# sm = SMOTE(sampling_strategy={0:4259,1:2500,2:2500,3:2500,4:2500,5:2500}, random_state=42)
# X_res, y_res = sm.fit_resample(X, train_val_df['class'])
# X_ = pd.Series([row for row in X_res])

# # 将列表转换为 Pandas Series
# X_series = pd.Series(X_)
# df_aug=pd.DataFrame({'data':X_series,'class':y_res})
# df_aug['class'].value_counts()

class
0    4259
1    2500
5    2500
4    2500
3    2500
2    2500
Name: count, dtype: int64

In [7]:
# 定义验证函数
def validate(model, dataloader, num_classes):
    model.eval()
    correct = [0] * num_classes
    total = [0] * num_classes

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(torch.float32).to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            
            for i in range(num_classes):
                class_indices = labels == i
                class_total = torch.sum(class_indices)
                total[i] += class_total.item()
                correct[i] += torch.sum(predicted[class_indices] == labels[class_indices]).item()

    accuracies = [correct[i] / total[i] if total[i] != 0 else 0 for i in range(num_classes)]
    return accuracies

print('device:', device)
train_losses = []

# 定义 MLP 的输入维度、隐藏层维度和类别数量
input_size = 900
# hidden_sizes = [2048, 1024, 512, 256, 128, 64, 16, 8]
num_classes = 6

# input_size = 1
# hidden_size = 64

# model = LSTM(input_size, hidden_size, num_classes)

# 创建 MLP 模型实例
# model = MLP(input_size, hidden_sizes, num_classes)
# model = DeepCNN(input_size, num_classes)

# model = ResNet(BasicBlock, [2, 2, 2, 2], out_channel=4)
model = ResNet(num_classes=6)
# model = resnet34()

model.to(device)

# 定义损失函数和优化器
criterion = FocalLoss()
optimizer = optim.SGD(model.parameters(), lr=1e-3)
scheduler = StepLR(optimizer, step_size=1000, gamma=0.1)

# 训练模型
num_epochs = 2000
train_losses = []

for epoch in tqdm(range(num_epochs)):
    train_loss=0.0
    # with torch.autograd.detect_anomaly():
    for inputs, labels in train_loader:
        # 前向传播
        inputs = inputs.to(torch.float32).to(device)
        labels = labels.to(device)
        
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # 反向传播和参数更新
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    scheduler.step()

        # train_loss += loss.item() * inputs.size(0)
        
    # 计算平均损失并记录
    # train_loss = train_loss / len(xzZtrain_dataset)
    # train_losses.append(train_loss)
            
    if (epoch+1) % 200 == 0:
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}")
        accuracies = validate(model, val_loader, num_classes)
        for i in range(num_classes):
            print(f"Validation Accuracy (Class {i}): {accuracies[i]}")

timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
# 拼接文件名
filename = f"model/Resnet_{timestamp}.pth"
# 保存模型和参数
torch.save(model, filename)

device: cuda


  0%|          | 0/2000 [00:00<?, ?it/s]


KeyError: 563

In [None]:
model=torch.load('model/Resnet_2023-07-04-13-59-36-tag1.pth')
model.eval()
predictions = []
true_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(torch.float32).to(device)
        outputs = model(inputs).to(device)
        _, predicted_labels = torch.max(outputs, 1)

        predictions.extend(predicted_labels.tolist())
        true_labels.extend(labels.tolist())

# 计算评估指标
accuracy = accuracy_score(true_labels, predictions)
precision = precision_score(true_labels, predictions, average='macro')
recall = recall_score(true_labels, predictions, average='macro')
f1 = f1_score(true_labels, predictions, average='macro')

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1-score:", f1)

RuntimeError: GET was unable to find an engine to execute this computation

In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score

# 计算混淆矩阵
cm = confusion_matrix(true_labels, predictions)

# 计算每个类别的准确率
num_classes = len(cm)
class_accuracy = {}

for i in range(num_classes):
    class_accuracy[i] = cm[i, i] / cm[i, :].sum()

# 打印每个类别的准确率
for i in range(num_classes):
    print(f"Class {i} accuracy: {class_accuracy[i]:.4f}")

# 计算总体准确率
overall_accuracy = accuracy_score(true_labels, predictions)
print("Overall accuracy:", overall_accuracy)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# 计算混淆矩阵
cm = confusion_matrix(true_labels, predictions)

# 绘制热力图
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.xlabel("Predicted labels")
plt.ylabel("True labels")
plt.title("Confusion Matrix")
plt.savefig(f'imgs/cm_{timestamp}.png')
plt.show()