<a href="https://colab.research.google.com/github/JohnYechanJo/Novo-Nordisk_Anomaly-Detection/blob/classifier/simple_classifier_finished.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

image generation

In [None]:
# VIT Processing
def img_transform():
    return transforms.Compose([
        transforms.Lambda(lambda img: img.crop((0, 100, 768, 400))),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    #3，224，224

# Images Transfer to Tensors
def load_trans(path, pic_num=640):
    trans_toTensor = img_transform()
    image_list = []
    i = 0
    for filename in os.listdir(path):
        if i == pic_num:
            break
        file_path = os.path.join(path, filename)
        if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
            try:
                img = Image.open(file_path).convert("RGB")
                tensor_img = trans_toTensor(img)
                image_list.append(tensor_img)
            except Exception as e:
                print(f"Skip: {filename}, Error: {e}")
        i += 1
    return image_list

# Cell 10: Generate Synthetic CNV Images
def generate_synthetic_images():
    # Load the Base Pipeline from the Original Model
    pipeline = StableDiffusionPipeline.from_pretrained(
        "nota-ai/bk-sdm-small",
        torch_dtype=torch.float16,
        use_auth_token=False
    ).to(device)

    # Load the Fine-tuned UNet
    unet = UNet2DConditionModel.from_pretrained(
        "/content/models/sd_cnv_finetuned/final_unet",
        torch_dtype=torch.float16
    ).to(device)

    # Replace the UNet in the Pipeline
    pipeline.unet = unet

    # Generate Synthetic Images
    synthetic_dir = "/content/synthetic_cnv/"
    os.makedirs(synthetic_dir, exist_ok=True)
    num_images = 640
    prompt = (
     "OCT scan shows CNV"
    )


    for i in range(num_images):
        image = pipeline(prompt, num_inference_steps=50).images[0]
        image.save(os.path.join(synthetic_dir, f"synthetic_cnv_{i}.png"))
        if i % 50 == 0:
            print(f"Generated {i}/{num_images} images")
        clear_memory()

    # bsz, 3，224，224
    synthetic_tensor_list = load_trans(synthetic_dir, pic_num=num_images)


    # Save synthetic_cnv_dataset.pt
    if os.path.exists('synthetic_cnv_dataset.pt'):
        os.remove('synthetic_cnv_dataset.pt')
    torch.save(synthetic_tensor_list, 'synthetic_cnv_dataset.pt')
    print("Synthetic CNV dataset saved to synthetic_cnv_dataset.pt")

generate_synthetic_images()
clear_memory()

Classifier

In [1]:
# unpack
data_dic = torch.load('Dataset.pt')
mixed_train = data_dic['train_data']
mixed_train_label = data_dic['train_label']
mixed_val = data_dic['val_data']
mixed_val_label = data_dic['val_label']
mixed_test = data_dic['test_data']
mixed_test_label = data_dic['test_label']

class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv_block = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),  # (3,224,224) -> (16,224,224)
            nn.ReLU(),
            nn.MaxPool2d(2),                            # -> (16,112,112)

            nn.Conv2d(16, 32, kernel_size=3, padding=1), # -> (32,112,112)
            nn.ReLU(),
            nn.MaxPool2d(2),                             # -> (32,56,56)

            nn.Conv2d(32, 64, kernel_size=3, padding=1), # -> (64,56,56)
            nn.ReLU(),
            nn.MaxPool2d(2)                              # -> (64,28,28)
        )
        self.fc = nn.Sequential(
            nn.Flatten(),             # -> (64×28×28)
            nn.Linear(64*28*28, 128),
            nn.ReLU(),
            nn.Linear(128, 2)         # 2-class classification
        )

    def forward(self, x):
        x = self.conv_block(x)
        x = self.fc(x)
        return x
class Classifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = SimpleCNN()
        self.best_acc = 0
    def train_val_test(self):
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=1e-3)
        train_loader = DataLoader(TensorDataset(mixed_train, mixed_train_label), batch_size=128, shuffle=True)
        loss = nn.CrossEntropyLoss()
        epochs = 15
        for epoch in range(epochs):
            print(f"\nEpoch {epoch + 1}/{epochs}")
            # train
            self.model.train()
            for i, data in enumerate(train_loader):
                total = len(train_loader)
                batch_x, batch_y = (item.cuda() for item in data)
                self.optimizer.zero_grad()
                logit_original = self.model(batch_x)
                l = loss(logit_original, batch_y)
                l.backward()
                self.optimizer.step()
                corrects = (torch.max(logit_original, 1)[1].view(batch_y.size()).data == batch_y.data).sum()
                accuracy = 100 * corrects / len(batch_y)
                print(f'Batch[{i + 1}/{total}] - loss: {l.item():.6f}  accuracy: {accuracy:.4f}%({corrects}/{batch_y.size(0)})')
            # val
            self.model.val()
            with torch.no_grad():
                logits = self.model(mixed_val)
                predicted = torch.max(logits,dim=1)[1]
                y_pred = predicted.data.cpu().numpy().tolist()
                acc = accuracy_score(mixed_val_label, y_pred)
                print(f"Validation Accuracy: {acc:.4f}")
                if acc > self.best_acc:
                   self.best_acc = acc
                print("Best val set acc:", self.best_acc)
        # test
        self.model.val()
        with torch.no_grad():
            logit = self.model(mixed_test)
            predicted = torch.max(logits,dim=1)[1]
            y_pred = predicted.data.cpu().numpy().tolist()
            try:
              res = classification_report(mixed_test_label, y_pred, labels=[0, 1], target_names=['NR', 'FR'], digits=3, output_dict=True)
              for k, v in res.items():
                  print(k, v)
              print(f"result: {res['accuracy']:.4f}")
            except ValueError as e:
              print(f"Error in classification_report: {e}")
              res = {'accuracy': 0, 'macro avg': {'f1-score': 0, 'precision': 0, 'recall': 0}}
        return res




def runable(model):
    nn = model
    return nn.train_val_test()



NameError: name 'torch' is not defined

train val and test with ratio

In [None]:
import torch
import numpy as np
import pandas as pd
import os
import kagglehub

def train_classifier_with_ratios():
    synthetic_images = torch.load('synthetic_cnv_dataset.pt')
    path = kagglehub.dataset_download("paultimothymooney/kermany2018")
    loadpath = os.path.join(path, "OCT2017 /train")
    train_path_cnv = os.path.join(loadpath, "CNV")
    train_path_normal = os.path.join(loadpath, "NORMAL")
    # bsz,3,224,224
    cnv_tensor_list = load_trans(train_path_cnv, pic_num = 800)
    normal_tensor_list = load_trans(train_path_normal,  pic_num = 800)
    # ratio : 10%~90%
    ratios = [i/10 for i in range(1, 10)]
    # batch_size = 128
    half_batch = 64
    batch_num = 10
    for ratio in ratios:
        mixed_train = []
        mixed_train_label = []
        num_synthetic_batch = int(64 * ratio)
        num_normal_cnv_batch = 64 - num_synthetic_batch

        # get train dataset
        for i in range(batch_num):
          labels = torch.cat([torch.ones(half_batch, dtype=torch.long),
                              torch.zeros(half_batch, dtype=torch.long)], dim=0)
          s_cnv = synthetic_images[i*num_synthetic_batch:(i+1)*num_synthetic_batch]
          n_cnv = cnv_tensor_list[i*num_normal_cnv_batch:(i+1)*num_normal_cnv_batch]
          n_norm = normal_tensor_list[i*half_batch:(i+1)*half_batch]
          mixed_train.extend([s_cnv, n_cnv, n_norm])
          mixed_train_label.append(labels)
        mixed_train = torch.cat(mixed_train, dim=0)
        mixed_train_label = torch.cat(mixed_train_label, dim=0)

        #get val dataset
        i, j = batch_num*num_normal_cnv_batch, batch_num*half_batch
        val_labels = torch.cat([torch.ones(80, dtype=torch.long),
                              torch.zeros(80, dtype=torch.long)], dim=0)
        n_cnv = cnv_tensor_list[i:i+80]
        n_norm = normal_tensor_list[i:i+80]
        mixed_val = torch.cat([n_cnv, n_norm],dim=0)

        #get test dataset
        i, j=i+80, j+80
        test_labels = torch.cat([torch.ones(80, dtype=torch.long),
                              torch.zeros(80, dtype=torch.long)], dim=0)
        n_cnv = cnv_tensor_list[i:i+80]
        n_norm = normal_tensor_list[i:i+80]
        mixed_test = torch.cat([n_cnv, n_norm],dim=0)

        # save norm->0 / cnv->1
        if os.path.exists('pre-trained_dataset.pt'):
           os.remove('pre-trained_dataset.pt')
        torch.save({
            "train_data": mixed_train,
            "train_label": mixed_train_label,
            "val_data": mixed_val,
            "val_label": val_labels,
            "test_data": mixed_test,
            "test_label": test_labels
        },"Dataset.pt")


        # Train and evaluate model
        model = Classifier()  # Assumes Classifier is defined elsewhere
        res = runable(model)  # Assumes train_and_test is defined
        results.append({
            'ratio': ratio,
            'accuracy': res['accuracy'],
            'f1_score': res['macro avg']['f1-score'],
            'precision': res['macro avg']['precision'],
            'recall': res['macro avg']['recall']
        })
        clear_memory()  # Assumes clear_memory is defined elsewhere


    print("\nResults for Ratio 10% to 90%:")
    for res in results:
        if res['accuracy'] is not None:
            print(f"Ratio: {res['ratio']*100:.0f}% | Accuracy: {res['accuracy']:.4f} | F1 Score: {res['f1_score']:.4f} | Precision: {res['precision']:.4f} | Recall: {res['recall']:.4f}")


    pd.DataFrame(results).to_csv('classifier_results.csv', index=False)
    print("Results saved to classifier_results.csv")

train_classifier_with_ratios()