## N = 15

In [None]:
import numpy as np
import torch

print(torch.cuda.is_available())

# load date
X = np.load("Datasets/kryptonite-24-X.npy")
X.shape

In [None]:
y = np.load("Datasets/kryptonite-24-y.npy")
y.shape

In [None]:
import numpy as np
import torch
from tqdm import tqdm
from torch.utils.data import TensorDataset, DataLoader, random_split
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import models

# Convert numpy arrays to PyTorch tensors
X_tensor = torch.tensor(X, dtype=torch.float)
y_tensor = torch.tensor(y, dtype=torch.float)

# create a TensorDataset
dataset = TensorDataset(X_tensor, y_tensor)

# define split sizes (60% train, 20% validation, 20% test)
train_size = int(0.6 * len(dataset))
val_size = int(0.2 * len(dataset))
test_size = len(dataset) - train_size - val_size

# Split the dataset into train, validation, and test
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Create DataLoaders for each subset
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# check data loader output
for X_batch, y_batch in tqdm(train_loader):
    # Reshape X_batch to have a single channel  # Add a channel dimension
    print(X_batch.shape, y_batch.shape)  # X_batch is of shape [batch_size, 1, 15]
    break

class CustomResNet18(nn.Module):
    def __init__(self):
        super(CustomResNet18, self).__init__()
        
        # 加载resnet18
        self.model = models.resnet18(pretrained=False)
        
        # 修改第一个卷积层，将输入通道从3改为1
        self.model.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        
        # 修改最后的全连接层，输出改为1个值
        num_features = self.model.fc.in_features
        self.model.fc = nn.Linear(num_features, 1)
        
        # 添加sigmoid层来得到0-1之间的值
        self.sigmoid = nn.Sigmoid()
        
        # 添加reshape层来调整输入维度
        self.reshape_layer = ReshapeLayer()

    def forward(self, x):
        # x shape: [batch_size, 1, 24]
        x = self.reshape_layer(x)  # 转换为 [batch_size, 1, H, W]
        x = self.model(x)
        x = self.sigmoid(x)
        return x

class ReshapeLayer(nn.Module):
    def __init__(self):
        super(ReshapeLayer, self).__init__()
    
    def forward(self, x):
        batch_size = x.size(0)
        x = x.view(batch_size, 1, 24)
        x = torch.nn.functional.pad(x, (0, 1), "constant", 0)
        x = x.view(batch_size, 1, 5, 5)
        return x

# initialize the model, loss function, and optimizer
# Initialize the ResNet18 model
# model = CustomResNet18()
# criterion = nn.BCELoss()  # 使用二元交叉熵损失
# optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


def train_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    # 使用tqdm显示进度条
    train_pbar = tqdm(train_loader, desc='Training')
    for batch_idx, (inputs, targets) in enumerate(train_pbar):
        inputs, targets = inputs.to(device), targets.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs).squeeze()
        loss = criterion(outputs, targets)
        
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        
        # 计算准确率
        predictions = (outputs >= 0.5).float()
        correct += (predictions == targets).sum().item()
        total += targets.size(0)
        
        # 更新进度条信息
        train_pbar.set_postfix({
            'loss': f'{total_loss/(batch_idx+1):.4f}',
            'acc': f'{100.*correct/total:.2f}%'
        })
    
    return total_loss / len(train_loader), correct / total

def validate(model, val_loader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():
        val_pbar = tqdm(val_loader, desc='Validation')
        for batch_idx, (inputs, targets) in enumerate(val_pbar):
            inputs, targets = inputs.to(device), targets.to(device)
            
            outputs = model(inputs).squeeze()
            loss = criterion(outputs, targets)
            
            total_loss += loss.item()
            predictions = (outputs >= 0.5).float()
            correct += (predictions == targets).sum().item()
            total += targets.size(0)
            
            val_pbar.set_postfix({
                'loss': f'{total_loss/(batch_idx+1):.4f}',
                'acc': f'{100.*correct/total:.2f}%'
            })
    
    return total_loss / len(val_loader), correct / total

torch.manual_seed(42)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = CustomResNet18().to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
epochs = 100
best_acc = 0
for epoch in range(epochs):
    print(f'\nEpoch {epoch+1}/{epochs}')
    
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc = validate(model, val_loader, criterion, device)
    
    print(f'Train Loss: {train_loss:.4f} | Train Acc: {train_acc*100:.2f}%')
    print(f'Val Loss: {val_loss:.4f} | Val Acc: {val_acc*100:.2f}%')
    
    if val_acc > best_acc:
        best_acc = val_acc
        torch.save(model.state_dict(), 'best_model_24.pth')
        print('Saved best model!')


# set the number of epochs
# epochs = 100
# best_acc = 0

# for epoch in range(epochs):
#     """ Training """
#     model.train()

#     # forward pass
#     correct = 0
#     total = 0
#     train_loss = 0
#     for X_batch, y_batch in train_loader:
#         # ResNet18 expects 3-channel images, so we need to expand our data
#         print(X_batch.shape)
#         output = model(X_batch)
#         print("!!!!!!!!")
#         print(output.shape)
#         print(output)
#         y_preds = output.squeeze()

#         correct += torch.eq(y_preds, y_batch).sum().item()
#         total += len(y_batch)

#         loss = loss_fn(output, y_batch)
#         train_loss += loss.item()
#         # zero the optimizer
#         optimizer.zero_grad()
#         # backpropagation
#         loss.backward()
#         # Gradient Descent
#         optimizer.step()

#     """ Validation """
#     model.eval()
#     correct = 0
#     total = 0
#     val_loss = 0
#     with torch.inference_mode():
#         for X_batch, y_batch in val_loader:
#             # ResNet18 expects 3-channel images, so we need to expand our data
#             output = model(X_batch).squeeze()
#             y_preds = torch.round(torch.sigmoid(output))

#             correct += torch.eq(y_preds, y_batch).sum().item()
#             total += len(y_batch)

#             loss = loss_fn(output, y_batch)
#             val_loss += loss.item()
        
#         val_acc = (correct / total) * 100
#         if val_acc > best_acc:
#             best_acc = val_acc
#             torch.save(model.state_dict(), "n-15best.pth")

#     if epoch % 10 == 0:
#         print(f"Epoch: {epoch} | Train Loss: {train_loss:.5f} | Acc: {train_acc:.2f}% | Learning Rate: {scheduler.get_last_lr()[0]:.7f} | Val loss: {val_loss:.5f} | Val Acc: {val_acc:.2f}%")
#     train_acc = (correct / total) * 100