<a href="https://colab.research.google.com/github/gilsonauerswald/Bioinformatic_Projects/blob/main/Lesson_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# rf_pipeline_from_class_row.py
# Usage: python rf_pipeline_from_class_row.py
# Inputs (TSV, genes in rows, samples in columns): Final_12535train_matrix.txt, Final_12535test_matrix.txt
# Each file has a special row named "class" giving the label (e.g., StageI/II/III/IV) for every sample.

import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import VarianceThresholda
from sklearn.metrics import confusion_matrix, classification_report, ConfusionMatrixDisplay
from sklearn.inspection import permutation_importance
import matplotlib.pyplot as plt
from joblib import dump

# -------------------- Config --------------------
TRAIN_PATH = "https://raw.githubusercontent.com/Omicslogic-git/Project_3_data/refs/heads/main/Final_12535train_matrix.txt"
TEST_PATH  = "https://raw.githubusercontent.com/Omicslogic-git/Project_3_data/refs/heads/main/Final_12535test_matrix.txt"
OUTDIR = "rf_outputs"
RANDOM_SEED = 42
N_ESTIMATORS = 500
VAR_THRESHOLD = 1e-3          # remove near-constant genes
TOP_N_IMPORTANCE = 40         # keep top-N genes by importance (after variance filter)
os.makedirs(OUTDIR, exist_ok=True)

# -------------------- Load ----------------------
train_raw = pd.read_csv(TRAIN_PATH, sep="\t", index_col=0)
test_raw  = pd.read_csv(TEST_PATH,  sep="\t", index_col=0)

# ---- Labels from the `class` row ----
assert "class" in train_raw.index, "Row 'class' not found in TRAIN matrix."
y_train_tokens = train_raw.loc["class"].astype(str).tolist()

test_has_labels = "class" in test_raw.index
y_test_tokens  = (test_raw.loc["class"].astype(str).tolist() if test_has_labels else None)

# ---- Drop label row from features ----
train_df = train_raw.drop(index="class")
test_df  = (test_raw.drop(index="class") if test_has_labels else test_raw)

# ---- Align genes between train and test ----
common_genes = train_df.index.intersection(test_df.index)
train_df = train_df.loc[common_genes]
test_df  = test_df.loc[common_genes]

# ---- Samples as rows, genes as columns ----
X_train = train_df.T.copy()
X_test  = test_df.T.copy()

# Encode labels
le = LabelEncoder()
y_train = le.fit_transform(y_train_tokens)
if test_has_labels:
    try:
        y_test = le.transform(y_test_tokens)
    except ValueError:
        # If test contains unseen labels, skip accuracy/report computation
        test_has_labels = False
        y_test = None

print(f"[Info] Train: {X_train.shape[0]} samples x {X_train.shape[1]} genes")
print(f"[Info] Test :  {X_test.shape[0]} samples x {X_test.shape[1]} genes")
print(f"[Info] Classes: {list(le.classes_)}")

# ---------------- Feature Selection ----------------
# (1) Variance filter
vt = VarianceThreshold(threshold=VAR_THRESHOLD)
X_train_v = vt.fit_transform(X_train)
X_test_v  = vt.transform(X_test)
genes_v = X_train.columns[vt.get_support()]
print(f"[FS] After variance filter: {X_train_v.shape[1]} genes")

[Info] Train: 50 samples x 48 genes
[Info] Test :  21 samples x 48 genes
[Info] Classes: [np.str_('No_Primary_factor'), np.str_('Not_available'), np.str_('alcohol_consumption'), np.str_('hemochromatosis'), np.str_('hepatitis_b'), np.str_('hepatitis_c'), np.str_('non-alcoholic_fatty_liver_disease'), np.str_('other')]
[FS] After variance filter: 48 genes


In [None]:
# (2) Random Forest on variance-filtered features
rf_fs = RandomForestClassifier(
    n_estimators=N_ESTIMATORS, max_features="sqrt",
    class_weight="balanced_subsample", random_state=RANDOM_SEED, n_jobs=-1
)
rf_fs.fit(X_train_v, y_train)

# --- Mean Decrease Gini (built-in) ---
gini_importances = pd.Series(rf_fs.feature_importances_, index=genes_v)

# --- Mean Decrease Accuracy (permutation importance on training set) ---
perm = permutation_importance(
    rf_fs, X_train_v, y_train, n_repeats=3, random_state=RANDOM_SEED, n_jobs=-1
)

# --- Single, combined importance table (one file only) ---
fi_combined = pd.DataFrame({
    "Gene": genes_v,
    "MeanDecreaseGini": gini_importances.values,
    "MeanDecreaseAccuracy": perm.importances_mean,
    "MDA_STD": perm.importances_std
}).sort_values("MeanDecreaseGini", ascending=False).reset_index(drop=True)

# Save the one combined file
imp_path = os.path.join(OUTDIR, "feature_importances.csv")
fi_combined.to_csv(imp_path, index=False)

# Select top-N genes by Gini (from the combined table)
topN = min(TOP_N_IMPORTANCE, fi_combined.shape[0])
sel_genes = fi_combined.loc[:topN-1, "Gene"]
pd.Series(sel_genes, name="Gene").to_csv(os.path.join(OUTDIR, "selected_genes_topN.txt"), index=False)

# Optional small preview table of the selected topN
fi_combined.loc[:topN-1, ["Gene", "MeanDecreaseGini", "MeanDecreaseAccuracy", "MDA_STD"]] \
    .to_csv(os.path.join(OUTDIR, "feature_importances_topN.csv"), index=False)

# Build design matrices with the selected genes
X_train_sel = X_train[sel_genes].values
X_test_sel  = X_test[sel_genes].values
print(f"[FS] Selected top {topN} genes")

# ---------------- Final RF Model -------------------
rf = RandomForestClassifier(
    n_estimators=N_ESTIMATORS, max_features="sqrt",
    class_weight="balanced_subsample", random_state=RANDOM_SEED, n_jobs=-1
)
rf.fit(X_train_sel, y_train)
dump(rf, os.path.join(OUTDIR, "rf_model.joblib"))

# ---------------- Training Results -----------------
y_pred_train = rf.predict(X_train_sel)
cm = confusion_matrix(y_train, y_pred_train, labels=np.arange(len(le.classes_)))
cm_df = pd.DataFrame(cm, index=[f"True_{c}" for c in le.classes_],
                        columns=[f"Pred_{c}" for c in le.classes_])
cm_df.to_csv(os.path.join(OUTDIR, "training_confusion_matrix.csv"))

# Plot (no custom colors)
fig, ax = plt.subplots(figsize=(6, 6))
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=le.classes_)
disp.plot(ax=ax, values_format='d', cmap=None)
plt.title("Random Forest — Training Confusion Matrix")
plt.tight_layout()
plt.savefig(os.path.join(OUTDIR, "training_confusion_matrix.png"), dpi=300)
plt.close(fig)

# Classification report (train)
report_train = classification_report(y_train, y_pred_train, target_names=le.classes_, output_dict=True)
pd.DataFrame(report_train).T.to_csv(os.path.join(OUTDIR, "training_classification_report.csv"))



[FS] Selected top 40 genes


In [None]:
# ---------------- Test Predictions -----------------
y_pred_test = rf.predict(X_test_sel)
pred_labels = le.inverse_transform(y_pred_test)
test_out = pd.DataFrame({"Sample": X_test.index, "Predicted_Class": pred_labels})

if test_has_labels:
    # Save test accuracy and test classification report
    test_out["True_Class"] = y_test_tokens
    acc = (test_out["Predicted_Class"] == test_out["True_Class"]).mean()
    with open(os.path.join(OUTDIR, "test_accuracy.txt"), "w") as f:
        f.write(f"Test Accuracy: {acc:.4f}\n")

    report_test = classification_report(y_test, y_pred_test, target_names=le.classes_, output_dict=True)
    pd.DataFrame(report_test).T.to_csv(os.path.join(OUTDIR, "test_classification_report.csv"))

test_out.to_csv(os.path.join(OUTDIR, "test_predictions.csv"), index=False)

print(
    f"[Done] Wrote outputs to `{OUTDIR}`:\n"
    "  - training_confusion_matrix.csv / .png\n"
    "  - training_classification_report.csv\n"
    "  - test_predictions.csv (+ test_accuracy.txt & test_classification_report.csv if labels present)\n"
    "  - feature_importances.csv  (MeanDecreaseGini, MeanDecreaseAccuracy, MDA_STD)\n"
    "  - feature_importances_topN.csv\n"
    "  - selected_genes_topN.txt\n"
    "  - rf_model.joblib"
)

[Done] Wrote outputs to `rf_outputs`:
  - training_confusion_matrix.csv / .png
  - training_classification_report.csv
  - test_predictions.csv (+ test_accuracy.txt & test_classification_report.csv if labels present)
  - feature_importances.csv  (MeanDecreaseGini, MeanDecreaseAccuracy, MDA_STD)
  - feature_importances_topN.csv
  - selected_genes_topN.txt
  - rf_model.joblib


In [None]:
# rf_pipeline_improved.py
# Usage: python rf_pipeline_improved.py
# Adds scaling, ANOVA-based feature selection, and hyperparameter tuning.

import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import VarianceThreshold, SelectKBest, f_classif
from sklearn.metrics import confusion_matrix, classification_report, ConfusionMatrixDisplay
from sklearn.model_selection import StratifiedKFold, RandomizedSearchCV
import matplotlib.pyplot as plt
from joblib import dump

# -------------------- Config --------------------
TRAIN_PATH = "https://raw.githubusercontent.com/Omicslogic-git/Project_3_data/refs/heads/main/Final_12535train_matrix.txt"
TEST_PATH  = "https://raw.githubusercontent.com/Omicslogic-git/Project_3_data/refs/heads/main/Final_12535test_matrix.txt"
OUTDIR = "rf_outputs_improved"
RANDOM_SEED = 42
N_ESTIMATORS = 500
VAR_THRESHOLD = 1e-3
TOP_N_GENES = 200     # increased from 40
os.makedirs(OUTDIR, exist_ok=True)

# -------------------- Load ----------------------
train_raw = pd.read_csv(TRAIN_PATH, sep="\t", index_col=0)
test_raw  = pd.read_csv(TEST_PATH,  sep="\t", index_col=0)

# ---- Labels ----
y_train_tokens = train_raw.loc["class"].astype(str).tolist()
test_has_labels = "class" in test_raw.index
y_test_tokens = (test_raw.loc["class"].astype(str).tolist() if test_has_labels else None)

# ---- Drop label row ----
train_df = train_raw.drop(index="class")
test_df  = (test_raw.drop(index="class") if test_has_labels else test_raw)

# ---- Align genes ----
common_genes = train_df.index.intersection(test_df.index)
train_df = train_df.loc[common_genes]
test_df  = test_df.loc[common_genes]

X_train = train_df.T.copy()
X_test  = test_df.T.copy()

# Encode labels
le = LabelEncoder()
y_train = le.fit_transform(y_train_tokens)
if test_has_labels:
    try:
        y_test = le.transform(y_test_tokens)
    except ValueError:
        test_has_labels = False
        y_test = None

print(f"[Info] Train: {X_train.shape}, Test: {X_test.shape}, Classes: {list(le.classes_)}")

# ---------------- Feature Selection ----------------
# (1) Variance filter
vt = VarianceThreshold(threshold=VAR_THRESHOLD)
X_train_v = vt.fit_transform(X_train)
X_test_v  = vt.transform(X_test)
genes_v = X_train.columns[vt.get_support()]

print(f"[FS] After variance filter: {X_train_v.shape[1]} genes")

# (2) ANOVA F-test: keep top-N
selector = SelectKBest(score_func=f_classif, k=min(TOP_N_GENES, len(genes_v)))
X_train_sel = selector.fit_transform(X_train_v, y_train)
X_test_sel  = selector.transform(X_test_v)
sel_genes = genes_v[selector.get_support()]
pd.Series(sel_genes, name="Gene").to_csv(os.path.join(OUTDIR, "selected_genes_topN.txt"), index=False)

print(f"[FS] Selected top {len(sel_genes)} genes by ANOVA F-test")

# ---------------- Scaling ----------------
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_sel)
X_test_scaled  = scaler.transform(X_test_sel)

# ---------------- Hyperparameter Tuning ----------------
param_dist = {
    "n_estimators": [500, 1000, 1500],
    "max_depth": [5, 10, 20, None],
    "max_features": ["sqrt", "log2", None],
    "min_samples_split": [2, 5, 10],
    "min_samples_leaf": [1, 2, 4],
    "class_weight": ["balanced", "balanced_subsample"]
}

rf = RandomForestClassifier(random_state=RANDOM_SEED, n_jobs=-1)

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_SEED)
search = RandomizedSearchCV(
    rf, param_distributions=param_dist,
    n_iter=20, scoring="accuracy", n_jobs=-1,
    cv=cv, random_state=RANDOM_SEED, verbose=2
)

search.fit(X_train_scaled, y_train)
rf_best = search.best_estimator_
dump(rf_best, os.path.join(OUTDIR, "rf_model_tuned.joblib"))

print("[Tuning] Best Params:", search.best_params_)
print("[Tuning] CV Best Score:", search.best_score_)

# ---------------- Training Results -----------------
y_pred_train = rf_best.predict(X_train_scaled)
cm = confusion_matrix(y_train, y_pred_train, labels=np.arange(len(le.classes_)))
pd.DataFrame(cm, index=[f"True_{c}" for c in le.classes_],
                columns=[f"Pred_{c}" for c in le.classes_]) \
  .to_csv(os.path.join(OUTDIR, "training_confusion_matrix.csv"))

fig, ax = plt.subplots(figsize=(6, 6))
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=le.classes_)
disp.plot(ax=ax, values_format='d')
plt.title("RF (Tuned) — Training Confusion Matrix")
plt.tight_layout()
plt.savefig(os.path.join(OUTDIR, "training_confusion_matrix.png"), dpi=300)
plt.close(fig)

pd.DataFrame(classification_report(
    y_train, y_pred_train, target_names=le.classes_, output_dict=True)).T \
  .to_csv(os.path.join(OUTDIR, "training_classification_report.csv"))

# ---------------- Test Predictions -----------------
y_pred_test = rf_best.predict(X_test_scaled)
pred_labels = le.inverse_transform(y_pred_test)
test_out = pd.DataFrame({"Sample": X_test.index, "Predicted_Class": pred_labels})

if test_has_labels:
    test_out["True_Class"] = y_test_tokens
    acc = (test_out["Predicted_Class"] == test_out["True_Class"]).mean()
    with open(os.path.join(OUTDIR, "test_accuracy.txt"), "w") as f:
        f.write(f"Test Accuracy: {acc:.4f}\n")
    pd.DataFrame(classification_report(
        y_test, y_pred_test, target_names=le.classes_, output_dict=True)).T \
      .to_csv(os.path.join(OUTDIR, "test_classification_report.csv"))

test_out.to_csv(os.path.join(OUTDIR, "test_predictions.csv"), index=False)

print("[Done] Improved pipeline finished. Outputs in:", OUTDIR)


[Info] Train: (50, 12535), Test: (21, 12535), Classes: [np.str_('StageI'), np.str_('StageII'), np.str_('StageIII'), np.str_('StageIV')]
[FS] After variance filter: 12535 genes
[FS] Selected top 200 genes by ANOVA F-test
Fitting 5 folds for each of 20 candidates, totalling 100 fits




[Tuning] Best Params: {'n_estimators': 1000, 'min_samples_split': 10, 'min_samples_leaf': 1, 'max_features': 'log2', 'max_depth': None, 'class_weight': 'balanced'}
[Tuning] CV Best Score: 0.86
[Done] Improved pipeline finished. Outputs in: rf_outputs_improved
