In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory
import torch
import torch.nn as nn
import os
from sklearn.model_selection import train_test_split
import torchvision 
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torch.autograd import Variable
import torch.nn.functional as F
import torch.nn.init as init

# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
if torch.cuda.is_available():
    torch.cuda.set_device(0)  # Set the current device to the first GPU
    print("Using GPU")
else:
    print("Using CPU")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class VGG16_MRI(nn.Module):
    def __init__(self, num_classes=2):
        super(VGG16_MRI, self).__init__()
        # Load a pre-trained VGG16 model with batch normalization
        model = torchvision.models.vgg16_bn(pretrained=True)
        
        # Change the first convolutional layer to accept single-channel (grayscale) input
        model.features[0] = nn.Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        
        # Retain the feature extraction layers
        self.feature = model.features
        
        # Define the feature dimension based on output size for 240x240 input
        # VGG16 feature output will be (512, 7, 7) for 224x224, so we calculate for 240x240
        self.feat_dim = 512 * 7 * 7  # Update this if output size changes with input size
        
        # Adjust the number of classes for binary classification (0 or 1)
        self.num_classes = num_classes
        
        # Batch normalization layer
        self.bn = nn.BatchNorm1d(self.feat_dim)
        self.bn.bias.requires_grad_(False)  # no shift
        
        # Fully connected layer to map features to the number of classes
        self.fc_layer = nn.Linear(self.feat_dim, self.num_classes)
        
        self.model = model
            
    def forward(self, x):
        # Pass input through feature extraction layers
        feature = self.feature(x)
        feature = feature.view(feature.size(0), -1)  # Flatten the feature map
        feature = self.bn(feature)  # Apply batch normalization
        res = self.fc_layer(feature)  # Output class scores
        
        return feature, res

    def predict(self, x):
        # Pass input through feature extraction layers
        feature = self.feature(x)
        feature = feature.view(feature.size(0), -1)  # Flatten the feature map
        feature = self.bn(feature)  # Apply batch normalization
        res = self.fc_layer(feature)  # Output class scores

        return res

Using GPU


In [3]:
# Dataset Class
class MRIDataset(Dataset):
    def __init__(self, df, data_dir, transform=None):
        self.df = df
        self.data_dir = data_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_name = os.path.join(self.data_dir, self.df.iloc[idx]['filename'])
        image = Image.open(img_name).convert('L')  # Convert to grayscale
        label = int(self.df.iloc[idx]['label'])

        if self.transform:
            image = self.transform(image)

        return image, label

# Define image transformations (normalization can be adjusted based on data needs)
transform = transforms.Compose([
    transforms.Resize((240, 240)),  # Ensure image size is 240x240
    transforms.ToTensor()
])

In [4]:
def load_pretrained_classifier(path=None):
    if path is None:
        path = "/kaggle/input/brats23-classifier/pytorch/default/1/classifier.pt"
    model = VGG16_MRI(num_classes=2)
    model.load_state_dict(torch.load(path))
    model.eval()
    return model

In [5]:
def create_directory_if_not_exists(directory_path):
    try:
        os.makedirs(directory_path, exist_ok=True)
        print(f"Directory created successfully: {directory_path}")
    except OSError as error:
        print(f"Error creating directory: {error}")

In [6]:
# utils.py
def freeze(net):
    for p in net.parameters():
        p.requires_grad_(False) 

def unfreeze(net):
    for p in net.parameters():
        p.requires_grad_(True)

In [7]:
import torch.nn as nn

class Generator(nn.Module):
    def __init__(self, in_dim=100, dim=64):
        super(Generator, self).__init__()
        
        def dconv_bn_relu(in_dim, out_dim):
            return nn.Sequential(
                nn.ConvTranspose2d(in_dim, out_dim, 5, 2, padding=2, output_padding=1, bias=False),
                nn.BatchNorm2d(out_dim),
                nn.ReLU())
        
        # Fully connected layer to expand noise to a larger size
        self.l1 = nn.Sequential(
            nn.Linear(in_dim, dim * 8 * 15 * 15, bias=False),
            nn.BatchNorm1d(dim * 8 * 15 * 15),
            nn.ReLU())

        # Deconvolutional layers for upsampling to 240x240
        self.l2_5 = nn.Sequential(
            dconv_bn_relu(dim * 8, dim * 4),   # 15x15 -> 30x30
            dconv_bn_relu(dim * 4, dim * 2),   # 30x30 -> 60x60
            dconv_bn_relu(dim * 2, dim),       # 60x60 -> 120x120
            nn.ConvTranspose2d(dim, 1, 5, 2, padding=2, output_padding=1),  # 120x120 -> 240x240
            nn.Sigmoid())  # Output pixel values in range [0, 1]

    def forward(self, x):
        y = self.l1(x)
        y = y.view(y.size(0), -1, 15, 15)
        y = self.l2_5(y)
        return y

In [8]:
# Discriminator discri.py 
class MinibatchDiscrimination(nn.Module):
    def __init__(self, in_features, out_features, kernel_dims, mean=False):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.kernel_dims = kernel_dims
        self.mean = mean
        self.T = nn.Parameter(torch.Tensor(in_features, out_features, kernel_dims))
        init.normal(self.T, 0, 1)

    def forward(self, x):
        # x is NxA
        # T is AxBxC
        matrices = x.mm(self.T.view(self.in_features, -1))
        matrices = matrices.view(-1, self.out_features, self.kernel_dims)

        M = matrices.unsqueeze(0)  # 1xNxBxC
        M_T = M.permute(1, 0, 2, 3)  # Nx1xBxC
        norm = torch.abs(M - M_T).sum(3)  # NxNxB
        expnorm = torch.exp(-norm)
        o_b = (expnorm.sum(0) - 1)   # NxB, subtract self distance
        if self.mean:
            o_b /= x.size(0) - 1

        x = torch.cat([x, o_b], 1)
        return x

class MinibatchDiscriminator(nn.Module):
    def __init__(self,in_dim=1, dim=64, n_classes=1000):
        super(MinibatchDiscriminator, self).__init__()
        self.n_classes = n_classes

        def conv_ln_lrelu(in_dim, out_dim, k, s, p):
            return nn.Sequential(
                nn.Conv2d(in_dim, out_dim, k, s, p),
                # Since there is no effective implementation of LayerNorm,
                # we use InstanceNorm2d instead of LayerNorm here.
                nn.InstanceNorm2d(out_dim, affine=True),
                nn.LeakyReLU(0.2))

        self.layer1 = conv_ln_lrelu(in_dim, dim, 5, 2, 2)
        self.layer2 = conv_ln_lrelu(dim, dim*2, 5, 2, 2)
        self.layer3 = conv_ln_lrelu(dim*2, dim*4, 5, 2, 2)
        self.layer4 = conv_ln_lrelu(dim*4, dim*4, 3, 2, 1)
        self.mbd1 = MinibatchDiscrimination(57600, 64, 50)
        self.fc_layer = nn.Linear(57600+64, self.n_classes)

    def forward(self, x):
        out = []
        bs = x.shape[0]
        feat1 = self.layer1(x)
        out.append(feat1)
        feat2 = self.layer2(feat1)
        out.append(feat2)
        feat3 = self.layer3(feat2)
        out.append(feat3)
        feat4 = self.layer4(feat3)
        out.append(feat4)
        feat = feat4.view(bs, -1)
        # print('feat:', feat.shape)
        mb_out = self.mbd1(feat)   # Nx(A+B)
        y = self.fc_layer(mb_out)
        
        return feat, y

In [9]:
def get_GAN(n_classes, z_dim, Pretrained = False):

    # if Pretrained :
        # G= 
    # else :
    G = Generator(z_dim)
    D = MinibatchDiscriminator(n_classes=n_classes)
    
    G = torch.nn.DataParallel(G).to(device)
    D = torch.nn.DataParallel(D).to(device)
    if Pretrained:
#         root_path = "/kaggle/working/attack_results"
        dataset_name = "BraTS23"
#         mode_name_T = "VGG16_MRI"
#         path = os.path.join(root_path, os.path.join(dataset_name, model_name_T))
#         path = "/kaggle/input/brats23-gan-epoch25/pytorch/default/1"
        path = "/kaggle/input/brats23-gan-epoch75/pytorch/default/1/attack_results/BraTS23/VGG16_MRI"
        # path = os.path.join(os.path.join(gan_model_dir, dataset), target_model)
        path_G = os.path.join(path, "ep75_improved_{}_G.pt".format(dataset_name))
        path_D = os.path.join(path, "ep75_improved_{}_D.pt".format(dataset_name))
        ckp_G = torch.load(path_G)
        G.load_state_dict(ckp_G['state_dict'], strict=True)
        ckp_D = torch.load(path_D)
        D.load_state_dict(ckp_D['state_dict'], strict=True)
        print("Loaded Pretrained Model (Specific GAN)")
    
    return G, D

In [10]:
def get_augmodel():
    # model = pretrained_VGG_MRI_model
    model = load_pretrained_classifier()
    model = torch.nn.DataParallel(model).cuda()
    return model

In [11]:
import time

def init_dataloader(df = None, data_dir="/kaggle/input/preprocessed-brats23/Images", batch_size=64, mode="gan", transform=None, iterator=False):
    tf = time.time()
    if df is None : 
        df = pd.read_csv("/kaggle/input/preprocessed-brats23/labels.csv")
        df,_ = train_test_split(df, test_size=0.4, stratify=df['label'], random_state=42)
    # Define shuffle based on mode (assuming "attack" mode does not shuffle data)
    shuffle_flag = False if mode == "attack" else True

    # Initialize the dataset with the MRIDataset class
    # Define image transformations (normalization can be adjusted based on data needs)
    transform = transforms.Compose([
        transforms.Resize((240, 240)),  # Ensure image size is 240x240
        transforms.ToTensor()
    ])

    dataset = MRIDataset(df=df, data_dir=data_dir, transform=transform)

    # Create the DataLoader
    if iterator:
        data_loader = DataLoader(dataset,
                                 batch_size=batch_size,
                                 shuffle=shuffle_flag,
                                 drop_last=True,
                                 num_workers=0,
                                 pin_memory=True).__iter__()
    else:
        data_loader = DataLoader(dataset,
                                 batch_size=batch_size,
                                 shuffle=shuffle_flag,
                                 drop_last=True,
                                 num_workers=2,
                                 pin_memory=True)

    interval = time.time() - tf
    print(f'Initializing data loader took {interval:.2f} seconds')
    
    return dataset, data_loader


In [12]:
import torchvision.utils as tvls
def save_tensor_images(images, filename, nrow = None, normalize = True):
    if not nrow:
        tvls.save_image(images, filename, normalize = normalize, padding=0)
    else:
        tvls.save_image(images, filename, normalize = normalize, nrow=nrow, padding=0)

class HLoss(nn.Module):
    def __init__(self):
        super(HLoss, self).__init__()

    def forward(self, x):
        b = F.softmax(x, dim=1) * F.log_softmax(x, dim=1)
        b = -1.0 * b.sum()
        return b

# define "soft" cross-entropy with pytorch tensor operations
def softXEnt (input, target):
    targetprobs = nn.functional.softmax (target, dim = 1)
    logprobs = nn.functional.log_softmax (input, dim = 1)
    return  -(targetprobs * logprobs).sum() / input.shape[0]

def log_sum_exp(x, axis = 1):
    m = torch.max(x, dim = 1)[0]
    return m + torch.log(torch.sum(torch.exp(x - m.unsqueeze(1)), dim = axis))

def frequency_penalty_loss(f_imgs):
    # Apply 2D Fourier Transform and shift zero-frequency component to center
    f_imgs_fft = torch.fft.fft2(f_imgs)
    f_imgs_fft_shifted = torch.fft.fftshift(f_imgs_fft)
    
    # Mask to isolate high-frequency components (outer regions)
    h, w = f_imgs_fft_shifted.shape[-2:]
    mask = torch.ones_like(f_imgs_fft_shifted, dtype=torch.bool)
    mask[:, :, h//4:-h//4, w//4:-w//4] = 0  # Central low-frequency area masked out
    
    # Calculate the mean magnitude of high-frequency components
    high_freq_magnitude = torch.abs(f_imgs_fft_shifted[mask]).mean()
    return -high_freq_magnitude  # Negate to reward higher high-frequency content

def train_specific_gan():

    # Hyperparams
    file_path = None
    model_name_T = "VGG16_MRI"
    lr = 0.0002
    batch_size = 64
    z_dim = 100
    epochs = 10
    n_critic = 5
    dataset_name = "BraTS23"
    

    # Create save folders
    root_path = "/kaggle/working/attack_results"
    save_model_dir = os.path.join(root_path, os.path.join(dataset_name, model_name_T))
    save_img_dir = os.path.join(save_model_dir, "imgs")
    os.makedirs(save_model_dir, exist_ok=True)
    os.makedirs(save_img_dir, exist_ok=True)


    # Load target model
    T = get_augmodel()

    # Dataset
    data_dir = "/kaggle/input/preprocessed-brats23/Images"
    dataset, dataloader = init_dataloader(df=None,data_dir=data_dir,batch_size=batch_size)
    
    # Start Training
    print("Training GAN for %s" % model_name_T)

#     G = Generator(z_dim)
#     DG = MinibatchDiscriminator(n_classes = 2)
    
#     G = torch.nn.DataParallel(G).cuda()
#     DG = torch.nn.DataParallel(DG).cuda()
    G,DG = get_GAN(2,100,True)
    dg_optimizer = torch.optim.Adam(DG.parameters(), lr=0.0002, betas=(0.5, 0.999))
    g_optimizer = torch.optim.Adam(G.parameters(), lr=0.0002, betas=(0.5, 0.999))

    entropy = HLoss()

    step = 0
    for epoch in range(epochs):
        start = time.time()
        _, unlabel_loader1 = init_dataloader(df=None,data_dir = data_dir,batch_size = batch_size, mode="gan",iterator=True)
        _,unlabel_loader2 = init_dataloader(df=None,data_dir =data_dir, batch_size =batch_size, mode="gan",iterator=True)
        for i, (imgs,label) in enumerate(dataloader):
            current_iter = epoch * len(dataloader) + i + 1

            step += 1
            imgs = imgs.cuda()
            bs = imgs.size(0)
            x_unlabel_t = next(unlabel_loader1)
            x_unlabel2_t = next(unlabel_loader2)
            
            freeze(G)
            unfreeze(DG)

            z = torch.randn(bs, z_dim).cuda()
            f_imgs = G(z)

            y_prob = T(imgs)[-1]
            y = torch.argmax(y_prob, dim=1).view(-1)
            
            x_unlabel = x_unlabel_t[0]
            x_unlabel2 = x_unlabel2_t[0]
            _, output_label = DG(imgs)
            _, output_unlabel = DG(x_unlabel)
            _, output_fake =  DG(f_imgs)

            loss_lab = softXEnt(output_label, y_prob)
            loss_unlab = 0.5*(torch.mean(F.softplus(log_sum_exp(output_unlabel)))-torch.mean(log_sum_exp(output_unlabel))+torch.mean(F.softplus(log_sum_exp(output_fake))))
            dg_loss = loss_lab + loss_unlab
            
            acc = torch.mean((output_label.max(1)[1] == y).float())
            
            dg_optimizer.zero_grad()
            dg_loss.backward()
            dg_optimizer.step()

            # train G
            if step % n_critic == 0:
                freeze(DG)
                unfreeze(G)
                z = torch.randn(bs, z_dim).cuda()
                f_imgs = G(z)
                mom_gen, output_fake = DG(f_imgs)
                mom_unlabel, _ = DG(x_unlabel2)

                mom_gen = torch.mean(mom_gen, dim = 0)
                mom_unlabel = torch.mean(mom_unlabel, dim = 0)

                Hloss = entropy(output_fake)
                f_loss = sobel_edge_loss(f_imgs)
                g_loss = torch.mean((mom_gen - mom_unlabel).abs()) + 1e-4 * Hloss + 1e-4 *f_loss

                g_optimizer.zero_grad()
                g_loss.backward()
                g_optimizer.step()
#                 torch.cuda.empty_cache()

        end = time.time()
        interval = end - start
        
        print("Epoch:%d \tTime:%.2f\tG_loss:%.2f\t train_acc:%.2f" % (epoch, interval, g_loss, acc))

        torch.save({'state_dict':G.state_dict()}, os.path.join(save_model_dir, "ep85_improved_loss_{}_G.pt".format(dataset_name)))
        torch.save({'state_dict':DG.state_dict()}, os.path.join(save_model_dir, "ep85_improved_loss_{}_D.pt".format(dataset_name)))

        if (epoch+1) % 5 == 0:
            z = torch.randn(32, z_dim).cuda()
            fake_image = G(z)
            save_tensor_images(fake_image.detach(), os.path.join(save_img_dir, "improved_BraTS23_img_{}.png".format(epoch)), nrow = 8)

def sobel_edge_loss(f_imgs):
    sobel_x = torch.tensor([[1, 0, -1], [2, 0, -2], [1, 0, -1]], dtype=torch.float32, device=f_imgs.device).unsqueeze(0).unsqueeze(0)
    sobel_y = torch.tensor([[1, 2, 1], [0, 0, 0], [-1, -2, -1]], dtype=torch.float32, device=f_imgs.device).unsqueeze(0).unsqueeze(0)
    
    # Apply Sobel filter to extract edges in both x and y directions
    edges_x = F.conv2d(f_imgs, sobel_x, padding=1)
    edges_y = F.conv2d(f_imgs, sobel_y, padding=1)
    
    # Combine edge intensities
    edge_magnitude = torch.sqrt(edges_x * 2 + edges_y * 2)
    
    # The higher the edge magnitude, the sharper the image details
    return -torch.mean(edge_magnitude)  # Negate to encourage sharper details

In [13]:
train_specific_gan()

Downloading: "https://download.pytorch.org/models/vgg16_bn-6c64b313.pth" to /root/.cache/torch/hub/checkpoints/vgg16_bn-6c64b313.pth
100%|██████████| 528M/528M [00:02<00:00, 225MB/s]
  model.load_state_dict(torch.load(path))


Initializing data loader took 0.15 seconds
Training GAN for VGG16_MRI


  init.normal(self.T, 0, 1)
  ckp_G = torch.load(path_G)
  ckp_D = torch.load(path_D)


Loaded Pretrained Model (Specific GAN)
Initializing data loader took 0.10 seconds
Initializing data loader took 0.09 seconds


  with torch.cuda.device(device), torch.cuda.stream(stream), autocast(enabled=autocast_enabled):
  return F.linear(input, self.weight, self.bias)


Epoch:0 	Time:1836.44	G_loss:nan	 train_acc:0.73
Initializing data loader took 0.09 seconds
Initializing data loader took 0.09 seconds


  with torch.cuda.device(device), torch.cuda.stream(stream), autocast(enabled=autocast_enabled):


Epoch:1 	Time:1621.85	G_loss:nan	 train_acc:0.50
Initializing data loader took 0.10 seconds
Initializing data loader took 0.09 seconds


  with torch.cuda.device(device), torch.cuda.stream(stream), autocast(enabled=autocast_enabled):


Epoch:2 	Time:1630.79	G_loss:nan	 train_acc:0.75
Initializing data loader took 0.09 seconds
Initializing data loader took 0.09 seconds


  with torch.cuda.device(device), torch.cuda.stream(stream), autocast(enabled=autocast_enabled):


Epoch:3 	Time:1608.68	G_loss:nan	 train_acc:0.67
Initializing data loader took 0.10 seconds
Initializing data loader took 0.10 seconds


  with torch.cuda.device(device), torch.cuda.stream(stream), autocast(enabled=autocast_enabled):


Epoch:4 	Time:1622.05	G_loss:nan	 train_acc:0.58
Initializing data loader took 0.09 seconds
Initializing data loader took 0.09 seconds


  with torch.cuda.device(device), torch.cuda.stream(stream), autocast(enabled=autocast_enabled):


Epoch:5 	Time:1633.28	G_loss:nan	 train_acc:0.73
Initializing data loader took 0.10 seconds
Initializing data loader took 0.09 seconds


  with torch.cuda.device(device), torch.cuda.stream(stream), autocast(enabled=autocast_enabled):


Epoch:6 	Time:1625.04	G_loss:nan	 train_acc:0.59
Initializing data loader took 0.10 seconds
Initializing data loader took 0.09 seconds


  with torch.cuda.device(device), torch.cuda.stream(stream), autocast(enabled=autocast_enabled):


Epoch:7 	Time:1608.31	G_loss:nan	 train_acc:0.62
Initializing data loader took 0.10 seconds
Initializing data loader took 0.09 seconds


  with torch.cuda.device(device), torch.cuda.stream(stream), autocast(enabled=autocast_enabled):


Epoch:8 	Time:1604.99	G_loss:nan	 train_acc:0.70
Initializing data loader took 0.11 seconds
Initializing data loader took 0.09 seconds


  with torch.cuda.device(device), torch.cuda.stream(stream), autocast(enabled=autocast_enabled):


Epoch:9 	Time:1591.89	G_loss:nan	 train_acc:0.64


In [14]:
def get_GAN2(n_classes,z_dims):
    G = Generator(z_dims)
    D = MinibatchDiscriminator(n_classes=n_classes)
    
    G = torch.nn.DataParallel(G).to(device)
    D = torch.nn.DataParallel(D).to(device)
#     root_path = "/kaggle/working/attack_results"
#     save_model_dir = os.path.join(root_path, os.path.join("BraTS23", "VGG16_MRI"))
    path = "/kaggle/input/brats23-gan-epoch50/pytorch/default/1"
    path_G = os.path.join(path, "new_improved_{}_G.pt".format("BraTS23"))
    path_D = os.path.join(path, "new_improved_{}_D.pt".format("BraTS23"))
    ckp_G = torch.load(path_G)
    G.load_state_dict(ckp_G['state_dict'], strict=True)
    ckp_D = torch.load(path_D)
    D.load_state_dict(ckp_D['state_dict'], strict=True)
    return G,D
    

In [15]:
# Assuming the modified Generator class is already defined as `Generator240x240`
def test_gan():
#         generator = GeneratorMRI(in_dim=100, dim=64)
        generator,_ = get_GAN2(2,100)
        generator.eval()
        noise = torch.randn(1, 100)
        with torch.no_grad():
            generated_image = generator(noise)
        generated_image = generated_image.squeeze(0).cpu().numpy()
        print(generated_image.shape)
        # Convert the generated image to a 2D array
        generated_image = np.squeeze(generated_image)  # Remove the channel dimension for grayscale

        # Plot the generated image
        plt.imshow(generated_image, cmap='gray')
        plt.axis('off')  # Turn off axis labels
        plt.show()
# test_gan()

In [16]:
# G,D = get_GAN2(2,100)

In [17]:
def test_models(G):
        generator = G
        generator.eval()
        noise = torch.randn(1, 100)
        with torch.no_grad():
            generated_image = generator(noise)
        generated_image = generated_image.squeeze(0).cpu().numpy()
#         print(generated_image.shape)
        # Convert the generated image to a 2D array
        generated_image = np.squeeze(generated_image)  # Remove the channel dimension for grayscale

        # Plot the generated image
        plt.imshow(generated_image, cmap='gray')
        plt.axis('off')  # Turn off axis labels
        plt.show()

In [18]:
# for i in range(5):
#     test_models(G)

In [19]:
# from PIL import Image
# from IPython.display import display

# image_path = '/kaggle/input/preprocessed-brats23/Images/BraTS-GLI-00000-000_slice083.png'
# img = Image.open(image_path)
# display(img)