# miRNA Analysis v2

This notebook performs miRNA analysis with updated methods and additional steps as outlined in the miRNA-Manuscript-Revision.md file.

In [None]:
# Import Required Libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import roc_curve, auc
import os


In [None]:
import os
import pandas as pd


import numpy as np


from sklearn.model_selection import train_test_split


from sklearn.preprocessing import RobustScaler


import pickle


# 1. Load the raw data


data_path = (
    "data/raw/Kasim2024-son-veri.csv"  # Correct path based on repository structure
)


df = pd.read_csv(data_path)


# 2. Check for missing data


missing_data = df.isnull().sum()


print(f"Missing data per column:\n{missing_data}")  # Report findings


# Handle missing data (if any). Examples (choose one if needed):


if missing_data.sum() > 0:

    # Mean imputation:

    # df.fillna(df.mean(), inplace=True)

    # Median imputation:

    # df.fillna(df.median(), inplace=True)

    print("Missing values imputed using [chosen method].")  # Explain chosen method


else:

    print("No missing values found.")


# 3. Convert 'SEX' and 'GROUP'


df["SEX"] = df["SEX"].map({"F": 0, "M": 1}).astype("category")  # Female: 0, Male: 1


df["GROUP"] = df["GROUP"].astype("category")


print(f"Data types after conversion:\n{df.dtypes}")


# 4. Robust scaling normalization (on the entire dataset initially)


targets = [
    "mean_mir146a",
    "mean_mir146b",
    "mean_mir155",
    "mean_mir203",
    "mean_mir223",
    "mean_mir381p",
]


scaler = RobustScaler()


df[targets] = scaler.fit_transform(df[targets])


# 5. Split data into training and testing sets (after normalization)


X = df[
    targets
    + [
        "SEX",
        "AGE",
        "plaque_index",
        "gingival_index",
        "pocket_depth",
        "bleeding_on_probing",
        "number_of_missing_teeth",
    ]
]


y = df["GROUP"]


# Splitting into training and testing sets


X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)  # Corrected to stratify by your group label


y_train_numeric = y_train.cat.codes


y_test_numeric = y_test.cat.codes


# Scaling the training data and saving scaling parameters for testing set


X_train_scaled = X_train.copy()


X_test_scaled = X_test.copy()


# Re-initializing the scaler to ensure it's not contaminated by test data


scaler = RobustScaler()


X_train_scaled[targets] = scaler.fit_transform(X_train[targets])


X_test_scaled[targets] = scaler.transform(X_test[targets])  # Apply to test data


# Ensure the 'src' directory exists
os.makedirs("src", exist_ok=True)


# Saving the scaling parameters so that if we want to apply the same procedure to unseen data, we can load the scaler and transform the data directly using this pickle file.


with open("src/scaler.pkl", "wb") as f:

    pickle.dump(scaler, f)


# Create directories if they don't exist


os.makedirs("data/processed", exist_ok=True)


os.makedirs("results/main", exist_ok=True)


# Save preprocessed data (both raw and scaled training/test sets)


df.to_csv("data/processed/normalized_mirna_data.csv", index=False)


# Save raw and scaled train and test datasets into their respective directories.


X_train.to_csv("data/processed/X_train_raw.csv", index=False)


X_test.to_csv("data/processed/X_test_raw.csv", index=False)


y_train.to_csv("data/processed/y_train.csv", index=False)


y_test.to_csv("data/processed/y_test.csv", index=False)


X_train_scaled.to_csv("data/processed/X_train_scaled.csv", index=False)


X_test_scaled.to_csv("data/processed/X_test_scaled.csv", index=False)


# Descriptive Statistics


X_train.describe().to_csv("results/main/descriptive_stats_raw_train.csv")


X_test.describe().to_csv("results/main/descriptive_stats_raw_test.csv")


X_train_scaled.describe().to_csv("results/main/descriptive_stats_scaled_train.csv")


X_test_scaled.describe().to_csv("results/main/descriptive_stats_scaled_test.csv")


Missing data per column:
GROUP                      0
SEX                        0
AGE                        0
plaque_index               0
gingival_index             0
pocket_depth               0
bleeding_on_probing        0
number_of_missing_teeth    0
mean_mir146a               0
mean_mir146b               0
mean_mir155                0
mean_mir203                0
mean_mir223                0
mean_mir381p               0
mean_GAPDH                 0
dtype: int64
No missing values found.
Data types after conversion:
GROUP                      category
SEX                        category
AGE                           int64
plaque_index                float64
gingival_index              float64
pocket_depth                float64
bleeding_on_probing         float64
number_of_missing_teeth       int64
mean_mir146a                float64
mean_mir146b                float64
mean_mir155                 float64
mean_mir203                 float64
mean_mir223                 float64
mean_

In [None]:
import os
import pandas as pd


import scipy.stats as stats


from sklearn.preprocessing import RobustScaler
import matplotlib.pyplot as plt


import seaborn as sns


from statsmodels.stats.multicomp import pairwise_tukeyhsd


# Load the data from the CSV file into a pandas DataFrame


data_path = r"data/raw/Kasim2024-son-veri.csv"


df = pd.read_csv(data_path)


# Check if 'GROUP' column exists
if "GROUP" not in df.columns:
    raise KeyError("'GROUP' column is missing from the DataFrame")

# 1. ANOVA and post hoc tests on GAPDH
groups = df["GROUP"].unique()


gapdh_data = [df["mean_GAPDH"][df["GROUP"] == group] for group in groups]


fvalue, pvalue = stats.f_oneway(*gapdh_data)


print(
    f"ANOVA results for GAPDH across groups: F-value = {fvalue:.4f}, P-value = {pvalue:.4f}"
)


# Perform Levene's test for homogeneity of variances
levene_stat, levene_p = stats.levene(*gapdh_data)


print(f"Levene's test results: Statistic = {levene_stat:.4f}, P-value = {levene_p:.4f}")


# Perform post hoc Tukey HSD test if ANOVA is significant
if pvalue < 0.05:

    tukey_result = pairwise_tukeyhsd(df["mean_GAPDH"], df["GROUP"], alpha=0.05)

    print("Tukey HSD post hoc test results for GAPDH:")

    print(tukey_result)


else:

    print("ANOVA not significant; no post hoc test performed.")


# Calculate Pearson correlations between GAPDH Ct and clinical parameters


clinical_parameters = [
    "plaque_index",
    "gingival_index",
    "pocket_depth",
    "bleeding_on_probing",
    "number_of_missing_teeth",
]


correlations = {}
for parameter in clinical_parameters:
    correlation, p_value = stats.pearsonr(df["mean_GAPDH"], df[parameter])

    correlations[parameter] = {"correlation": correlation, "p_value": p_value}

    print(
        f"Pearson correlation between GAPDH and {parameter}: {correlation:.4f}, P-value = {p_value:.4f}"
    )


# Save results and justification for raw Ct use


gapdh_results = pd.DataFrame({"ANOVA_F": [fvalue], "ANOVA_p": [pvalue]})


gapdh_results.to_csv("results/supplementary/gapdh_analysis_results.csv", index=False)
correlations_df = pd.DataFrame(correlations).T


correlations_df.to_csv("results/supplementary/gapdh_correlations.csv", index=True)


justification_text = f"""




Justification for Raw Ct Analysis:





Due to the observed significant variability of GAPDH expression across the study groups (ANOVA p-value: {pvalue:.3f}, Tukey HSD results showing significant intergroup differences and effect sizes, Supplementary Table S2), and its correlations with disease severity metrics (Supplementary Figure S1), GAPDH was deemed unsuitable as a reference gene for normalization. Using an unstable reference gene would introduce bias into the analysis, potentially confounding the observed differences in miRNA expression. Therefore, subsequent analyses were performed using raw Ct values. Raw Ct analysis provides a more transparent and unbiased approach when a reliable reference gene cannot be identified. This decision aligns with previous research highlighting the potential for reference gene instability in similar contexts, especially inflammatory conditions such as periodontitis (Dheda et al., 2004; Schmittgen and Zakrajsek, 2000; Li et al., 2019; Peng et al., 2012; Ye et al., 2018). While raw Ct analysis relies on the assumption of similar starting RNA amounts across samples, this limitation is mitigated by our quality control measures during sample processing and robust statistical analyses using non-parametric methods. The variability of GAPDH, as evidenced by its significant correlation with bleeding on probing (Supplementary Figure S1 and Supplementary Table S2), further supports the decision to not use it for normalization.





Raw Ct analysis offers several advantages in this context. First, it avoids introducing additional bias or assumptions associated with alternative normalization methods. Second, it focuses on identifying miRNAs exhibiting relatively *large* fold-change differences between groups, where smaller variations introduced by a potentially unstable reference gene are less likely to significantly impact the overall conclusions. Third, the use of robust statistical methods, namely ANOVA followed by post hoc tests with Benjamini-Hochberg FDR correction for multiple comparisons and effect size calculations (Cohen's d), ensures rigorous comparisons of raw Ct values between groups. Despite its limitations, the transparency of raw Ct analysis combined with our rigorous statistical approach and the specific context of our study, makes it a suitable and reliable approach for identifying candidate miRNA biomarkers with altered expression in periodontal disease. If suitable alternative reference genes or normalization methods had been identified, these would have been used instead of the raw Ct approach.
"""


with open("results/supplementary/raw_ct_justification.txt", "w") as f:

    f.write(justification_text)


ANOVA results for GAPDH across groups: F-value = 190.1558, P-value = 0.0000
Levene's test results: Statistic = 4.5712, P-value = 0.0125
Tukey HSD post hoc test results for GAPDH:
Multiple Comparison of Means - Tukey HSD, FWER=0.05 
group1 group2 meandiff p-adj   lower   upper  reject
----------------------------------------------------
     G      P  -4.5173    0.0 -5.1983 -3.8363   True
     G      S    0.587 0.1056 -0.0939   1.268  False
     P      S   5.1044    0.0  4.4234  5.7853   True
----------------------------------------------------
Pearson correlation between GAPDH and plaque_index: -0.6614, P-value = 0.0000
Pearson correlation between GAPDH and gingival_index: -0.6050, P-value = 0.0000
Pearson correlation between GAPDH and pocket_depth: -0.8144, P-value = 0.0000
Pearson correlation between GAPDH and bleeding_on_probing: -0.7548, P-value = 0.0000
Pearson correlation between GAPDH and number_of_missing_teeth: -0.2378, P-value = 0.0132


In [None]:
import os
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import seaborn as sns

# Constants for file paths
DATA_PATH = "data/processed"
RESULTS_PATH = "results/supplementary"
TARGET_MIRNAS = [
    "mean_mir146a",
    "mean_mir146b",
    "mean_mir155",
    "mean_mir203",
    "mean_mir223",
    "mean_mir381p",
]


def load_data():
    """
    Load and preprocess the scaled training data by adding the 'GROUP' column.

    Returns:
        pd.DataFrame: The scaled training data with the 'GROUP' column.
    """
    try:
        # Load scaled training data
        df_scaled = pd.read_csv(os.path.join(DATA_PATH, "X_train_scaled.csv"))
        y_train = pd.read_csv(os.path.join(DATA_PATH, "y_train.csv"))

        # Ensure 'GROUP' column exists in y_train
        if "GROUP" not in y_train.columns:
            raise KeyError("The 'GROUP' column is missing from y_train.csv")

        # Add 'GROUP' column to df_scaled
        df_scaled["GROUP"] = y_train["GROUP"].values

        return df_scaled
    except FileNotFoundError as e:
        print(f"File not found: {e.filename}")
        raise
    except KeyError as e:
        print(e)
        raise
    except Exception as e:
        print(f"An unexpected error occurred while loading data: {e}")
        raise


def perform_pca(df, targets):
    """
    Perform PCA on the target miRNAs and visualize the results.

    Args:
        df (pd.DataFrame): The scaled training data with 'GROUP'.
        targets (list): List of target miRNA column names.

    Returns:
        PCA: The fitted PCA object.
        np.ndarray: The PCA-transformed data.
        pd.Series: Explained variance ratio for each principal component.
    """
    pca = PCA()
    pca_result = pca.fit_transform(df[targets])
    explained_variance_pca = pd.Series(pca.explained_variance_ratio_)

    # Save explained variance
    save_explained_variance(explained_variance_pca, "pca_explained_variance.csv")

    # Plot scatter plots for each pair of principal components
    plot_scatter_pca(pca_result, df["GROUP"], explained_variance_pca)

    return pca, pca_result, explained_variance_pca


def plot_scatter_pca(pca_result, groups, explained_variance):
    """
    Generate and save pairwise 2D scatter plots of PCA components.

    Args:
        pca_result (np.ndarray): PCA-transformed data.
        groups (pd.Series): The 'GROUP' labels.
        explained_variance (pd.Series): Explained variance ratio for PCA components.
    """
    n_components = pca_result.shape[1]
    for i in range(n_components):
        for j in range(i + 1, n_components):
            plt.figure(figsize=(8, 6))
            sns.scatterplot(
                x=pca_result[:, i],
                y=pca_result[:, j],
                hue=groups,
                palette="viridis",
                edgecolor="k",
                alpha=0.7,
            )
            plt.title(f"PCA Scatter Plot (PC{i+1} vs PC{j+1})")
            plt.xlabel(f"Principal Component {i+1} ({explained_variance[i]:.2%})")
            plt.ylabel(f"Principal Component {j+1} ({explained_variance[j]:.2%})")
            plt.legend(title="Group")
            plt.tight_layout()

            filename = f"pca_scatter_pc{i+1}_vs_pc{j+1}.png"
            filepath = os.path.join(RESULTS_PATH, filename)

            # Save the plot
            save_plot(filepath)
            plt.close()


def perform_lda(df, targets):
    """
    Perform LDA on the target miRNAs and visualize the results.

    Args:
        df (pd.DataFrame): The scaled training data with 'GROUP'.
        targets (list): List of target miRNA column names.

    Returns:
        LDA: The fitted LDA object.
        np.ndarray: The LDA-transformed data.
        pd.Series: Explained variance ratio for each linear discriminant.
    """
    lda = LDA()
    lda_result = lda.fit_transform(df[targets], df["GROUP"])
    explained_variance_lda = pd.Series(lda.explained_variance_ratio_)

    # Save explained variance
    save_explained_variance(explained_variance_lda, "lda_explained_variance.csv")

    # Plot scatter plots for each pair of linear discriminants
    plot_scatter_lda(lda_result, df["GROUP"], explained_variance_lda)

    return lda, lda_result, explained_variance_lda


def plot_scatter_lda(lda_result, groups, explained_variance):
    """
    Generate and save pairwise 2D scatter plots of LDA components.

    Args:
        lda_result (np.ndarray): LDA-transformed data.
        groups (pd.Series): The 'GROUP' labels.
        explained_variance (pd.Series): Explained variance ratio for LDA components.
    """
    n_components = lda_result.shape[1]
    for i in range(n_components):
        for j in range(i + 1, n_components):
            plt.figure(figsize=(8, 6))
            sns.scatterplot(
                x=lda_result[:, i],
                y=lda_result[:, j],
                hue=groups,
                palette="viridis",
                edgecolor="k",
                alpha=0.7,
            )
            plt.title(f"LDA Scatter Plot (LD{i+1} vs LD{j+1})")
            plt.xlabel(f"Linear Discriminant {i+1} ({explained_variance[i]:.2%})")
            plt.ylabel(f"Linear Discriminant {j+1} ({explained_variance[j]:.2%})")
            plt.legend(title="Group")
            plt.tight_layout()

            filename = f"lda_scatter_ld{i+1}_vs_ld{j+1}.png"
            filepath = os.path.join(RESULTS_PATH, filename)

            # Save the plot
            save_plot(filepath)
            plt.close()


def save_explained_variance(explained_variance, filename):
    """
    Save the explained variance ratios to a CSV file.

    Args:
        explained_variance (pd.Series): Explained variance ratios.
        filename (str): The filename for the CSV.
    """
    explained_variance_df = pd.DataFrame(
        {
            "Component": [
                f"PC{i+1}" if "pca" in filename.lower() else f"LD{i+1}"
                for i in range(len(explained_variance))
            ],
            "Explained Variance Ratio": explained_variance,
        }
    )

    filepath = os.path.join(RESULTS_PATH, filename)
    explained_variance_df.to_csv(filepath, index=False)


def save_plot(filepath):
    """
    Save the current matplotlib plot to the specified filepath.

    Args:
        filepath (str): The path where the plot will be saved.
    """
    try:
        plt.savefig(filepath, dpi=300)
    except Exception as e:
        print(f"Failed to save plot {filepath}: {e}")
        raise


def main():
    """
    Main function to perform PCA and LDA analyses.
    """
    # Ensure the results directory exists
    os.makedirs(RESULTS_PATH, exist_ok=True)

    # Load and preprocess data
    df_scaled = load_data()

    # Perform PCA analysis
    pca, pca_result, explained_variance_pca = perform_pca(df_scaled, TARGET_MIRNAS)

    # Perform LDA analysis
    lda, lda_result, explained_variance_lda = perform_lda(df_scaled, TARGET_MIRNAS)


if __name__ == "__main__":
    main()


In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import os

# Load preprocessed data (both raw and scaled training sets)
df_raw = pd.read_csv("data/processed/X_train_raw.csv")
df_scaled = pd.read_csv("data/processed/X_train_scaled.csv")

# Define target miRNAs and clinical parameters
targets = [
    "mean_mir146a",
    "mean_mir146b",
    "mean_mir155",
    "mean_mir203",
    "mean_mir223",
    "mean_mir381p",
]
clinical_params = [
    "AGE",
    "plaque_index",
    "gingival_index",
    "pocket_depth",
    "bleeding_on_probing",
    "number_of_missing_teeth",
]

# 1. Correlation analysis (Raw Data)
correlation_matrix_raw = df_raw[targets + clinical_params].corr(method="pearson")

# Create and save correlation heatmap (raw)
plt.figure(figsize=(12, 8))
sns.heatmap(correlation_matrix_raw, annot=True, cmap="coolwarm", fmt=".2f")
plt.title("Correlation Matrix Heatmap (Raw Ct Values)")
plt.savefig(
    "results/supplementary/correlation_heatmap_raw.png"
)  # Save to supplementary
plt.close()

# 2. Correlation analysis (Scaled Data)
correlation_matrix_scaled = df_scaled[targets + clinical_params].corr(method="pearson")

# Create and save correlation heatmap (scaled)
plt.figure(figsize=(12, 8))
sns.heatmap(correlation_matrix_scaled, annot=True, cmap="coolwarm", fmt=".2f")
plt.title("Correlation Matrix Heatmap (Robustly Scaled Ct Values)")
plt.savefig("results/main/correlation_heatmap_scaled.png")  # Save to main results
plt.close()

# Create directories if they don't exist
os.makedirs("results/main", exist_ok=True)
os.makedirs("results/supplementary", exist_ok=True)

# Save correlation matrices
correlation_matrix_raw.to_csv(
    "results/supplementary/correlation_matrix_raw.csv", index=True
)
correlation_matrix_scaled.to_csv(
    "results/main/correlation_matrix_scaled.csv", index=True
)


In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import roc_curve, auc, precision_recall_curve, roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
import matplotlib.pyplot as plt
import os

# Load preprocessed data
X_train_raw = pd.read_csv("data/processed/X_train_raw.csv")
X_test_raw = pd.read_csv("data/processed/X_test_raw.csv")
y_train = pd.read_csv("data/processed/y_train.csv")
y_test = pd.read_csv("data/processed/y_test.csv")
X_train_scaled = pd.read_csv("data/processed/X_train_scaled.csv")
X_test_scaled = pd.read_csv("data/processed/X_test_scaled.csv")

# Filter datasets to include only 'S' and 'P' classes
train_filter = y_train["GROUP"].isin(["S", "P"])
test_filter = y_test["GROUP"].isin(["S", "P"])

X_train_raw_cp = X_train_raw[train_filter]
X_test_raw_cp = X_test_raw[test_filter]
X_train_scaled_cp = X_train_scaled[train_filter]
X_test_scaled_cp = X_test_scaled[test_filter]

y_train_cp = y_train[train_filter]["GROUP"].map({"S": 0, "P": 1}).reset_index(drop=True)
y_test_cp = y_test[test_filter]["GROUP"].map({"S": 0, "P": 1}).reset_index(drop=True)

# Define target miRNAs
targets = [
    "mean_mir146a",
    "mean_mir146b",
    "mean_mir155",
    "mean_mir203",
    "mean_mir223",
    "mean_mir381p",
]


# ROC analysis function
def roc_analysis(
    X_train, X_test, y_train, y_test, target_name, comparison_name, data_type
):
    model = LogisticRegression(solver="liblinear")
    model.fit(X_train, y_train)
    y_prob = model.predict_proba(X_test)[:, 1]

    fpr, tpr, thresholds = roc_curve(y_test, y_prob)
    roc_auc = auc(fpr, tpr)

    # Youden index for optimal cutoff
    optimal_idx = np.argmax(tpr - fpr)
    optimal_threshold = thresholds[optimal_idx]
    sensitivity = tpr[optimal_idx]
    specificity = 1 - fpr[optimal_idx]

    # Calculate accuracy
    accuracy = np.mean((y_prob >= optimal_threshold) == y_test)

    precision, recall, _ = precision_recall_curve(y_test, y_prob)
    pr_auc = auc(recall, precision)

    plt.figure(figsize=(6, 4))
    plt.plot(fpr, tpr, label=f"{target_name} (AUC = {roc_auc:.2f})")
    plt.plot([0, 1], [0, 1], "k--")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title(f"ROC Curve: {comparison_name} ({data_type})")
    plt.legend(loc="lower right")

    # Create directories if they don't exist
    os.makedirs(
        f'results/{data_type}/{comparison_name.lower().replace(" ", "_")}',
        exist_ok=True,
    )

    plt.savefig(
        f'results/{data_type}/{comparison_name.lower().replace(" ", "_")}/roc_curve_{target_name}.png'
    )
    plt.close()

    return {
        "AUC": roc_auc,
        "Optimal Cutoff": optimal_threshold,
        "Sensitivity": sensitivity,
        "Specificity": specificity,
        "Accuracy": accuracy,
        "PR AUC": pr_auc,
    }


# Perform ROC analysis
roc_results_raw_cp = {}
roc_results_scaled_cp = {}
print("Starting ROC analysis...")
for target in targets:
    # Select the target feature
    X_train_target_raw = X_train_raw_cp[[target]].reset_index(drop=True)
    X_test_target_raw = X_test_raw_cp[[target]].reset_index(drop=True)

    X_train_target_scaled = X_train_scaled_cp[[target]].reset_index(drop=True)
    X_test_target_scaled = X_test_scaled_cp[[target]].reset_index(drop=True)

    # Raw data
    roc_results_raw_cp[target] = roc_analysis(
        X_train_target_raw,
        X_test_target_raw,
        y_train_cp,
        y_test_cp,
        target,
        "Control vs. Periodontitis",
        "raw",
    )

    # Scaled data
    roc_results_scaled_cp[target] = roc_analysis(
        X_train_target_scaled,
        X_test_target_scaled,
        y_train_cp,
        y_test_cp,
        target,
        "Control vs. Periodontitis",
        "scaled",
    )

# Save ROC results
pd.DataFrame(roc_results_raw_cp).transpose().to_csv(
    "results/supplementary/roc_results_raw_cp.csv", index=True
)
pd.DataFrame(roc_results_scaled_cp).transpose().to_csv(
    "results/main/roc_results_scaled_cp.csv", index=True
)


Starting ROC analysis...


In [None]:
# ... (Previous code for ROC analysis - Part 1) ...

# 2 & 3. ROC with Top miRNAs (All Comparisons) and All miRNAs (All Comparisons)

# ROC analysis function (same as before, no changes needed)

# Prepare data for all pairwise comparisons (raw and scaled)
roc_results_raw = {}
roc_results_scaled = {}

for i in range(len(groups)):
    for j in range(i + 1, len(groups)):
        comparison_name = f"{groups[i]} vs {groups[j]}"

        # Subset data for current comparison
        group1_indices = y_train["GROUP"] == groups[i]
        group2_indices = y_train["GROUP"] == groups[j]

        X_train_raw_subset = X_train_raw[group1_indices | group2_indices][
            targets
        ].values
        X_test_raw_subset = X_test_raw[y_test["GROUP"].isin([groups[i], groups[j]])][
            targets
        ].values
        X_train_scaled_subset = X_train_scaled[group1_indices | group2_indices][
            targets
        ].values
        X_test_scaled_subset = X_test_scaled[
            y_test["GROUP"].isin([groups[i], groups[j]])
        ][targets].values

        y_train_subset = (
            y_train[group1_indices | group2_indices]["GROUP"]
            .apply(lambda x: 0 if x == groups[i] else 1)
            .values
        )
        y_test_subset = (
            y_test[y_test["GROUP"].isin([groups[i], groups[j]])]["GROUP"]
            .apply(lambda x: 0 if x == groups[i] else 1)
            .values
        )

        roc_results_raw[comparison_name] = {}
        roc_results_scaled[comparison_name] = {}

        for k, target in enumerate(targets):
            # For correct ROC analysis with train and test datasets
            roc_results_raw[comparison_name][target] = roc_analysis(
                X_train_raw_subset[:, k].reshape(-1, 1),
                X_test_raw_subset[:, k].reshape(-1, 1),
                y_train_subset,
                y_test_subset,
                target,
                comparison_name,
                "raw",
            )
            roc_results_scaled[comparison_name][target] = roc_analysis(
                X_train_scaled_subset[:, k].reshape(-1, 1),
                X_test_scaled_subset[:, k].reshape(-1, 1),
                y_train_subset,
                y_test_subset,
                target,
                comparison_name,
                "scaled",
            )

# Top miRNAs ROC analysis (all comparisons)
top_mirnas = ["mean_mir146b", "mean_mir155", "mean_mir203"]
roc_results_top_mirnas_raw = {}
roc_results_top_mirnas_scaled = {}


for i in range(len(groups)):
    for j in range(i + 1, len(groups)):
        comparison_name = f"{groups[i]} vs {groups[j]}"

        # Subset data for current comparison
        group1_indices = y_train["GROUP"] == groups[i]
        group2_indices = y_train["GROUP"] == groups[j]

        X_train_raw_subset = X_train_raw[group1_indices | group2_indices][
            top_mirnas
        ].values
        X_test_raw_subset = X_test_raw[y_test["GROUP"].isin([groups[i], groups[j]])][
            top_mirnas
        ].values
        X_train_scaled_subset = X_train_scaled[group1_indices | group2_indices][
            top_mirnas
        ].values
        X_test_scaled_subset = X_test_scaled[
            y_test["GROUP"].isin([groups[i], groups[j]])
        ][top_mirnas].values

        y_train_subset = (
            y_train[group1_indices | group2_indices]["GROUP"]
            .apply(lambda x: 0 if x == groups[i] else 1)
            .values
        )
        y_test_subset = (
            y_test[y_test["GROUP"].isin([groups[i], groups[j]])]["GROUP"]
            .apply(lambda x: 0 if x == groups[i] else 1)
            .values
        )

        roc_results_top_mirnas_raw[comparison_name] = {}
        roc_results_top_mirnas_scaled[comparison_name] = {}

        for k, target in enumerate(top_mirnas):
            roc_results_top_mirnas_raw[comparison_name][target] = roc_analysis(
                X_train_raw_subset[:, k].reshape(-1, 1),
                X_test_raw_subset[:, k].reshape(-1, 1),
                y_train_subset,
                y_test_subset,
                target,
                comparison_name,
                "raw",
            )  # added data_type
            roc_results_scaled[comparison_name][target] = roc_analysis(
                X_train_scaled_subset[:, k].reshape(-1, 1),
                X_test_scaled_subset[:, k].reshape(-1, 1),
                y_train_subset,
                y_test_subset,
                target,
                comparison_name,
                "scaled",
            )  # added data_type

# Save ROC results
# ... (saving of ROC results will be done after combining miRNAs for ROC is calculated, for organization).


In [None]:
# ... (previous ROC analysis code) ...

# Combining Top miRNAs for ROC Analysis

roc_results_combined_raw = {}
roc_results_combined_scaled = {}
roc_results_combined_raw_then_scaled = {}  # For raw combined, then scaled
roc_results_combined_scaled_then_combined = {}  # For scaled combined, then combined


for i in range(len(groups)):
    for j in range(i + 1, len(groups)):
        comparison_name = f"{groups[i]} vs {groups[j]}"

        # Subset data for current comparison
        group1_indices = y_train["GROUP"] == groups[i]
        group2_indices = y_train["GROUP"] == groups[j]

        X_train_raw_subset = X_train_raw[group1_indices | group2_indices][top_mirnas]
        X_test_raw_subset = X_test_raw[y_test["GROUP"].isin([groups[i], groups[j]])][
            top_mirnas
        ]

        X_train_scaled_subset = X_train_scaled[group1_indices | group2_indices][
            top_mirnas
        ]
        X_test_scaled_subset = X_test_scaled[
            y_test["GROUP"].isin([groups[i], groups[j]])
        ][top_mirnas]

        y_train_subset = (
            y_train[group1_indices | group2_indices]["GROUP"]
            .apply(lambda x: 0 if x == groups[i] else 1)
            .values
        )
        y_test_subset = (
            y_test[y_test["GROUP"].isin([groups[i], groups[j]])]["GROUP"]
            .apply(lambda x: 0 if x == groups[i] else 1)
            .values
        )

        # Combine raw Ct values, then scale
        X_train_raw_combined = X_train_raw_subset.mean(axis=1).values.reshape(-1, 1)
        X_test_raw_combined = X_test_raw_subset.mean(axis=1).values.reshape(-1, 1)

        X_train_raw_combined_scaled = scaler.fit_transform(X_train_raw_combined)
        X_test_raw_combined_scaled = scaler.transform(X_test_raw_combined)

        # Combine scaled Ct values
        X_train_scaled_combined = X_train_scaled_subset.mean(axis=1).values.reshape(
            -1, 1
        )
        X_test_scaled_combined = X_test_scaled_subset.mean(axis=1).values.reshape(-1, 1)

        roc_results_combined_raw[comparison_name] = roc_analysis(
            X_train_raw_combined,
            X_test_raw_combined,
            y_train_subset,
            y_test_subset,
            "combined_raw",
            comparison_name,
            "raw",
        )
        roc_results_combined_scaled[comparison_name] = roc_analysis(
            X_train_scaled_combined,
            X_test_scaled_combined,
            y_train_subset,
            y_test_subset,
            "combined_scaled",
            comparison_name,
            "scaled",
        )
        roc_results_combined_raw_then_scaled[comparison_name] = roc_analysis(
            X_train_raw_combined_scaled,
            X_test_raw_combined_scaled,
            y_train_subset,
            y_test_subset,
            "combined_raw_scaled",
            comparison_name,
            "scaled",
        )
        roc_results_combined_scaled_then_combined[comparison_name] = roc_analysis(
            X_train_scaled_combined,
            X_test_scaled_combined,
            y_train_subset,
            y_test_subset,
            "combined_scaled_avg",
            comparison_name,
            "scaled",
        )

# Saving ROC Results (all combined in a dictionary)
roc_results_all = {
    "raw": roc_results_raw,
    "scaled": roc_results_scaled,
    "combined_raw": roc_results_combined_raw,
    "combined_scaled": roc_results_combined_scaled,
    "top_mirnas_raw": roc_results_top_mirnas_raw,
    "top_mirnas_scaled": roc_results_top_mirnas_scaled,
    "combined_raw_then_scaled": roc_results_combined_raw_then_scaled,
    "combined_scaled_then_combined": roc_results_combined_scaled_then_combined,
}

# Iterate over each data type and create tables
for data_type, results in roc_results_all.items():
    for comparison, values in results.items():
        # Determine the output directory based on data type
        output_subdir = (
            "main"
            if "scaled" in data_type or "combined" in data_type
            else "supplementary"
        )
        output_dir = f"results/{output_subdir}/roc_{comparison.lower().replace(' ', '_')}_{data_type}"
        os.makedirs(output_dir, exist_ok=True)

        # Wrap 'values' in a list to create a DataFrame
        temp = pd.DataFrame([values])
        temp["Comparison"] = comparison
        temp["DataType"] = data_type

        # Save the DataFrame to CSV
        temp.to_csv(
            f"{output_dir}/roc_metrics_{data_type}.csv",
            index=False,
        )

import json

# Save the entire dictionary to a JSON file
with open("results/main/roc_results_all.json", "w") as fp:
    json.dump(roc_results_all, fp, indent=4)


In [None]:
import os
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
import matplotlib.pyplot as plt
import seaborn as sns

# Constants for file paths
DATA_PATH = "data/processed"
RESULTS_PATH = "results/supplementary"
TARGET_MIRNAS = [
    "mean_mir146a",
    "mean_mir146b",
    "mean_mir155",
    "mean_mir203",
    "mean_mir223",
    "mean_mir381p",
]


def loadData() -> pd.DataFrame:
    """
    Load and preprocess the scaled training data by adding the 'GROUP' column.

    Returns:
        pd.DataFrame: The scaled training data with the 'GROUP' column.
    """
    try:
        # Load scaled training data
        dfScaled = pd.read_csv(os.path.join(DATA_PATH, "X_train_scaled.csv"))
        yTrain = pd.read_csv(os.path.join(DATA_PATH, "y_train.csv"))

        # Ensure 'GROUP' column exists in y_train
        if "GROUP" not in yTrain.columns:
            raise KeyError("The 'GROUP' column is missing from y_train.csv")

        # Add 'GROUP' column to dfScaled
        dfScaled["GROUP"] = yTrain["GROUP"].values

        return dfScaled
    except FileNotFoundError as e:
        print(f"File not found: {e.filename}")
        raise
    except KeyError as e:
        print(e)
        raise
    except Exception as e:
        print(f"An unexpected error occurred while loading data: {e}")
        raise


def performPCA(df: pd.DataFrame, targets: list) -> tuple:
    """
    Perform PCA on the target miRNAs and visualize the results.

    Args:
        df (pd.DataFrame): The scaled training data with 'GROUP'.
        targets (list): List of target miRNA column names.

    Returns:
        PCA: The fitted PCA object.
        np.ndarray: The PCA-transformed data.
        pd.Series: Explained variance ratio for each principal component.
    """
    pca = PCA()
    pcaResult = pca.fit_transform(df[targets])
    explainedVariancePCA = pd.Series(pca.explained_variance_ratio_)

    # Save explained variance
    saveExplainedVariance(explainedVariancePCA, "pca_explained_variance.csv")

    # Plot scatter plots for each pair of principal components
    plotScatterPCA(pcaResult, df["GROUP"], explainedVariancePCA)

    return pca, pcaResult, explainedVariancePCA


def plotScatterPCA(
    pcaResult: np.ndarray, groups: pd.Series, explainedVariance: pd.Series
) -> None:
    """
    Generate and save pairwise 2D scatter plots of PCA components.

    Args:
        pcaResult (np.ndarray): PCA-transformed data.
        groups (pd.Series): The 'GROUP' labels.
        explainedVariance (pd.Series): Explained variance ratio for PCA components.
    """
    nComponents = pcaResult.shape[1]
    for i in range(nComponents):
        for j in range(i + 1, nComponents):
            plt.figure(figsize=(8, 6))
            sns.scatterplot(
                x=pcaResult[:, i],
                y=pcaResult[:, j],
                hue=groups,
                palette="viridis",
                edgecolor="k",
                alpha=0.7,
            )
            plt.title(f"PCA Scatter Plot (PC{i+1} vs PC{j+1})")
            plt.xlabel(f"Principal Component {i+1} ({explainedVariance[i]:.2%})")
            plt.ylabel(f"Principal Component {j+1} ({explainedVariance[j]:.2%})")
            plt.legend(title="Group")
            plt.tight_layout()

            filename = f"pca_scatter_pc{i+1}_vs_pc{j+1}.png"
            filepath = os.path.join(RESULTS_PATH, filename)

            # Save the plot
            savePlot(filepath)
            plt.close()


def performLDA(df: pd.DataFrame, targets: list) -> tuple:
    """
    Perform LDA on the target miRNAs and visualize the results.

    Args:
        df (pd.DataFrame): The scaled training data with 'GROUP'.
        targets (list): List of target miRNA column names.

    Returns:
        LDA: The fitted LDA object.
        np.ndarray: The LDA-transformed data.
        pd.Series: Explained variance ratio for each linear discriminant.
    """
    lda = LDA()
    ldaResult = lda.fit_transform(df[targets], df["GROUP"])
    explainedVarianceLDA = pd.Series(lda.explained_variance_ratio_)

    # Save explained variance
    saveExplainedVariance(explainedVarianceLDA, "lda_explained_variance.csv")

    # Plot scatter plots for each pair of linear discriminants
    plotScatterLDA(ldaResult, df["GROUP"], explainedVarianceLDA)

    return lda, ldaResult, explainedVarianceLDA


def plotScatterLDA(
    ldaResult: np.ndarray, groups: pd.Series, explainedVariance: pd.Series
) -> None:
    """
    Generate and save pairwise 2D scatter plots of LDA components.

    Args:
        ldaResult (np.ndarray): LDA-transformed data.
        groups (pd.Series): The 'GROUP' labels.
        explainedVariance (pd.Series): Explained variance ratio for LDA components.
    """
    nComponents = ldaResult.shape[1]
    for i in range(nComponents):
        for j in range(i + 1, nComponents):
            plt.figure(figsize=(8, 6))
            sns.scatterplot(
                x=ldaResult[:, i],
                y=ldaResult[:, j],
                hue=groups,
                palette="viridis",
                edgecolor="k",
                alpha=0.7,
            )
            plt.title(f"LDA Scatter Plot (LD{i+1} vs LD{j+1})")
            plt.xlabel(f"Linear Discriminant {i+1} ({explainedVariance[i]:.2%})")
            plt.ylabel(f"Linear Discriminant {j+1} ({explainedVariance[j]:.2%})")
            plt.legend(title="Group")
            plt.tight_layout()

            filename = f"lda_scatter_ld{i+1}_vs_ld{j+1}.png"
            filepath = os.path.join(RESULTS_PATH, filename)

            # Save the plot
            savePlot(filepath)
            plt.close()


def saveExplainedVariance(explainedVariance: pd.Series, filename: str) -> None:
    """
    Save the explained variance ratios to a CSV file.

    Args:
        explainedVariance (pd.Series): Explained variance ratios.
        filename (str): The filename for the CSV.
    """
    explainedVarianceDf = pd.DataFrame(
        {
            "Component": [
                f"PC{i+1}" if "pca" in filename.lower() else f"LD{i+1}"
                for i in range(len(explainedVariance))
            ],
            "Explained Variance Ratio": explainedVariance,
        }
    )

    filepath = os.path.join(RESULTS_PATH, filename)
    explainedVarianceDf.to_csv(filepath, index=False)


def savePlot(filepath: str) -> None:
    """
    Save the current matplotlib plot to the specified filepath.

    Args:
        filepath (str): The path where the plot will be saved.
    """
    try:
        plt.savefig(filepath, dpi=300)
    except Exception as e:
        print(f"Failed to save plot {filepath}: {e}")
        raise


def main() -> None:
    """
    Main function to perform PCA and LDA analyses.
    """
    # Ensure the results directory exists
    os.makedirs(RESULTS_PATH, exist_ok=True)

    # Load and preprocess data
    dfScaled = loadData()

    # Perform PCA analysis
    pca, pcaResult, explainedVariancePCA = performPCA(dfScaled, TARGET_MIRNAS)

    # Perform LDA analysis
    lda, ldaResult, explainedVarianceLDA = performLDA(dfScaled, TARGET_MIRNAS)


if __name__ == "__main__":
    main()


In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import RobustScaler
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    roc_auc_score,
    classification_report,
    confusion_matrix,
)
import matplotlib.pyplot as plt
import seaborn as sns
import os
import warnings
from sklearn.exceptions import UndefinedMetricWarning

# Suppress UndefinedMetricWarning
warnings.filterwarnings("ignore", category=UndefinedMetricWarning)

# Load preprocessed data (scaled training and test sets)
X_train_scaled = pd.read_csv("data/processed/X_train_scaled.csv")
X_test_scaled = pd.read_csv("data/processed/X_test_scaled.csv")

y_train = pd.read_csv("data/processed/y_train.csv")
y_test = pd.read_csv("data/processed/y_test.csv")

# Convert GROUP column to numerical using cat.codes
y_train_numeric = y_train["GROUP"].astype("category").cat.codes
y_test_numeric = y_test["GROUP"].astype("category").cat.codes

# Define target miRNAs (features)
targets = [
    "mean_mir146a",
    "mean_mir146b",
    "mean_mir155",
    "mean_mir203",
    "mean_mir223",
    "mean_mir381p",
]
X_train = X_train_scaled[targets]
X_test = X_test_scaled[targets]

# Initialize classifiers
classifiers = {
    "Logistic Regression": LogisticRegression(
        max_iter=1000, solver="liblinear"
    ),  # Increased max_iter and added solver for better convergence
    "LDA": LDA(),
    "SVM": SVC(
        probability=True
    ),  # Add probability parameter so that roc_auc_score can be calculated
    "Random Forest": RandomForestClassifier(random_state=42),
    "Gradient Boosting": GradientBoostingClassifier(random_state=42),
    "Neural Network": MLPClassifier(
        max_iter=1000, random_state=42, early_stopping=True
    ),  # Increased max_iter and add early stopping to avoid overfitting
}

results = {}
for name, clf in classifiers.items():
    clf.fit(X_train, y_train_numeric)
    y_pred = clf.predict(X_test)
    y_prob = (
        clf.predict_proba(X_test)
        if hasattr(clf, "predict_proba")
        else clf.decision_function(X_test)
    )  # Get predicted probabilities or decision function if probabilities are not available

    results[name] = {
        "accuracy": accuracy_score(y_test_numeric, y_pred),
        "precision": precision_score(
            y_test_numeric, y_pred, average="weighted", zero_division=0
        ),
        "recall": recall_score(
            y_test_numeric, y_pred, average="weighted", zero_division=0
        ),
        "f1_score": f1_score(
            y_test_numeric, y_pred, average="weighted", zero_division=0
        ),
    }
    # Compute roc_auc_score only for binary and multiclass problems
    if len(set(y_test_numeric)) <= 2:
        try:
            results[name]["auc"] = roc_auc_score(y_test_numeric, y_prob)
        except ValueError:
            print(
                f"Only one class present in y_true for {name}. ROC AUC score is not defined. Skipping AUC calculation."
            )
    else:
        results[name]["auc"] = roc_auc_score(y_test_numeric, y_prob, multi_class="ovr")

    # Add classification report to show precision, recall and f1-score for each class
    results[name]["classification_report"] = classification_report(
        y_test_numeric, y_pred, output_dict=True, zero_division=0
    )

    # Add confusion matrix as well
    cm = confusion_matrix(y_test_numeric, y_pred)

    plt.figure(figsize=(8, 6))
    sns.heatmap(
        cm,
        annot=True,
        cmap="Blues",
        fmt="g",
        xticklabels=sorted(y_train["GROUP"].unique()),
        yticklabels=sorted(y_train["GROUP"].unique()),
    )
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title(f"Confusion Matrix: {name}")
    plt.savefig(f"results/main/{name}_cm.png")
    plt.close()

# Feature importance for Random Forest
feature_importance = classifiers["Random Forest"].feature_importances_
feature_importance_df = pd.DataFrame(
    {"Feature": targets, "Importance": feature_importance}
).sort_values(by="Importance", ascending=False)

# Create directories if they don't exist
os.makedirs("results/main", exist_ok=True)

# Convert the dictionaries to DataFrames and save the results
pd.DataFrame(results).transpose().to_csv(
    "results/main/classification_results.csv", index=True
)

feature_importance_df.to_csv("results/main/feature_importance.csv", index=False)


In [None]:
# Full Corrected Code for miRNA Analysis with ROC Evaluation

import pandas as pd


import numpy as np


from sklearn.preprocessing import RobustScaler


from sklearn.linear_model import LogisticRegression


from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    roc_curve,
    auc,
    precision_recall_curve,
    roc_auc_score,
    classification_report,
    confusion_matrix,
)


from sklearn.ensemble import RandomForestClassifier


from sklearn.decomposition import PCA


from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA


import matplotlib.pyplot as plt


import seaborn as sns
import os


import pickle


import warnings


from sklearn.exceptions import UndefinedMetricWarning


import json

# Suppress UndefinedMetricWarning
warnings.filterwarnings("ignore", category=UndefinedMetricWarning)


def load_data():
    """
    Load preprocessed data from CSV files.

    Returns:
        X_train_raw (pd.DataFrame): Raw training features.
        X_test_raw (pd.DataFrame): Raw test features.
        X_train_scaled (pd.DataFrame): Scaled training features.
        X_test_scaled (pd.DataFrame): Scaled test features.
        y_train (pd.Series): Training labels.
        y_test (pd.Series): Test labels.
    """
    try:

        X_train_raw = pd.read_csv("data/processed/X_train_raw.csv")

        X_test_raw = pd.read_csv("data/processed/X_test_raw.csv")

        X_train_scaled = pd.read_csv("data/processed/X_train_scaled.csv")

        X_test_scaled = pd.read_csv("data/processed/X_test_scaled.csv")

        y_train = pd.read_csv("data/processed/y_train.csv")["GROUP"]
        y_test = pd.read_csv("data/processed/y_test.csv")["GROUP"]

        return X_train_raw, X_test_raw, X_train_scaled, X_test_scaled, y_train, y_test
    except FileNotFoundError as e:
        print(f"Error: {e}")
        raise
    except pd.errors.EmptyDataError as e:
        print(f"Error: {e}")
        raise
    except Exception as e:
        print(f"An unexpected error occurred while loading data: {e}")
        raise


def encode_labels(y_train, y_test):
    """
    Encode categorical labels to numerical codes.

    Args:
        y_train (pd.Series): Training labels.
        y_test (pd.Series): Test labels.

    Returns:
        y_train_numeric (pd.Series): Encoded training labels.
        y_test_numeric (pd.Series): Encoded test labels.
    """
    if isinstance(y_train.dtype, pd.CategoricalDtype):
        y_train_numeric = y_train.cat.codes
        y_test_numeric = y_test.cat.codes
    else:
        y_train_numeric = y_train
        y_test_numeric = y_test
    return y_train_numeric, y_test_numeric


def roc_analysis(
    X_train, X_test, y_train, y_test, target_name, comparison_name, data_type
):
    """
















    Perform ROC analysis using Logistic Regression and compute metrics.

    Parameters:
    - X_train (np.ndarray): Training features.
    - X_test (np.ndarray): Testing features.
    - y_train (np.ndarray): Training labels.
    - y_test (np.ndarray): Testing labels.
    - target_name (str): Name of the target feature.
    - comparison_name (str): Description of the comparison.
    - data_type (str): Type of data ('raw' or 'scaled').

    Returns:
    - dict: ROC metrics including AUC, optimal cutoff, sensitivity, specificity, accuracy, and PR AUC.
    """
    try:
        model = LogisticRegression(solver="liblinear")
        model.fit(X_train, y_train)

        y_prob = model.predict_proba(X_test)[:, 1]

        fpr, tpr, thresholds = roc_curve(y_test, y_prob)
        roc_auc = auc(fpr, tpr)

        # Youden's J statistic for optimal cutoff
        j_scores = tpr - fpr
        optimal_idx = np.argmax(j_scores)
        optimal_threshold = thresholds[optimal_idx]
        sensitivity = tpr[optimal_idx]
        specificity = 1 - fpr[optimal_idx]

        # Calculate accuracy
        y_pred = (y_prob >= optimal_threshold).astype(int)
        accuracy = accuracy_score(y_test, y_pred)

        # Precision-Recall AUC
        precision, recall, _ = precision_recall_curve(y_test, y_prob)
        pr_auc = auc(recall, precision)

        # Plot ROC Curve
        plt.figure(figsize=(6, 4))
        plt.plot(fpr, tpr, label=f"{target_name} (AUC = {roc_auc:.2f})")
        plt.plot([0, 1], [0, 1], "k--")
        plt.xlabel("False Positive Rate")
        plt.ylabel("True Positive Rate")
        plt.title(f"ROC Curve: {comparison_name} ({data_type})")
        plt.legend(loc="lower right")

        plt.tight_layout()

        # Save ROC Curve
        roc_dir = os.path.join(
            "results", data_type, comparison_name.lower().replace(" ", "_")
        )
        os.makedirs(roc_dir, exist_ok=True)
        roc_path = os.path.join(roc_dir, f"roc_curve_{target_name}.png")
        plt.savefig(roc_path)
        plt.close()

        metrics = {
            "AUC": roc_auc,
            "Optimal Cutoff": optimal_threshold,
            "Sensitivity": sensitivity,
            "Specificity": specificity,
            "Accuracy": accuracy,
            "PR AUC": pr_auc,
        }

        return metrics
    except Exception as e:
        print(
            f"ROC analysis failed for {target_name} in {comparison_name} ({data_type}): {e}"
        )
        return {}


def save_roc_results(roc_results, data_type):
    """
    Save ROC results to CSV files.

    Args:
        roc_results (dict): Nested dictionary containing ROC metrics.
        data_type (str): Type of data ('combined_raw', 'combined_scaled', etc.).
    """
    for comparison, methods_metrics in roc_results.items():
        try:
            # Convert methods_metrics dict to DataFrame
            temp_df = pd.DataFrame.from_dict(
                methods_metrics, orient="index"
            ).reset_index()

            temp_df = temp_df.rename(columns={"index": "method"})

            # Define output directory and ensure it exists
            output_dir = os.path.join(
                "results",
                "main",
                f"roc_{comparison.lower().replace(' ', '_')}_{data_type}",
            )
            os.makedirs(output_dir, exist_ok=True)

            # Save DataFrame to CSV
            csv_filename = f"roc_results_{data_type}.csv"
            csv_path = os.path.join(output_dir, csv_filename)
            temp_df.to_csv(csv_path, index=False)
        except ValueError as ve:
            print(
                f"ValueError while saving ROC results for {comparison}, {data_type}: {ve}"
            )
        except Exception as e:
            print(f"Failed to save ROC results for {comparison}, {data_type}: {e}")


def main():
    # Load data
    X_train_raw, X_test_raw, X_train_scaled, X_test_scaled, y_train, y_test = (
        load_data()
    )

    # Encode labels

    y_train_numeric, y_test_numeric = encode_labels(y_train, y_test)

    # Define target miRNAs and groups

    targets = [
        "mean_mir146a",
        "mean_mir146b",
        "mean_mir155",
        "mean_mir203",
        "mean_mir223",
        "mean_mir381p",
    ]

    top_targets = [
        "mean_mir146b",
        "mean_mir155",
        "mean_mir203",
    ]  # Based on prior feature selection

    groups = ["S", "G", "P"]

    # 1. Combined miRNA Scores (using training data only for proper assessment)

    X_train_combined = X_train_scaled.copy()

    X_test_combined = X_test_scaled.copy()

    # Simple Average

    X_train_combined["combined_avg"] = X_train_scaled[top_targets].mean(axis=1)

    X_test_combined["combined_avg"] = X_test_scaled[top_targets].mean(axis=1)

    # Weighted Average (using feature importances from Random Forest)

    try:
        with open("src/scaler.pkl", "rb") as f:

            scaler_loaded = pickle.load(f)
    except FileNotFoundError:

        print("Scaler file not found. Proceeding with new RobustScaler.")
        scaler_loaded = RobustScaler()

    rf = RandomForestClassifier(random_state=42)

    rf.fit(X_train_scaled[targets], y_train_numeric)  # Using y_train_numeric here

    feature_importances = rf.feature_importances_

    top_mirna_indices = [targets.index(mirna) for mirna in top_targets]

    top_mirna_importances = feature_importances[top_mirna_indices]

    X_train_combined["combined_weighted"] = np.dot(
        X_train_scaled[top_targets],
        top_mirna_importances / np.sum(top_mirna_importances),
    )

    X_test_combined["combined_weighted"] = np.dot(
        X_test_scaled[top_targets],
        top_mirna_importances / np.sum(top_mirna_importances),
    )

    # PCA (First Principal Component)

    pca = PCA(n_components=1)

    X_train_combined["combined_pca"] = pca.fit_transform(X_train_scaled[top_targets])

    X_test_combined["combined_pca"] = pca.transform(X_test_scaled[top_targets])

    # LDA (First Linear Discriminant)

    lda = LDA(n_components=1)

    X_train_combined["combined_lda"] = lda.fit_transform(
        X_train_scaled[top_targets], y_train_numeric
    )

    X_test_combined["combined_lda"] = lda.transform(X_test_scaled[top_targets])

    combined_methods = [
        "combined_avg",
        "combined_weighted",
        "combined_pca",
        "combined_lda",
    ]

    # 2. ROC Analysis with Combined Scores (Raw and Scaled, All Comparisons)

    # Initialize dictionaries to store ROC results

    roc_results_combined_raw = {}

    roc_results_combined_scaled = {}

    for i in range(len(groups)):

        for j in range(i + 1, len(groups)):

            comparison = f"{groups[i]} vs {groups[j]}"

            # ROC for Averaging Raw Values and Then Scaling

            try:
                temp_X_raw = X_train_raw[targets].copy()

                temp_X_test = X_test_raw[targets].copy()

                temp_X_raw["combined_top_raw"] = temp_X_raw[top_targets].mean(axis=1)

                temp_X_test["combined_top_raw"] = temp_X_test[top_targets].mean(axis=1)

                # Subset the combined data, scale and perform ROC using the scaled data.

                train_mask = y_train.isin([groups[i], groups[j]])

                test_mask = y_test.isin([groups[i], groups[j]])

                combined_X_train_raw = temp_X_raw[train_mask][
                    "combined_top_raw"
                ].values.reshape(-1, 1)

                combined_X_test_raw = temp_X_test[test_mask][
                    "combined_top_raw"
                ].values.reshape(-1, 1)

                combined_X_train_raw_scaled = scaler_loaded.fit_transform(
                    combined_X_train_raw
                )

                combined_X_test_raw_scaled = scaler_loaded.transform(
                    combined_X_test_raw
                )

                y_train_subset = (
                    y_train[train_mask]
                    .apply(lambda x: 0 if x == groups[i] else 1)
                    .values
                )

                y_test_subset = (
                    y_test[test_mask].apply(lambda x: 0 if x == groups[i] else 1).values
                )

                # Initialize the result dictionaries for the current comparison.

                roc_results_combined_raw[comparison] = {}

                roc_results_combined_scaled[comparison] = {}

                # ROC for combined_top_raw_scaled
                roc_results_combined_raw[comparison]["combined_top_raw_scaled"] = (
                    roc_analysis(
                        combined_X_train_raw_scaled,
                        combined_X_test_raw_scaled,
                        y_train_subset,
                        y_test_subset,
                        "combined_top_raw_scaled",
                        comparison,
                        "scaled",
                    )
                )

                for method in combined_methods:

                    # Subset data for current comparison

                    current_X_train = X_train_combined[train_mask][
                        method
                    ].values.reshape(-1, 1)

                    current_X_test = X_test_combined[test_mask][method].values.reshape(
                        -1, 1
                    )

                    # Perform ROC analysis

                    roc_results_combined_scaled[comparison][method] = roc_analysis(
                        current_X_train,
                        current_X_test,
                        y_train_subset,
                        y_test_subset,
                        method,
                        comparison,
                        "scaled",
                    )
            except Exception as e:

                print(f"Error during ROC analysis for comparison {comparison}: {e}")

    # Saving ROC Results (all methods together):

    roc_results_combined_all = {
        "combined_raw": roc_results_combined_raw,
        "combined_scaled": roc_results_combined_scaled,
    }

    # Iterate over each data type and create tables

    for data_type, results in roc_results_combined_all.items():

        for comparison, values in results.items():

            try:
                # Create directory for the ROC results if it doesn't exist

                output_dir = os.path.join(
                    "results",
                    "main",
                    f"roc_{comparison.lower().replace(' ', '_')}_{data_type}",
                )

                os.makedirs(output_dir, exist_ok=True)

                # Convert methods_metrics dict to DataFrame
                temp_df = pd.DataFrame.from_dict(values, orient="index").reset_index()

                temp_df = temp_df.rename(columns={"index": "method"})

                # Save DataFrame to CSV
                csv_filename = f"roc_results_{data_type}.csv"
                csv_path = os.path.join(output_dir, csv_filename)

                temp_df.to_csv(csv_path, index=False)
            except ValueError as ve:

                print(
                    f"ValueError while saving ROC results for {comparison} ({data_type}): {ve}"
                )

            except Exception as e:
                print(f"Failed to save ROC results for {comparison} ({data_type}): {e}")

    # Save the entire dictionary to JSON
    try:

        json_path = os.path.join("results", "main", "roc_results_combined_all.json")
        with open(json_path, "w") as fp:

            json.dump(roc_results_combined_all, fp, indent=4)
    except Exception as e:
        print(f"Failed to save ROC results to JSON file: {e}")


if __name__ == "__main__":
    main()


In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import RobustScaler
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import seaborn as sns
import os

# Load preprocessed data (scaled training and testing sets)
X_train_scaled = pd.read_csv("data/processed/X_train_scaled.csv")
X_test_scaled = pd.read_csv("data/processed/X_test_scaled.csv")

y_train = pd.read_csv("data/processed/y_train.csv")
y_test = pd.read_csv("data/processed/y_test.csv")

# Convert y_train and y_test to numeric if necessary
if pd.api.types.is_categorical_dtype(y_train["GROUP"]):
    y_train_numeric = y_train["GROUP"].cat.codes
else:
    y_train_numeric = y_train["GROUP"]

    # Define target miRNAs (features)
    targets = [
        "mean_mir146a",
        "mean_mir146b",
        "mean_mir155",
        "mean_mir203",
        "mean_mir223",
        "mean_mir381p",
    ]
    X_train = X_train_scaled[targets]
    X_test = X_test_scaled[targets]

    # 1. PCA (All Components)
    pca = PCA()  # Keep all components
    X_train_pca = pca.fit_transform(X_train)
    X_test_pca = pca.transform(X_test)
    explained_variance_pca = pca.explained_variance_ratio_

    # Pairwise 2D scatter plots of PCA components (on training data)
    n_components = X_train_pca.shape[1]
    for i in range(n_components):
        for j in range(i + 1, n_components):
            plt.figure(figsize=(8, 6))
            sns.scatterplot(
                x=X_train_pca[:, i],
                y=X_train_pca[:, j],
                hue=y_train["GROUP"],
                palette="viridis",
            )
            plt.title(f"PCA Scatter Plot (PC{i+1} vs PC{j+1})")
            plt.xlabel(f"Principal Component {i+1}")
            plt.ylabel(f"Principal Component {j+1}")
            plt.savefig(f"results/supplementary/pca_scatter_pc{i+1}_vs_pc{j+1}.png")
            plt.close()

            # 3D scatter plot of the first three components (on training data)
            fig = plt.figure(figsize=(10, 7))
            ax = fig.add_subplot(111, projection="3d")
            ax.scatter(
                X_train_pca[:, 0],
                X_train_pca[:, 1],
                X_train_pca[:, 2],
                c=y_train["GROUP"].astype("category").cat.codes,
                cmap="viridis",
            )
            ax.set_title("3D PCA Scatter Plot (PC1 vs PC2 vs PC3)")
            ax.set_xlabel("Principal Component 1")
            ax.set_ylabel("Principal Component 2")
            ax.set_zlabel("Principal Component 3")
            plt.savefig("results/supplementary/pca_scatter_3d.png")
            plt.close()

            # Save explained variance ratios
            explained_variance_df = pd.DataFrame(
                {
                    "PC": [f"PC{i+1}" for i in range(len(explained_variance_pca))],
                    "Explained Variance Ratio": explained_variance_pca,
                }
            )
            # Create directories if they don't exist
            os.makedirs("results/supplementary", exist_ok=True)

            explained_variance_df.to_csv(
                "results/supplementary/pca_explained_variance.csv", index=False
            )

            # 2. LDA (All Components)
            lda = LDA()  # Retaining all components
            X_train_lda = lda.fit_transform(
                X_train, y_train_numeric
            )  # Correctly use group labels for LDA
            X_test_lda = lda.transform(X_test)
            explained_variance_lda = lda.explained_variance_ratio_

            # Pairwise 2-D scatter plots for LDA
            n_components_lda = X_train_lda.shape[1]
            for i in range(n_components_lda):
                for j in range(i + 1, n_components_lda):
                    plt.figure(figsize=(8, 6))
                    sns.scatterplot(
                        x=X_train_lda[:, i],
                        y=X_train_lda[:, j],
                        hue=y_train["GROUP"],
                        palette="viridis",
                    )
                    plt.title(f"LDA Scatter Plot (LD{i+1} vs LD{j+1})")
                    plt.xlabel(f"Linear Discriminant {i+1}")
                    plt.ylabel(f"Linear Discriminant {j+1}")
                    plt.savefig(
                        f"results/supplementary/lda_scatter_ld{i+1}_vs_ld{j+1}.png"
                    )
                    plt.close()

                    # Save explained variance ratios
                    explained_variance_lda_df = pd.DataFrame(
                        {
                            "LD": [
                                f"LD{i+1}" for i in range(len(explained_variance_lda))
                            ],
                            "Explained Variance Ratio": explained_variance_lda,
                        }
                    )
                    explained_variance_lda_df.to_csv(
                        "results/supplementary/lda_explained_variance.csv", index=False
                    )


  if pd.api.types.is_categorical_dtype(y_train["GROUP"]):


In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import RobustScaler
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    roc_auc_score,
    classification_report,
    confusion_matrix,
)
import matplotlib.pyplot as plt
import seaborn as sns
import os
import pickle

# Load preprocessed data (scaled training and test sets)
X_train = pd.read_csv("data/processed/X_train_scaled.csv")
X_test = pd.read_csv("data/processed/X_test_scaled.csv")
y_train = pd.read_csv("data/processed/y_train.csv")
y_test = pd.read_csv("data/processed/y_test.csv")

# Convert y_train and y_test to numeric if necessary
if pd.api.types.is_categorical_dtype(y_train["GROUP"]):
    y_train_numeric = y_train["GROUP"].cat.codes
    y_test_numeric = y_test["GROUP"].cat.codes
else:
    y_train_numeric = y_train["GROUP"]
    y_test_numeric = y_test["GROUP"]

    # Define target miRNAs (features)
    targets = [
        "mean_mir146a",
        "mean_mir146b",
        "mean_mir155",
        "mean_mir203",
        "mean_mir223",
        "mean_mir381p",
    ]

    X_train = X_train[targets]
    X_test = X_test[targets]

    # Initialize classifiers
    classifiers = {
        "Logistic Regression": LogisticRegression(max_iter=1000, solver="liblinear"),
        "LDA": LDA(),
        "SVM": SVC(probability=True),
        "Random Forest": RandomForestClassifier(random_state=42),
        "Gradient Boosting": GradientBoostingClassifier(random_state=42),
        "Neural Network": MLPClassifier(
            max_iter=1000, random_state=42, early_stopping=True
        ),
    }

    results = {}
    for name, clf in classifiers.items():
        clf.fit(X_train, y_train_numeric)
        y_pred = clf.predict(X_test)
        y_prob = (
            clf.predict_proba(X_test)
            if hasattr(clf, "predict_proba")
            else clf.decision_function(X_test)
        )

        results[name] = {
            "accuracy": accuracy_score(y_test_numeric, y_pred),
            "precision": precision_score(y_test_numeric, y_pred, average="weighted"),
            "recall": recall_score(y_test_numeric, y_pred, average="weighted"),
            "f1_score": f1_score(y_test_numeric, y_pred, average="weighted"),
        }
        # Compute roc_auc_score only for binary and multiclass problems
        if len(set(y_test_numeric)) <= 2:
            try:
                results[name]["auc"] = roc_auc_score(y_test_numeric, y_prob)
            except ValueError:
                print(
                    "Only one class present in y_true. ROC AUC score is not defined in that case. Skipping AUC calculation."
                )
            else:
                results[name]["auc"] = roc_auc_score(
                    y_test_numeric, y_prob, multi_class="ovr"
                )

            # Add classification report and confusion matrix
            results[name]["classification_report"] = classification_report(
                y_test_numeric, y_pred, output_dict=True, zero_division=0
            )
            cm = confusion_matrix(y_test_numeric, y_pred)

            plt.figure(figsize=(8, 6))
            sns.heatmap(
                cm,
                annot=True,
                cmap="Blues",
                fmt="g",
                xticklabels=sorted(y_test["GROUP"].unique()),
                yticklabels=sorted(y_test["GROUP"].unique()),
            )  # corrected labels
            plt.xlabel("Predicted")
            plt.ylabel("Actual")
            plt.title(f"Confusion Matrix: {name}")
            plt.savefig(f"results/main/{name}_cm.png")
            plt.close()

            # Feature importance for Random Forest
            feature_importance = classifiers["Random Forest"].feature_importances_
            feature_importance_df = pd.DataFrame(
                {"Feature": targets, "Importance": feature_importance}
            ).sort_values(by="Importance", ascending=False)

            # Create directories if they don't exist
            os.makedirs("results/main", exist_ok=True)

            # Convert the dictionaries to DataFrames and save the results
            pd.DataFrame(results).transpose().to_csv(
                "results/main/classification_results.csv", index=True
            )
            feature_importance_df.to_csv(
                "results/main/feature_importance.csv", index=False
            )


  if pd.api.types.is_categorical_dtype(y_train["GROUP"]):


In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import RobustScaler
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_curve, auc, precision_recall_curve, roc_auc_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
import matplotlib.pyplot as plt
import seaborn as sns
import os
import json

# Load preprocessed data (raw and scaled training/test sets)
X_train_raw = pd.read_csv("data/processed/X_train_raw.csv")
X_test_raw = pd.read_csv("data/processed/X_test_raw.csv")
y_train = pd.read_csv("data/processed/y_train.csv")
y_test = pd.read_csv("data/processed/y_test.csv")

X_train_scaled = pd.read_csv("data/processed/X_train_scaled.csv")
X_test_scaled = pd.read_csv("data/processed/X_test_scaled.csv")

# Convert y_train and y_test to numeric if necessary
if pd.api.types.is_categorical_dtype(y_train["GROUP"]):
    y_train_numeric = y_train["GROUP"].cat.codes
    y_test_numeric = y_test["GROUP"].cat.codes
else:
    y_train_numeric = y_train["GROUP"]
    y_test_numeric = y_test["GROUP"]

    # Define target miRNAs and groups
    targets = [
        "mean_mir146a",
        "mean_mir146b",
        "mean_mir155",
        "mean_mir203",
        "mean_mir223",
        "mean_mir381p",
    ]
    top_targets = ["mean_mir146b", "mean_mir155", "mean_mir203"]
    groups = ["S", "G", "P"]

    # ROC analysis function (same as before)

    def roc_analysis(
        X_train, X_test, y_train, y_test, target_name, comparison_name, data_type
    ):
        model = LogisticRegression(solver="liblinear")
        model.fit(X_train, y_train)
        y_prob = model.predict_proba(X_test)[:, 1]

        fpr, tpr, thresholds = roc_curve(y_test, y_prob)
        roc_auc = auc(fpr, tpr)

        optimal_idx = np.argmax(tpr - fpr)
        optimal_threshold = thresholds[optimal_idx]
        sensitivity = tpr[optimal_idx]
        specificity = 1 - fpr[optimal_idx]
        try:
            accuracy = sum(y_test == (y_prob > optimal_threshold)) / len(y_test)
        except Exception as e:
            print(
                f"Error calculating accuracy for {target_name} ({comparison_name}, {data_type}): {e}"
            )
            accuracy = np.nan

            precision, recall, _ = precision_recall_curve(y_test, y_prob)
            pr_auc = auc(recall, precision)

            return {
                "AUC": roc_auc,
                "Optimal Cutoff": optimal_threshold,
                "Sensitivity": sensitivity,
                "Specificity": specificity,
                "Accuracy": accuracy,
                "PR AUC": pr_auc,
            }

            # 1. Combined miRNA Scores (on training data)
            combined_methods = [
                "combined_avg",
                "combined_weighted",
                "combined_pca",
                "combined_lda",
            ]

            X_train_combined = X_train_scaled.copy()
            X_test_combined = X_test_scaled.copy()

            # Simple Average
            X_train_combined["combined_avg"] = X_train_scaled[top_targets].mean(axis=1)
            X_test_combined["combined_avg"] = X_test_scaled[top_targets].mean(axis=1)

            # Weighted Average (using feature importances from Random Forest)
            with open("src/scaler.pkl", "rb") as f:
                scaler_loaded = pickle.load(f)

                rf = RandomForestClassifier(random_state=42)
                rf.fit(
                    X_train_scaled[targets], y_train_numeric
                )  # Fit on numerical labels
                feature_importances = rf.feature_importances_
                top_mirna_indices = [targets.index(mirna) for mirna in top_targets]
                top_mirna_importances = feature_importances[top_mirna_indices]

                X_train_combined["combined_weighted"] = np.dot(
                    X_train_scaled[top_targets],
                    top_mirna_importances / np.sum(top_mirna_importances),
                )
                X_test_combined["combined_weighted"] = np.dot(
                    X_test_scaled[top_targets],
                    top_mirna_importances / np.sum(top_mirna_importances),
                )

                # PCA (First Principal Component)
                pca = PCA(n_components=1)
                X_train_combined["combined_pca"] = pca.fit_transform(
                    X_train_scaled[top_targets]
                )
                X_test_combined["combined_pca"] = pca.transform(
                    X_test_scaled[top_targets]
                )

                # LDA (First Linear Discriminant)
                lda = LDA(n_components=1)
                X_train_combined["combined_lda"] = lda.fit_transform(
                    X_train_scaled[top_targets], y_train_numeric
                )  # Fit on numeric labels
                X_test_combined["combined_lda"] = lda.transform(
                    X_test_scaled[top_targets]
                )

                # 2. ROC Analysis with Combined Scores (Raw and Scaled, All Comparisons)
                roc_results_combined_raw = {}
                roc_results_combined_scaled = {}

                # Iterate through group comparisons
                for i in range(len(groups)):
                    for j in range(i + 1, len(groups)):
                        comparison = f"{groups[i]} vs {groups[j]}"
                        roc_results_combined_raw[comparison] = {}
                        roc_results_combined_scaled[comparison] = {}
                        # Subset data for the comparison, using y_train and y_test values for subsetting
                        train_subset = y_train["GROUP"].isin([groups[i], groups[j]])
                        test_subset = y_test["GROUP"].isin([groups[i], groups[j]])

                        # Raw and scaled data should also be subsetted using both X_train, X_test and also y_train and y_test indices
                        X_train_raw_subset = X_train_raw[train_subset]
                        X_test_raw_subset = X_test_raw[test_subset]

                        X_train_scaled_subset = X_train_scaled[train_subset]
                        X_test_scaled_subset = X_test_scaled[test_subset]
                        # Convert y_train, and y_test labels to numeric
                        y_train_subset = (
                            y_train[train_subset]["GROUP"]
                            .apply(lambda x: 0 if x == groups[i] else 1)
                            .values
                        )
                        y_test_subset = (
                            y_test[test_subset]["GROUP"]
                            .apply(lambda x: 0 if x == groups[i] else 1)
                            .values
                        )

                        # Combine raw values, scale and calculate ROC
                        combined_X_train_raw = (
                            X_train_raw_subset[top_targets]
                            .mean(axis=1)
                            .values.reshape(-1, 1)
                        )
                        combined_X_test_raw = (
                            X_test_raw_subset[top_targets]
                            .mean(axis=1)
                            .values.reshape(-1, 1)
                        )

                        combined_X_train_raw_scaled = scaler.fit_transform(
                            combined_X_train_raw
                        )
                        combined_X_test_raw_scaled = scaler.transform(
                            combined_X_test_raw
                        )

                        roc_results_combined_raw[comparison][
                            "combined_top_raw_scaled"
                        ] = roc_analysis(
                            combined_X_train_raw_scaled,
                            combined_X_test_raw_scaled,
                            y_train_subset,
                            y_test_subset,
                            "combined_top_raw_scaled",
                            comparison,
                            "scaled",
                        )

                        for method in combined_methods:
                            # Subset data for current comparison, using training set for fitting, test set for transforming as for other methods.
                            X_train_subset = X_train_combined[train_subset][
                                method
                            ].values.reshape(-1, 1)
                            X_test_subset = X_test_combined[test_subset][
                                method
                            ].values.reshape(-1, 1)
                            roc_results_combined_scaled[comparison][method] = (
                                roc_analysis(
                                    X_train_subset,
                                    X_test_subset,
                                    y_train_subset,
                                    y_test_subset,
                                    method,
                                    comparison,
                                    "scaled",
                                )
                            )

                            # Saving ROC Results and Correlations with Clinical Parameters: (Will be in the next response due to character limitations).


  if pd.api.types.is_categorical_dtype(y_train["GROUP"]):


In [None]:
import os
import pandas as pd
import numpy as np
import json
from typing import Dict, Any, Tuple, List
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, accuracy_score, recall_score, precision_score
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
import matplotlib.pyplot as plt
import seaborn as sns

# Constants for file paths
DATA_PATH: str = "data/processed"
RESULTS_MAIN_PATH: str = "results/main"
RESULTS_SUPPLEMENTARY_PATH: str = "results/supplementary"
TARGET_MIRNAS: List[str] = [
    "mean_mir146a",
    "mean_mir146b",
    "mean_mir155",
    "mean_mir203",
    "mean_mir223",
    "mean_mir381p",
]


def loadData() -> (
    Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]
):
    """
    Load and preprocess the data.

    Returns:
        Tuple containing:
            - dfScaled: Scaled training data with 'GROUP' column.
            - X_train_raw: Raw training data.
            - X_test_raw: Raw test data.
            - X_train_scaled: Scaled training data.
            - X_test_scaled: Scaled test data.
            - yTrain: Training labels.
            - yTest: Test labels.
    """
    try:
        # Load scaled data
        X_train_scaled: pd.DataFrame = pd.read_csv(
            os.path.join(DATA_PATH, "X_train_scaled.csv")
        )
        X_test_scaled: pd.DataFrame = pd.read_csv(
            os.path.join(DATA_PATH, "X_test_scaled.csv")
        )
        yTrain: pd.Series = pd.read_csv(os.path.join(DATA_PATH, "y_train.csv"))["GROUP"]
        yTest: pd.Series = pd.read_csv(os.path.join(DATA_PATH, "y_test.csv"))["GROUP"]

        # Load raw data
        X_train_raw: pd.DataFrame = pd.read_csv(
            os.path.join(DATA_PATH, "X_train_raw.csv")
        )
        X_test_raw: pd.DataFrame = pd.read_csv(
            os.path.join(DATA_PATH, "X_test_raw.csv")
        )

        # Add 'GROUP' column to dfScaled
        dfScaled: pd.DataFrame = X_train_scaled.copy()
        dfScaled["GROUP"] = yTrain.values
        return (
            dfScaled,
            X_train_raw,
            X_test_raw,
            X_train_scaled,
            X_test_scaled,
            yTrain,
            yTest,
        )
    except FileNotFoundError as e:
        print(f"File not found: {e.filename}")
        raise
    except KeyError as e:
        print(f"Missing expected column: {e}")
        raise
    except Exception as e:
        print(f"An unexpected error occurred while loading data: {e}")
        raise


def performPCA(
    df: pd.DataFrame, targets: List[str]
) -> Tuple[PCA, np.ndarray, pd.Series]:
    """
    Perform PCA on the target miRNAs and visualize the results.

    Args:
        df (pd.DataFrame): The scaled training data with 'GROUP'.
        targets (List[str]): List of target miRNA column names.

    Returns:
        Tuple containing PCA object, PCA-transformed data, and explained variance ratio.
    """
    pca: PCA = PCA()
    pcaResult: np.ndarray = pca.fit_transform(df[targets])
    explainedVariancePCA: pd.Series = pd.Series(pca.explained_variance_ratio_)

    saveExplainedVariance(explainedVariancePCA, "pca_explained_variance.csv", "pca")
    plotScatterPCA(pcaResult, df["GROUP"], explainedVariancePCA)

    return pca, pcaResult, explainedVariancePCA


def plotScatterPCA(
    pcaResult: np.ndarray, groups: pd.Series, explainedVariance: pd.Series
) -> None:
    """
    Generate and save pairwise 2D scatter plots of PCA components.

    Args:
        pcaResult (np.ndarray): PCA-transformed data.
        groups (pd.Series): The 'GROUP' labels.
        explainedVariance (pd.Series): Explained variance ratio for PCA components.
    """
    nComponents: int = pcaResult.shape[1]
    for i in range(nComponents):
        for j in range(i + 1, nComponents):
            plt.figure(figsize=(8, 6))
            sns.scatterplot(
                x=pcaResult[:, i],
                y=pcaResult[:, j],
                hue=groups,
                palette="viridis",
                edgecolor="k",
                alpha=0.7,
            )
            plt.title(f"PCA Scatter Plot (PC{i+1} vs PC{j+1})")
            plt.xlabel(f"Principal Component {i+1} ({explainedVariance[i]:.2%})")
            plt.ylabel(f"Principal Component {j+1} ({explainedVariance[j]:.2%})")
            plt.legend(title="Group")
            plt.tight_layout()

            filename: str = f"pca_scatter_pc{i+1}_vs_pc{j+1}.png"
            filepath: str = os.path.join(RESULTS_SUPPLEMENTARY_PATH, filename)

            savePlot(filepath)
            plt.close()


def performLDA(
    df: pd.DataFrame, targets: List[str]
) -> Tuple[LDA, np.ndarray, pd.Series]:
    """
    Perform LDA on the target miRNAs and visualize the results.

    Args:
        df (pd.DataFrame): The scaled training data with 'GROUP'.
        targets (List[str]): List of target miRNA column names.

    Returns:
        Tuple containing LDA object, LDA-transformed data, and explained variance ratio.
    """
    lda: LDA = LDA()
    ldaResult: np.ndarray = lda.fit_transform(df[targets], df["GROUP"])
    explainedVarianceLDA: pd.Series = pd.Series(lda.explained_variance_ratio_)

    saveExplainedVariance(explainedVarianceLDA, "lda_explained_variance.csv", "lda")
    plotScatterLDA(ldaResult, df["GROUP"], explainedVarianceLDA)

    return lda, ldaResult, explainedVarianceLDA


def plotScatterLDA(
    ldaResult: np.ndarray, groups: pd.Series, explainedVariance: pd.Series
) -> None:
    """
    Generate and save pairwise 2D scatter plots of LDA components.

    Args:
        ldaResult (np.ndarray): LDA-transformed data.
        groups (pd.Series): The 'GROUP' labels.
        explainedVariance (pd.Series): Explained variance ratio for LDA components.
    """
    nComponents: int = ldaResult.shape[1]
    for i in range(nComponents):
        for j in range(i + 1, nComponents):
            plt.figure(figsize=(8, 6))
            sns.scatterplot(
                x=ldaResult[:, i],
                y=ldaResult[:, j],
                hue=groups,
                palette="viridis",
                edgecolor="k",
                alpha=0.7,
            )
            plt.title(f"LDA Scatter Plot (LD{i+1} vs LD{j+1})")
            plt.xlabel(f"Linear Discriminant {i+1} ({explainedVariance[i]:.2%})")
            plt.ylabel(f"Linear Discriminant {j+1} ({explainedVariance[j]:.2%})")
            plt.legend(title="Group")
            plt.tight_layout()

            filename: str = f"lda_scatter_ld{i+1}_vs_ld{j+1}.png"
            filepath: str = os.path.join(RESULTS_SUPPLEMENTARY_PATH, filename)

            savePlot(filepath)
            plt.close()


def saveExplainedVariance(
    explainedVariance: pd.Series, filename: str, analysisType: str
) -> None:
    """
    Save the explained variance ratios to a CSV file.

    Args:
        explainedVariance (pd.Series): Explained variance ratios.
        filename (str): The filename for the CSV.
        analysisType (str): Type of analysis ('pca' or 'lda') to label components appropriately.
    """
    componentLabel: str = "PC" if analysisType.lower() == "pca" else "LD"
    explainedVarianceDF: pd.DataFrame = pd.DataFrame(
        {
            "Component": [
                f"{componentLabel}{i+1}" for i in range(len(explainedVariance))
            ],
            "Explained Variance Ratio": explainedVariance,
        }
    )

    filepath: str = os.path.join(RESULTS_SUPPLEMENTARY_PATH, filename)
    explainedVarianceDF.to_csv(filepath, index=False)


def savePlot(filepath: str) -> None:
    """
    Save the current matplotlib plot to the specified filepath.

    Args:
        filepath (str): The path where the plot will be saved.
    """
    try:
        plt.savefig(filepath, dpi=300)
    except Exception as e:
        print(f"Failed to save plot {filepath}: {e}")
        raise


def saveRocResults(
    rocResults: Dict[str, Dict[str, Dict[str, Any]]], outputBasePath: str
) -> None:
    """
    Save ROC analysis results to CSV and JSON files.

    Args:
        rocResults (Dict[str, Dict[str, Dict[str, Any]]]): Nested dictionary containing ROC metrics.
        outputBasePath (str): Base directory to save the ROC results.
    """
    for dataType, comparisons in rocResults.items():
        for comparison, methodsMetrics in comparisons.items():
            comparisonDir: str = (
                f"roc_{comparison.lower().replace(' ', '_')}_{dataType}"
            )
            outputDir: str = os.path.join(outputBasePath, comparisonDir)
            os.makedirs(outputDir, exist_ok=True)

            if isinstance(methodsMetrics, dict) and all(
                isinstance(metrics, dict) for metrics in methodsMetrics.values()
            ):
                tempDF: pd.DataFrame = pd.DataFrame.from_dict(
                    methodsMetrics, orient="index"
                )
                tempDF.index.name = "method"
                tempDF.reset_index(inplace=True)

                csvFilename: str = f"roc_results_{dataType}.csv"
                csvPath: str = os.path.join(outputDir, csvFilename)
                tempDF.to_csv(csvPath, index=False)
            else:
                print(
                    f"Unexpected data format for comparison '{comparison}' and data type '{dataType}'. Skipping..."
                )
                continue

    jsonPath: str = os.path.join(outputBasePath, "roc_results_combined_all.json")
    try:
        with open(jsonPath, "w") as fp:
            json.dump(rocResults, fp, indent=4)
    except Exception as e:
        print(f"Failed to save ROC results to JSON file: {e}")
        raise


def performCorrelationAnalysis(
    X_train_raw: pd.DataFrame,
    X_test_raw: pd.DataFrame,
    X_train_scaled: pd.DataFrame,
    X_test_scaled: pd.DataFrame,
    topTargets: List[str],
    clinicalParams: List[str],
) -> None:
    """
    Perform correlation analysis for combined miRNAs and save heatmaps and matrices.

    Args:
        X_train_raw (pd.DataFrame): Raw training data.
        X_test_raw (pd.DataFrame): Raw test data.
        X_train_scaled (pd.DataFrame): Scaled training data.
        X_test_scaled (pd.DataFrame): Scaled test data.
        topTargets (List[str]): List of top target miRNA column names.
        clinicalParams (List[str]): List of clinical parameter column names.
    """
    try:
        X_train_raw["combined_raw"] = X_train_raw[topTargets].mean(axis=1)
        X_test_raw["combined_raw"] = X_test_raw[topTargets].mean(axis=1)

        X_train_scaled["combined_scaled_top"] = X_train_scaled[topTargets].mean(axis=1)
        X_test_scaled["combined_scaled_top"] = X_test_scaled[topTargets].mean(axis=1)

        correlationMatrixCombinedRaw: pd.DataFrame = X_train_raw[
            ["combined_raw"] + clinicalParams
        ].corr(method="pearson")
        correlationMatrixCombinedScaled: pd.DataFrame = X_train_scaled[
            ["combined_scaled_top"] + clinicalParams
        ].corr(method="pearson")

        generateAndSaveHeatmap(
            correlationMatrixCombinedRaw,
            "Correlation Heatmap (Combined Raw Ct Values)",
            os.path.join(
                RESULTS_SUPPLEMENTARY_PATH, "correlation_heatmap_combined_raw.png"
            ),
        )

        generateAndSaveHeatmap(
            correlationMatrixCombinedScaled,
            "Correlation Heatmap (Combined Scaled Ct Values)",
            os.path.join(RESULTS_MAIN_PATH, "correlation_heatmap_combined_scaled.png"),
        )

        correlationMatrixCombinedRaw.to_csv(
            os.path.join(
                RESULTS_SUPPLEMENTARY_PATH, "correlation_matrix_combined_raw.csv"
            ),
            index=True,
        )
        correlationMatrixCombinedScaled.to_csv(
            os.path.join(RESULTS_MAIN_PATH, "correlation_matrix_combined_scaled.csv"),
            index=True,
        )
    except KeyError as e:
        print(f"Missing expected column: {e}")
        raise
    except Exception as e:
        print(f"An error occurred during correlation analysis: {e}")
        raise


def generateAndSaveHeatmap(corrMatrix: pd.DataFrame, title: str, filepath: str) -> None:
    """
    Generate and save a heatmap for the given correlation matrix.

    Args:
        corrMatrix (pd.DataFrame): Correlation matrix to visualize.
        title (str): Title of the heatmap.
        filepath (str): Path to save the heatmap image.
    """
    try:
        plt.figure(figsize=(10, 6))
        sns.heatmap(corrMatrix, annot=True, cmap="coolwarm", fmt=".2f")
        plt.title(title)
        plt.tight_layout()
        plt.savefig(filepath)
        plt.close()
    except Exception as e:
        print(f"Failed to generate or save heatmap '{title}': {e}")
        raise


def performROCAnalysisRaw(
    X_train: pd.DataFrame, y_train: pd.Series, comparison: str, method: str
) -> Dict[str, Any]:
    """
    Perform ROC analysis on raw data for a given comparison and method.

    Args:
        X_train (pd.DataFrame): Training features.
        y_train (pd.Series): Training labels.
        comparison (str): Comparison groups (e.g., 'S vs G').
        method (str): Machine learning method to use.

    Returns:
        Dict[str, Any]: Dictionary of ROC metrics.
    """
    # Implement your ROC analysis logic here
    # For the placeholder, we'll generate random metrics
    np.random.seed(0)  # For reproducibility
    metrics = {
        "AUC": np.random.rand(),
        "Accuracy": np.random.rand(),
        "Sensitivity": np.random.rand(),
        "Specificity": np.random.rand(),
    }
    return metrics


def performROCAnalysisScaled(
    X_train: pd.DataFrame, y_train: pd.Series, comparison: str, method: str
) -> Dict[str, Any]:
    """
    Perform ROC analysis on scaled data for a given comparison and method.

    Args:
        X_train (pd.DataFrame): Training features.
        y_train (pd.Series): Training labels.
        comparison (str): Comparison groups (e.g., 'S vs G').
        method (str): Machine learning method to use.

    Returns:
        Dict[str, Any]: Dictionary of ROC metrics.
    """
    # Implement your ROC analysis logic here
    # For the placeholder, we'll generate random metrics
    np.random.seed(0)  # For reproducibility
    metrics = {
        "AUC": np.random.rand(),
        "Accuracy": np.random.rand(),
        "Sensitivity": np.random.rand(),
        "Specificity": np.random.rand(),
    }
    return metrics


def main() -> None:
    """
    Main function to perform PCA, LDA, ROC analysis, and correlation analysis.
    """
    os.makedirs(RESULTS_MAIN_PATH, exist_ok=True)
    os.makedirs(RESULTS_SUPPLEMENTARY_PATH, exist_ok=True)

    # Load data
    (
        dfScaled,
        X_train_raw,
        X_test_raw,
        X_train_scaled,
        X_test_scaled,
        yTrain,
        yTest,
    ) = loadData()

    # Perform PCA and LDA analyses
    pca, pcaResult, explainedVariancePCA = performPCA(dfScaled, TARGET_MIRNAS)
    lda, ldaResult, explainedVarianceLDA = performLDA(dfScaled, TARGET_MIRNAS)

    # Placeholder for top targets and clinical parameters
    topTargets: List[str] = TARGET_MIRNAS  # Update with actual top targets if different
    clinicalParams: List[str] = [
        "AGE",
        "SEX",
        "plaque_index",
        "gingival_index",
    ]  # Replace with actual parameters

    # Perform ROC analysis and collect results
    comparisons = ["S vs G", "S vs P", "G vs P"]
    methods = [
        "LogisticRegression",
        "RandomForest",
        "SVM",
    ]  # Update with your actual methods

    rocResultsCombinedRaw: Dict[str, Dict[str, Dict[str, Any]]] = {}
    rocResultsCombinedScaled: Dict[str, Dict[str, Dict[str, Any]]] = {}

    # For raw data
    for comparison in comparisons:
        rocResultsCombinedRaw[comparison] = {}
        for method in methods:
            metricsRaw = performROCAnalysisRaw(X_train_raw, yTrain, comparison, method)
            rocResultsCombinedRaw[comparison][method] = metricsRaw

    # For scaled data
    for comparison in comparisons:
        rocResultsCombinedScaled[comparison] = {}
        for method in methods:
            metricsScaled = performROCAnalysisScaled(
                X_train_scaled, yTrain, comparison, method
            )
            rocResultsCombinedScaled[comparison][method] = metricsScaled

    # Combine ROC results
    rocResultsCombinedAll = {
        "combined_raw": rocResultsCombinedRaw,
        "combined_scaled": rocResultsCombinedScaled,
    }

    # Save ROC results
    saveRocResults(rocResultsCombinedAll, RESULTS_MAIN_PATH)

    # Perform correlation analysis
    performCorrelationAnalysis(
        X_train_raw,
        X_test_raw,
        X_train_scaled,
        X_test_scaled,
        topTargets,
        clinicalParams,
    )


if __name__ == "__main__":
    main()


In [None]:
import os
import pandas as pd
import numpy as np
import json
from typing import Dict, Any, Tuple
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
import matplotlib.pyplot as plt
import seaborn as sns

# Constants for file paths
DATA_PATH: str = "data/processed"
RESULTS_MAIN_PATH: str = "results/main"
RESULTS_SUPPLEMENTARY_PATH: str = "results/supplementary"
TARGET_MIRNAS: list[str] = [
    "mean_mir146a",
    "mean_mir146b",
    "mean_mir155",
    "mean_mir203",
    "mean_mir223",
    "mean_mir381p",
]


def load_data() -> pd.DataFrame:
    """
    Load and preprocess the scaled training data by adding the 'GROUP' column.

    Returns:
        pd.DataFrame: The scaled training data with the 'GROUP' column.
    """
    try:
        df_scaled: pd.DataFrame = pd.read_csv(
            os.path.join(DATA_PATH, "X_train_scaled.csv")
        )
        y_train: pd.DataFrame = pd.read_csv(os.path.join(DATA_PATH, "y_train.csv"))

        if "GROUP" not in y_train.columns:
            raise KeyError("The 'GROUP' column is missing from y_train.csv")

        df_scaled["GROUP"] = y_train["GROUP"].values
        return df_scaled
    except FileNotFoundError as e:
        print(f"File not found: {e.filename}")
        raise
    except KeyError as e:
        print(e)
        raise
    except Exception as e:
        print(f"An unexpected error occurred while loading data: {e}")
        raise


def perform_pca(
    df: pd.DataFrame, targets: list[str]
) -> Tuple[PCA, np.ndarray, pd.Series]:
    """
    Perform PCA on the target miRNAs and visualize the results.

    Args:
        df (pd.DataFrame): The scaled training data with 'GROUP'.
        targets (list[str]): List of target miRNA column names.

    Returns:
        Tuple[PCA, np.ndarray, pd.Series]: PCA object, PCA-transformed data, and explained variance ratio.
    """
    pca: PCA = PCA()
    pca_result: np.ndarray = pca.fit_transform(df[targets])
    explained_variance_pca: pd.Series = pd.Series(pca.explained_variance_ratio_)

    save_explained_variance(explained_variance_pca, "pca_explained_variance.csv", "pca")
    plot_scatter_pca(pca_result, df["GROUP"], explained_variance_pca)

    return pca, pca_result, explained_variance_pca


def plot_scatter_pca(
    pca_result: np.ndarray, groups: pd.Series, explained_variance: pd.Series
) -> None:
    """
    Generate and save pairwise 2D scatter plots of PCA components.

    Args:
        pca_result (np.ndarray): PCA-transformed data.
        groups (pd.Series): The 'GROUP' labels.
        explained_variance (pd.Series): Explained variance ratio for PCA components.
    """
    n_components: int = pca_result.shape[1]
    for i in range(n_components):
        for j in range(i + 1, n_components):
            plt.figure(figsize=(8, 6))
            sns.scatterplot(
                x=pca_result[:, i],
                y=pca_result[:, j],
                hue=groups,
                palette="viridis",
                edgecolor="k",
                alpha=0.7,
            )
            plt.title(f"PCA Scatter Plot (PC{i+1} vs PC{j+1})")
            plt.xlabel(f"Principal Component {i+1} ({explained_variance[i]:.2%})")
            plt.ylabel(f"Principal Component {j+1} ({explained_variance[j]:.2%})")
            plt.legend(title="Group")
            plt.tight_layout()

            filename: str = f"pca_scatter_pc{i+1}_vs_pc{j+1}.png"
            filepath: str = os.path.join(RESULTS_SUPPLEMENTARY_PATH, filename)

            save_plot(filepath)
            plt.close()


def perform_lda(
    df: pd.DataFrame, targets: list[str]
) -> Tuple[LDA, np.ndarray, pd.Series]:
    """
    Perform LDA on the target miRNAs and visualize the results.

    Args:
        df (pd.DataFrame): The scaled training data with 'GROUP'.
        targets (list[str]): List of target miRNA column names.

    Returns:
        Tuple[LDA, np.ndarray, pd.Series]: LDA object, LDA-transformed data, and explained variance ratio.
    """
    lda: LDA = LDA()
    lda_result: np.ndarray = lda.fit_transform(df[targets], df["GROUP"])
    explained_variance_lda: pd.Series = pd.Series(lda.explained_variance_ratio_)

    save_explained_variance(explained_variance_lda, "lda_explained_variance.csv", "lda")
    plot_scatter_lda(lda_result, df["GROUP"], explained_variance_lda)

    return lda, lda_result, explained_variance_lda


def plot_scatter_lda(
    lda_result: np.ndarray, groups: pd.Series, explained_variance: pd.Series
) -> None:
    """
    Generate and save pairwise 2D scatter plots of LDA components.

    Args:
        lda_result (np.ndarray): LDA-transformed data.
        groups (pd.Series): The 'GROUP' labels.
        explained_variance (pd.Series): Explained variance ratio for LDA components.
    """
    n_components: int = lda_result.shape[1]
    for i in range(n_components):
        for j in range(i + 1, n_components):
            plt.figure(figsize=(8, 6))
            sns.scatterplot(
                x=lda_result[:, i],
                y=lda_result[:, j],
                hue=groups,
                palette="viridis",
                edgecolor="k",
                alpha=0.7,
            )
            plt.title(f"LDA Scatter Plot (LD{i+1} vs LD{j+1})")
            plt.xlabel(f"Linear Discriminant {i+1} ({explained_variance[i]:.2%})")
            plt.ylabel(f"Linear Discriminant {j+1} ({explained_variance[j]:.2%})")
            plt.legend(title="Group")
            plt.tight_layout()

            filename: str = f"lda_scatter_ld{i+1}_vs_ld{j+1}.png"
            filepath: str = os.path.join(RESULTS_SUPPLEMENTARY_PATH, filename)

            save_plot(filepath)
            plt.close()


def save_explained_variance(
    explained_variance: pd.Series, filename: str, analysis_type: str
) -> None:
    """
    Save the explained variance ratios to a CSV file.

    Args:
        explained_variance (pd.Series): Explained variance ratios.
        filename (str): The filename for the CSV.
        analysis_type (str): Type of analysis ('pca' or 'lda') to label components appropriately.
    """
    component_label: str = "PC" if analysis_type.lower() == "pca" else "LD"
    explained_variance_df: pd.DataFrame = pd.DataFrame(
        {
            "Component": [
                f"{component_label}{i+1}" for i in range(len(explained_variance))
            ],
            "Explained Variance Ratio": explained_variance,
        }
    )

    filepath: str = os.path.join(RESULTS_SUPPLEMENTARY_PATH, filename)
    explained_variance_df.to_csv(filepath, index=False)


def save_plot(filepath: str) -> None:
    """
    Save the current matplotlib plot to the specified filepath.

    Args:
        filepath (str): The path where the plot will be saved.
    """
    try:
        plt.savefig(filepath, dpi=300)
    except Exception as e:
        print(f"Failed to save plot {filepath}: {e}")
        raise


def save_roc_results(
    roc_results: Dict[str, Dict[str, Dict[str, Any]]], output_base_path: str
) -> None:
    """
    Save ROC analysis results to CSV and JSON files.

    Args:
        roc_results (Dict[str, Dict[str, Dict[str, Any]]]): Nested dictionary containing ROC metrics.
        output_base_path (str): Base directory to save the ROC results.
    """
    for data_type, comparisons in roc_results.items():
        for comparison, methods_metrics in comparisons.items():
            comparison_dir: str = (
                f"roc_{comparison.lower().replace(' ', '_')}_{data_type}"
            )
            output_dir: str = os.path.join(output_base_path, comparison_dir)
            os.makedirs(output_dir, exist_ok=True)

            if isinstance(methods_metrics, dict) and all(
                isinstance(metrics, dict) for metrics in methods_metrics.values()
            ):
                temp_df: pd.DataFrame = pd.DataFrame.from_dict(
                    methods_metrics, orient="index"
                )
                temp_df.index.name = "method"
                temp_df.reset_index(inplace=True)

                csv_filename: str = f"roc_results_{data_type}.csv"
                csv_path: str = os.path.join(output_dir, csv_filename)
                temp_df.to_csv(csv_path, index=False)
            else:
                print(
                    f"Unexpected data format for comparison '{comparison}' and data type '{data_type}'. Skipping..."
                )
                continue

    json_path: str = os.path.join(output_base_path, "roc_results_combined_all.json")
    try:
        with open(json_path, "w") as fp:
            json.dump(roc_results, fp, indent=4)
    except Exception as e:
        print(f"Failed to save ROC results to JSON file: {e}")
        raise


def perform_correlation_analysis(
    X_train_raw: pd.DataFrame,
    X_test_raw: pd.DataFrame,
    X_train_scaled: pd.DataFrame,
    X_test_scaled: pd.DataFrame,
    top_targets: list[str],
    clinical_params: list[str],
) -> None:
    """
    Perform correlation analysis for combined miRNAs and save heatmaps and matrices.

    Args:
        X_train_raw (pd.DataFrame): Raw scaled training data.
        X_test_raw (pd.DataFrame): Raw scaled test data.
        X_train_scaled (pd.DataFrame): Scaled training data.
        X_test_scaled (pd.DataFrame): Scaled test data.
        top_targets (list[str]): List of top target miRNA column names.
        clinical_params (list[str]): List of clinical parameter column names.
    """
    try:
        X_train_raw["combined_raw"] = X_train_raw[top_targets].mean(axis=1)
        X_test_raw["combined_raw"] = X_test_raw[top_targets].mean(axis=1)

        X_train_scaled["combined_scaled_top"] = X_train_scaled[top_targets].mean(axis=1)
        X_test_scaled["combined_scaled_top"] = X_test_scaled[top_targets].mean(axis=1)

        correlation_matrix_combined_raw: pd.DataFrame = X_train_raw[
            ["combined_raw"] + clinical_params
        ].corr(method="pearson")
        correlation_matrix_combined_scaled: pd.DataFrame = X_train_scaled[
            ["combined_scaled_top"] + clinical_params
        ].corr(method="pearson")

        generate_and_save_heatmap(
            correlation_matrix_combined_raw,
            "Correlation Heatmap (Combined Raw Ct Values)",
            os.path.join(
                RESULTS_SUPPLEMENTARY_PATH, "correlation_heatmap_combined_raw.png"
            ),
        )

        generate_and_save_heatmap(
            correlation_matrix_combined_scaled,
            "Correlation Heatmap (Combined Scaled Ct Values)",
            os.path.join(RESULTS_MAIN_PATH, "correlation_heatmap_combined_scaled.png"),
        )

        correlation_matrix_combined_raw.to_csv(
            os.path.join(
                RESULTS_SUPPLEMENTARY_PATH, "correlation_matrix_combined_raw.csv"
            ),
            index=True,
        )
        correlation_matrix_combined_scaled.to_csv(
            os.path.join(RESULTS_MAIN_PATH, "correlation_matrix_combined_scaled.csv"),
            index=True,
        )
    except KeyError as e:
        print(f"Missing expected column: {e}")
        raise
    except Exception as e:
        print(f"An error occurred during correlation analysis: {e}")
        raise


def generate_and_save_heatmap(
    corr_matrix: pd.DataFrame, title: str, filepath: str
) -> None:
    """
    Generate and save a heatmap for the given correlation matrix.

    Args:
        corr_matrix (pd.DataFrame): Correlation matrix to visualize.
        title (str): Title of the heatmap.
        filepath (str): Path to save the heatmap image.
    """
    try:
        plt.figure(figsize=(10, 6))
        sns.heatmap(corr_matrix, annot=True, cmap="coolwarm", fmt=".2f")
        plt.title(title)
        plt.tight_layout()
        plt.savefig(filepath)
        plt.close()
    except Exception as e:
        print(f"Failed to generate or save heatmap '{title}': {e}")
        raise


def main() -> None:
    """
    Main function to perform PCA, LDA, ROC analysis, and correlation analysis.
    """
    os.makedirs(RESULTS_MAIN_PATH, exist_ok=True)
    os.makedirs(RESULTS_SUPPLEMENTARY_PATH, exist_ok=True)

    df_scaled: pd.DataFrame = load_data()

    pca, pca_result, explained_variance_pca = perform_pca(df_scaled, TARGET_MIRNAS)
    lda, lda_result, explained_variance_lda = perform_lda(df_scaled, TARGET_MIRNAS)

    # ... (previous code for combining miRNAs and ROC analysis) ...

    roc_results_combined_all: Dict[str, Dict[str, Dict[str, Any]]] = {
        "combined_raw": roc_results_combined_raw,
        "combined_scaled": roc_results_combined_scaled,
    }

    save_roc_results(roc_results_combined_all, RESULTS_MAIN_PATH)

    perform_correlation_analysis(
        X_train_raw,
        X_test_raw,
        X_train_scaled,
        X_test_scaled,
        top_targets,
        clinical_params,
    )


if __name__ == "__main__":
    main()


Unexpected data format for comparison 'S vs G' and data type 'combined_raw'. Skipping...
Unexpected data format for comparison 'S vs P' and data type 'combined_raw'. Skipping...
Unexpected data format for comparison 'G vs P' and data type 'combined_raw'. Skipping...
Unexpected data format for comparison 'S vs G' and data type 'combined_scaled'. Skipping...
Unexpected data format for comparison 'S vs P' and data type 'combined_scaled'. Skipping...
Unexpected data format for comparison 'G vs P' and data type 'combined_scaled'. Skipping...


In [None]:
import os
import pandas as pd
import numpy as np
from typing import Dict, Any, Tuple, List
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import roc_curve, auc
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
import matplotlib.pyplot as plt
import seaborn as sns
import json
import pickle

# Constants for file paths
DATA_PATH: str = "data/processed"
RESULTS_MAIN_PATH: str = "results/main"
RESULTS_SUPPLEMENTARY_PATH: str = "results/supplementary"

TARGET_MIRNAS: List[str] = [
    "mean_mir146a",
    "mean_mir146b",
    "mean_mir155",
    "mean_mir203",
    "mean_mir223",
    "mean_mir381p",
]


def load_data() -> Tuple[
    pd.DataFrame,
    pd.DataFrame,
    pd.DataFrame,
    pd.DataFrame,
    pd.DataFrame,
    pd.Series,
    pd.Series,
]:
    """
    Load and preprocess the data.

    Returns:
        Tuple containing:
            - df_scaled: Scaled training data with 'GROUP' column.
            - X_train_raw: Raw training data.
            - X_test_raw: Raw test data.
            - X_train_scaled: Scaled training data.
            - X_test_scaled: Scaled test data.
            - y_train: Training labels.
            - y_test: Test labels.
    """
    try:
        # Load scaled data
        X_train_scaled: pd.DataFrame = pd.read_csv(
            os.path.join(DATA_PATH, "X_train_scaled.csv")
        )
        X_test_scaled: pd.DataFrame = pd.read_csv(
            os.path.join(DATA_PATH, "X_test_scaled.csv")
        )
        y_train: pd.Series = pd.read_csv(os.path.join(DATA_PATH, "y_train.csv"))[
            "GROUP"
        ]
        y_test: pd.Series = pd.read_csv(os.path.join(DATA_PATH, "y_test.csv"))["GROUP"]

        # Load raw data
        X_train_raw: pd.DataFrame = pd.read_csv(
            os.path.join(DATA_PATH, "X_train_raw.csv")
        )
        X_test_raw: pd.DataFrame = pd.read_csv(
            os.path.join(DATA_PATH, "X_test_raw.csv")
        )

        # Add 'GROUP' column to df_scaled
        df_scaled: pd.DataFrame = X_train_scaled.copy()
        df_scaled["GROUP"] = y_train.values

        return (
            df_scaled,
            X_train_raw,
            X_test_raw,
            X_train_scaled,
            X_test_scaled,
            y_train,
            y_test,
        )
    except FileNotFoundError as e:
        print(f"File not found: {e.filename}")
        raise
    except KeyError as e:
        print(f"Missing expected column: {e}")
        raise
    except Exception as e:
        print(f"An unexpected error occurred while loading data: {e}")
        raise


def perform_roc_analysis(
    X_train: pd.DataFrame,
    X_test: pd.DataFrame,
    y_train: pd.Series,
    y_test: pd.Series,
    model_name: str,
) -> Dict[str, Any]:
    """
    Perform ROC analysis for a given model.

    Args:
        X_train (pd.DataFrame): Training features.
        X_test (pd.DataFrame): Testing features.
        y_train (pd.Series): Training labels.
        y_test (pd.Series): Testing labels.
        model_name (str): Name of the model to use.

    Returns:
        Dict[str, Any]: Dictionary containing FPR, TPR, and AUC.
    """
    try:
        # Initialize the model based on the model name
        if model_name == "LogisticRegression":
            model = LogisticRegression(solver="liblinear")
        elif model_name == "RandomForest":
            model = RandomForestClassifier(n_estimators=100)
        elif model_name == "SVM":
            model = SVC(probability=True)
        else:
            raise ValueError(f"Unsupported model: {model_name}")

        # Train the model
        model.fit(X_train, y_train)

        # Predict probabilities
        y_scores = model.predict_proba(X_test)[:, 1]

        # Calculate ROC curve and AUC
        fpr, tpr, _ = roc_curve(y_test, y_scores)
        roc_auc = auc(fpr, tpr)

        # Convert NumPy arrays to lists for JSON serialization
        return {"FPR": fpr.tolist(), "TPR": tpr.tolist(), "AUC": roc_auc}
    except Exception as e:
        print(f"Error in perform_roc_analysis for model {model_name}: {e}")
        raise


def roc_analysis(
    X_train: pd.DataFrame,
    X_test: pd.DataFrame,
    y_train: pd.Series,
    y_test: pd.Series,
    target_name: str,
    comparison_name: str,
    data_type: str,
) -> Dict[str, Any]:
    """
    Wrapper function to perform ROC analysis and handle debugging.

    Args:
        X_train (pd.DataFrame): Training features.
        X_test (pd.DataFrame): Testing features.
        y_train (pd.Series): Training labels.
        y_test (pd.Series): Testing labels.
        target_name (str): Name of the target feature.
        comparison_name (str): Description of the comparison.
        data_type (str): Type of data ('raw' or 'scaled').

    Returns:
        Dict[str, Any]: ROC analysis results.
    """
    try:
        print(
            f"Performing ROC analysis for {comparison_name} using {target_name} ({data_type} data)..."
        )
        models = ["LogisticRegression", "RandomForest", "SVM"]
        results = {}
        for model in models:
            metrics = perform_roc_analysis(X_train, X_test, y_train, y_test, model)
            results[model] = metrics
            print(
                f"Completed {model} for {comparison_name} using {target_name}. AUC: {metrics['AUC']:.4f}"
            )
        return results
    except Exception as e:
        print(
            f"Error in roc_analysis for {comparison_name} with target {target_name}: {e}"
        )
        raise


def main() -> None:
    """
    Main function to perform data loading, preprocessing, and ROC analysis.
    """
    try:
        # Create directories if they don't exist
        os.makedirs(RESULTS_MAIN_PATH, exist_ok=True)
        os.makedirs(RESULTS_SUPPLEMENTARY_PATH, exist_ok=True)

        # Load data
        (
            df_scaled,
            X_train_raw,
            X_test_raw,
            X_train_scaled,
            X_test_scaled,
            y_train,
            y_test,
        ) = load_data()

        # Define unique groups
        unique_groups = y_train.unique()
        print(f"Unique groups in training data: {unique_groups}")

        # Define top miRNAs and clinical parameters
        top_mirnas = TARGET_MIRNAS  # Update if different
        top_clinical_params = [
            "pocket_depth",
            "bleeding_on_probing",
            # Add other clinical parameters as needed
        ]

        # Initialize ROC results dictionary
        roc_results_combined_clinical: Dict[str, Dict[str, Any]] = {}

        # Define valid comparisons (exclude comparisons within the same group)
        valid_comparisons = []
        for i in range(len(unique_groups)):
            for j in range(i + 1, len(unique_groups)):
                comparison_name = f"{unique_groups[i]} vs {unique_groups[j]}"
                valid_comparisons.append(
                    (unique_groups[i], unique_groups[j], comparison_name)
                )

        print(f"Valid comparisons: {[name for _, _, name in valid_comparisons]}")

        for group1, group2, comparison_name in valid_comparisons:
            print(f"Starting ROC analysis for comparison: {comparison_name}")

            # Subset training data for current comparison
            train_indices = (y_train == group1) | (y_train == group2)
            X_train_subset_scaled = X_train_scaled[train_indices]
            y_train_subset = y_train[train_indices].apply(
                lambda x: 0 if x == group1 else 1
            )

            # Subset testing data for current comparison
            test_indices = (y_test == group1) | (y_test == group2)
            X_test_subset_scaled = X_test_scaled[test_indices]
            y_test_subset = y_test[test_indices].apply(
                lambda x: 0 if x == group1 else 1
            )

            # Validate consistent sample sizes and presence of both classes
            unique_train_labels = y_train_subset.unique()
            unique_test_labels = y_test_subset.unique()
            print(
                f"Training samples: {len(X_train_subset_scaled)}, Testing samples: {len(X_test_subset_scaled)}"
            )
            print(
                f"Training labels: {unique_train_labels}, Testing labels: {unique_test_labels}"
            )

            if len(unique_train_labels) < 2 or len(unique_test_labels) < 2:
                print(
                    f"Skipping ROC analysis for {comparison_name} due to insufficient classes.\n"
                )
                continue

            roc_results_combined_clinical[comparison_name] = {}

            # ROC Analysis using only Clinical Parameters
            for param in top_clinical_params:
                if param not in X_train_subset_scaled.columns:
                    print(
                        f"Parameter '{param}' not found in X_train_subset_scaled. Skipping..."
                    )
                    continue
                try:
                    roc_result = roc_analysis(
                        X_train_subset_scaled[[param]],
                        X_test_subset_scaled[[param]],
                        y_train_subset,
                        y_test_subset,
                        param,
                        comparison_name,
                        "scaled",
                    )
                    roc_results_combined_clinical[comparison_name][param] = roc_result
                except Exception as e:
                    print(
                        f"Failed ROC analysis for parameter '{param}' in {comparison_name}: {e}"
                    )

            # Combine Scaled Top miRNAs for the Current Comparison
            try:
                combined_top_miRNA_train = (
                    X_train_subset_scaled[top_mirnas].mean(axis=1).values.reshape(-1, 1)
                )
                combined_top_miRNA_test = (
                    X_test_subset_scaled[top_mirnas].mean(axis=1).values.reshape(-1, 1)
                )

                combined_top_miRNA_train_df = pd.DataFrame(
                    combined_top_miRNA_train, columns=["combinedTopMiRNA"]
                )
                combined_top_miRNA_test_df = pd.DataFrame(
                    combined_top_miRNA_test, columns=["combinedTopMiRNA"]
                )

                roc_combined_top_miRNA = roc_analysis(
                    combined_top_miRNA_train_df,
                    combined_top_miRNA_test_df,
                    y_train_subset,
                    y_test_subset,
                    "combinedTopMiRNA",
                    comparison_name,
                    "scaled",
                )
                roc_results_combined_clinical[comparison_name][
                    "combinedTopMiRNA"
                ] = roc_combined_top_miRNA
            except Exception as e:
                print(
                    f"Failed ROC analysis for combinedTopMiRNA in {comparison_name}: {e}"
                )

            print(f"Completed ROC analysis for {comparison_name}\n")

            # Save ROC Results
            roc_results_path = os.path.join(
                RESULTS_MAIN_PATH, "roc_results_combined_clinical.json"
            )
            try:
                with open(roc_results_path, "w") as f:
                    json.dump(roc_results_combined_clinical, f, indent=4)
                print(f"ROC analysis results saved to {roc_results_path}")
            except Exception as e:
                print(f"Failed to save ROC results to {roc_results_path}: {e}")
                raise

    except Exception as e:
        print(f"An error occurred in the main function: {e}")
        raise


if __name__ == "__main__":
    main()


Unique groups in training data: ['G' 'S' 'P']
Valid comparisons: ['G vs S', 'G vs P', 'S vs P']
Starting ROC analysis for comparison: G vs S
Training samples: 57, Testing samples: 15
Training labels: [0 1], Testing labels: [1 0]
Performing ROC analysis for G vs S using pocket_depth (scaled data)...
Completed LogisticRegression for G vs S using pocket_depth. AUC: 0.5893
Completed RandomForest for G vs S using pocket_depth. AUC: 0.5000
Completed SVM for G vs S using pocket_depth. AUC: 0.4821
Performing ROC analysis for G vs S using bleeding_on_probing (scaled data)...
Completed LogisticRegression for G vs S using bleeding_on_probing. AUC: 1.0000
Completed RandomForest for G vs S using bleeding_on_probing. AUC: 1.0000
Completed SVM for G vs S using bleeding_on_probing. AUC: 1.0000
Performing ROC analysis for G vs S using combinedTopMiRNA (scaled data)...
Completed LogisticRegression for G vs S using combinedTopMiRNA. AUC: 0.4464
Completed RandomForest for G vs S using combinedTopMiRNA. AU