<a href="https://colab.research.google.com/github/davidarvai/DIPLOMADOLGOZAT/blob/main/Xgboost.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import math
import numpy as np
import nibabel as nib
import pandas as pd
import xgboost as xgb
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

# Functions for confusion matrix and metrics
def get_custom_confusion_matrix(tumor_type, tn, fp, fn, tp):
    if tumor_type == "Whole Tumor":
        custom_matrix = np.array([[tn, fp, fp, fp],
                                  [fn, tp, tp, tp],
                                  [fn, tp, tp, tp],
                                  [fn, tp, tp, tp]])
    elif tumor_type == "Edema":
        custom_matrix = np.array([[tn, tn, fp, tn],
                                  [tn, tn, fp, tn],
                                  [fn, fn, tp, fn],
                                  [tn, tn, fp, tn]])
    elif tumor_type == "Tumor Core":
        custom_matrix = np.array([[tn, fp, tn, fp],
                                  [fn, tp, fn, tp],
                                  [tn, fp, tn, fp],
                                  [fn, tp, fn, tp]])
    elif tumor_type == "Enhancing Core":
        custom_matrix = np.array([[tn, tn, tn, fp],
                                  [tn, tn, tn, fp],
                                  [tn, tn, tn, fp],
                                  [fn, fn, fn, tp]])
    else:
        custom_matrix = None
    return custom_matrix

# Calculating the elements of the confusion matrix based on the ground truth and the prediction

def compute_confusion(gt_mask, pred_mask):
    tn = np.sum((gt_mask == False) & (pred_mask == False))
    tp = np.sum((gt_mask == True)  & (pred_mask == True))
    fp = np.sum((gt_mask == False) & (pred_mask == True))
    fn = np.sum((gt_mask == True)  & (pred_mask == False))
    return tn, fp, fn, tp

def compute_metrics(tn, fp, fn, tp):
    TPR = tp / (tp + fn) if (tp+fn) > 0 else 0  # True Positive Rate
    TNR = tn / (tn + fp) if (tn+fp) > 0 else 0  # True Negative Rate
    PPV = tp / (tp + fp) if (tp+fp) > 0 else 0  # Precision
    NPV = tn / (tn + fn) if (tn+fn) > 0 else 0  # Negative Predictive Value
    ACC = (tp + tn) / (tp + tn + fp + fn) if (tp+tn+fp+fn) > 0 else 0  # Pontosság
    DS  = (2 * tp) / (2 * tp + fp + fn) if (2 * tp + fp + fn) > 0 else 0  # Dice Score
    return TPR, TNR, PPV, NPV, ACC, DS

# Data loading and preprocessing
def remap_segmentation(seg): # Copy of the input array
    seg_new = np.copy(seg) # A copy of the input array
    seg_new[seg == 4] = 3  # Converting voxels with a value of 4 to 3
    return seg_new

def load_subject_data(subject_path):
    files = os.listdir(subject_path)
    subject_data = {}
    for file in files:
        if file.endswith('.nii') or file.endswith('.nii.gz'):
            lower = file.lower()
            if 'seg' in lower:
                subject_data['seg'] = os.path.join(subject_path, file)
            else:
                for mod in ['t1', 't1ce', 't2', 'flair']:
                    if mod in lower:
                        subject_data[mod] = os.path.join(subject_path, file)
    return subject_data

def load_data_from_dir(data_dir):
    X_list = []
    Y_list = []
    subject_names = []
    subject_dirs = [os.path.join(data_dir, d) for d in os.listdir(data_dir)
                    if os.path.isdir(os.path.join(data_dir, d))]
    subject_dirs.sort()
    for subject_path in subject_dirs:
        data_files = load_subject_data(subject_path)
        if all(mod in data_files for mod in ['t1', 't1ce', 't2', 'flair']) and 'seg' in data_files:
            modality_imgs = []
            for mod in ['t1', 't1ce', 't2', 'flair']:
                img = nib.load(data_files[mod]).get_fdata()
                modality_imgs.append(img)
            X = np.stack(modality_imgs, axis=-1)
            seg = nib.load(data_files['seg']).get_fdata()
            seg = remap_segmentation(seg)
            X_list.append(X)
            Y_list.append(seg)
            subject_names.append(os.path.basename(subject_path))
        else:
            print("Hiányos adatok:", subject_path)
    return X_list, Y_list, subject_names

# Normalizing the input volume to the [0,1] range
def normalize_volume(vol):
    vol = vol.astype(np.float32)
    vol = (vol - np.min(vol)) / (np.max(vol) - np.min(vol) + 1e-8)
    return vol

# Extracting slices (2D)
def extract_slices(volume, seg, slice_axis=2, include_bg_ratio=0.3):
    slices_x = []
    slices_y = []
    D = volume.shape[slice_axis]
    for i in range(D):
        img_slice = volume[:, :, i, :]
        seg_slice = seg[:, :, i]
        if np.sum(seg_slice > 0) > 0.01 * (seg_slice.shape[0] * seg_slice.shape[1]):
            slices_x.append(img_slice)
            slices_y.append(seg_slice)
        else:
            if np.random.rand() < include_bg_ratio:
                slices_x.append(img_slice)
                slices_y.append(seg_slice)
    return slices_x, slices_y

# Pixel sampling from a slice
def sample_pixels_from_slice(img_slice, seg_slice, num_samples=1000, tumor_ratio=0.5):
    H, W, C = img_slice.shape
    tumor_idx = np.argwhere(seg_slice > 0)
    bg_idx = np.argwhere(seg_slice == 0)
    n_tumor = int(num_samples * tumor_ratio)
    n_bg = num_samples - n_tumor

    if len(tumor_idx) < n_tumor:
        tumor_sample = tumor_idx
        n_bg = num_samples - len(tumor_sample)
    else:
        indices = np.random.choice(len(tumor_idx), n_tumor, replace=False)
        tumor_sample = tumor_idx[indices]

    if len(bg_idx) < n_bg:
        bg_sample = bg_idx
    else:
        indices = np.random.choice(len(bg_idx), n_bg, replace=False)
        bg_sample = bg_idx[indices]

    all_idx = np.concatenate([tumor_sample, bg_sample], axis=0)
    np.random.shuffle(all_idx)

    features = []
    labels = []
    for idx in all_idx:
        i, j = idx
        features.append(img_slice[i, j, :])
        labels.append(int(seg_slice[i, j]))
    return np.array(features), np.array(labels)

# Generating training dataset at the pixel level
def create_training_dataset(volumes, segmentations, num_samples_per_slice=1000):
    X_samples = []
    y_samples = []
    for vol, seg in zip(volumes, segmentations):
        slices_img, slices_seg = extract_slices(vol, seg, slice_axis=2, include_bg_ratio=0.3)
        for img_slice, seg_slice in zip(slices_img, slices_seg):
            if img_slice.shape[0] < 10 or img_slice.shape[1] < 10:
                continue
            Xp, yp = sample_pixels_from_slice(img_slice, seg_slice, num_samples=num_samples_per_slice, tumor_ratio=0.5)
            X_samples.append(Xp)
            y_samples.append(yp)
    if len(X_samples) == 0:
        raise ValueError("Nincsenek érvényes szelet minták!")
    X_all = np.concatenate(X_samples, axis=0)
    y_all = np.concatenate(y_samples, axis=0)
    return X_all, y_all

# Function for predicting the entire slice on test data
def predict_volume(model, volume):
    H, W, D, _ = volume.shape
    pred_vol = np.zeros((H, W, D), dtype=np.int32)
    for i in range(D):
        x_slice = volume[:, :, i, :]
        x_flat = x_slice.reshape(-1, x_slice.shape[-1])
        pred_flat = model.predict(x_flat)
        pred_slice = pred_flat.reshape(H, W)
        pred_vol[:, :, i] = pred_slice
    return pred_vol

# Converting the obtained 3D result to a binary mask based on tumor type
def get_binary_mask_3d(segmentation, tumor_type):
    if tumor_type == "Whole Tumor":
        return np.isin(segmentation, [1,2,3])
    elif tumor_type == "Edema":
        return (segmentation == 2)
    elif tumor_type == "Tumor Core":
        return np.isin(segmentation, [1,3])
    elif tumor_type == "Enhancing Core":
        return (segmentation == 3)
    else:
        raise ValueError("Ismeretlen tumor típus!")


if __name__ == "__main__":

    output_dir = "/content/drive/My Drive/Allamvizsga/OutputXgboost"
    os.makedirs(output_dir, exist_ok=True)


    train_dir = "/content/drive/My Drive/Allamvizsga/Data/Teszt/Train"
    test_dir  = "/content/drive/My Drive/Allamvizsga/Data/Teszt/Teszt"

    print("Train adatok betöltése...")
    X_train_vols, Y_train_vols, train_subject_names = load_data_from_dir(train_dir)
    if len(X_train_vols) == 0:
        raise ValueError("Nincsenek betöltött train adatok!")
    X_train_vols = [normalize_volume(vol) for vol in X_train_vols]

    print("Tréning pixel adatok előállítása...")
    X_pixels, y_pixels = create_training_dataset(X_train_vols, Y_train_vols, num_samples_per_slice=1000)
    print("Összes minta:", X_pixels.shape, y_pixels.shape)

    # Train/Validation split
    X_train, X_val, y_train, y_val = train_test_split(X_pixels, y_pixels, test_size=0.2, random_state=42)

    # XGBoost classifier – 4 classes (0, 1, 2, 3)
    model = xgb.XGBClassifier(
        objective='multi:softmax',
        num_class=4,
        use_label_encoder=False,
        eval_metric=['mlogloss', 'merror'],
        verbosity=1,
        random_state=42
    )

    print("XGBoost tréning...")
    eval_set = [(X_train, y_train), (X_val, y_val)]
    model.fit(X_train, y_train, eval_set=eval_set, verbose=True)

    # Extracting metrics collected during training
    evals_result = model.evals_result()

    # Saving learning curves
    # Loss Curve: mlogloss
    plt.figure(figsize=(8,6))
    plt.plot(evals_result['validation_0']['mlogloss'], label='Train Loss')
    plt.plot(evals_result['validation_1']['mlogloss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.ylabel('mlogloss')
    plt.xlabel('Epochs')
    plt.legend()
    plt.savefig(os.path.join(output_dir, 'loss_curve.png'))
    plt.close()

    # Accuracy Curve:
    plt.figure(figsize=(8,6))
    train_acc = [1 - err for err in evals_result['validation_0']['merror']]
    val_acc = [1 - err for err in evals_result['validation_1']['merror']]
    plt.plot(train_acc, label='Train Accuracy')
    plt.plot(val_acc, label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epochs')
    plt.legend()
    plt.savefig(os.path.join(output_dir, 'accuracy_curve.png'))
    plt.close()

    # Testing: Predicting the entire 3D map slice by slice
    print("Teszt adatok betöltése...")
    X_test_vols, Y_test_vols, test_subject_names = load_data_from_dir(test_dir)
    if len(X_test_vols) == 0:
        raise ValueError("Nincsenek betöltött teszt adatok!")
    X_test_vols = [normalize_volume(vol) for vol in X_test_vols]

    metrics_rows = []
    output_txt_lines = []
    tumor_types = ["Whole Tumor", "Edema", "Tumor Core", "Enhancing Core"]

    for vol, seg, subj_name in zip(X_test_vols, Y_test_vols, test_subject_names):
        print("Előrejelzés a", subj_name, "subjectön...")
        pred_vol = predict_volume(model, vol)


        for tumor in tumor_types:
            gt_mask = get_binary_mask_3d(seg, tumor)
            pred_mask = get_binary_mask_3d(pred_vol, tumor)
            tn, fp, fn, tp = compute_confusion(gt_mask, pred_mask)
            TPR, TNR, PPV, NPV, ACC, DS = compute_metrics(tn, fp, fn, tp)
            cm = get_custom_confusion_matrix(tumor, tn, fp, fn, tp)

            metrics_rows.append({
                "Name": subj_name,
                "TumorType": tumor,
                "TP": tp,
                "TN": tn,
                "FP": fp,
                "FN": fn,
                "TPR": round(TPR, 3),
                "TNR": round(TNR, 3),
                "PPV": round(PPV, 3),
                "NPV": round(NPV, 3),
                "ACC": round(ACC, 3),
                "DS": round(DS, 3)
            })

            txt_block = f"Mapa neve: {subj_name}\nTumor típus: {tumor}\nKonfúziós mátrix:\n{cm}\n"
            txt_block += f"True Positive Rate (TPR): {round(TPR,3)}\n"
            txt_block += f"True Negative Rate (TNR): {round(TNR,3)}\n"
            txt_block += f"Positive Predictive Value (PPV): {round(PPV,3)}\n"
            txt_block += f"Negative Predictive Value (NPV): {round(NPV,3)}\n"
            txt_block += f"Accuracy (ACC): {round(ACC,3)}\n"
            txt_block += f"Dice Score (DS): {round(DS,3)}\n\n"
            output_txt_lines.append(txt_block)

    # Saving results to a CSV file
    metrics_df = pd.DataFrame(metrics_rows, columns=["Name", "TumorType", "TP", "TN", "FP", "FN", "TPR", "TNR", "PPV", "NPV", "ACC", "DS"])
    csv_path = os.path.join(output_dir, "metrics_output.csv")
    metrics_df.to_csv(csv_path, index=False)
    print("A metrics_output.csv fájl elmentve:", csv_path)

    # Saving results to a TXT file
    txt_path = os.path.join(output_dir, "output.txt")
    with open(txt_path, "w") as f:
        f.write("".join(output_txt_lines))
    print("Az output.txt fájl elmentve:", txt_path)


Train adatok betöltése...
Tréning pixel adatok előállítása...
Összes minta: (757000, 4) (757000,)
XGBoost tréning...


Parameters: { "use_label_encoder" } are not used.



[0]	validation_0-mlogloss:0.95471	validation_0-merror:0.09208	validation_1-mlogloss:0.95530	validation_1-merror:0.09293
[1]	validation_0-mlogloss:0.72646	validation_0-merror:0.08901	validation_1-mlogloss:0.72757	validation_1-merror:0.09030
[2]	validation_0-mlogloss:0.58154	validation_0-merror:0.08782	validation_1-mlogloss:0.58308	validation_1-merror:0.08890
[3]	validation_0-mlogloss:0.48277	validation_0-merror:0.08728	validation_1-mlogloss:0.48474	validation_1-merror:0.08862
[4]	validation_0-mlogloss:0.41408	validation_0-merror:0.08646	validation_1-mlogloss:0.41632	validation_1-merror:0.08764
[5]	validation_0-mlogloss:0.36248	validation_0-merror:0.08515	validation_1-mlogloss:0.36496	validation_1-merror:0.08651
[6]	validation_0-mlogloss:0.32510	validation_0-merror:0.08421	validation_1-mlogloss:0.32778	validation_1-merror:0.08536
[7]	validation_0-mlogloss:0.29800	validation_0-merror:0.08353	validation_1-mlogloss:0.30086	validation_1-merror:0.08476
[8]	validation_0-mlogloss:0.27745	valida