In [None]:
import os
import multiprocessing
import random
import pandas as pd
import numpy as np
from PIL import Image
from sklearn.model_selection import train_test_split

from torchvision import transforms, models
import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn, optim

In [None]:
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
torch.cuda.manual_seed_all(42)

torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

os.environ["PYTHONHASHSEED"] = '42'

# Data prepare

In [None]:
def create_dataframe(data_dir):
    data = []
    for split in ['train', 'test']:
        for label in ['benign', 'malignant']:
            folder = os.path.join(data_dir, split, label)
            for filename in os.listdir(folder):
                if filename.endswith(('.png', '.jpg', '.jpeg')):  # Убедимся, что это изображение
                    filepath = os.path.join(folder, filename)
                    data.append({
                        'path': filepath,
                        'label': 0 if label == 'benign' else 1,
                        'split': split
                    })
    return pd.DataFrame(data)

In [None]:
data_dir = "data"
dataframe = create_dataframe(data_dir)
print(dataframe.head())

In [None]:
class ImageDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        image_path = row['path']
        label = row['label']

        image = Image.open(image_path).convert('RGB')
        if self.transform:
            image = self.transform(image)

        return image, label

In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

def get_dataloader(dataframe, transform, batch_size=32, shuffle=True):
    dataset = ImageDataset(dataframe, transform=transform)
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

train_df = dataframe[dataframe['split'] == 'train'].copy().reset_index(drop=True)
test_df = dataframe[dataframe['split'] == 'test'].copy().reset_index(drop=True)

train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=42)

train_loader = get_dataloader(train_df, transform=transform, batch_size=192, shuffle=True)
val_loader   = get_dataloader(val_df,   transform=transform, batch_size=192, shuffle=False)
test_loader  = get_dataloader(test_df,  transform=transform, batch_size=64, shuffle=False)

In [None]:
for idx, row in dataframe.iterrows():
    image_path = row['path']
    image = Image.open(image_path).convert('RGB')
    image = transform(image)
    torch.save(image, f"processed/{idx}.pt")

# FlexibleResNet

In [None]:
class BasicBlock(nn.Module):
    expansion = 1
    def __init__(self, in_planes, planes, stride=1, activation=nn.ReLU):
        super(BasicBlock, self).__init__()
        self.activation = activation()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride,
                               padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)

        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1,
                               padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        
        if stride != 1 or in_planes != planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes)
            )

    def forward(self, x):
        out = self.activation(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = self.activation(out)
        return out


In [None]:
class Bottleneck(nn.Module):
    expansion = 4
    def __init__(self, in_planes, planes, stride=1, activation=nn.ReLU):
        super(Bottleneck, self).__init__()
        self.activation = activation()

        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)

        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.conv3 = nn.Conv2d(planes, planes * self.expansion,
                               kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(planes * self.expansion)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != planes * self.expansion:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, planes * self.expansion,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes * self.expansion)
            )

    def forward(self, x):
        out = self.activation(self.bn1(self.conv1(x)))
        out = self.activation(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = self.activation(out)
        return out

In [None]:
class FlexibleResNet(nn.Module):
    def __init__(self, block_type, num_blocks_list, 
                 num_classes=2, 
                 activation_name="relu",
                 base_channels=64):
        """
        block_type: класс блока (BasicBlock или Bottleneck)
        num_blocks_list: список [n1, n2, n3, n4] (сколько блоков в каждом из 4-х stage)
        activation_name: "relu" / "tanh" / "sigmoid"
        base_channels: кол-во каналов в первой свёртке
        """
        super(FlexibleResNet, self).__init__()

        act_map = {
            "relu": nn.ReLU,
            "tanh": nn.Tanh,
            "sigmoid": nn.Sigmoid
        }
        self.activation = act_map.get(activation_name, nn.ReLU)

        self.in_planes = base_channels
        self.block = block_type

        self.conv1 = nn.Conv2d(3, self.in_planes, kernel_size=7, stride=2,
                               padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(self.in_planes)
        self.act1 = self.activation()
        self.pool1 = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(self.block, base_channels,      num_blocks_list[0], stride=1)
        self.layer2 = self._make_layer(self.block, base_channels*2,    num_blocks_list[1], stride=2)
        self.layer3 = self._make_layer(self.block, base_channels*4,    num_blocks_list[2], stride=2)
        self.layer4 = self._make_layer(self.block, base_channels*8,    num_blocks_list[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))

        final_planes = (base_channels*8) * (block_type.expansion if hasattr(block_type, 'expansion') else 1)
        self.fc = nn.Linear(final_planes, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        layers = []

        strides = [stride] + [1]*(num_blocks-1)
        for s in strides:
            layers.append(block(self.in_planes, planes, s, activation=self.activation))
            if hasattr(block, 'expansion'):
                self.in_planes = planes * block.expansion
            else:
                self.in_planes = planes
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.pool1(self.act1(self.bn1(self.conv1(x))))
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

In [None]:
def resnet34(num_classes=1000, activation_name="relu"):
    return FlexibleResNet(
        block_type=BasicBlock,
        num_blocks_list=[3, 4, 6, 3],
        num_classes=num_classes,
        activation_name=activation_name,
        base_channels=64
    )

# Train and test functions

In [None]:
def train_model(model, criterion, optimizer, train_loader, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Статистика
        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = 100. * correct / total
    return epoch_loss, epoch_acc

In [None]:
def evaluate_model(model, criterion, test_loader, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)

            outputs = model(images)
            loss = criterion(outputs, labels)

            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    epoch_loss = running_loss / len(test_loader)
    epoch_acc = 100. * correct / total
    return epoch_loss, epoch_acc

In [None]:
from sklearn.metrics import roc_auc_score

def calc_roc_auc_score(model, test_loader, device):
    model.eval()
    all_labels = []
    all_probs = []

    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)

            outputs = model(images)
            
            probs = torch.softmax(outputs, dim=1)[:, 1].cpu().numpy()
            all_probs.extend(probs)
            all_labels.extend(labels.cpu().numpy())

    roc_auc = roc_auc_score(all_labels, all_probs)
    return roc_auc

In [None]:
from sklearn.metrics import precision_recall_curve, auc

def calc_pr_auc_score(model, test_loader, device):
    model.eval()
    all_labels = []
    all_probs = []

    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)

            probs = torch.softmax(outputs, dim=1)[:, 1].cpu().numpy()
            
            all_probs.extend(probs)
            all_labels.extend(labels.cpu().numpy())

    precision, recall, _ = precision_recall_curve(all_labels, all_probs)

    pr_auc = auc(recall, precision)
    return pr_auc


# Train resnet34 with different optimizers

In [None]:
def initialize_model(num_classes=2):
    model = models.resnet34(pretrained=True)
    num_features = model.fc.in_features
    model.fc = nn.Linear(num_features, num_classes)
    return model

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = resnet34(num_classes=2).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

best_test_acc = 0.0
best_model_path = "best_model_sgd.pth"

for epoch in range(1, 26):
    train_loss, train_acc = train_model(model, criterion, optimizer, train_loader, device)
    test_loss, test_acc = evaluate_model(model, criterion, val_loader, device)
    
    if test_acc > best_test_acc:
        best_test_acc = test_acc
        torch.save(model.state_dict(), best_model_path)
        print(f"Лучшее состояние модели сохранено с точностью валидации {best_test_acc:.2f}%")

    print(f"Epoch {epoch}:\n"
          f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.2f}%\n"
          f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.2f}%")

test_loss, test_acc = evaluate_model(model, criterion, test_loader, device)
print(f'\n\nFinal Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.2f}%')

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = resnet34(num_classes=2).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, nesterov=True, momentum=0.9)

best_test_acc = 0.0
best_model_path = "best_model_nag.pth"

for epoch in range(1, 26):
    train_loss, train_acc = train_model(model, criterion, optimizer, train_loader, device)
    test_loss, test_acc = evaluate_model(model, criterion, val_loader, device)
    
    if test_acc > best_test_acc:
        best_test_acc = test_acc
        torch.save(model.state_dict(), best_model_path)
        print(f"Лучшее состояние модели сохранено с точностью валидации {best_test_acc:.2f}%")

    print(f"Epoch {epoch}:\n"
          f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.2f}%\n"
          f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.2f}%")
    
test_loss, test_acc = evaluate_model(model, criterion, test_loader, device)
print(f'\n\nFinal Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.2f}%')

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = resnet34(num_classes=2).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adagrad(model.parameters(), lr=0.005)

best_test_acc = 0.0
best_model_path = "best_model_adagrad.pth"

for epoch in range(1, 50):
    train_loss, train_acc = train_model(model, criterion, optimizer, train_loader, device)
    test_loss, test_acc = evaluate_model(model, criterion, val_loader, device)
    
    if test_acc > best_test_acc:
        best_test_acc = test_acc
        torch.save(model.state_dict(), best_model_path)
        print(f"Лучшее состояние модели сохранено с точностью валидации {best_test_acc:.2f}%")

    print(f"Epoch {epoch}:\n"
          f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.2f}%\n"
          f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.2f}%")
    
test_loss, test_acc = evaluate_model(model, criterion, test_loader, device)
print(f'\n\nFinal Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.2f}%')

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = resnet34(num_classes=2).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.RMSprop(model.parameters(), lr=0.003)

best_test_acc = 0.0
best_model_path = "best_model_rmsprop.pth"

for epoch in range(1, 76):
    train_loss, train_acc = train_model(model, criterion, optimizer, train_loader, device)
    test_loss, test_acc = evaluate_model(model, criterion, val_loader, device)
    
    if test_acc > best_test_acc:
        best_test_acc = test_acc
        torch.save(model.state_dict(), best_model_path)
        print(f"Лучшее состояние модели сохранено с точностью валидации {best_test_acc:.2f}%")

    print(f"Epoch {epoch}:\n"
          f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.2f}%\n"
          f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.2f}%")
    
test_loss, test_acc = evaluate_model(model, criterion, test_loader, device)
print(f'\n\nFinal Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.2f}%')

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = resnet34(num_classes=2).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

best_test_acc = 0.0
best_model_path = "best_model_adam.pth"

for epoch in range(1, 51):
    train_loss, train_acc = train_model(model, criterion, optimizer, train_loader, device)
    test_loss, test_acc = evaluate_model(model, criterion, val_loader, device)
    
    if test_acc > best_test_acc:
        best_test_acc = test_acc
        torch.save(model.state_dict(), best_model_path)
        print(f"Лучшее состояние модели сохранено с точностью валидации {best_test_acc:.2f}%")

    print(f"Epoch {epoch}:\n"
          f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.2f}%\n"
          f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.2f}%")
    
test_loss, test_acc = evaluate_model(model, criterion, test_loader, device)
print(f'\n\nFinal Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.2f}%')

# Metrics on resnet34

In [None]:
model.load_state_dict(torch.load('best_model_adam.pth'))
model.to(device)
None

In [None]:
print(calc_roc_auc_score(model, test_loader, device))
print(calc_pr_auc_score(model, test_loader, device))

In [None]:
model.load_state_dict(torch.load('best_model_sgd.pth'))
model.to(device)
None

In [None]:
print(calc_roc_auc_score(model, test_loader, device))
print(calc_pr_auc_score(model, test_loader, device))

In [None]:
model.load_state_dict(torch.load('best_model_nag.pth'))
model.to(device)
None

In [None]:
print(calc_roc_auc_score(model, test_loader, device))
print(calc_pr_auc_score(model, test_loader, device))

In [None]:
model.load_state_dict(torch.load('best_model_adagrad.pth'))
model.to(device)
None

In [None]:
print(calc_roc_auc_score(model, test_loader, device))
print(calc_pr_auc_score(model, test_loader, device))

In [None]:
model.load_state_dict(torch.load('best_model_rmsprop.pth'))
model.to(device)
None

In [None]:
print(calc_roc_auc_score(model, test_loader, device))
print(calc_pr_auc_score(model, test_loader, device))

# Genetic Algorithm

In [None]:
class ImageDataset(Dataset):
    def __init__(self, dataframe, images_dict, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        label = self.dataframe.iloc[idx]['label']
        index = self.dataframe.index[idx]
        image = images_dict[index]

        return image, label
    
images_dict = {}
for idx, row in dataframe.iterrows():
    images_dict[idx] = torch.load(f'processed/{idx}.pt')
    
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

def get_dataloader(dataframe, images_dict, transform, batch_size=32, shuffle=True):
    dataset = ImageDataset(dataframe, images_dict, transform=transform)
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

train_df = dataframe[dataframe['split'] == 'train'].copy()
test_df = dataframe[dataframe['split'] == 'test'].copy()

train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=42)

train_loader = get_dataloader(train_df, images_dict, transform=transform, batch_size=64, shuffle=True)
val_loader   = get_dataloader(val_df,   images_dict, transform=transform, batch_size=64, shuffle=False)
test_loader  = get_dataloader(test_df,  images_dict, transform=transform, batch_size=64, shuffle=False)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = FlexibleResNet(
    block_type=Bottleneck,
    num_blocks_list=[2, 3, 2, 4],
    num_classes=2,
    activation_name='relu',
    base_channels=64
).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.005, nesterov=True, momentum=0.9)

best_test_acc = 0.0
best_model_path = "resnet_ga_nag.pth"

for epoch in range(1, 26):
    train_loss, train_acc = train_model(model, criterion, optimizer, train_loader, device)
    test_loss, test_acc = evaluate_model(model, criterion, val_loader, device)

    if test_acc > best_test_acc:
        best_test_acc = test_acc
        torch.save(model.state_dict(), best_model_path)
        print(f"Лучшее состояние модели сохранено с точностью валидации {best_test_acc:.2f}%")

    print(f"Epoch {epoch}:\n"
          f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.2f}%\n"
          f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.2f}%")

    test_loss, test_acc = evaluate_model(model, criterion, test_loader, device)
    print(f'\n\nFinal Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.2f}%')

In [None]:
model.load_state_dict(torch.load('resnet_ga_nag.pth'))
model.to(device)
None

In [None]:
print(calc_roc_auc_score(model, test_loader, device))
print(calc_pr_auc_score(model, test_loader, device))

In [None]:
test_loss, test_acc = evaluate_model(model, criterion, test_loader, device)
print(f'\n\nFinal Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.2f}%')