<a href="https://colab.research.google.com/github/andrecoimbra/CTG_RP_PC_2025/blob/main/CTG_Train_Models.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Config Colab Instance

Install packages and download libraries

In [None]:
%reload_ext autoreload
%autoreload 2
%matplotlib inline

## Download config file from GitHub Repo

Include any default source files

In [None]:
! rm config_local.py
! wget https://raw.githubusercontent.com/andrecoimbra/CTG_RP_PC_2025/main/src/config_local.py

In [None]:
from config_local import get_default_github_src_files

### Download other specified files from src folder

In [None]:
get_default_github_src_files()

In [None]:
! ls

In [None]:
# test code
import test as test
test.test()

## Install Packages Dependencies

In [None]:
! pip install wfdb

In [None]:
! pip install pyts

## Download CTU-UHB Database

In [None]:
#!rsync -Cavz physionet.org::ctu-uhb-ctgdb  /content/ctu-uhb-ctgdb
# !wget -r -N -c -np -nv -P /content/ctu-uhb-ctgdb https://physionet.org/files/ctu-uhb-ctgdb/1.0.0/
!gdown 1h_qlULLpSR9fAJvzeE1Zkq2C2aIuB-N9
!unzip "/content/ctu-uhb-ctgdb.zip" -d "/content/"

In [None]:
# !mv /content/ctu-uhb-ctgdb/physionet.org/files/ctu-uhb-ctgdb/1.0.0/* /content/ctu-uhb-ctgdb
# !rm -r /content/ctu-uhb-ctgdb/physionet.org

# Generate Recurrence Plots and Datasets

## Import libraries

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import random
import gc #garbage collector

from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix

from fastai.vision.all import *

import torch
from torch import nn
import torch.nn.init as init

from config_local import RECORDINGS_DIR, THRESHOLD_PH
from ctg_utils import balance_files

from compute_metadata import generate_list, save_label_file
from generate_recurrence_images import generate_rp_images_segment, gen_recurrence_params

## Balance Dataset

In [None]:
RECORDINGS_DIR, THRESHOLD_PH

In [None]:
random.seed(123)

balance_files(RECORDINGS_DIR, threshold=THRESHOLD_PH, verbose=True)

## Config
Configure Recurrence Plot Parameters

In [None]:
np.random.seed(1234)
random.seed(1234)

POLICY='late_valid' # 'best_quality', 'early_valid', 'late_valid'
SEG_LENGTH = 15

IMAGES_DIR = '/content/images_rp_{}_{}min'.format(POLICY, SEG_LENGTH)
print(IMAGES_DIR)

CMAP="binary" # "plasma"

rp_params = gen_recurrence_params(dimensions=[2], time_delays=[1], percentages=[1, 3, 10], use_clip_vals=[False])
# rp_params = gen_recurrence_params(dimensions=[2, 3], time_delays=list(range(1,11)), percentages=list(range(1,11)), use_clip_vals=[False])
len(rp_params), rp_params

In [None]:
tfms=[]
size=224
bs=64
workers=4

## Generate Recurrence Plots

In [None]:
# !rm -R '{IMAGES_DIR}'

Making sure images are generated correctly

In [None]:
generate_rp_images_segment(RECORDINGS_DIR, images_dir=IMAGES_DIR, rp_params=rp_params,
                           policy=POLICY, show_signal=False, show_image=True, verbose=True, cmap=CMAP,
                           limit=5, max_seg_min=SEG_LENGTH, n_dec=4)

Generating the images from all randomly selected samples (balanced dataset)

In [None]:
generate_rp_images_segment(RECORDINGS_DIR, images_dir=IMAGES_DIR, rp_params=rp_params,
                           policy=POLICY, show_signal=False, show_image=False, verbose=False,
                           cmap=CMAP, max_seg_min=SEG_LENGTH, n_dec=4)

Checking the amount of images in the folder

In [None]:
!find '{IMAGES_DIR}' -type f -name '*.tif' | wc -l

Checking the size of the image folder

In [None]:
! du -sh '{IMAGES_DIR}'

## Generate Train and Test Label Files

In [None]:
random.seed(1234)

recordings, outcomes, results = generate_list(image_dir=IMAGES_DIR, image_file='rp_images_index.json',
                                              thresh=THRESHOLD_PH, key='pH', verbose=False)

save_label_file(results, image_dir=IMAGES_DIR, csv_file='labels.csv')

# Checking the first 5 entries
recordings[0:5]

Ensure there’s no data leakage by considering the prefixes

In [None]:
# Load the labels file
df = pd.read_csv(f'{IMAGES_DIR}/labels.csv')

# Extract the prefix before the first '_'
df["prefix"] = df["fname"].apply(lambda x: x.split("_")[0])

# Get the unique prefixes
unique_prefixes = df["prefix"].unique()

# Create the folds ensuring that the same prefix stays in the same set
kf = KFold(n_splits=10, shuffle=True, random_state=42)
folds = {}

# Distribute the prefixes across the folds
for fold, (train_idx, test_idx) in enumerate(kf.split(unique_prefixes)):
    train_prefixes = unique_prefixes[train_idx]
    test_prefixes = unique_prefixes[test_idx]

    # Select the samples corresponding to the chosen prefixes
    train_set = df[df["prefix"].isin(train_prefixes)]
    test_set = df[df["prefix"].isin(test_prefixes)]

    folds[fold] = {"train": train_set, "test": test_set}

# # Display the size of each set in the first fold as an example
folds[0]["train"].shape, folds[0]["test"].shape, folds[0]["train"].head(), folds[0]["test"].head()

In [None]:
# Function to display the training parameters
def print_training_params(learn):
    print("Training parameters in Fastai:\n")
    print(f"- Optimization function: {learn.opt_func.__name__}")
    print(f"- Learning rate: {learn.lr}")
    print(f"- Loss function: {learn.loss_func}\n")

##Convolutional Neural Network [CNN] - RP

### Models

####  Original Model (64x64)

Model described in the article "Computer-Aided Diagnosis System of Fetal Hypoxia Incorporating Recurrence Plot With Convolutional Neural Network"

In [None]:
class ModelArticle(nn.Module):
    def __init__(self):
        super(ModelArticle, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3, 8, 5),                 # 60 × 60 × 8
            nn.ReLU(),
            nn.AvgPool2d(3, stride=2),          # 29 × 29 × 8
            nn.Conv2d(8, 8, 5),                 # 25 × 25 × 8
            nn.ReLU(),
            nn.AvgPool2d(3, stride=2),          # 12 × 12 × 8
            nn.Flatten(),                       # 1152
            nn.Linear(1152, 144),
            nn.ReLU(),
            nn.Dropout(p=0.8),
            nn.Linear(144, 2)
        )

    def forward(self, x):
        return self.model(x)


def get_dataloaders(fold, path_to_images, bs=32):
    train_df = folds[fold]["train"]
    dls = ImageDataLoaders.from_df(
        train_df,
        path=path_to_images,
        fn_col='fname',
        label_col='label',
        valid_pct=0.2,
        item_tfms=Resize(64),
        bs=bs
    )
    return dls

####  Our Model (224x224)

Adapted input to 224x224, since is a common dimension for pretrained models

In [None]:
class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Flatten(),
            nn.Linear(14 * 14 * 128, 512),
            nn.ReLU(),
            nn.Dropout(p=0.5),

            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Dropout(p=0.5),

            nn.Linear(128, 2)
        )

    def forward(self, x):
        return self.model(x)


def get_dataloaders(fold, path_to_images, bs=32):
    train_df = folds[fold]["train"]
    dls = ImageDataLoaders.from_df(
        train_df,
        path=path_to_images,
        fn_col='fname',
        label_col='label',
        valid_pct=0.2,
        item_tfms=Resize(224),
        bs=bs
    )
    return dls

####RESNET18

In [None]:
class ResNet18(nn.Module):
    def __init__(self):
        super(ResNet18, self).__init__()
        self.model = resnet18(pretrained=True)
        in_features = self.model.fc.in_features
        self.model.fc = nn.Linear(in_features, 2)  # Ajustando para saída binária

    def forward(self, x):
        return self.model(x)

def get_dataloaders(fold, path_to_images, bs=32):
    train_df = folds[fold]["train"]
    dls = ImageDataLoaders.from_df(
        train_df,
        path=path_to_images,
        fn_col='fname',
        label_col='label',
        valid_pct=0.2,
        item_tfms=Resize(224),
        bs=bs
    )
    return dls

####EfficientNet B0

In [None]:
class EfficientNetB0(nn.Module):
    def __init__(self):
        super(EfficientNetB0, self).__init__()
        self.model = models.efficientnet_b0(pretrained=True)
        in_features = self.model.classifier[1].in_features
        self.model.classifier[1] = nn.Linear(in_features, 2)  # Saída binária

    def forward(self, x):
        return self.model(x)

def get_dataloaders(fold, path_to_images, bs=32):
    train_df = folds[fold]["train"]
    dls = ImageDataLoaders.from_df(
        train_df,
        path=path_to_images,
        fn_col='fname',
        label_col='label',
        valid_pct=0.2,
        item_tfms=Resize(224),
        bs=bs
    )
    return dls


### Training and Testing

In [None]:
# Dictionary to store metrics
metrics = {"accuracy": [], "precision": [], "recall": [], "specificity": [], "f1_score": []}

# Training and testing loop
for fold in range(len(folds)):
    dls = get_dataloaders(fold, IMAGES_DIR, bs)

    # Reset the model for each fold
    model = ModelArticle() # ModelArticle, MyModel, ResNet18, EfficientNetB0

    learn = Learner(dls, model, loss_func=CrossEntropyLossFlat(), metrics=[accuracy])
    learn.fine_tune(10)

    # Evaluation on the test set
    test_df = folds[fold]["test"]
    test_dl = dls.test_dl(test_df["fname"].apply(lambda x: Path(IMAGES_DIR)/x))
    preds, _ = learn.get_preds(dl=test_dl)

    # Convert predictions to binary labels
    pred_labels = preds.argmax(dim=1).cpu().numpy()
    true_labels = test_df["label"].values

    # Calculate the confusion matrix
    cm = confusion_matrix(true_labels, pred_labels)
    tn, fp, fn, tp = cm.ravel()

    # Manually calculate metrics
    acc = (tp + tn) / (tp + tn + fp + fn)
    error_rate = 1 - acc
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    # Store metrics
    metrics["accuracy"].append(acc)
    metrics["precision"].append(precision)
    metrics["recall"].append(recall)
    metrics["specificity"].append(specificity)
    metrics["f1_score"].append(f1_score)

    # Display metrics for the fold
    print(f"Fold {fold}:")
    print(f"  Accuracy: {acc:.4f}")
    print(f"  Precision: {precision:.4f}")
    print(f"  Sensitivity (Recall): {recall:.4f}")
    print(f"  Specificity: {specificity:.4f}")
    print(f"  F1-score: {f1_score:.4f}")
    print("-" * 40)

    if fold == len(folds) - 1:
        print_training_params(learn)
        learn.show_results(max_n=6)

    # Free the model and GPU memory (if using CUDA)
    del learn           # Delete the Learner object
    del model         # Delete the model
    torch.cuda.empty_cache()  # Free GPU memory, if necessary

    # Garbage collect to free unreferenced objects
    gc.collect()

# Calculate average performance
total_metrics = {key: np.mean(values) for key, values in metrics.items()}

print("-" * 40)
print("-> AVERAGE MODEL PERFORMANCE: \n")
for key, value in total_metrics.items():
    metric_name = key.replace('_', ' ').capitalize()
    print(f"Avg. {metric_name}: {value:.4f}")

print('\n1 - normal (pH >= 7.15)')
print('0 - hypoxia (pH < 7.15)')

#Poincaré Plot

In [None]:
from generate_poincare_images import generate_pc_images_segment

In [None]:
np.random.seed(1234)
random.seed(1234)

POLICY='early_valid' # 'best_quality', 'early_valid', 'late_valid'
SEG_LENGTH = 30

CMAP=None

# pc_lags = [1]
pc_lags = list(range(1, 11)) # 10 lags


IMAGES_DIR = '/content/images_pc_{}_{}min'.format(POLICY, SEG_LENGTH)
print(IMAGES_DIR)

In [None]:
tfms=[]
size=224
bs=64
workers=4

Making sure images are generated correctly

In [None]:
generate_pc_images_segment(RECORDINGS_DIR, images_dir=IMAGES_DIR, pc_lags=pc_lags,
                           policy=POLICY, show_signal=False, show_image=True, verbose=False, cmap=CMAP,
                           limit=1, max_seg_min=SEG_LENGTH, n_dec=1)

Generating the images from all randomly selected samples (balanced dataset)

In [None]:
generate_pc_images_segment(RECORDINGS_DIR, images_dir=IMAGES_DIR, pc_lags=pc_lags,
                           policy=POLICY, show_signal=False, show_image=False, verbose=False,
                           cmap=CMAP, max_seg_min=SEG_LENGTH, n_dec=4)

Checking the amount of images in the folder

In [None]:
!find '{IMAGES_DIR}' -type f -name '*.tif' | wc -l

Checking the size of the image folder

In [None]:
!du -sh '{IMAGES_DIR}'

## Generate Train and Test Label Files

In [None]:
random.seed(1234)

recordings, outcomes, results = generate_list(image_dir=IMAGES_DIR, image_file='pc_images_index.json',
                                              thresh=7.15, key='pH', verbose=True)

save_label_file(results, image_dir=IMAGES_DIR, csv_file='labels.csv')

# Checking the first 5 entries
recordings[0:5]

In [None]:
# Load the labels file
df = pd.read_csv(f'{IMAGES_DIR}/labels.csv')

# Extract the prefix before the first '_'
df["prefix"] = df["fname"].apply(lambda x: x.split("_")[0])

# Get the unique prefixes
unique_prefixes = df["prefix"].unique()

# Create the folds ensuring that the same prefix stays in the same set
kf = KFold(n_splits=10, shuffle=True, random_state=42)
folds = {}

# Distribute the prefixes across the folds
for fold, (train_idx, test_idx) in enumerate(kf.split(unique_prefixes)):
    train_prefixes = unique_prefixes[train_idx]
    test_prefixes = unique_prefixes[test_idx]

    # Select the samples corresponding to the chosen prefixes
    train_set = df[df["prefix"].isin(train_prefixes)]
    test_set = df[df["prefix"].isin(test_prefixes)]

    folds[fold] = {"train": train_set, "test": test_set}

# # Display the size of each set in the first fold as an example
folds[0]["train"].shape, folds[0]["test"].shape, folds[0]["train"].head(), folds[0]["test"].head()

##Convolutional Neural Network [CNN] - PC

### Our Model (224x224)

In [None]:
class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Flatten(),
            nn.Linear(14 * 14 * 128, 512),
            nn.ReLU(),
            nn.Dropout(p=0.5),

            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Dropout(p=0.5),

            nn.Linear(128, 2)
        )

    def forward(self, x):
        return self.model(x)

def get_dataloaders(fold, path_to_images, bs=32):
    """Cria ImageDataLoaders a partir do fold especificado."""
    train_df = folds[fold]["train"]
    dls = ImageDataLoaders.from_df(
        train_df,
        path=path_to_images,
        fn_col='fname',
        label_col='label',
        valid_pct=0.2,
        item_tfms=Resize(224),
        bs=bs
    )
    return dls

### Training and Testing

In [None]:
# Dictionary to store metrics
metrics = {"accuracy": [], "precision": [], "recall": [], "specificity": [], "f1_score": []}

# Training and testing loop
for fold in range(len(folds)):
    dls = get_dataloaders(fold, IMAGES_DIR, bs)

    # Reset the model for each fold
    model = MyModel() # ModelArticle, MyModel, ResNet18, EfficientNetB0

    learn = Learner(dls, model, loss_func=CrossEntropyLossFlat(), metrics=[accuracy])
    learn.fine_tune(3)

    # Evaluation on the test set
    test_df = folds[fold]["test"]
    test_dl = dls.test_dl(test_df["fname"].apply(lambda x: Path(IMAGES_DIR)/x))
    preds, _ = learn.get_preds(dl=test_dl)

    # Convert predictions to binary labels
    pred_labels = preds.argmax(dim=1).cpu().numpy()
    true_labels = test_df["label"].values

    # Calculate the confusion matrix
    cm = confusion_matrix(true_labels, pred_labels)
    tn, fp, fn, tp = cm.ravel()

    # Manually calculate metrics
    acc = (tp + tn) / (tp + tn + fp + fn)
    error_rate = 1 - acc
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    # Store metrics
    metrics["accuracy"].append(acc)
    metrics["precision"].append(precision)
    metrics["recall"].append(recall)
    metrics["specificity"].append(specificity)
    metrics["f1_score"].append(f1_score)

    # Display metrics for the fold
    print(f"Fold {fold}:")
    print(f"  Accuracy: {acc:.4f}")
    print(f"  Precision: {precision:.4f}")
    print(f"  Sensitivity (Recall): {recall:.4f}")
    print(f"  Specificity: {specificity:.4f}")
    print(f"  F1-score: {f1_score:.4f}")
    print("-" * 40)

    if fold == len(folds) - 1:
        print_training_params(learn)
        learn.show_results(max_n=6)

    # Free the model and GPU memory (if using CUDA)
    del learn           # Delete the Learner object
    del model         # Delete the model
    torch.cuda.empty_cache()  # Free GPU memory, if necessary

    # Garbage collect to free unreferenced objects
    gc.collect()

# Calculate average performance
total_metrics = {key: np.mean(values) for key, values in metrics.items()}

print("-" * 40)
print("-> AVERAGE MODEL PERFORMANCE: \n")
for key, value in total_metrics.items():
    metric_name = key.replace('_', ' ').capitalize()
    print(f"Avg. {metric_name}: {value:.4f}")

print('\n1 - normal (pH >= 7.15)')
print('0 - hypoxia (pH < 7.15)')

#Additional info

In [None]:
!python --version

In [None]:
import fastai; fastai.__version__