In [None]:
data split

In [1]:
import os
import pandas as pd
from sklearn.model_selection import train_test_split

# Set file paths
data_dir = "./clustering"  # Directory containing clustering results
output_dir = "./train_test_split"  # Directory to save train-test split results
os.makedirs(output_dir, exist_ok=True)

# File list
files = [
    "AM-I-filtered/AM-I-filtered_with_labels_k4.csv",
    'AM-II-filtered/AM-II-filtered_with_labels_k3.csv',
    'AM-III-filtered/AM-III-filtered_with_labels_k4.csv'
]

# Random seed
random_seed = 42

# Main process
for file in files:
    file_path = os.path.join(data_dir, file)
    df = pd.read_csv(file_path)  # Read clustering result file

    # Remove rows with missing values
    df = df.dropna()

    # Initialize empty DataFrames to store train and test data for the current file
    train_data = pd.DataFrame()
    test_data = pd.DataFrame()

    # Group by UMAP_Cluster
    for cluster in df['UMAP_Cluster'].unique():
        cluster_df = df[df['UMAP_Cluster'] == cluster].reset_index(drop=True)

        # Split into train and test sets
        train_df, test_df = train_test_split(cluster_df, test_size=0.1, random_state=random_seed)

        # Append current cluster's train and test data to the main DataFrames
        train_data = pd.concat([train_data, train_df], ignore_index=True)
        test_data = pd.concat([test_data, test_df], ignore_index=True)

    # Check for duplicate samples between train and test sets
    duplicate_rows = train_data.merge(test_data, how='inner')
    if not duplicate_rows.empty:
        print(f"‚ö†Ô∏è Potential data leakage detected (duplicate samples in train and test sets): {file}")
        print(f"Number of duplicate samples = {len(duplicate_rows)}")
    else:
        print(f"‚úÖ No duplicate samples between train and test sets: {file}")

    # Create output filenames (remove directory path, keep only filename)
    base_filename = os.path.basename(file_path)  # Get filename, e.g., "Default-2_with_labels_k4.csv"
    base_name_without_ext = os.path.splitext(base_filename)[0]  # Remove extension
    
    # Save train and test sets for the current file
    train_output_path = os.path.join(output_dir, f"{base_name_without_ext}_train.csv")
    test_output_path = os.path.join(output_dir, f"{base_name_without_ext}_test.csv")

    train_data.to_csv(train_output_path, index=False)
    test_data.to_csv(test_output_path, index=False)

    print(f"Saved train data for {file} to {train_output_path}")
    print(f"Saved test data for {file} to {test_output_path}")

‚úÖ No duplicate samples between train and test sets: AM-I-filtered/AM-I-filtered_with_labels_k4.csv
Saved train data for AM-I-filtered/AM-I-filtered_with_labels_k4.csv to ./train_test_split/AM-I-filtered_with_labels_k4_train.csv
Saved test data for AM-I-filtered/AM-I-filtered_with_labels_k4.csv to ./train_test_split/AM-I-filtered_with_labels_k4_test.csv
‚úÖ No duplicate samples between train and test sets: AM-II-filtered/AM-II-filtered_with_labels_k3.csv
Saved train data for AM-II-filtered/AM-II-filtered_with_labels_k3.csv to ./train_test_split/AM-II-filtered_with_labels_k3_train.csv
Saved test data for AM-II-filtered/AM-II-filtered_with_labels_k3.csv to ./train_test_split/AM-II-filtered_with_labels_k3_test.csv
‚úÖ No duplicate samples between train and test sets: AM-III-filtered/AM-III-filtered_with_labels_k4.csv
Saved train data for AM-III-filtered/AM-III-filtered_with_labels_k4.csv to ./train_test_split/AM-III-filtered_with_labels_k4_train.csv
Saved test data for AM-III-filtered/AM

comparison of chemical space and re distribution between test and train set in AM-I, AM-II, AM-III dataset

In [2]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Retention-time distribution comparison and PCA analysis between Train/Test splits.
"""

from __future__ import annotations

import os
import glob
import logging
from pathlib import Path
from typing import Dict, List, Tuple, Optional

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.stats import ks_2samp
from sklearn.decomposition import PCA

# ---------- Global Constants ----------
DATA_FOLDER = Path("./train_test_split")
RT_RESULT_FOLDER = Path("./train_test_split/rt-comparison")
PCA_RESULT_FOLDER = Path("./train_test_split/pca-structure-distribution")

# Create result folders
RT_RESULT_FOLDER.mkdir(parents=True, exist_ok=True)
PCA_RESULT_FOLDER.mkdir(parents=True, exist_ok=True)

TARGET_COL = "UV_RT-s"

# Feature columns for null value filtering
ALL_FEATURES = (
    ["MolWt", "logP", "TPSA", "H_bond_donors", "H_bond_acceptors"]
    + [f"col{i}" for i in range(823)]
    + [f"fp_{i}" for i in range(1024)]
)

# Statistical significance threshold
ALPHA = 0.05

# ---------- Logging ----------
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%H:%M:%S",
)

# ---------- Plot Style Configuration ----------
def _configure_plot_style() -> None:
    """Global plot styling: fonts, line widths, transparency, etc."""
    sns.set_theme(style="white")
    sns.set_context(
        "paper",
        rc={
            "font.size": 15,
            "axes.labelsize": 16,
            "axes.titlesize": 16,
            "xtick.labelsize": 14,
            "ytick.labelsize": 14,
            "legend.fontsize": 14,
            "lines.linewidth": 2.5,
            "axes.linewidth": 1.5,
        },
    )
    plt.rcParams["font.family"] = "DejaVu Sans"
    np.random.seed(42)


_configure_plot_style()

# ---------- Data I/O ----------
def _load_data(train_path: Path, test_path: Path) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Safely read train/test CSV files, filter null values, and return DataFrames."""
    try:
        train_df = pd.read_csv(train_path).dropna(subset=[TARGET_COL] + ALL_FEATURES)
        test_df = pd.read_csv(test_path).dropna(subset=[TARGET_COL] + ALL_FEATURES)
    except Exception as e:
        logging.error("Failed to read files: %s", e)
        raise
    return train_df, test_df


def load_fingerprints_from_csv(file_path: Path) -> Optional[np.ndarray]:
    """Load fingerprint data from a CSV file."""
    try:
        df = pd.read_csv(file_path)
        fp_columns = [f'fp_{i}' for i in range(1024)]
        if all(col in df.columns for col in fp_columns):
            fps = df[fp_columns].values
            return fps
        else:
            logging.warning(f"Complete fingerprint columns not found in {file_path.name}")
            return None
    except Exception as e:
        logging.error(f"Failed to load file {file_path}: {e}")
        return None

# ---------- Retention Time Analysis ----------
def analyze_retention_time_pair(train_path: Path, test_path: Path) -> Dict:
    """Analyze retention time distribution for a train/test file pair.
    Returns statistical summary and saves KDE plot."""
    base_name = train_path.stem.replace("_train", "")

    logging.info("üîç Analyzing retention time distribution: %s vs %s", train_path.name, test_path.name)

    train_df, test_df = _load_data(train_path, test_path)
    n_train, n_test = len(train_df), len(test_df)

    # Descriptive statistics
    train_desc = train_df[TARGET_COL].describe()
    test_desc = test_df[TARGET_COL].describe()

    # Kolmogorov-Smirnov test
    ks_stat, p_val = ks_2samp(train_df[TARGET_COL], test_df[TARGET_COL])
    ks_reject = p_val < ALPHA

    # Plot retention time distribution
    plt.figure(figsize=(6, 4))
    sns.kdeplot(
        train_df[TARGET_COL],
        label="Train",
        fill=True,
        alpha=0.25,
        color="tab:blue",
    )
    sns.kdeplot(
        test_df[TARGET_COL],
        label="Test",
        fill=True,
        alpha=0.25,
        color="tab:orange",
    )

    plt.xlabel("Retention Time (s)")
    plt.ylabel("Density")
    plt.legend()
    plt.tight_layout()

    fig_path = RT_RESULT_FOLDER / f"{base_name}_kde.png"
    plt.savefig(fig_path, dpi=600, bbox_inches="tight")
    plt.close()
    
    logging.info("‚úÖ Retention time analysis completed: %s", base_name)

    return {
        "dataset": base_name,
        "train_samples": n_train,
        "test_samples": n_test,
        "train_mean": train_desc["mean"],
        "train_std": train_desc["std"],
        "test_mean": test_desc["mean"],
        "test_std": test_desc["std"],
        "ks_stat": ks_stat,
        "ks_p_value": p_val,
        "ks_reject": ks_reject,
    }

# ---------- PCA Analysis ----------
def perform_pca_analysis(
    fps1: np.ndarray, 
    fps2: np.ndarray, 
    label1: str, 
    label2: str, 
    n_components: int = 2
) -> Optional[pd.DataFrame]:
    """Perform PCA dimensionality reduction and save results."""
    # Check if data is loaded correctly
    if fps1 is None or fps2 is None:
        logging.error(f"Data {label1} or {label2} not loaded correctly")
        return None
    
    if not isinstance(fps1, np.ndarray) or fps1.shape[1] != 1024:
        logging.error(f"Fingerprint data format incorrect for {label1}")
        return None
    if not isinstance(fps2, np.ndarray) or fps2.shape[1] != 1024:
        logging.error(f"Fingerprint data format incorrect for {label2}")
        return None
    
    # Combine both datasets
    all_fps = np.vstack([fps1, fps2])
    pca = PCA(n_components=n_components)
    reduced = pca.fit_transform(all_fps)
    
    # Create DataFrame
    df = pd.DataFrame(reduced, columns=[f'PC{i+1}' for i in range(n_components)])
    df['Label'] = np.concatenate([np.full(len(fps1), label1), np.full(len(fps2), label2)])
    
    # Save PCA-reduced data
    base_name = label1.replace('_train.csv', '')
    output_csv_path = PCA_RESULT_FOLDER / f"pca_reduced_data_{base_name}.csv"
    df.to_csv(output_csv_path, index=False)
    logging.info("üìä PCA reduced data saved: %s", output_csv_path)
    
    return df


def plot_pca(
    df: pd.DataFrame, 
    label1: str, 
    label2: str
) -> None:
    """Plot PCA results and save figure."""
    base_name = label1.replace('_train.csv', '')
    
    plt.figure(figsize=(10, 8))
    colors = {label1: "#007AFF", label2: "#FFCC00"}
    
    sns.scatterplot(
        data=df,
        x='PC1',
        y='PC2',
        hue='Label',
        palette=colors,
        s=110,
        alpha=0.7,
        linewidth=0.9
    )

    plt.xlabel("PCA 1", fontsize=24)
    plt.ylabel("PCA 2", fontsize=24)

    ax = plt.gca()
    for spine in ax.spines.values():
        spine.set_linewidth(2)
    ax.tick_params(width=2, length=10)
    plt.xticks(fontsize=20)
    plt.yticks(fontsize=20)

    # Customize legend labels
    handles, labels = ax.get_legend_handles_labels()
    labels = ['train' if l.endswith('_train.csv') else 'test' for l in labels]
    
    ax.legend(
        handles=handles,
        labels=labels,
        title=None,
        fontsize=20,
        markerscale=1
    )

    plt.tight_layout()
    output_png_path = PCA_RESULT_FOLDER / f"pca_plot_{base_name}.png"
    plt.savefig(output_png_path, dpi=600)
    plt.close()
    logging.info("üìà PCA plot saved: %s", output_png_path)


def analyze_structure_pair(train_path: Path, test_path: Path) -> None:
    """Analyze structural distribution (PCA) for a train/test file pair."""
    base_name = train_path.stem.replace("_train", "")
    
    logging.info("üß¨ Analyzing structural distribution: %s vs %s", train_path.name, test_path.name)
    
    # Load fingerprint data
    fps_train = load_fingerprints_from_csv(train_path)
    fps_test = load_fingerprints_from_csv(test_path)
    
    if fps_train is not None and fps_test is not None:
        # Perform PCA analysis
        df_pca = perform_pca_analysis(
            fps_train, fps_test,
            label1=train_path.name,
            label2=test_path.name
        )
        
        if df_pca is not None:
            # Plot PCA results
            plot_pca(df_pca, train_path.name, test_path.name)
            logging.info("‚úÖ Structural analysis completed: %s", base_name)
    else:
        logging.warning("‚ö†Ô∏è Skipping structural analysis for %s (fingerprint data missing)", base_name)

# ---------- Main Program ----------
def main() -> None:
    """Main function: execute retention time analysis and structural analysis."""
    # Check if data folder exists
    if not DATA_FOLDER.exists():
        logging.error(f"‚ùå Data folder does not exist: {DATA_FOLDER}")
        logging.info(f"Please ensure the {DATA_FOLDER} folder exists and contains *_train.csv and *_test.csv files")
        return
    
    # Get all training files
    train_files = sorted(DATA_FOLDER.glob("*_train.csv"))
    
    if not train_files:
        logging.error("‚ùå No training files found (*_train.csv)")
        logging.info(f"Please check if the {DATA_FOLDER} folder contains *_train.csv files")
        return
    
    logging.info("üìÅ Found %d training files", len(train_files))
    
    # Build file pairs
    pairs: List[Tuple[Path, Path]] = []
    for tr in train_files:
        te = tr.with_name(tr.name.replace("_train.csv", "_test.csv"))
        if te.exists():
            pairs.append((tr, te))
        else:
            logging.warning("‚ö†Ô∏è Corresponding test file not found: %s", tr.name)
    
    if not pairs:
        logging.error("‚ùå No valid file pairs found")
        logging.info("Please ensure each *_train.csv file has a corresponding *_test.csv file")
        return
    
    logging.info("üîÑ Starting analysis of %d datasets...", len(pairs))
    
    # Part 1: Retention Time Analysis
    logging.info("=" * 50)
    logging.info("üìä Starting retention time distribution analysis")
    logging.info("=" * 50)
    
    rt_summary = []
    for tr_path, te_path in pairs:
        try:
            summary = analyze_retention_time_pair(tr_path, te_path)
            rt_summary.append(summary)
        except Exception as e:
            logging.error("‚ùå Retention time analysis failed for %s: %s", tr_path.name, e)
    
    # Save retention time analysis summary
    if rt_summary:
        rt_summary_df = pd.DataFrame(rt_summary)
        rt_summary_path = RT_RESULT_FOLDER / "train_test_rt_summary.csv"
        rt_summary_df.to_csv(rt_summary_path, index=False, float_format="%.4f")
        logging.info("‚úÖ Retention time summary saved to %s", rt_summary_path)
    
    # Part 2: Structural Analysis
    logging.info("=" * 50)
    logging.info("üî¨ Starting structural distribution analysis (PCA)")
    logging.info("=" * 50)
    
    for tr_path, te_path in pairs:
        try:
            analyze_structure_pair(tr_path, te_path)
        except Exception as e:
            logging.error("‚ùå Structural analysis failed for %s: %s", tr_path.name, e)
    
    logging.info("üéâ All analyses completed!")
    logging.info("üìÅ Retention time results saved in: %s", RT_RESULT_FOLDER.absolute())
    logging.info("üìÅ Structural analysis results saved in: %s", PCA_RESULT_FOLDER.absolute())


if __name__ == "__main__":
    main()

15:48:36 [INFO] üìÅ Found 3 training files
15:48:36 [INFO] üîÑ Starting analysis of 3 datasets...
15:48:36 [INFO] üìä Starting retention time distribution analysis
15:48:36 [INFO] üîç Analyzing retention time distribution: AM-I-filtered_with_labels_k4_train.csv vs AM-I-filtered_with_labels_k4_test.csv
15:48:38 [INFO] ‚úÖ Retention time analysis completed: AM-I-filtered_with_labels_k4
15:48:38 [INFO] üîç Analyzing retention time distribution: AM-II-filtered_with_labels_k3_train.csv vs AM-II-filtered_with_labels_k3_test.csv
15:48:39 [INFO] ‚úÖ Retention time analysis completed: AM-II-filtered_with_labels_k3
15:48:39 [INFO] üîç Analyzing retention time distribution: AM-III-filtered_with_labels_k4_train.csv vs AM-III-filtered_with_labels_k4_test.csv
15:48:39 [INFO] ‚úÖ Retention time analysis completed: AM-III-filtered_with_labels_k4
15:48:39 [INFO] ‚úÖ Retention time summary saved to train_test_split/rt-comparison/train_test_rt_summary.csv
15:48:39 [INFO] üî¨ Starting structural di