In [None]:
import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    print(dirname)

In [None]:
%cd /kaggle/working
!git clone https://github.com/Harito97/HaritoProduction
%cd ThyroidCancerClassifier
%ls
!git pull

In [None]:
import wandb
wandb.login() # 00e43aa49b2b8c17d0db66c858191c6420f4dc9e

# Create data

In [None]:
from backend.app.models.H97 import H97_ANN
from backend.app.app import App
model1_path = '/kaggle/input/thyroidcancerclassifier/H97_ANN.pth'
model2_path = '/kaggle/input/thyroidcancerclassifier/H97_ANN.pth'
app = App(model1_path, model2_path)

In [None]:
import os
data_dir = '/kaggle/input/thyroidcancer-ver1/dataver1/ver1'
input_info = {}
for dataset in ['train', 'valid', 'test']:
    results = []
    labels = []
    for index_label, label in enumerate(['.B2', 'B5', 'B6']):
        for filename in os.listdir(f'{data_dir}/{dataset}/{label}'):
            if filename.endswith('.jpg'):
                image_path = f'{data_dir}/{dataset}/{label}/{filename}'
            else:
                continue
            print(f'Processing {image_path}')
            results.append(app.get_images(image_path))
            labels.append([index_label] * 18)
    input_info[dataset] = {'results': results, 'labels': labels}

# Save input_info
import json
with open('input_info.json', 'w') as f:
    json.dump(input_info, f)
# Read input_info
with open('input_info.json', 'r') as f:
    input_info = json.load(f)
# Get train dataset
train_dataset = input_info['train']
# Get valid dataset
valid_dataset = input_info['valid']
# Get test dataset
test_dataset = input_info['test']

In [None]:
import os
import json
import seaborn as sns
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn.functional as F
from sklearn.metrics import (
    confusion_matrix,
    classification_report,
    f1_score,
    roc_curve,
    auc,
)
from sklearn.preprocessing import label_binarize
import wandb

class Tool:
    @staticmethod
    def save_confusion_matrix(y_true, y_score, target_names, filename, normalize=False):
        """
        Saves the confusion matrix to a file.

        Args:
            y_true (array-like): True labels.
            y_score (array-like): Predicted scores.
            target_names (list): Names of the target classes.
            filename (str): Path to save the confusion matrix.
            normalize (bool, optional): Whether to normalize the confusion matrix. Defaults to False.
        Returns:
            cm (numpy.ndarray): The confusion matrix.
        """
        try:
            cm = confusion_matrix(y_true, y_score)
            print("Confusion Matrix:\n", cm)
            if normalize:
                cm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis]
                title = "Normalized Confusion Matrix"
            else:
                title = "Confusion Matrix, Without Normalization"

            plt.figure(figsize=(8, 6))
            sns.heatmap(
                cm,
                annot=True,
                fmt=".2f" if normalize else "d",
                cmap="Blues",
                xticklabels=target_names,
                yticklabels=target_names,
            )
            plt.ylabel("True label")
            plt.xlabel("Predicted label")
            plt.title(title)
            plt.savefig(filename)
            plt.close()
            return cm
        except ValueError as e:
            print(f"Error creating confusion matrix: {e}")
            return None

    @staticmethod
    def save_classification_report(y_true, y_score, filename):
        """
        Saves the classification report to a file.

        Args:
            y_true (array-like): True labels.
            y_score (array-like): Predicted scores.
            filename (str): Path to save the classification report.
        Returns:
            cr (dict): The classification report.
        """
        try:
            cr = classification_report(y_true, y_score, output_dict=True)
            print("Classification Report:\n", cr)
            report_df = pd.DataFrame(cr).transpose()
            report_df.drop(
                "support", axis=1, inplace=True
            )  # Bỏ cột support nếu không cần
            report_df.plot(kind="bar", figsize=(10, 6))
            plt.title("Classification Report")
            plt.ylabel("Score")
            plt.xticks(rotation=45)
            plt.tight_layout()
            plt.savefig(filename)
            plt.close()
            return cr
        except ValueError as e:
            print(f"Error creating DataFrame from classification report: {e}")
            return None

    @staticmethod
    def save_roc_auc_plot(y_true, y_score, n_classes, filename):
        """
        Calculates and saves the ROC AUC plot to a file.

        Args:
            y_true (array-like): True labels.
            y_score (array-like): Predicted scores.
            n_classes (int): Number of classes.
            filename (str): Path to save the plot.
        Returns:
            fpr (dict): False positive rates for each class.
            tpr (dict): True positive rates for each class.
            roc_auc (dict): ROC AUC scores for each class.
        """
        try:
            # Convert y_true and y_score to NumPy arrays if they are lists
            y_true = np.array(y_true)
            y_score = np.array(y_score)

            fpr = dict()
            tpr = dict()
            roc_auc = dict()

            # Binarize the output if more than 2 classes
            if n_classes > 2:
                y_true = label_binarize(y_true, classes=[*range(n_classes)])
                for i in range(n_classes):
                    fpr[i], tpr[i], _ = roc_curve(y_true[:, i], y_score[:, i])
                    roc_auc[i] = auc(fpr[i], tpr[i])
            else:
                fpr[1], tpr[1], _ = roc_curve(y_true, y_score[:, 1])
                roc_auc[1] = auc(fpr[1], tpr[1])

            plt.figure(figsize=(8, 6))

            if n_classes == 2:
                plt.plot(
                    fpr[1],
                    tpr[1],
                    lw=2,
                    label="ROC curve (area = {0:0.2f})".format(roc_auc[1]),
                )
            else:
                for i in range(n_classes):
                    plt.plot(
                        fpr[i],
                        tpr[i],
                        lw=2,
                        label="ROC curve of class {0} (area = {1:0.2f})".format(
                            i, roc_auc[i]
                        ),
                    )

            plt.plot([0, 1], [0, 1], color="navy", lw=2, linestyle="--")
            plt.xlim([0.0, 1.0])
            plt.ylim([0.0, 1.05])
            plt.xlabel("False Positive Rate")
            plt.ylabel("True Positive Rate")
            plt.title("Receiver Operating Characteristic (ROC)")
            plt.legend(loc="lower right")
            plt.savefig(filename)
            plt.close()
            return fpr, tpr, roc_auc
        except ValueError as e:
            print(f"Error creating ROC AUC plot: {e}")
            return None, None, None

In [None]:
import torch
import torch.nn as nn
import numpy as np
from sklearn.metrics import f1_score
import json

# Số lượng mẫu cho từng lớp
num_samples_B2 = 103
num_samples_B5 = 541
num_samples_B6 = 777

# Tổng số mẫu
total_samples = num_samples_B2 + num_samples_B5 + num_samples_B6
num_classes = 3

# Tính trọng số cho từng lớp
class_weight_B2 = total_samples / (num_classes * num_samples_B2)
class_weight_B5 = total_samples / (num_classes * num_samples_B5)
class_weight_B6 = total_samples / (num_classes * num_samples_B6)

# Tạo tensor chứa trọng số
model = H97_ANN()
class_weights = torch.tensor([class_weight_B2, class_weight_B5, class_weight_B6])
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
criterion = criterion.to(device)
model.to(device)


# Def to train the H97_ANN model
def train():
    batch_size = 32
    epochs = 100
    patience = 20
    wandb.init(
        project="ThyroidCancer",
        entity="harito",
        name="H97_ANN_train",
        config={"batch_size": batch_size, "epochs": epochs, "patience": patience},
    )
    step = train_dataset["results"][0].shape[1] // batch_size

    history_file_path = f"history.json"
    model_best = f"model_best.pt"
    model_last = f"model_last.pt"

    print("Initializing history ...")
    history = {
        "train_loss": [],
        "train_acc": [],
        "train_f1": [],
        "val_loss": [],
        "val_acc": [],
        "val_f1": [],
    }

    print("Training classification model...")
    best_loss = float("inf")
    patience_counter = 0
    for i in range(epochs):
        running_loss = 10 ** 10
        train_preds = np.array([])
        train_targets = np.array([])
        preds = torch.tensor([])
        for j in range(step):
            if j * batch_size >= len(train_dataset["results"]):
                x = train_dataset["results"][j * batch_size :]
                y = train_dataset["labels"][j * batch_size :]
            else:
                x = train_dataset["results"][j * batch_size : (j + 1) * batch_size]
                y = train_dataset["labels"][j * batch_size : (j + 1) * batch_size]

            loss, output = model.fix_batch(x, y, criterion, optimizer)
            running_loss += loss.item()
            _, preds = torch.max(output, 1)
            train_preds.extend(preds.view(-1).cpu().numpy())
            train_targets.extend(y.view(-1).cpu().numpy())

        train_loss = running_loss / len(train_dataset["results"])
        train_acc = np.mean(
            np.array(preds.view(-1).cpu().numpy()) == np.array(train_dataset["labels"])
        )
        train_f1 = f1_score(
            train_dataset["labels"], preds.view(-1).cpu().numpy(), average="weighted"
        )

        history["train_loss"].append(train_loss)
        history["train_acc"].append(train_acc)
        history["train_f1"].append(train_f1)

        valid_loss, valid_output = model.fix_batch(
            valid_dataset["results"], valid_dataset["labels"]
        )
        val_loss = valid_loss.item() / len(valid_dataset["results"])
        _, preds = torch.max(valid_output, 1)
        val_acc = np.mean(
            np.array(preds.view(-1).cpu().numpy()) == np.array(valid_dataset["labels"])
        )
        val_f1 = f1_score(
            valid_dataset["labels"], preds.view(-1).cpu().numpy(), average="weighted"
        )

        history["val_loss"].append(val_loss)
        history["val_acc"].append(val_acc)
        history["val_f1"].append(val_f1)

        wandb.log(
            {
                "train_loss": train_loss,
                "train_acc": train_acc,
                "train_f1": train_f1,
                "val_loss": val_loss,
                "val_acc": val_acc,
                "val_f1": val_f1,
                epoch: i,
            }
        )

        with open(history_file_path, "w") as history_file:
            json.dump(history, history_file)

        if val_loss > best_loss:
            best_loss = val_loss
            patience_counter = 0
            torch.save(model.state_dict(), model_best)
            print(f"Model saved with val_loss: {val_loss}")
        else:
            patience_counter += 1
            torch.save(model.state_dict(), model_last)
            print(f"Patience counter: {patience_counter}")

        if patience_counter >= patience:
            print("Early stopping")
            break

    wandb.finish()


def test():
    print("Loading the best model ...")
    model.load_state_dict(torch.load("model_best.pt"))
    model.eval()
    test_loss, test_output = model.fix_batch(
        test_dataset["results"], test_dataset["labels"]
    )
    _, preds = torch.max(test_output, 1)
    test_loss = test_loss.item() / len(test_dataset["results"])
    test_acc = np.mean(
        np.array(preds.view(-1).cpu().numpy()) == np.array(test_dataset["labels"])
    )
    test_f1 = f1_score(
        test_dataset["labels"], preds.view(-1).cpu().numpy(), average="weighted"
    )
    # Save confusion matrix
    Tool.save_confusion_matrix(
        test_dataset["labels"],
        preds.view(-1).cpu().numpy(),
        ["B2", "B5", "B6"],
        "confusion_matrix.png",
    )
    # Save classification report
    Tool.save_classification_report(
        test_dataset["labels"],
        preds.view(-1).cpu().numpy(),
        "classification_report.png",
    )
    # Save ROC AUC plot
    Tool.save_roc_auc_plot(
        test_dataset["labels"], preds.view(-1).cpu().numpy(), 3, "roc_auc.png"
    )

    print(f"Test loss: {test_loss}, Test accuracy: {test_acc}, Test f1: {test_f1}")
    wandb.init(
        project="ThyroidCancer",
        entity="harito",
        name="H97_ANN_test",
        config={"test_loss": test_loss, "test_acc": test_acc, "test_f1": test_f1},
    )
    wandb.log(
        {
            "test_loss": test_loss,
            "test_acc": test_acc,
            "test_f1": test_f1,
            "confusion_matrix": wandb.Image(
                f"confusion_matrix.png"
            ),
            "classification_report": wandb.Image(
                f"classification_report.png"
            ),
            "roc_auc_plot": wandb.Image(f"roc_auc_plot.png"),
        }
    )