# Preprocess

In [None]:

import numpy as np
import pandas as pd
from pathlib import Path


meta_path = Path(r"F:\Data\micromass\micromass_un_anonymized\micromass\pure_spectra_metadata.csv")  
out_dir = meta_path.parent
y_out = out_dir / "y_gram.npy"
map_full_csv = out_dir / "species_to_gram_full.csv"      
map_manual_csv = out_dir / "species_to_gram_manual.csv"  




meta = pd.read_csv(meta_path, sep=";")


species_codes = meta["Species"].astype(str).str.strip()
unique_codes = sorted(species_codes.unique())


species_to_gram_positive_only = {
    'QWP.DRH',
    'QWP.LRO',
    'VVJ.KSF',
    'VVJ.KWJ',
    'QBG.CRP',
    'QBG.KGI',
    'RTO.TQH',
    'RTO.JFR',
}
pos_set = set(species_to_gram_positive_only)


unknown_in_dict = sorted(pos_set.difference(unique_codes))



y_gram = species_codes.isin(pos_set).astype(np.int64).to_numpy()
np.save(y_out, y_gram)



full_map_df = pd.DataFrame({"Species": unique_codes})
full_map_df["GramBinary"] = full_map_df["Species"].isin(pos_set).astype(int)
full_map_df.to_csv(map_full_csv, index=False, encoding="utf-8")



pd.DataFrame({"Species": sorted(pos_set), "GramBinary": 1}).to_csv(map_manual_csv, index=False, encoding="utf-8")



In [None]:

import numpy as np
import pandas as pd
from pathlib import Path


X_csv = Path(r"F:\Data\micromass\micromass_un_anonymized\micromass\pure_spectra_matrix.csv")
out_dir = X_csv.parent
X_out = out_dir / "X_std.npy"             
scaler_out = out_dir / "standardizer.npz"  


X = pd.read_csv(X_csv, sep=";", header=None, dtype=np.float32).to_numpy()


mean = X.mean(axis=0, dtype=np.float64)
std = X.std(axis=0, dtype=np.float64)

std_safe = np.where(std == 0.0, 1.0, std)
X_std = ((X - mean) / std_safe).astype(np.float32, copy=False)


np.save(X_out, X_std)
np.savez_compressed(scaler_out, mean=mean, std=std_safe)

print(f"[OK] X_std -> {X_out}  shape={X_std.shape}  dtype={X_std.dtype}")
print(f"[OK] standardizer -> {scaler_out} (keys: mean, std)")


In [None]:
import numpy as np
x_npy_path = "./Micro-mass/X_std.npy"
y_npy_path = "./Micro-mass/y_gram.npy"

y = np.load(y_npy_path)

X_full = np.load(x_npy_path); y = np.load(y_npy_path)
print(X_full.shape)
print(y.shape)
D = X_full.shape[1]




from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score
seed =42
n_jobs =42
skf = StratifiedKFold(n_splits=2, shuffle=True, random_state=seed)

fold_acc = []
for k, (tr_idx, te_idx) in enumerate(skf.split(X_full, y), start=1):
    X_tr, X_te = X_full[tr_idx], X_full[te_idx]
    y_tr, y_te = y[tr_idx], y[te_idx]

    clf = RandomForestClassifier(
        n_estimators=300,
        max_depth=None,
        min_samples_leaf=3,
        random_state=seed,
        n_jobs=n_jobs
    )
    clf.fit(X_tr, y_tr)
    y_pred = clf.predict(X_te)
    acc = accuracy_score(y_te, y_pred)
    fold_acc.append(acc)
    print(f"Fold {k} accuracy: {acc:.4f}")

print(f"Mean accuracy (2-fold): {np.mean(fold_acc):.4f} ± {np.std(fold_acc, ddof=1):.4f}")


# LOCO

In [None]:
from Inference.estimators_cls_cross import LOCOEstimator_cls
from scipy.stats import norm
from sklearn.ensemble import RandomForestClassifier


estimator1 = LOCOEstimator_cls(



        regressor =  RandomForestClassifier(
                n_estimators=300,
                max_depth=None,
                min_samples_leaf=3,
                random_state=seed,
                n_jobs=n_jobs
                )

)


phi_0_loco, se_0_loco = estimator1.importance(X_full, y)

phi_0_loco_test = phi_0_loco 

se_0_loco_test = se_0_loco 

z_score_0_loco = phi_0_loco_test / se_0_loco_test

p_value_0_loco = 1 - norm.cdf(z_score_0_loco)
rounded_p_value_0_loco = np.round(p_value_0_loco, 3)

print(rounded_p_value_0_loco)


alpha = 0.05 / 1300  


mask = (p_value_0_loco <= alpha).astype(int)


import pandas as pd
import os


results_df = pd.DataFrame({
    "Feature": np.arange(len(phi_0_loco)),        
    "phi_0_loco": phi_0_loco,                     
    "std_0_loco": se_0_loco,                     
    "z_score_0_loco": z_score_0_loco,             
    "p_value_0_loco": p_value_0_loco,             
    "mask": mask                                  
})


out_path = "./Micro-mass/summary/loco/loco_results.csv"


os.makedirs(os.path.dirname(out_path), exist_ok=True)


results_df.to_csv(out_path, index=False, encoding="utf-8-sig")

print(f"Results saved to {out_path}")


In [None]:
mask = (p_value_0_loco <= alpha).astype(int)
vals, cnts = np.unique(mask, return_counts=True)
print(dict(zip(vals, cnts)))  

mask1 = (p_value_0_loco <= 0.05).astype(int)
vals1, cnts1 = np.unique(mask1, return_counts=True)
print(dict(zip(vals1, cnts1)))

# CPI

In [None]:
from Inference.estimators_cls_cross import CPIEstimator_cls_normal
from scipy.stats import norm
from sklearn.ensemble import RandomForestClassifier

estimator1 = CPIEstimator_cls_normal(



        regressor =  RandomForestClassifier(
                n_estimators=300,
                max_depth=None,
                min_samples_leaf=3,
                random_state=seed,
                n_jobs=n_jobs
                )

)


phi_0_cpi, se_0_cpi = estimator1.importance(X_full, y)

phi_0_cpi_test = phi_0_cpi 

se_0_cpi_test = se_0_cpi 

z_score_0_cpi = phi_0_cpi_test / se_0_cpi_test

p_value_0_cpi = 1 - norm.cdf(z_score_0_cpi)
rounded_p_value_0_cpi = np.round(p_value_0_cpi, 3)

print(rounded_p_value_0_cpi)

alpha = 0.05 / 1300  


mask = (p_value_0_cpi <= alpha).astype(int)

import pandas as pd
import os


results_df = pd.DataFrame({
    "Feature": np.arange(len(phi_0_cpi)),        
    "phi_0_cpi": phi_0_cpi,                     
    "std_0_cpi": se_0_cpi,                     
    "z_score_0_cpi": z_score_0_cpi,             
    "p_value_0_cpi": p_value_0_cpi,             
    "mask": mask                                  
})


out_path = "./Micro-mass/summary/cpi/cpi_results.csv"


os.makedirs(os.path.dirname(out_path), exist_ok=True)


results_df.to_csv(out_path, index=False, encoding="utf-8-sig")

print(f"Results saved to {out_path}")


In [None]:
mask = (p_value_0_cpi <= alpha).astype(int)
vals, cnts = np.unique(mask, return_counts=True)
print(dict(zip(vals, cnts)))  

mask1 = (p_value_0_cpi <= 0.05).astype(int)
vals1, cnts1 = np.unique(mask1, return_counts=True)
print(dict(zip(vals1, cnts1)))

# FDFI

In [None]:
import sys
sys.path.append('ICLR_Flow_Disentangle')

import numpy as np
import torch


device = torch.device("cuda:6" if torch.cuda.is_available() else "cpu")

from Flow_Matching.flow_matching import FlowMatchingModel


model = FlowMatchingModel(
    X=X_full,
    dim=D,
    device=device,
    hidden_dim=128,        
    time_embed_dim=64,     
    num_blocks=1,
    use_bn=False
)
model.fit(num_steps=5000, batch_size=256, lr=1e-3, show_plot=True)

In [None]:
from Inference.estimators_cls_cross import CPI_Flow_Model_Estimator_cls_normal
from scipy.stats import norm
from sklearn.ensemble import RandomForestClassifier

estimator8 = CPI_Flow_Model_Estimator_cls_normal(
    regressor =  RandomForestClassifier(
                n_estimators=300,
                max_depth=None,
                min_samples_leaf=3,
                random_state=seed,
                n_jobs=n_jobs
                ),


    flow_model=model

)


phi_x_cpi, se_x_cpi = estimator8.importance(X_full, y)

phi_x_cpi_test = phi_x_cpi 

se_x_cpi_test = se_x_cpi 

z_score_x_cpi = phi_x_cpi_test / se_x_cpi_test


p_value_x_cpi = 1 - norm.cdf(z_score_x_cpi)
rounded_p_value_x_cpi = np.round(p_value_x_cpi, 3)

print(rounded_p_value_x_cpi)

alpha = 0.05 / 1300  


mask = (p_value_x_cpi <= alpha).astype(int)

import pandas as pd
import os


results_df = pd.DataFrame({
    "Feature": np.arange(len(phi_x_cpi)),        
    "phi_x_cpi": phi_x_cpi,                     
    "std_x_cpi": se_x_cpi,                     
    "z_score_x_cpi": z_score_x_cpi,             
    "p_value_x_cpi": p_value_x_cpi,             
    "mask": mask                                  
})


out_path = "./Micro-mass/summary/fdfi/fdfi_results.csv"


os.makedirs(os.path.dirname(out_path), exist_ok=True)


results_df.to_csv(out_path, index=False, encoding="utf-8-sig")

print(f"Results saved to {out_path}")




In [None]:
mask = (p_value_x_cpi <= alpha).astype(int)
vals, cnts = np.unique(mask, return_counts=True)
print(dict(zip(vals, cnts)))  

mask1 = (p_value_x_cpi <= 0.05).astype(int)
vals1, cnts1 = np.unique(mask1, return_counts=True)
print(dict(zip(vals1, cnts1))) 

In [None]:
import numpy as np
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score
from sklearn.ensemble import RandomForestClassifier


top_k = 50
D = X_full.shape[1]


phi_safe = np.nan_to_num(phi_x_cpi, nan=-np.inf, posinf=-np.inf, neginf=-np.inf)


top_k = min(top_k, D)
selected_feature_idx = np.argsort(phi_safe)[-top_k:][::-1].tolist()

print(f"Top-{top_k} feature indices by FDFI:")
for rank, j in enumerate(selected_feature_idx, start=1):
    print(f"  #{rank}: idx={j:4d}, FDFI={phi_x_cpi[j]:.6f}, SE={se_x_cpi[j]:.6f}")


X_subset = X_full[:, selected_feature_idx]


seed = 42

n_jobs = 42
skf = StratifiedKFold(n_splits=2, shuffle=True, random_state=seed)

fold_acc = []
for k, (tr_idx, te_idx) in enumerate(skf.split(X_subset, y), start=1):
    X_tr, X_te = X_subset[tr_idx], X_subset[te_idx]
    y_tr, y_te = y[tr_idx], y[te_idx]

    clf = RandomForestClassifier(
        n_estimators=300,
        max_depth=None,
        min_samples_leaf=3,
        random_state=seed,
        n_jobs=n_jobs
    )
    clf.fit(X_tr, y_tr)
    y_pred = clf.predict(X_te)
    acc = accuracy_score(y_te, y_pred)
    fold_acc.append(acc)
    print(f"Fold {k} accuracy (selected features): {acc:.4f}")

print(f"Mean accuracy (2-fold, selected features): {np.mean(fold_acc):.4f} ± {np.std(fold_acc, ddof=1):.4f}")


print("Selected indices:", selected_feature_idx)


# DFI

In [None]:
from Inference.estimators_cls_cross import  DFIEstimator_cls
from scipy.stats import norm
from sklearn.ensemble import RandomForestClassifier


estimator4 = DFIEstimator_cls(
    regressor =  RandomForestClassifier(
                n_estimators=300,
                max_depth=None,
                min_samples_leaf=3,
                random_state=seed,
                n_jobs=n_jobs
                )

)


phi_x_dfi, se_x_dfi = estimator4.importance(X_full, y)



phi_x_dfi_test = phi_x_dfi 

se_x_dfi_test = se_x_dfi 

z_score_x_dfi = phi_x_dfi_test / se_x_dfi_test

p_value_x_dfi = 1 - norm.cdf(z_score_x_dfi)
rounded_p_value_x_dfi = np.round(p_value_x_dfi, 3)

print(rounded_p_value_x_dfi)

alpha = 0.05 / 1300  


mask = (p_value_x_dfi <= alpha).astype(int)

import pandas as pd
import os


results_df = pd.DataFrame({
    "Feature": np.arange(len(phi_x_dfi)),        
    "phi_x_dfi": phi_x_dfi,                     
    "std_x_dfi": se_x_dfi,                     
    "z_score_x_dfi": z_score_x_dfi,             
    "p_value_x_dfi": p_value_x_dfi,             
    "mask": mask                                  
})

out_path = "./Micro-mass/summary/dfi/dfi_results.csv"


os.makedirs(os.path.dirname(out_path), exist_ok=True)


results_df.to_csv(out_path, index=False, encoding="utf-8-sig")

print(f"Results saved to {out_path}")

In [None]:
mask = (p_value_x_dfi <= alpha).astype(int)
vals, cnts = np.unique(mask, return_counts=True)
print(dict(zip(vals, cnts)))  

mask1 = (p_value_x_dfi <= 0.05).astype(int)
vals1, cnts1 = np.unique(mask1, return_counts=True)
print(dict(zip(vals1, cnts1))) 

In [None]:
import numpy as np
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score
from sklearn.ensemble import RandomForestClassifier


top_k = 50
D = X_full.shape[1]


phi_safe = np.nan_to_num(phi_x_dfi, nan=-np.inf, posinf=-np.inf, neginf=-np.inf)


top_k = min(top_k, D)
selected_feature_idx = np.argsort(phi_safe)[-top_k:][::-1].tolist()

print(f"Top-{top_k} feature indices by DFI:")
for rank, j in enumerate(selected_feature_idx, start=1):
    print(f"  #{rank}: idx={j:4d}, DFI={phi_x_dfi[j]:.6f}, SE={se_x_dfi[j]:.6f}")


X_subset = X_full[:, selected_feature_idx]


seed = 42
n_jobs = 42
skf = StratifiedKFold(n_splits=2, shuffle=True, random_state=seed)

fold_acc = []
for k, (tr_idx, te_idx) in enumerate(skf.split(X_subset, y), start=1):
    X_tr, X_te = X_subset[tr_idx], X_subset[te_idx]
    y_tr, y_te = y[tr_idx], y[te_idx]

    clf = RandomForestClassifier(
        n_estimators=300,
        max_depth=None,
        min_samples_leaf=3,
        random_state=seed,
        n_jobs=n_jobs
    )
    clf.fit(X_tr, y_tr)
    y_pred = clf.predict(X_te)
    acc = accuracy_score(y_te, y_pred)
    fold_acc.append(acc)
    print(f"Fold {k} accuracy (selected features): {acc:.4f}")

print(f"Mean accuracy (2-fold, selected features): {np.mean(fold_acc):.4f} ± {np.std(fold_acc, ddof=1):.4f}")


print("Selected indices:", selected_feature_idx)


# ad hoc cluster+CPI

In [None]:
import numpy as np
from collections import defaultdict
from scipy.cluster import hierarchy
from scipy.spatial.distance import squareform
from scipy.stats import spearmanr, rankdata
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score
from sklearn.feature_selection import mutual_info_classif
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler


x_npy_path = "./Micro-mass/X_std.npy"
y_npy_path = "./Micro-mass/y_gram.npy"


X_full = np.load(x_npy_path)   
y = np.load(y_npy_path).ravel()

print("X shape:", X_full.shape)
print("y shape:", y.shape)

n, D = X_full.shape
feature_names = [f"f{i}" for i in range(D)] 


def spearman_corr_matrix(X: np.ndarray) -> np.ndarray:

    res = spearmanr(X, axis=0, nan_policy="omit")
    C = np.asarray(res.correlation, dtype=float)
    if C.ndim == 0 or C.shape == ():  
        R = np.apply_along_axis(rankdata, 0, X)  
        C = np.corrcoef(R, rowvar=False)
    C = (C + C.T) / 2.0
    np.fill_diagonal(C, 1.0)
    C = np.nan_to_num(C, nan=0.0, posinf=0.0, neginf=0.0)
    return C

corr = spearman_corr_matrix(X_full)

distance_matrix = 1.0 - np.abs(corr)
distance_matrix = (distance_matrix + distance_matrix.T) / 2.0
np.fill_diagonal(distance_matrix, 0.0)

condensed = squareform(distance_matrix)  
link = hierarchy.linkage(condensed, method="ward")


t = 1.285  
cluster_ids = hierarchy.fcluster(link, t, criterion="distance")

cluster_to_ids = defaultdict(list)
for idx, cid in enumerate(cluster_ids):
    cluster_to_ids[cid].append(idx)

print(f"#clusters: {len(cluster_to_ids)}")

def select_representatives(
    X: np.ndarray,
    y: np.ndarray,
    cluster_to_ids: dict,
    corr_abs: np.ndarray,   
    dist: np.ndarray,       
    strategy: str = "medoid",
    random_state: int = 0
):

    rng = np.random.RandomState(random_state)
    chosen_list = []

    for cid, ids in cluster_to_ids.items():
        ids = list(ids)
        if len(ids) == 1:
            chosen_list.append(ids[0])
            continue

        if strategy == "first":
            chosen = ids[0]

        elif strategy == "medoid":
            subD = dist[np.ix_(ids, ids)]
            sums = np.sum(subD, axis=1)
            chosen = ids[int(np.argmin(sums))]
        else:
            raise ValueError(f"Unknown strategy: {strategy}")

        chosen_list.append(chosen)

    return chosen_list


strategy = "medoid"
selected_idx = select_representatives(
    X_full, y, cluster_to_ids,
    corr_abs=np.abs(corr),
    dist=distance_matrix,
    strategy=strategy,
    random_state=42
)
selected_names = [feature_names[i] for i in selected_idx]
print(f"[{strategy}] selected {len(selected_idx)} features")



do_cv = True
if do_cv:
    X_subset = X_full[:, selected_idx]
    seed = 42
    n_jobs = 42
    skf = StratifiedKFold(n_splits=2, shuffle=True, random_state=seed)

    fold_acc = []
    for k, (tr, te) in enumerate(skf.split(X_subset, y), start=1):
        clf = RandomForestClassifier(
            n_estimators=300, max_depth=None, min_samples_leaf=3,
            random_state=seed, n_jobs=n_jobs
        )
        clf.fit(X_subset[tr], y[tr])
        pred = clf.predict(X_subset[te])
        acc = accuracy_score(y[te], pred)
        fold_acc.append(acc)
        print(f"Fold {k} accuracy (selected features): {acc:.4f}")
    print(f"Mean accuracy (2-fold): {np.mean(fold_acc):.4f} ± {np.std(fold_acc, ddof=1):.4f}")


In [None]:
from Inference.estimators_cls_cross import CPIEstimator_cls_normal
from scipy.stats import norm
from sklearn.ensemble import RandomForestClassifier
import numpy as np
import pandas as pd
from pathlib import Path


estimator1 = CPIEstimator_cls_normal(



        regressor =  RandomForestClassifier(
                n_estimators=300,
                max_depth=None,
                min_samples_leaf=3,
                random_state=seed,
                n_jobs=n_jobs
                )

)


phi_0_cpi, se_0_cpi = estimator1.importance(X_subset, y)


phi_0_cpi_test = phi_0_cpi 

se_0_cpi_test = se_0_cpi 

z_score_0_cpi = phi_0_cpi_test / se_0_cpi_test

p_value_0_cpi = 1 - norm.cdf(z_score_0_cpi)
rounded_p_value_0_cpi = np.round(p_value_0_cpi, 3)

print(rounded_p_value_0_cpi)

alpha = 0.05 / 100  


mask = (p_value_0_cpi <= alpha).astype(int)

print(mask)


In [None]:
import numpy as np
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score
from sklearn.ensemble import RandomForestClassifier


top_k = 50
D = X_full.shape[1]


phi_safe = np.nan_to_num(phi_0_cpi, nan=-np.inf, posinf=-np.inf, neginf=-np.inf)


top_k = min(top_k, D)
selected_feature_idx = np.argsort(phi_safe)[-top_k:][::-1].tolist()

print(f"Top-{top_k} feature indices by CPI:")
for rank, j in enumerate(selected_feature_idx, start=1):
    print(f"  #{rank}: idx={j:4d}, CPI={phi_0_cpi[j]:.6f}, SE={se_0_cpi[j]:.6f}")


X_subset = X_full[:, selected_feature_idx]


seed = 42

n_jobs = 42
skf = StratifiedKFold(n_splits=2, shuffle=True, random_state=seed)

fold_acc = []
for k, (tr_idx, te_idx) in enumerate(skf.split(X_subset, y), start=1):
    X_tr, X_te = X_subset[tr_idx], X_subset[te_idx]
    y_tr, y_te = y[tr_idx], y[te_idx]

    clf = RandomForestClassifier(
        n_estimators=300,
        max_depth=None,
        min_samples_leaf=3,
        random_state=seed,
        n_jobs=n_jobs
    )
    clf.fit(X_tr, y_tr)
    y_pred = clf.predict(X_te)
    acc = accuracy_score(y_te, y_pred)
    fold_acc.append(acc)
    print(f"Fold {k} accuracy (selected features): {acc:.4f}")

print(f"Mean accuracy (2-fold, selected features): {np.mean(fold_acc):.4f} ± {np.std(fold_acc, ddof=1):.4f}")

print("Selected indices:", selected_feature_idx)
