In [None]:
# __      __  _____    ___________________________  
#/  \    /  \/  _  \  /   _____/\______   \_____  \ 
#\   \/\/   /  /_\  \ \_____  \  |     ___/ _(__  < 
# \        /    |    \/        \ |    |    /       \
#  \__/\  /\____|__  /_______  / |____|   /______  /
#       \/         \/        \/                  \/ 

# LFQ DIA Peptide-Level Analysis

In [None]:
# Params
_PATH_ = ""
INPUT_PATH = f"{_PATH_}lfq.dia.peptides.csv"                   
METADATA_PATH = f"{_PATH_}sample_metadata.csv"                 # Must contain 'sample_id', 'group', 'label'
OUTPUT_FILTERED = f"{_PATH_}filtered_peptide_matrix.csv"
OUTPUT_AGGREGATED = f"{_PATH_}aggregated_protein_matrix.csv"
OUTPUT_RLE = f"{_PATH_}RLE.png"
OUTPUT_IMPUTE = f"{_PATH_}Post-imputation_histogram.png"
OUTPUT_PCA = f"{_PATH_}PCA.svg"
OUTPUT_UMAP2D = f"{_PATH_}UMAP-2D.svg"
OUTPUT_UMAP3D = f"{_PATH_}UMAP-3D.svg"


MIN_COMPLETENESS = 0.5  # Intragroup completeness threshold
CV_THRESHOLD = 3.5      # Intragroup CV upper limit
TOP_N = 3               # Top-N peptides per protein for aggregation
MIN_INTENSITY_Q = 0.10  # Minimum acceptable median intensity (percentile)
PSEUDOCOUNT = 1e-5      # For log-transform and imputation
N_GROUP = 2             # Number of groups
QUANTILE = 0.10         # Quantile to sample skewnormal imputation
SHIFT = 2.00            # Number of std deviations to shift the gaussian center to the left
WIDTH = 0.33            # Std deviation multiplier (more of less variance)
MIN_ALLOWED = None      # ABS minimum for imputed values
SEED = 42               # Raondom seed for reproducibility
LEGEND_LAB = ''
LEGEND_LAB1 = ''
LEGEND_LAB2 = ''

In [None]:
# Load Packages
import pandas as pd
import numpy as np

from umap import UMAP
import matplotlib.pyplot as plt
import plotly.express as px
import seaborn as sns

import warnings
warnings.filterwarnings("ignore", message="n_jobs value 1 overridden to 1", category=UserWarning)

## Normalization Method

In [None]:
# ---------------------- Method 1: RobNorm-like (robust median centering) -------------------
def robnorm_normalize(df):
    df_norm = df.copy()
    for col in df.columns:
        center = np.median(df[col].dropna())
        df_norm[col] = df[col] - center
    return df_norm

# ---------------------- Method 2: TMM-like Scaling ----------------------------------
def tmm_normalize(df):
    df = df.copy()
    ref_sample = df.median(axis=1)  # pseudo-reference sample

    scaling_factors = []
    for col in df.columns:
        ratios = df[col] - ref_sample
        trimmed = ratios[(ratios > np.percentile(ratios, 5)) & (ratios < np.percentile(ratios, 95))]
        scaling_factors.append(trimmed.median())

    scale_series = pd.Series(scaling_factors, index=df.columns)
    return df - scale_series  # subtract log-scale factor per sample


#---------------------- Method 3: Variance Stabilizing Normalization -------------------
def vsn_normalize(df):
    df_trans = df.copy().apply(lambda x: 2**x)
    offset = np.median(df.values[np.isfinite(df.values)])

    # Apply transformation: log2(a + sqrt(x + offset))
    vsn_func = lambda x: np.log2(1.0 + np.sqrt(x + offset))
    df_vsn = df_trans.applymap(vsn_func)

    df_vsn = (df_vsn - df_vsn.mean()) / df_vsn.std()

    return df_vsn

#---------------------- Method 4: RobNorm-Like with MAD Scaling -----------------------
# Function for robust normalization
def robMAD_normalize(df_log2, preserve_index=True):
    normalized_df = df_log2.copy()
    for col in df_log2.columns:
        values = df_log2[col]
        center = np.median(values.dropna())
        scale = median_abs_deviation(values.dropna())
        # Avoid division by zero
        scale = scale if scale != 0 else 1.0
        normalized = (values - center) / scale

        normalized_df[col] = normalized

    if preserve_index:
        normalized_df.index = df_log2.index

    return normalized_d
    

## Imputation Method

In [None]:
# ---------------------- Method 1: Left-Tail Skewnorm -------------------
def left_tail_skewnorm_impute(df, quantile=QUANTILE, seed=SEED):
    np.random.seed(seed)
    imputed_df = df.copy()

    for col in imputed_df.columns:
        observed = imputed_df[col].dropna()
        
        # Fit skew-normal: returns shape (a), location (loc), scale (scale)
        s, loc, scale = skewnorm.fit(observed)

        # Sample missing values from the left tail
        missing_mask = imputed_df[col].isna()
        n_missing = missing_mask.sum()
        if n_missing == 0:
            continue
        
        # Define upper bound in left tail: e.g., 10th percentile in the fitted skewnorm
        left_tail_max = skewnorm.ppf(quantile, s, loc, scale)
        # Impute from skewnorm, truncated to only accept samples <= left_tail_max
        imputed_vals = []
        while len(imputed_vals) < n_missing:
            sample = skewnorm.rvs(s, loc, scale, size=(n_missing - len(imputed_vals)))
            tail_samples = sample[sample <= left_tail_max]
            imputed_vals.extend(tail_samples)
        imputed_vals = np.array(imputed_vals)[:n_missing]
        imputed_df.loc[missing_mask, col] = imputed_vals

    return imputed_df

# ---------------------- Method 2: Left-Tail Gaussian -------------------
def left_shifted_gaussian_impute(df, shift=SHIFT, width=WIDTH, min_allowed=MIN_ALLOWED, seed=SEED):
    np.random.seed(seed)
    imputed_df = df.copy()

    for col in imputed_df.columns:
        col_data = imputed_df[col]
        missing_mask = col_data.isna()
        if missing_mask.sum() == 0:
            continue
        
        observed_vals = col_data[~missing_mask]
        mean_obs = observed_vals.mean()
        std_obs = observed_vals.std()

        low_gaussian = np.random.normal(loc=mean_obs - shift * std_obs,
                                        scale=width * std_obs,
                                        size=missing_mask.sum())

        if min_allowed is not None:
            low_gaussian = np.clip(low_gaussian, min_allowed, None)

        # Impute
        imputed_df.loc[missing_mask, col] = low_gaussian

    return imputed_df


## Aggregation Method

In [None]:
# ---------------------- Top-N Mean ------------------------
def aggregate_top_n(df, topn=TOP_N):
    def topn_agg(protein_df):
        peptide_medians = protein_df.median(axis=1)
        top_peptides = peptide_medians.nlargest(topn).index
        return protein_df.loc[top_peptides].mean(axis=0)
    aggdf = (
        df
        .groupby(level='Accession', group_keys=False)
        .apply(topn_agg)
    )
    return aggdf

# START

In [None]:
# 0. Load data
df = pd.read_csv(INPUT_PATH)
meta_df = pd.read_csv(METADATA_PATH)
sample_ids = meta_df['sample_id'].values
sample_labels = meta_df['label'].values

In [None]:
# 1. Wrangle dfs and check for outliers
peptide_df = df.copy()
prefixes = ['Accession', 'Peptide', 'Area']
selected_columns2 = df.columns[[any(prefix in col for prefix in prefixes) for col in df.columns]] # select sample id columns and accession, peptide
peptide_df = peptide_df.loc[:, selected_columns2]
peptide_df = peptide_df.dropna(subset=['Accession']) # remove NaN
peptide_df = peptide_df[~peptide_df.Accession.str.contains('#CONTAM#')] # Remove contaminants
peptide_df['Accession'] = peptide_df.Accession.str.split('|').str[0] # Split Accession string to aquire Uniprot Accession ID
peptide_df.set_index(['Accession', 'Peptide'], inplace=True)
peptide_df = peptide_df.iloc[:, :-(N_GROUP+1)]
peptide_df = peptide_df[sample_ids]  # Ensure correct column order

group_map = meta_df.set_index('sample_id')['group'].to_dict()
grouped_samples = meta_df.groupby('group')['sample_id'].apply(list).to_dict() # Group metadata

log_data = np.log(peptide_df + 0.00001) # RLE 
peptide_df = peptide_df.replace(0, np.nan)
dfboxen = log_data.T.apply(lambda x: x-np.nanmedian(x))

# Plot
fig = sns.boxenplot(
    data=dfboxen.iloc[:,1:].T,
    orient='h',
    width_method="linear")
plt.xlabel('$Log10$ Relative Abundance')
plt.ylabel('Sample')
fig.set_yticks(sample_ids)
fig.set_yticklabels(sample_labels)
plt.title('RLE $Pre$-Normalization')
plt.savefig(OUTPUT_RLE)
plt.show(fig)

In [None]:
# 2. Filter for completeness
def intragroup_completeness(row):
    for group, samples in grouped_samples.items():
        completeness = row[samples].count() / len(samples)
        if completeness > MIN_COMPLETENESS:
            return True
    return False

mask_complete = peptide_df.apply(intragroup_completeness, axis=1)
peptide_df = peptide_df[mask_complete]

# 3. Log2 transform
peptide_df_log = np.log2(peptide_df + PSEUDOCOUNT)

# Step 4: QRILC-style impute
log2_imputed = left_tail_skewnorm_impute(peptide_df_log, quantile=QUANTILE, seed=SEED) # Adjust as neccessary

plt.figure(figsize=(8, 5))
plt.hist(log2_imputed.values.flatten(), bins=80, color='slateblue')
plt.xlabel("Log2 Intensity")
plt.ylabel("Frequency")
plt.title("Protein Intensities After $Skewnorm$ Left-Tail Imputation")
plt.grid(True)
plt.tight_layout()
plt.savefig(OUTPUT_IMPUTE)
plt.show()

In [None]:
# Dimension Reduction of Prenormalized data with PCA
X0 = log2_imputed.T
pca = PCA(n_components=min(X0.shape))
components = pca.fit_transform(X0)

fig = px.scatter(pd.DataFrame(components).rename(columns={0: 'PC 1', 1: 'PC 2'}), x='PC 1', y='PC 2',
                 color=meta_df['group'],
                 template='plotly_white',
                 labels={
                 'PC 1': f'PC 1: {round(pca.explained_variance_ratio_[0] * 100, 1)}%',
                 'PC 2': f'PC 2: {round(pca.explained_variance_ratio_[1] * 100, 1)}%',
                 'color': LEGEND_LAB},
                 text=sample_labels)

fig.data[0].name = LEGEND_LAB1
fig.data[1].name = LEGEND_LAB2

fig.update_layout(font=dict(family='Arial', size=14), legend=dict(
    orientation='h',
    yanchor='bottom',
    y=1.01,
    xanchor='center',
    x=0.5,
    font=dict(family='Arial', size=14)
))

fig.update_traces(textposition='top center')
fig.update_traces(marker_size=10)
fig.write_image(OUTPUT_PCA)
fig.show()

In [None]:
# 5. Trimmed Means M-values (Normalization)
peptide_df_norm = tmm_normalize(log2_imputed)

# 6. Filter by peptide intensity
medians = peptide_df_norm.median(axis=1)
intensity_threshold = medians.quantile(MIN_INTENSITY_Q)
mask_intensity = medians >= intensity_threshold
peptide_df_norm = peptide_df_norm[mask_intensity]

# 7. Filter by intragroup CV
def intragroup_cv_ok(row):
    for group, samples in grouped_samples.items():
        values = row[samples]
        if values.mean() == 0:
            return False
        cv = values.std() / values.mean()
        if cv > CV_THRESHOLD or np.isnan(cv):
            return False
    return True

mask_cv = peptide_df_norm.apply(intragroup_cv_ok, axis=1)
peptide_df_filtered = peptide_df_norm[mask_cv]
peptide_df_filtered.to_csv(OUTPUT_FILTERED)

# 8. Aggregate by Top-N most intense peptides
protein_df = aggregate_top_n(peptide_df_filtered, topn=TOP_N)

# Export for limma and report proteins aggregated/peptides retained
protein_df.to_csv(OUTPUT_AGGREGATED)
print(f"Retained peptides: {peptide_df_filtered.shape[0]}")
print(f"Aggregated proteins: {protein_df.shape[0]}")

In [None]:
# Dimension Reduction
X = protein_df.T
y = meta_df.group

umap_2d = UMAP(n_components=2, n_neighbors=X.shape[0]//N_GROUP, random_state=42)
umap_3d = UMAP(n_components=3, n_neighbors=X.shape[0]//N_GROUP, random_state=42)

proj_2d = umap_2d.fit_transform(X)
proj_3d = umap_3d.fit_transform(X)

fig_2d = px.scatter(
    pd.DataFrame(proj_2d).rename(columns={0: 'UMAP1', 1: 'UMAP2'}),
    x='UMAP1',
    y='UMAP2',
    color=y,
    template='plotly_white',
    labels={'color': LEGEND_LAB},
    text=sample_labels
)
fig_2d.data[0].name = LEGEND_LAB1
fig_2d.data[1].name = LEGEND_LAB2

fig_2d.update_layout(font=dict(family='Arial', size=14), legend=dict(
    orientation='h',
    yanchor='bottom',
    y=1.01,
    xanchor='center',
    x=0.5,
    font=dict(family='Arial', size=14)
))

fig_2d.update_traces(textposition='top center')
fig_2d.update_traces(marker_size=10)

fig_3d = px.scatter_3d(
    pd.DataFrame(proj_3d).rename(columns={0: 'UMAP1', 1: 'UMAP2', 2: 'UMAP3'}),
    x='UMAP1', y='UMAP2', z='UMAP3',
    color=y,
    labels={'color': LEGEND_LAB},
    #text=X.index
)

fig_3d.update_traces(textposition='top center')
fig_3d.update_traces(marker_size=6)

fig_2d.write_image(OUTPUT_UMAP2D)
fig_2d.show()
fig_3d.write_image(OUTPUT_UMAP3D)
fig_3d.show()