# header

In [18]:
import os
import json
import datetime
import logging
import warnings

import pandas as pd
import numpy as np
from tqdm import tqdm
from numba import cuda
from collections import Counter

from sklearn.metrics import accuracy_score, precision_score, recall_score
import config as CONFIG
import utils

import wandb

warnings.filterwarnings("ignore")
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader, TensorDataset, Dataset, SubsetRandomSampler
from sklearn.metrics import precision_score, recall_score
import torch.nn as nn
import torch.nn.functional as F


# generate features

In [23]:
# single process, go "feats.py" for multi process
max_row = 131072
max_col = 1024
row_block = 32
col_block = 16
row_section_size = max_row // row_block
col_section_size = max_col // col_block

def process_one(i):
    df = pd.read_csv(os.path.join(CONFIG.PATH_PROCESSED, f"mcelog_{i}.csv"))
    data = []
    labels = []
    hosts = df["sid"].drop_duplicates().to_list()

    for host in tqdm(hosts):
        host_matrix = np.zeros([row_block, col_block])
        host_df = df[df["sid"]==host]
        host_df = host_df.fillna(-1)
        
        label = (int(host_df["failure_type"].sum() < 0) + 1) % 2
        labels.append(label)

        host_df["row_section"] = host_df["row"] // row_section_size
        host_df["col_section"] = host_df["col"] // col_section_size

        host_df_simple = host_df[["row_section", "col_section"]].drop_duplicates()

        for _, row in host_df_simple.iterrows():
            host_matrix[int(row["row_section"]), int(row["col_section"])] = 1
        data.append(host_matrix)
    
    np.save(os.path.join(CONFIG.PATH_PROCESSED, f"feats_{i}.npy"), data)
    np.save(os.path.join(CONFIG.PATH_PROCESSED, f"labels_{i}.npy"), labels)
    
    
# for i in range(0, 31):
#     if i == 22:
#         continue
#     process_one(i)

# load data

In [41]:
host_matrix = np.zeros([3, 3])

host_matrix[1,1] += 1
host_matrix[1,1] += 1.2

host_matrix

array([[0. , 0. , 0. ],
       [0. , 2.2, 0. ],
       [0. , 0. , 0. ]])

In [16]:

dataset_list = []


for i in range(31):
    if i == 22:
        continue
    data_np = np.load(os.path.join(CONFIG.PATH_PROCESSED, f"feats_64x32_with_times_{i}.npy"))
    label_np = np.load(os.path.join(CONFIG.PATH_PROCESSED, f"labels_64x32_{i}.npy"))
    
    data_np = 2 / (1 + np.exp(-data_np)) - 1

    data = torch.tensor(data_np, dtype=torch.float32)
    label = torch.tensor(label_np, dtype=torch.int64)
     
    dataset = TensorDataset(data, label)
    dataset_list.append(dataset)

datasets = torch.utils.data.ConcatDataset(dataset_list)

In [17]:
data_np[0]

array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]])

## check ratio

In [19]:

dataset_list = []

xgb_data_list = []
xgb_label_list = []

cnt = 0
error = 0
enrich_ratio = 1
under_sampling_ratio = 8

for i in range(31):
    if i == 22:
        continue
    data_np = np.load(os.path.join(CONFIG.PATH_PROCESSED, f"feats_64x32_with_times_{i}.npy"))
    label_np = np.load(os.path.join(CONFIG.PATH_PROCESSED, f"labels_64x32_{i}.npy"))
    
    data_list = data_np.tolist()
    label_list = label_np.tolist()
    
    tmp_data_list = []
    tmp_label_list = []
    for idx in range(len(data_list)):
        curdata = data_list[idx]
        curlabel = label_list[idx]
        if curlabel == 1:
            for _ in range(enrich_ratio):
                tmp_data_list.append(curdata)
                tmp_label_list.append(curlabel)
        else:
            if idx % under_sampling_ratio == 0:
                tmp_data_list.append(curdata)
                tmp_label_list.append(curlabel)
    cnt += len(label_list)   
    error += sum(label_list)
    
    
    data_list = tmp_data_list
    label_list = tmp_label_list
    
    xgb_data_list.extend(data_list)
    xgb_label_list.extend(label_list)
    
    data_np = np.array(data_list)
    label_np = np.array(label_list)
    
    data_np = 2 / (1 + np.exp(-data_np)) - 1
    
    data = torch.tensor(data_np, dtype=torch.float32)
    label = torch.tensor(label_np, dtype=torch.int64)
     
    dataset = TensorDataset(data, label)
    dataset_list.append(dataset)

datasets = torch.utils.data.ConcatDataset(dataset_list)

In [20]:
len(xgb_data_list)

4296

In [21]:
sum(xgb_label_list)

566

In [22]:
data.shape

torch.Size([2, 1, 64, 32])

In [23]:
data_np.shape

(2, 1, 64, 32)

In [30]:
datasets[0].len()

AttributeError: 'tuple' object has no attribute 'len'

In [35]:
ret_data = None
ret_label = None

for i in range(31):
    if i == 22:
        continue
    data_np = np.load(os.path.join(CONFIG.PATH_PROCESSED, f"feats_64x32_with_times_{i}.npy"))
    label_np = np.load(os.path.join(CONFIG.PATH_PROCESSED, f"labels_64x32_{i}.npy"))
    if ret_data is None:
        ret_data = data_np
        ret_label = label_np
    else:
        ret_data = np.concatenate((ret_data, data_np))
        ret_label = np.concatenate((ret_label, label_np))

In [36]:
ret_data.shape

(30264, 1, 64, 32)

In [37]:
ret_label.shape

(30264,)

In [39]:
ret_label[256]

0

In [None]:
ret_data.

# model

## single testing

In [31]:
torch.manual_seed(42)
np.random.seed(42)

import torch.nn as nn
import torch.nn.functional as F
import random
random.seed(42)

# class CNNModel(nn.Module):
#     def __init__(self):
#         super(CNNModel, self).__init__()
#         self.channels = 64
        
#         self.conv1 = nn.Conv2d(1, self.channels, kernel_size=9, padding=4)
#         self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
#         self.fc1 = nn.Linear(self.channels * 16 * 8, 128)  
#         self.dropout = nn.Dropout(0.1)
#         self.fc2 = nn.Linear(128, 2)  

#     def forward(self, x):
#         x = self.pool(F.relu(self.conv1(x)))
#         x = x.view(-1, self.channels * 16 * 8)
#         x = F.relu(self.fc1(x))
#         x = self.dropout(x)
#         x = self.fc2(x)
#         return x

# class CNNModel(nn.Module):
#     def __init__(self):
#         super(CNNModel, self).__init__()
#         self.channels1 = 32
#         self.channels2 = 64
        
#         self.conv1 = nn.Conv2d(1, self.channels1, kernel_size=3, padding=1)
#         self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        
#         self.conv2 = nn.Conv2d(self.channels1, self.channels2, kernel_size=3, padding=1)
#         self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        
#         self.fc1 = nn.Linear(self.channels2 * 7 * 7, 128)
        
#         self.fc2 = nn.Linear(128, 2)  # 输出2类，根据你的任务调整输出维度

#         self.dropout = nn.Dropout(0.1)

#     def forward(self, x):
#         x = self.pool1(F.relu(self.conv1(x)))
#         x = self.pool2(F.relu(self.conv2(x)))
#         x = x.view(-1, self.channels2 * 7 * 7)
#         x = F.relu(self.fc1(x))
#         x = self.dropout(x)
#         x = self.fc2(x)
#         return x

# import torch
# import torch.nn as nn

class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=9, padding=4)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=7, padding=3)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=5, padding=2)
        self.conv4 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        
        # self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1)
        # self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        # self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        # self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        # self.conv4 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        
        # 全连接层
        self.fc1 = nn.Linear(256 * 2 * 1, 256) 
        self.dropout = nn.Dropout(0.1)
        self.fc2 = nn.Linear(256, 2)  

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = self.pool(F.relu(self.conv4(x)))
        x = x.view(-1, 256 * 2 * 1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

########################################################################
# 定义基本的ResNet块
class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=9, stride=stride, padding=4, bias=False)
        # self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        # self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = x
        out = self.conv1(x)
        # out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        # out = self.bn2(out)
        if self.downsample:
            identity = self.downsample(x)
        out += identity
        out = self.relu(out)
        return out

# 定义ResNet网络
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=2):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False)
        # self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(256, num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        # x = self.bn1(x)
        x = self.relu(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

# 创建ResNet模型
def ResNetCustom():
    return ResNet(BasicBlock, [2, 2, 2])  # 这里定义了一个3层的ResNet
########################################################################

# wandb.init(
#     project="dram",
    
#     config={

#     }
# )
    
    

batch_size = 64
data_loader = DataLoader(datasets, shuffle=True)


train_ratio = 0.8
test_ratio = 1 - train_ratio  

total_samples = len(data_loader.dataset)

train_size = int(train_ratio * total_samples)
test_size = total_samples - train_size

indices = list(range(total_samples))

random.shuffle(indices)

train_indices = indices[:train_size]
test_indices = indices[train_size:]

train_sampler = SubsetRandomSampler(train_indices)
test_sampler = SubsetRandomSampler(test_indices)

train_loader = DataLoader(datasets, batch_size=batch_size, sampler=train_sampler)
test_loader = DataLoader(datasets, batch_size=batch_size, sampler=test_sampler)


############################################################################



#######################################################################################

# # model = CNNModel()
# model = ResNetCustom()


# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# print("当前设备:", device)
# model.to(device)

# criterion = nn.CrossEntropyLoss() 
# optimizer = optim.Adam(model.parameters(), lr=0.001) 

# num_epochs = 100
# for epoch in range(num_epochs):
#     running_loss = 0.0
#     for _inputs, _labels in train_loader:
#         _inputs, _labels = _inputs.to(device), _labels.to(device)
#         optimizer.zero_grad()
#         outputs = model(_inputs.unsqueeze(1))  # 在通道维度上增加1
#         loss = criterion(outputs, _labels)
#         loss.backward()
#         optimizer.step()
#         running_loss += loss.item()
#         # wandb.log({"loss": running_loss})
#     print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {running_loss / len(data_loader)}')

# print('Finished Training')


# model.eval()



# predictions = []
# true_labels = []

# with torch.no_grad():
#     for _inputs, _labels in train_loader:
#         _inputs, _labels = _inputs.to(device), _labels.to(device)
#         outputs = model(_inputs.unsqueeze(1))
#         _, predicted = torch.max(outputs.data, 1)
#         predictions.extend(predicted.tolist())
#         true_labels.extend(_labels.tolist())

# precision = precision_score(true_labels, predictions)
# recall = recall_score(true_labels, predictions)

# print(f"all: {len(true_labels)}, error_labels: {sum(true_labels)}")
# print(f'Precision: {precision}')
# print(f'Recall: {recall}')

# # wandb.log({"train precision": precision, "train recall": recall})


# predictions = []
# true_labels = []

# with torch.no_grad():
#     for _inputs, _labels in test_loader:
#         _inputs, _labels = _inputs.to(device), _labels.to(device)
#         outputs = model(_inputs.unsqueeze(1))
#         _, predicted = torch.max(outputs.data, 1)
#         predictions.extend(predicted.tolist())
#         true_labels.extend(_labels.tolist())

# print(f"all: {len(true_labels)}, error_labels: {sum(true_labels)}")
        
# precision = precision_score(true_labels, predictions)
# recall = recall_score(true_labels, predictions)

# print(f'Precision: {precision}')
# print(f'Recall: {recall}')

# # wandb.log({"test precision": precision, "test recall": recall})

# # wandb.finish()
#######################################################################################


model = CNNModel()
# wandb.watch(model)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("当前设备:", device)
model.to(device)

criterion = nn.CrossEntropyLoss() 
optimizer = optim.Adam(model.parameters(), lr=0.001) 

num_epochs = 100
for epoch in range(num_epochs):
    running_loss = 0.0
    for _inputs, _labels in train_loader:
        _inputs, _labels = _inputs.to(device), _labels.to(device)
        optimizer.zero_grad()
        outputs = model(_inputs.unsqueeze(0))  # 在通道维度上增加1
        loss = criterion(outputs, _labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        # wandb.log({"loss": running_loss})
    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {running_loss / len(data_loader)}')

print('Finished Training')


model.eval()



predictions = []
true_labels = []

with torch.no_grad():
    for _inputs, _labels in train_loader:
        _inputs, _labels = _inputs.to(device), _labels.to(device)
        outputs = model(_inputs.unsqueeze(0))
        _, predicted = torch.max(outputs.data, 1)
        predictions.extend(predicted.tolist())
        true_labels.extend(_labels.tolist())

precision = precision_score(true_labels, predictions)
recall = recall_score(true_labels, predictions)

print(f"all: {len(true_labels)}, error_labels: {sum(true_labels)}")
print(f'Precision: {precision}')
print(f'Recall: {recall}')

# wandb.log({"train precision": precision, "train recall": recall})


predictions = []
true_labels = []

with torch.no_grad():
    for _inputs, _labels in test_loader:
        _inputs, _labels = _inputs.to(device), _labels.to(device)
        outputs = model(_inputs.unsqueeze(0))
        _, predicted = torch.max(outputs.data, 1)
        predictions.extend(predicted.tolist())
        true_labels.extend(_labels.tolist())

print(f"all: {len(true_labels)}, error_labels: {sum(true_labels)}")
        
precision = precision_score(true_labels, predictions)
recall = recall_score(true_labels, predictions)

print(f'Precision: {precision}')
print(f'Recall: {recall}')

# wandb.log({"test precision": precision, "test recall": recall})

# wandb.finish()

当前设备: cuda


RuntimeError: Expected 3D (unbatched) or 4D (batched) input to conv2d, but got input of size: [1, 64, 1, 64, 32]

## lightgbm

In [130]:
import numpy as np
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# 假设你有N个样本，每个样本是一个3x2的矩阵
# N = 1000
data = np.array(xgb_data_list)
labels = np.array(xgb_label_list)

# 将数据整形为(N, 6)，以便传递给LightGBM
data = data.reshape(len(data), -1)

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=42)

# 创建LightGBM数据集
train_data = lgb.Dataset(X_train, label=y_train)
test_data = lgb.Dataset(X_test, label=y_test, reference=train_data)

# 定义LightGBM参数
params = {
    'objective': 'binary',  # 二分类问题
    'metric': 'binary_error',  # 评估指标为二分类错误率
    'boosting_type': 'gbdt',  # 使用梯度提升树
    'num_leaves': 128,
    'learning_rate': 0.05,
    'feature_fraction': 0.3
}

# 训练LightGBM模型
num_round = 100  # 迭代次数
bst = lgb.train(params, train_data, num_round, valid_sets=[test_data])

# 预测
y_pred = bst.predict(X_test, num_iteration=bst.best_iteration)
y_pred_binary = [1 if x >= 0.2 else 0 for x in y_pred]

# 评估模型

accuracy = accuracy_score(y_test, y_pred_binary)
print(f'Accuracy: {accuracy * 100:.2f}%')

precision = precision_score(y_test, y_pred_binary)
recall = recall_score(y_test, y_pred_binary)

print(f'Precision: {precision}')
print(f'Recall: {recall}')

[LightGBM] [Info] Number of positive: 447, number of negative: 2989
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 5459
[LightGBM] [Info] Number of data points in the train set: 3436, number of used features: 506
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.130093 -> initscore=-1.900136
[LightGBM] [Info] Start training from score -1.900136
Accuracy: 81.51%
Precision: 0.3684210526315789
Recall: 0.47058823529411764


## pattern

In [92]:
ptn_data = np.load(os.path.join(CONFIG.PATH_PROCESSED, f"feats_with_times_{0}.npy"))
ptn_label = np.load(os.path.join(CONFIG.PATH_PROCESSED, f"labels_{0}.npy"))

ptn_data = 2/(1+np.exp(-ptn_data))-1

In [93]:
ptn_data[45]

array([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,

## xgboost

In [131]:
import numpy as np
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# 假设你有N个样本，每个样本是一个3x2的矩阵
# N = 1000
data = np.array(xgb_data_list)
labels = np.array(xgb_label_list)

# 将数据整形为(N, 6)，以便传递给LightGBM
data = data.reshape(len(data), -1)

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=42)

# model = xgb.XGBClassifier(max_depth=6,
#     learning_rate=0.01,
#     n_estimators=100,
#     objective='binary:logistic',
#     eval_metric='logloss',
#     random_state=42)
model = xgb.XGBClassifier(objective='binary:logistic')
model.fit(X_train, y_train)
y_pred = model.predict(X_test)

# 评估模型

accuracy = accuracy_score(y_test, y_pred)
print(f'Accuracy: {accuracy * 100:.2f}%')

precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)

print(f'Precision: {precision}')
print(f'Recall: {recall}')

Accuracy: 85.12%
Precision: 0.40816326530612246
Recall: 0.16806722689075632


## Random Forest 

In [132]:
from sklearn.ensemble import RandomForestClassifier


data = np.array(xgb_data_list)
labels = np.array(xgb_label_list)

data = data.reshape(len(data), -1)

X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=42)

model = RandomForestClassifier(n_estimators=50, 
                            random_state=42,
                            criterion="gini")
model.fit(X_train, y_train)
y_pred = model.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)
print(f'Accuracy: {accuracy * 100:.2f}%')

precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)

print(f'Precision: {precision}')
print(f'Recall: {recall}')

Accuracy: 84.42%
Precision: 0.3972602739726027
Recall: 0.24369747899159663


# test

In [98]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# 创建示例数据（请根据您的数据替换这部分代码）
N = 1000
data = np.random.rand(N, 1, 32, 16)
labels = np.random.randint(0, 2, N)

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=42)

# 将数据转换为 PyTorch 张量
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# 创建数据加载器
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# 定义基本的ResNet块
class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        if self.downsample:
            identity = self.downsample(x)
        out += identity
        out = self.relu(out)
        return out

# 定义ResNet网络
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=2):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(256, num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

# 创建ResNet模型
def ResNetCustom():
    return ResNet(BasicBlock, [2, 2, 2])  # 这里定义了一个3层的ResNet

# 训练模型
model = ResNetCustom()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

# 模型评估
model.eval()
y_pred = []
with torch.no_grad():
    for inputs, labels in train_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        y_pred.extend(predicted.cpu().numpy())

accuracy = accuracy_score(y_train, y_pred)
print(f"训练准确率: {accuracy:.2f}")


训练准确率: 0.51


In [33]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np

# 生成模拟数据
N = 1000
data = np.random.randn(N, 1, 64, 32).astype(np.float32)
labels = np.random.randint(0, 2, N)  # 生成随机标签

# 划分训练集和测试集
split_ratio = 0.8
split = int(N * split_ratio)
train_data, test_data = data[:split], data[split:]
train_labels, test_labels = labels[:split], labels[split:]

# 创建自定义数据集类
class CustomDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = {'data': self.data[idx], 'label': self.labels[idx]}
        return sample

# 创建数据集和数据加载器
batch_size = 64
train_dataset = CustomDataset(train_data, train_labels)
test_dataset = CustomDataset(test_data, test_labels)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# 定义一个两层CNN模型
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()
        self.fc = nn.Linear(32 * 16 * 8, 2)  # 输出2个类别

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

# 创建模型实例
model = SimpleCNN()

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练模型
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for batch in train_loader:
        inputs, labels = batch['data'], batch['label']
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {running_loss / len(train_loader)}')

# 在测试集上验证模型
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for batch in test_loader:
        inputs, labels = batch['data'], batch['label']
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Accuracy on test set: {(correct / total) * 100:.2f}%')


Epoch 1/10, Loss: 0.7720351815223694
Epoch 2/10, Loss: 0.6970367385790899
Epoch 3/10, Loss: 0.6913520051882818
Epoch 4/10, Loss: 0.6891163358321557
Epoch 5/10, Loss: 0.6859682110639719
Epoch 6/10, Loss: 0.681311194713299
Epoch 7/10, Loss: 0.672152775984544
Epoch 8/10, Loss: 0.662941758449261
Epoch 9/10, Loss: 0.641889397914593
Epoch 10/10, Loss: 0.6274937574680035
Accuracy on test set: 47.00%
