In [1]:
import torch
from transformers import ResNetConfig
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
import transformers
from torchvision import transforms
from resnet import ResNetForMultiLabel
from resnet import OrganAMNISTDataset, compute_metrics, train_model, evaluate_model
import random
import numpy as np
import os 

In [2]:
#SET SEEDS
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)

<torch._C.Generator at 0x219085c7530>

#### Import NPZ by concatenating

In [10]:
import os

def import_data(directory, save_path=None, save=False):

    data = []
    for filename in os.listdir(directory):
        if filename.endswith('.npz'):
            file_path = os.path.join(directory, filename)
            loaded_data = np.load(file_path)
            data.append(loaded_data)

    # concatenate the data from all files
    all_data = {}
    for key in data[0].keys():
        all_data[key] = np.concatenate([d[key] for d in data], axis=0)

    # check the shape of the concatenated data
    for key, value in all_data.items():
        print(f"{key}: {value.shape}")  

    if save:
        if save_path is None:
            save_path = f'datasets/{directory}_concatenated_data.npz'
        np.savez(save_path, **all_data)
        print(f"Data saved to {save_path}")

    return all_data

#### After combining the uniform noise + rotation + original, add in CT 

In [11]:
#quick method to add new CT information into the full concat file. 
def combine_npz(npz_data, CT_directory, save_name, file_prefix='train'):

    labels = ['label', 'Ring_Artifact_v1']
    data = []
    for filename in os.listdir(CT_directory):
        if filename.startswith(file_prefix):
                file_path = os.path.join(CT_directory, filename)
                loaded_data = np.load(file_path)
                data.append(loaded_data)
    all_data = {}
    for key in labels:
        all_data[key] = np.concatenate([d[key] for d in data], axis=0)

    for key, value in all_data.items():
        print(f"{key}: {value.shape}")  
    
    npz_data['Ring_Artifact_v1'] = all_data['Ring_Artifact_v1']
    
    for key, value in npz_data.items():
        print(f"{key}: {value.shape}")  
        
    np.savez(save_name, **npz_data)
    print(f"Data saved to {save_name}")
    
    
directory = 'Distorted_OrganAMNIST/RingArtifactv1_npz'
training_data = import_data('Distorted_OrganAMNIST/UniformNoise_Rotate90_npz')
# val_data = import_data('Distorted_OrganAMNIST/UniformNoise_Rotate90_val_dataset')
# test_data = import_data('Distorted_OrganAMNIST/UniformNoise_Rotate90_test_dataset')

combine_npz(training_data, directory, 'training_concatenated_dataset_full.npz', 'train')
    
    


original: (34561, 224, 224)
label: (34561, 1)
Uniform_Noise: (34561, 224, 224)
Rotate_90deg: (34561, 224, 224)
label: (34561, 1)
Ring_Artifact_v1: (34561, 224, 224)
original: (34561, 224, 224)
label: (34561, 1)
Uniform_Noise: (34561, 224, 224)
Rotate_90deg: (34561, 224, 224)
Ring_Artifact_v1: (34561, 224, 224)
Data saved to training_concatenated_dataset_full.npz


### Image normalizer 

In [3]:
def normalize_image(image, mean=0.5, std=0.5):
    """
    Normalize an image tensor to have a mean and standard deviation.
    """
    return (image - mean) / std

def normalize_images(images, mean=0.5, std=0.5):
    """
    Normalize a list of images.
    """
    return [normalize_image(image, mean, std) for image in images]

### Dataset Loader

In [4]:
#Modified CustomImageDataset Loader

class ModifiedCustomImageDataset(Dataset):
    def __init__(self, images, labels1,  transform=None):
        self.images = images 
        self.labels1 = labels1

        if transform is None:
            self.transform = transforms.Compose([
                transforms.ToTensor(),
                transforms.Lambda(lambda x: x.repeat(3, 1, 1))  # Grayscale to 3-channel
            ])
        else:
            self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img = self.images[idx].astype(np.float32)
        label1 = int(self.labels1[idx])


        if self.transform:
            img = self.transform(img)

        return {
            "pixel_values": img,
            "labels": int(label1) if torch.is_tensor(label1) else label1,
        }
        


#### Preprocessing functions for single distortion and multi.

In [None]:
#Modified

def preprocess_data(data, key='original'):
    """
    Modified from sam's implementation to preprocess 1 set of images. 
    
    key : str (distortion name)
    """
    keys = data.files 
    print(f'\nGenerating {key} set')
    
    images = data[key]
    labels = data['label']
    
    normalized_images = []
    for image in images:
        normalized_images.append(normalize_images(image))
        
    labels = np.array(labels)
    normalized_images = np.array(normalized_images)
    
    
    print(f"Labels shape: {labels.shape}")
    print(f"Images shape: {normalized_images.shape}")

    dataset = ModifiedCustomImageDataset(images=normalized_images, labels1=labels)

    return dataset


#### Preprocess data into distinct sets

In [None]:
val_set_loaded = np.load('val_concatenated_dataset_full.npz')
key_list = [key for key in val_set_loaded.files if key != 'label']
print(key_list)

val_rotate_set = preprocess_data(val_set_loaded, 'Rotate_90deg')
val_original_set = preprocess_data(val_set_loaded, 'original')
val_noise_set = preprocess_data(val_set_loaded, 'Uniform_Noise')
val_ct_set = preprocess_data(val_set_loaded, 'Ring_Artifact_v1')



train_set_loaded = np.load('training_concatenated_dataset_full.npz')

train_rotate_set = preprocess_data(train_set_loaded, 'Rotate_90deg')
train_original_set = preprocess_data(train_set_loaded, 'original')
train_noise_set = preprocess_data(train_set_loaded, 'Uniform_Noise')
train_ct_set = preprocess_data(train_set_loaded, 'Ring_Artifact_v1')



['original', 'Uniform_Noise', 'Rotate_90deg', 'Ring_Artifact_v1']

Generating Rotate_90deg validataion set
Labels shape: (6491, 1)
Images shape: (6491, 224, 224)

Generating original validataion set
Labels shape: (6491, 1)
Images shape: (6491, 224, 224)

Generating Uniform_Noise validataion set
Labels shape: (6491, 1)
Images shape: (6491, 224, 224)

Generating Ring_Artifact_v1 validataion set
Labels shape: (6491, 1)
Images shape: (6491, 224, 224)

Generating Rotate_90deg validataion set
Labels shape: (34561, 1)
Images shape: (34561, 224, 224)

Generating original validataion set
Labels shape: (34561, 1)
Images shape: (34561, 224, 224)

Generating Uniform_Noise validataion set
Labels shape: (34561, 1)
Images shape: (34561, 224, 224)

Generating Ring_Artifact_v1 validataion set


## Train Model

In [None]:
output_path = os.path.join('A1_Original', 'results')


config = ResNetConfig()
model = ResNetForMultiLabel(config)

print("Starting training")
trainer = train_model(
    train_dataset=train_original_set,
    eval_dataset=val_original_set,
    model=model,
    output_dir=output_path,  # Checkpoints will go here
    num_epochs=100,
    batch_size=32
)

print("Saving final model")
trainer.save_model(output_path)

In [None]:
output_path = os.path.join('A4_UNI', 'results')


config = ResNetConfig()
model = ResNetForMultiLabel(config)

print("Starting training")
trainer = train_model(
    train_dataset=train_noise_set,
    eval_dataset=val_noise_set,
    model=model,
    output_dir=output_path,  # Checkpoints will go here
    num_epochs=100,
    batch_size=32
)

print("Saving final model")
trainer.save_model(output_path)

In [None]:
output_path = os.path.join('A2_CT', 'results')


config = ResNetConfig()
model = ResNetForMultiLabel(config)

print("Starting training")
trainer = train_model(
    train_dataset=train_ct_set,
    eval_dataset=val_ct_set,
    model=model,
    output_dir=output_path,  # Checkpoints will go here
    num_epochs=100,
    batch_size=32
)

print("Saving final model")
trainer.save_model(output_path)

In [None]:

if torch.cuda.is_available():
    print("CUDA is available!")
    print(f"Using GPU: {torch.cuda.get_device_name(0)}")
    device = torch.device("cuda")
else:
    print("CUDA is not available. Using CPU.")
    device = torch.device("cpu")

CUDA is available!
Using GPU: NVIDIA GeForce RTX 3090


In [None]:
output_path = os.path.join('A3_ROT', 'results')


config = ResNetConfig()
model = ResNetForMultiLabel(config)

print("Starting training")
trainer = train_model(
    train_dataset=train_rotate_set,
    eval_dataset=val_rotate_set,
    model=model,
    output_dir=output_path,  # Checkpoints will go here
    num_epochs=100,
    batch_size=32
)

print("Saving final model")
trainer.save_model(output_path)

In [22]:
from resnet import eval_model
checkpoint_directory= 'A1_Original/results/checkpoint-1081'
# Load config and model
config = ResNetConfig.from_pretrained(checkpoint_directory)
model = ResNetForMultiLabel.from_pretrained(checkpoint_directory, config=config)

eval_model(eval_dataset=val_rotate_set, model=model, output_dir="A1_Original/val_results", num_epochs=0, batch_size=32)






ImportError: cannot import name 'eval_model' from 'resnet' (c:\Users\Calvin\Desktop\DG\domain-generalization-ct\resnet.py)

In [6]:
model.evaluate(eval_dataset =val_rotate_set )

AttributeError: 'ResNetForMultiLabel' object has no attribute 'evaluate'