In [4]:
# import gdown
# file_id = '1Y2cTYR_t_10NAbznspE5bBjuATPdTgtq'
# gdown.download(f'https://drive.google.com/uc?export=download&id={file_id}', 'file.zip', quiet=False)

In [5]:
# import zipfile
# import os

# zip_path = 'file.zip'
# with zipfile.ZipFile(zip_path, 'r') as zip_ref:
#     extract_path = 'extracted_files'
#     zip_ref.extractall(extract_path)

# extracted_files = os.listdir(extract_path)
# print(f"Extracted files and directories: {extracted_files}")

<h2>Importing data</h2>

In [1]:
# Importing data 

import os
import numpy as np
from collections import Counter
from sklearn.model_selection import train_test_split


data_dir = "AISContest_Data"
classes = [0, 1, 2, 3, 4]


data = []
labels = []

for label in classes:
    class_dir = os.path.join(data_dir, str(label))
    files = os.listdir(class_dir)
    for file in files:
        file_path = os.path.join(class_dir, file)
        data.append(np.load(file_path))
        labels.append(label)


data = np.array(data)
labels = np.array(labels)

print(len(labels))

class_counts = Counter(labels)
print("Count of samples in each class:")
for cls, count in class_counts.items():
    print(f"class {cls}: {count} samples")


X_train, X_temp, y_train, y_temp = train_test_split(data, labels, test_size=0.3, stratify=labels, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, stratify=y_temp, random_state=42)


from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train.reshape(-1, X_train.shape[-1])).reshape(X_train.shape)
X_val = scaler.transform(X_val.reshape(-1, X_val.shape[-1])).reshape(X_val.shape)
X_test = scaler.transform(X_test.reshape(-1, X_test.shape[-1])).reshape(X_test.shape)

# X_train = X_train / 255.0
# X_val = X_val / 255.0
# X_test = X_test / 255.0

import cv2

X_train = np.array([cv2.resize(img, (224, 224)) for img in X_train])
X_val = np.array([cv2.resize(img, (224, 224)) for img in X_val])
X_test = np.array([cv2.resize(img, (224, 224)) for img in X_test])


print(f"\nTraining data: {len(X_train)} samples")
print(f"Validation data: {len(X_val)} samples")
print(f"Test data: {len(X_test)} samples")





5813
Count of samples in each class:
class 0: 2906 samples
class 1: 237 samples
class 2: 1755 samples
class 3: 497 samples
class 4: 418 samples

Training data: 4069 samples
Validation data: 872 samples
Test data: 872 samples


<h2>Calculating weights for each class</h2>

In [18]:
from imblearn.over_sampling import RandomOverSampler

# تغییر شکل داده‌ها به شکل 2D برای resampling
X_train_flat = X_train.reshape(X_train.shape[0], -1)
X_train_flat = X_train_flat.astype(np.float32)

# انجام oversampling
ros = RandomOverSampler()
X_resampled, y_resampled = ros.fit_resample(X_train_flat, y_train)

# بازگرداندن به شکل اصلی
X_train = X_resampled.reshape(-1, X_train.shape[1], X_train.shape[2])
y_train = y_resampled

print(f"Oversampling انجام شد - داده‌های جدید: {Counter(y_train)}")

Oversampling انجام شد - داده‌های جدید: Counter({np.int64(0): 2034, np.int64(2): 2034, np.int64(3): 2034, np.int64(4): 2034, np.int64(1): 2034})


In [19]:
# clculating weights for each class

import torch

class_counts = Counter(y_train)
num_classes = len(classes)
total_samples = len(y_train)
class_weights = {cls: total_samples / (num_classes * count) for cls, count in class_counts.items()}


weights = torch.tensor([class_weights[i] for i in range(num_classes)], dtype=torch.float)

print("Weights for each class:")
for cls, weight in class_weights.items():
    print(f"Class {cls}: {weight:.4f}")     

Weights for each class:
Class 0: 1.0000
Class 2: 1.0000
Class 3: 1.0000
Class 4: 1.0000
Class 1: 1.0000


<h2>Creating Model</h2>

In [20]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.models import resnet50
from torchvision.models import ResNet50_Weights


class SleepStageClassifier(nn.Module):
    def __init__(self, num_classes=5):
        super(SleepStageClassifier, self).__init__()
        self.pretrained_model = resnet50(weights='ResNet50_Weights.DEFAULT')
        self.pretrained_model.fc = nn.Linear(self.pretrained_model.fc.in_features, num_classes)
        self.pretrained_model.conv1 = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        for param in self.pretrained_model.parameters():
            param.requires_grad = False
        
        # فاین‌تیون کردن لایه‌های آخر
        for param in self.pretrained_model.layer4.parameters():
            param.requires_grad = True
            
        for param in self.pretrained_model.layer3.parameters():
            param.requires_grad = True
    
        
        # تغییر لایه Fully Connected نهایی
        self.pretrained_model.fc = nn.Sequential(
            nn.Linear(self.pretrained_model.fc.in_features, 512),
            nn.ReLU(),
            nn.Dropout(0.7),
            nn.Linear(512, num_classes)
        )
    def forward(self, x):
        return self.pretrained_model(x)
    


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SleepStageClassifier(num_classes=len(classes)).to(device)

def init_weights(m):
    if isinstance(m, nn.Conv2d):
        nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
    elif isinstance(m, nn.Linear):
        nn.init.xavier_uniform_(m.weight)
        nn.init.zeros_(m.bias)

model.apply(init_weights)


class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.ce = nn.CrossEntropyLoss()

    def forward(self, inputs, targets):
        ce_loss = self.ce(inputs, targets)
        p_t = torch.exp(-ce_loss)
        focal_loss = self.alpha * (1 - p_t) ** self.gamma * ce_loss
        return focal_loss.mean()

class_weights = torch.tensor([0.4, 4.9, 0.6, 2.3, 2.7], dtype=torch.float).to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)

criterion = FocalLoss(alpha=0.5, gamma=2)

optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-4)

#scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
#scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5, verbose=True)
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts
scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=5, T_mult=2)


# def init_weights(m):
#     if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
#         nn.init.xavier_uniform_(m.weight)
#         if m.bias is not None:
#             nn.init.zeros_(m.bias)

# model.apply(init_weights)


print(model)

from torchvision import transforms

train_transforms = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.RandomResizedCrop(224, scale=(0.7, 1.0)),  # افزایش تغییر سایز
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),  # تغییر بیشتر رنگ
    transforms.RandomAffine(degrees=15, translate=(0.1, 0.1)),  # Translation
    transforms.ToTensor()
])






SleepStageClassifier(
  (pretrained_model): ResNet(
    (conv1): Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (downsample): 

In [21]:
from torch.utils.data import DataLoader, TensorDataset
import torch


X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)

X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val, dtype=torch.long)

X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)


train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)


batch_size = 32

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
from torch.utils.data import WeightedRandomSampler


class_sample_counts = Counter(y_train)
weights = np.array([1.0 / class_sample_counts[y] for y in y_train])
sample_weights = torch.tensor(weights, dtype=torch.float)
sampler = WeightedRandomSampler(weights=sample_weights, num_samples=len(sample_weights), replacement=True)
train_loader = DataLoader(train_dataset, batch_size=32, sampler=sampler)


print(f"Number of batches in train_loader: {len(train_loader)}")
print(f"Number of batches in val_loader: {len(val_loader)}")
print(f"Number of batches in test_loader: {len(test_loader)}")

Number of batches in train_loader: 318
Number of batches in val_loader: 28
Number of batches in test_loader: 28


In [22]:
from sklearn.metrics import f1_score

# def mixup_data(x, y, alpha=1.0):
#     lam = np.random.beta(alpha, alpha)
#     index = torch.randperm(x.size(0)).to(x.device)
#     mixed_x = lam * x + (1 - lam) * x[index, :]
#     y_a, y_b = y, y[index]
#     return mixed_x, y_a, y_b, lam

# def mixup_criterion(criterion, pred, y_a, y_b, lam):
#     return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

from sklearn.metrics import f1_score

num_epochs = 25

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    # --- Training Loop ---
    for inputs, labels in train_loader:
        inputs, labels = inputs.squeeze(-1).unsqueeze(1).to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    # --- Validation Loop ---
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.squeeze(-1).unsqueeze(1).to(device), labels.to(device)
            outputs = model(inputs)
            
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    # --- Calculate F1-Score ---
    f1 = f1_score(all_labels, all_preds, average=None)  # F1 for each class
    f1_macro = f1_score(all_labels, all_preds, average='macro')  # Macro Average
    f1_weighted = f1_score(all_labels, all_preds, average='weighted')  # Weighted Average
    
    # --- Print Results ---
    print(f"Epoch [{epoch+1}/{num_epochs}]")
    print(f"Loss: {running_loss/len(train_loader):.4f}")
    print("F1-Score per class:", f1)
    print(f"Macro F1-Score: {f1_macro:.4f}")
    print(f"Weighted F1-Score: {f1_weighted:.4f}")
    print("-" * 50)


# num_epochs = 25

# for epoch in range(num_epochs):
#     model.train()
#     running_loss = 0.0
    
#     # --- Training Loop ---
#     for inputs, labels in train_loader:
#         inputs, labels = inputs.squeeze(-1).unsqueeze(1).to(device), labels.to(device)
    
#         #inputs, targets_a, targets_b, lam = mixup_data(inputs, labels)
    
#         optimizer.zero_grad()
#         outputs = model(inputs)
#         loss = mixup_criterion(criterion, outputs, targets_a, targets_b, lam)
    
#         loss.backward()
#         optimizer.step()

    
#     # --- Validation Loop ---
#     model.eval()
#     all_preds = []
#     all_labels = []
    
#     with torch.no_grad():
#         for inputs, labels in val_loader:
#             inputs, labels = inputs.squeeze(-1).unsqueeze(1).to(device), labels.to(device)
#             outputs = model(inputs)
            
#             _, preds = torch.max(outputs, 1)
#             all_preds.extend(preds.cpu().numpy())
#             all_labels.extend(labels.cpu().numpy())
    
#     # --- Calculate F1-Score ---
#     f1 = f1_score(all_labels, all_preds, average=None)  # F1 for each class
#     f1_macro = f1_score(all_labels, all_preds, average='macro')  # Macro Average
#     f1_weighted = f1_score(all_labels, all_preds, average='weighted')  # Weighted Average
    
#     # --- Print Results ---
#     print(f"Epoch [{epoch+1}/{num_epochs}]")
#     print(f"Loss: {running_loss/len(train_loader):.4f}")
#     print("F1-Score per class:", f1)
#     print(f"Macro F1-Score: {f1_macro:.4f}")
#     print(f"Weighted F1-Score: {f1_weighted:.4f}")
#     print("-" * 50)


Epoch [1/25]
Loss: 0.6389
F1-Score per class: [0.01360544 0.03174603 0.45410628 0.09929078 0.        ]
Macro F1-Score: 0.1197
Weighted F1-Score: 0.1540
--------------------------------------------------
Epoch [2/25]
Loss: 0.5160
F1-Score per class: [0.06823028 0.03333333 0.4613918  0.08163265 0.        ]
Macro F1-Score: 0.1289
Weighted F1-Score: 0.1821
--------------------------------------------------
Epoch [3/25]
Loss: 0.5151
F1-Score per class: [0.03090508 0.         0.45123153 0.11976048 0.        ]
Macro F1-Score: 0.1204
Weighted F1-Score: 0.1622
--------------------------------------------------
Epoch [4/25]
Loss: 0.5121
F1-Score per class: [0.62575627 0.         0.02166065 0.13930348 0.        ]
Macro F1-Score: 0.1573
Weighted F1-Score: 0.3313
--------------------------------------------------
Epoch [5/25]
Loss: 0.5028
F1-Score per class: [0.64141414 0.05333333 0.07920792 0.16363636 0.        ]
Macro F1-Score: 0.1875
Weighted F1-Score: 0.3607
------------------------------------

KeyboardInterrupt: 

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in val_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.show()


RuntimeError: Given groups=1, weight of size [64, 1, 7, 7], expected input[1, 256, 224, 224] to have 1 channels, but got 256 channels instead

In [None]:
num_epochs = 25

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    for inputs, labels in train_loader:
        inputs, labels = inputs.squeeze(-1).unsqueeze(1).to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()  # تنظیم نرخ یادگیری
        
        running_loss += loss.item()
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader)}")


KeyboardInterrupt: 