In [None]:
import getpass
from glob import glob
import os

import numpy as np
import pandas as pd

from sklearn.linear_model import RidgeClassifier

Assumes you have either or both of:
- bio_share/stegg_results/peptide_datasets/ copied to root_dir, with peptide subdirectories decompressed; or
- bio_share/stegg_results/STEGG_complex_DB_tcr_datasets.zip copied to ``root_dir'' and decompressed into tcr_datasets/

In [None]:
root_dir = os.path.join("/home/", getpass.getuser())

peptides = ["AVFDRKSDAK", "ELAGIGILTV", "FLCMKALLL", "GILGFVFTL", "LLWNGPMAV"]
tcrs = ["A11Vc", "A3V", "TCR1"]

dataset = tcrs[0]

if not os.path.exists(f"{dataset}.csv"):

    if dataset in peptides:
        case_dirs = glob(os.path.join(root_dir, "peptide_datasets", dataset, "*"))
    elif dataset in tcrs:
        case_dirs = glob(os.path.join(root_dir, "tcr_datasets", f"{dataset}_*"))

    data = dict()
    for case_dir in case_dirs:
        case = os.path.basename(case_dir)
        energy_csv_fp = os.path.join(case_dir, "energy_terms.csv")
        if not os.path.exists(energy_csv_fp):
            continue
        energy_df = pd.read_csv(energy_csv_fp, index_col=0)
        if len(energy_df) == 0:
            continue
        energy_term_cols = energy_df.columns
        term_matrix = energy_df[energy_df.columns].values
        avg_term_vector = term_matrix.mean(axis=0)
        data[case] = {term: avg_term_vector[i] for i, term in enumerate(energy_term_cols)}

    energy_df = pd.DataFrame.from_dict(data, orient="index")

    ba_csv_fp = os.path.join("use_case_demo/", "peptide_datasets/" if dataset in peptides else "TCR_datasets/", f"{dataset}.csv")
    ba_df = pd.read_csv(ba_csv_fp, index_col=0)
    if dataset in peptides:
        ba_df.set_index(ba_df.CDR3a + "_" + ba_df.CDR3b + "_" + ba_df.peptide, inplace=True)
    elif dataset in tcrs:
        ba_df.index = [f"{dataset}_row{i + 1}" for i in range(len(ba_df))]
        ba_df["label"] = ba_df.activation

    if "split" not in ba_df.columns:
        ba_df = ba_df.sample(frac=1, random_state=1047)
        rand = np.random.rand(len(ba_df))
        ba_df["split"] = np.where(rand < 0.75, "train", np.where(rand < 0.875, "val", "test"))

    df = energy_df.merge(ba_df[["label", "split"]], how="inner", left_index=True, right_index=True)
    df.to_csv(f"{dataset}.csv")

else:

    df = pd.read_csv(f"{dataset}.csv", index_col=0)
    energy_term_cols = df.drop(["label", "split"], axis=1).columns

df

In [None]:
X = df[energy_term_cols].values
y = df.label.values

model = RidgeClassifier()
model.fit(X, y)

acc = model.score(X, y)
print(acc)

In [None]:
print(df.split.value_counts())

X_train = df[df.split == "train"][energy_term_cols].values
y_train = df[df.split == "train"].label.values

model = RidgeClassifier()
model.fit(X_train, y_train)

X_test = df[(df.split == "test") | (df.split == "val")][energy_term_cols].values
y_test = df[(df.split == "test") | (df.split == "val")].label.values

acc = model.score(X_test, y_test)
print(acc)