In [None]:
from pathlib import Path
import pandas as pd
from pandarallel import pandarallel
import wfdb
from wfdb.io import Record
from typing import List
import shutil
import datetime
from datetime import timedelta
import numpy as np

In [None]:
pandarallel.initialize(progress_bar=True)

# Create folder for dataset
dataset_path = Path("../ctg/dataset/ctgs")
dataset_path.mkdir(exist_ok=True, parents=True)

In [None]:
# Read the metadata file
dataset_records = pd.read_csv("../ctg/ctu-chb/RECORDS.csv")
dataset_records.head()

In [None]:
# Collect all outcomes into a single dataframe
def read_associated_outcome(row):
    record_no: int = int(row["record"])
    record_comments: List[str] = wfdb.rdheader(f"../ctg/ctu-chb/{record_no}").comments
    try:
        ph = float(record_comments[2].split(" ")[-1])
        bcef = float(record_comments[3].split(" ")[-1])
        pco2 = float(record_comments[4].split(" ")[-1])
        be = float(record_comments[5].split(" ")[-1])
        apgar1 = int(record_comments[6].split(" ")[-1])
        apgar5 = int(record_comments[7].split(" ")[-1])
        fetus_age_weeks = int(record_comments[16].split(" ")[-1])
        fetus_weight_grams = float(record_comments[17].split(" ")[-1])
        fetus_sex = int(record_comments[18].split(" ")[-1])
        mother_age_years = int(record_comments[20].split(" ")[-1])
        mother_gravidity = float(record_comments[21].split(" ")[-1])
        mother_parity = int(record_comments[22].split(" ")[-1])
        mother_diabetes = int(record_comments[23].split(" ")[-1])
        mother_hypertension = int(record_comments[24].split(" ")[-1])
        mother_preeclampsia = int(record_comments[25].split(" ")[-1])
        mother_praecox = int(record_comments[26].split(" ")[-1])
        mother_pyrexia = int(record_comments[27].split(" ")[-1])
        mother_meconim = int(record_comments[28].split(" ")[-1])
    except BaseException as e:
        print(record_no)
        print(e)
        raise ValueError()

    return {
        "record_no": record_no,
        "ph": ph,
        "bcef": bcef,
        "pco2": pco2,
        "be": be,
        "apgar1": apgar1,
        "apgar5": apgar5,
        "fetus_age_weeks": fetus_age_weeks,
        "fetus_weight_grams": fetus_weight_grams,
        "fetus_sex": fetus_sex,
        "mother_age_years": mother_age_years,
        "mother_gravidity": mother_gravidity,
        "mother_parity": mother_parity,
        "mother_diabetes": mother_diabetes,
        "mother_hypertension": mother_hypertension,
        "mother_preeclampsia": mother_preeclampsia,
        "mother_praecox": mother_praecox,
        "mother_pyrexia": mother_pyrexia,
        "mother_meconim": mother_meconim,
    }


outcome_df = dataset_records.apply(read_associated_outcome, axis=1)
outcome_df = pd.DataFrame.from_records(outcome_df.to_list())
outcome_df.to_parquet("../ctg/dataset/outcome.parquet")
outcome_df.head()

In [None]:
# Convert each record to parquet
def convert_record_to_parquet(row):
    record_no: int = int(row["record_no"])
    record: Record = wfdb.rdrecord(f"../ctg/ctu-chb/{record_no}")
    record_df = record.to_dataframe()
    record_df = record_df[record_df["FHR"] > 0]
    record_df = record_df[record_df["FHR"] < 250]
    record_df.to_parquet(f"../ctg/dataset/ctgs/{record_no}.parquet")


outcome_df.apply(convert_record_to_parquet, axis=1)

In [None]:
# Add CTG length to outcomes
outcome_df = pd.read_parquet("../ctg/dataset/outcome.parquet")
outcome_df.head()


# Add length of CTG
def convert_record_to_parquet(row):
    if not "no_of_points" in row:
        record_no: int = int(row["record_no"])
        record_df = pd.read_parquet(f"../ctg/dataset/ctgs/{record_no}.parquet")
        row["no_of_points"] = record_df.shape[0]
    return row


outcome_df = outcome_df.apply(convert_record_to_parquet, axis=1)
outcome_df.to_parquet("../ctg/dataset/outcome.parquet")

In [None]:
# Constants
WINDOW_LENGTH = timedelta(minutes=10)
WINDOW_STRIDE = timedelta(minutes=10)
PH_THRESHOLD = 7.05
MAX_DATETIME_DIFFERENCE_CTG = timedelta(days=270)
CALCULATED_Z_SCORE_PARAMS = [
    1016201603,
    138.06500900606565,
    324618849082.29016,
    1016201462,
    21.75170609426872,
    606363058472.4089,
]
Z_SCORE_OFFSET = 2
MIN_MAX_PARAMS = [0, 240, 0, 127]

In [None]:
# Find Segments in CTGs
from typing import List


def find_segments_in_ctg(
    ctg: pd.DataFrame, threshold: timedelta = timedelta(seconds=5)
):
    maskf: List[bool] = (ctg.index.to_series().diff() > threshold).values.tolist()
    maskb = maskf[1:]
    maskb.append(True)
    mask = [(x or y) for (x, y) in zip(maskf, maskb)]
    mask[0] = True
    borders = ctg[mask].index

    keep_borders = []

    for idx, border in enumerate(borders):
        keep_border = True
        if idx == 0:
            points_in_prev_segment = 0
        points_in_prev_segment = ctg[borders[idx - 1] : border].shape[0] - 2
        points_in_next_segment = 0
        if idx != len(borders) - 1:
            points_in_next_segment = ctg[border : borders[idx + 1]].shape[0] - 2
        if (points_in_prev_segment < 1) and (points_in_next_segment < 1):
            keep_border = False
        keep_borders.append(keep_border)

    borders = borders[keep_borders]
    return borders

In [None]:
# Add segment information to outcomes
import traceback

MAX_ALLOWED_GAP_IN_SEGMENT = timedelta(seconds=5)


def add_segment_information_to_outcome_file(
    max_allowed_gap_in_segment: timedelta = timedelta(minutes=5),
):
    data_dir = Path("../ctg/dataset/")
    outcomes = pd.read_parquet(data_dir / "outcome.parquet")
    outcomes = outcomes.drop(
        ["no_of_segments", "max_segment_length", "segments_information", "keep"],
        errors="ignore",
    )

    def get_segment_information_for_ctg(row):
        try:
            identifier: str = int(row["record_no"])
            ctg = pd.read_parquet(data_dir / "ctgs" / f"{identifier}.parquet")
            borders = find_segments_in_ctg(
                ctg=ctg, threshold=max_allowed_gap_in_segment
            )
            if len(borders) > 0:
                total_segments = int(len(borders) / 2)
                lengths = [
                    (borders[segment * 2 + 1] - borders[segment * 2])
                    for segment in range(0, total_segments)
                ]
                segments_information = [
                    {
                        "start": borders[segment * 2],
                        "end": borders[segment * 2 + 1],
                        "length": lengths[segment],
                    }
                    for segment in range(0, total_segments)
                ]

                row["no_of_segments"] = total_segments
                row["max_segment_length"] = max(lengths)
                row["segments_information"] = {"segments": segments_information}
                row["keep"] = True
            else:
                row["keep"] = False
            return row
        except BaseException as e:
            print(identifier)
            print(e)
            traceback.print_exc()
            raise ValueError()

    outcomes: pd.DataFrame = outcomes.parallel_apply(
        get_segment_information_for_ctg, axis=1
    )
    outcomes = outcomes[outcomes["keep"]]
    outcomes = outcomes.drop(["keep"], axis=1)
    outcomes.to_parquet(data_dir / "outcome_segment.parquet")


add_segment_information_to_outcome_file(
    max_allowed_gap_in_segment=MAX_ALLOWED_GAP_IN_SEGMENT
)

In [None]:
# Preprocess CTG
# 1. Load the CTG
# 2. Extract each segment
# 3. Preprocess segment: (Resample, Interpolate, Normalize)
# 4. Save segment
# 5. Delete Main CTG


def preprocess_and_split_ctgs():
    data_dir = Path("../ctg/dataset/")
    outcome = pd.read_parquet(data_dir / "outcome_segment.parquet")
    filtered_ctg_dir = data_dir / "ctgs"
    filtered_ctg_dir.mkdir(parents=True, exist_ok=True)

    def split_ctg_and_preprocess(row):
        segment_info_dict = row["segments_information"]
        segments = segment_info_dict["segments"]
        identifier: str = int(row["record_no"])
        ctg_filepath = data_dir / "ctgs" / f"{identifier}.parquet"
        ctg = pd.read_parquet(ctg_filepath)
        filtered_segments = []

        rejected_segments = 0
        for idx, segment in enumerate(segments):
            segment_start = segment["start"]
            segment_end = segment["end"]
            orig_ctg_segment = ctg[segment_start:segment_end]

            # Resample
            ctg_segment = orig_ctg_segment.resample("0.25S").agg(
                {"FHR": np.mean, "UC": np.mean}
            )
            fhr_nan_percentage = ctg_segment["FHR"].isna().sum() / ctg_segment.shape[0]
            ctg_segment_length = ctg_segment.index.max() - ctg_segment.index.min()

            if ctg_segment_length >= WINDOW_LENGTH:
                # Interpolate
                ctg_segment["FHR"] = ctg_segment["FHR"].interpolate(method="linear")
                ctg_segment["UC"] = ctg_segment["UC"].interpolate(method="linear")

                # Normalize
                ctg_segment["FHR"] = (ctg_segment["FHR"] - MIN_MAX_PARAMS[0]) / (
                    MIN_MAX_PARAMS[1] - MIN_MAX_PARAMS[0]
                )
                ctg_segment["UC"] = (ctg_segment["UC"] - MIN_MAX_PARAMS[2]) / (
                    MIN_MAX_PARAMS[3] - MIN_MAX_PARAMS[2]
                )

                ctg_segment.to_parquet(
                    filtered_ctg_dir / f"{identifier}_{idx-rejected_segments}.parquet"
                )
                filtered_segments.append(segment)
            else:
                rejected_segments = rejected_segments + 1

        ctg_filepath.unlink()

        segment_info_dict["segments"] = filtered_segments
        row["segments_information"] = segment_info_dict
        row["preprocessed_no_of_segments"] = len(filtered_segments)
        row["orig_no_of_segments"] = len(segments)
        return row

    #     random_rows = outcome.sample(n=1000)
    #     result = random_rows.parallel_apply(split_ctg_and_preprocess,axis=1)

    outcome = outcome.parallel_apply(split_ctg_and_preprocess, axis=1)
    outcome.to_parquet(data_dir / "outcomes_preprocessed.parquet")


preprocess_and_split_ctgs()

In [None]:
import datetime


def expand_outcome_normal():
    data_dir = Path("../ctg/dataset/")
    outcome = pd.read_parquet(data_dir / "outcomes_preprocessed.parquet")
    print(outcome.shape)

    def expand_outcome_row(row):
        segment_info_dict = row["segments_information"]
        segments = segment_info_dict["segments"]
        identifier: str = int(row["record_no"])
        fetus_age_weeks: int = row["fetus_age_weeks"]
        pH = row["ph"]

        # Create list to store all the start and end value for rows
        start_rows_list = []
        end_rows_list = []
        no_of_points_list = []
        segment_number_list = []
        window_number_list = []

        for idx, segment in enumerate(segments):
            segment_start = segment["start"]
            segment_end = segment["end"]
            window_roll = 0
            window_start = segment_start
            window_end = window_start + WINDOW_LENGTH
            ctg_segment = pd.read_parquet(
                data_dir / "ctgs" / f"{identifier}_{idx}.parquet"
            )

            while window_end <= segment_end:
                # Create outcome expanded row
                start_rows_list.append(window_start)
                end_rows_list.append(window_end)
                no_of_points_list.append(ctg_segment[window_start:window_end].shape[0])
                segment_number_list.append(idx)
                window_number_list.append((window_roll + 1))

                # Move window
                window_roll = window_roll + 1
                window_start = segment_start + (window_roll * WINDOW_STRIDE)
                window_end = window_start + WINDOW_LENGTH

        id_list = [identifier] * len(start_rows_list)
        fetus_age_weeks_list = [fetus_age_weeks] * len(start_rows_list)
        ph_list = [pH] * len(start_rows_list)
        return {
            "identifier": id_list,
            "start": start_rows_list,
            "end": end_rows_list,
            "fetus_age_weeks": fetus_age_weeks_list,
            "ns_art_ph": ph_list,
            "no_of_points": no_of_points_list,
            "segment_number": segment_number_list,
            "window_number": window_number_list,
        }

    outcome_expanded = outcome.parallel_apply(expand_outcome_row, axis=1)

    id_list = []
    start_rows_list = []
    end_rows_list = []
    fetus_age_weeks_list = []
    ph_list = []
    no_of_points_list = []
    segment_number_list = []
    window_number_list = []

    for result in outcome_expanded:
        id_list.extend(result["identifier"])
        start_rows_list.extend(result["start"])
        end_rows_list.extend(result["end"])
        fetus_age_weeks_list.extend(result["fetus_age_weeks"])
        ph_list.extend(result["ns_art_ph"])
        no_of_points_list.extend(result["no_of_points"])
        segment_number_list.extend(result["segment_number"])
        window_number_list.extend(result["window_number"])

    expanded_dict = {
        "identifier": id_list,
        "start": start_rows_list,
        "end": end_rows_list,
        "fetus_age_weeks": fetus_age_weeks_list,
        "ns_art_ph": ph_list,
        "no_of_points": no_of_points_list,
        "segment_number": segment_number_list,
        "window_number": window_number_list,
    }

    expanded_df = pd.DataFrame.from_dict(expanded_dict)
    print(expanded_df.shape)
    expanded_df.to_parquet(data_dir / "outcome_expanded.parquet")


expand_outcome_normal()

In [None]:
# Read expanded outcome
data_dir = Path("../ctg/dataset/")
outcome = pd.read_parquet(data_dir / "outcome_expanded.parquet")
print(outcome.shape)