# Proccesing embedding CSV files:

when creating the percentile data, you need to choose the desired percentiles values (results 1-3 below) and the Similarity metrix used in faiss (Cosine or Euclidean)

In [None]:
input_folder = "./embeddings/cvcs/"
# json file names and their threshold list:
# results 1(just resulst as filename):[25, 50, 75, 90, 95, 99]
# results 2: [10, 25, 40, 50, 60, 75, 90, 95]
# results 3: [5, 15, 25, 35, 50, 70, 80, 90, 95]
PERCENTILES = [5, 15, 25, 35, 50, 70, 80, 90, 95] ## does not matter in creating netx graphs function
# cosine, euclidean (=l2)
METRIC = "cosine"

function below runs on embedding CSV file of each of the patients, and creates the percentiles vector according to different k values, that leads to different adjacency matrix.

In [None]:
def creating_percentiles(file_path):
    df = pd.read_csv(file_path)
    # Extract TCR sequences and embeddings
    tcr_sequences = df.iloc[:, 0].values
    embeddings = df.iloc[:, 1:].values.astype('float32')

    # Function to create FAISS index and search for nearest neighbors
    def create_faiss_index(embeddings, k, distance_metric = 'cosine'):
        if distance_metric == 'cosine':
            # Normalize the embeddings
            embeddings = embeddings / np.linalg.norm(embeddings, axis=1)[:, np.newaxis]
            # When embeddings are normalized, the inner product is equivalent to cosine similarity. 
            index = faiss.IndexFlatIP(embeddings.shape[1])  # Using inner product(IP) for cosine similarity
        elif distance_metric == 'euclidean':
            # should not normelize beforeahand, or it will distort their original magnitudes
            index = faiss.IndexFlatL2(embeddings.shape[1])  # Using L2 distance (Euclidean)
        index.add(embeddings)
        distances, indices = index.search(embeddings, k)
        return distances, indices

    # Function to create the adjacency matrix
    def create_adjacency_matrix(num_embeddings, indices, distances, k):
        adjacency_matrix = np.zeros((num_embeddings, num_embeddings))
        for i in range(num_embeddings):
            for j in range(1, k):  # Skip the first neighbor (itself)
                adjacency_matrix[i, indices[i, j]] = distances[i, j]
        return adjacency_matrix

    # Define different k values to explore
    N = embeddings.shape[0]
    log_k = int(np.log(N))
    if log_k == 0:
        log_k += 1
    k_values = [5, 10, 15, 20, int(np.sqrt(N)), int((np.sqrt(N))/2)]
    # Ensure log_k is unique
    if log_k in k_values:
        log_k += 1
    # Add log_k to k_values
    k_values.append(log_k)

    percentiles = PERCENTILES # Standard percentiles to calculate
    percentiles_dict = {}  # Dictionary to store percentiles

    for k in k_values:
        distances, indices = create_faiss_index(embeddings, k, distance_metric=METRIC)
        adjacency_matrix = create_adjacency_matrix(embeddings.shape[0], indices, distances, k)
        
        distances_flat = adjacency_matrix.flatten()
        distances_flat = distances_flat[distances_flat > 0]
        calculated_percentiles = np.percentile(distances_flat, percentiles)
        percentiles_dict[k] = calculated_percentiles

    return percentiles_dict

# Running ML on the data

prepare data function is responsible for loading the percentiles data (in jsong file. each patient has a vector of K nearest neightbor values, and each k value has a percentiles vector).
after loading the data, it is converted to numpy format and splited into train and test samples.

In [None]:
from sklearn.model_selection import train_test_split
import numpy as np
import sklearn.preprocessing

def prepare_data(percentiles_data, labels_dict, vector_indices=None, average_vectors=False):
    data = []
    labels = []

    if average_vectors:
        for sample_name, percentiles_dict in percentiles_data.items():
            vectors = np.array(list(percentiles_dict.values()))
            avg_vector = np.mean(vectors, axis=0)
            data.append(avg_vector)
            labels.append(labels_dict[sample_name])
    else:
        max_length = max(len(np.concatenate(list(percentiles_dict.values()))) for percentiles_dict in percentiles_data.values())
        for sample_name, percentiles_dict in percentiles_data.items():
            vectors = list(percentiles_dict.values())
            if vector_indices is not None:
                selected_vectors = [vectors[i] for i in vector_indices if i < len(vectors)]
                flattened_percentiles = np.concatenate(selected_vectors)
            else:
                flattened_percentiles = np.concatenate(vectors)
            padded_percentiles = np.pad(flattened_percentiles, (0, max_length - len(flattened_percentiles)), 'constant')
            data.append(padded_percentiles)
            labels.append(labels_dict[sample_name])

    data = np.array(data)
    labels = np.array(labels)

    X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2)

    scaler = sklearn.preprocessing.StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    return X_train, X_test, y_train, y_test


Machine learning code:

In [None]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

def train_and_evaluate_knn(X_train, X_test, y_train, y_test, neighbors=5):
    knn = KNeighborsClassifier(n_neighbors=neighbors)
    knn.fit(X_train, y_train)
    y_pred = knn.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    conf_matrix = confusion_matrix(y_test, y_pred)
    class_report = classification_report(y_test, y_pred)

    print("KNN Results")
    print(f"Accuracy: {accuracy}")
    print("Confusion Matrix:")
    print(conf_matrix)
    print("Classification Report:")
    print(class_report)

    return knn, {'model': 'KNN', 'accuracy': accuracy, 'classification_report': class_report}

from sklearn.linear_model import LogisticRegression

def train_and_evaluate_logistic_regression(X_train, X_test, y_train, y_test):
    logistic_regression = LogisticRegression()
    logistic_regression.fit(X_train, y_train)
    y_pred = logistic_regression.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    conf_matrix = confusion_matrix(y_test, y_pred)
    class_report = classification_report(y_test, y_pred)

    print("Logistic Regression Results")
    print(f"Accuracy: {accuracy}")
    print("Confusion Matrix:")
    print(conf_matrix)
    print("Classification Report:")
    print(class_report)

    return logistic_regression, {'model': 'Logistic Regression', 'accuracy': accuracy, 'classification_report': class_report}

from sklearn.svm import SVC

def train_and_evaluate_svm(X_train, X_test, y_train, y_test):
    svm = SVC()
    svm.fit(X_train, y_train)
    y_pred = svm.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    conf_matrix = confusion_matrix(y_test, y_pred)
    class_report = classification_report(y_test, y_pred)

    print("SVM Results")
    print(f"Accuracy: {accuracy}")
    print("Confusion Matrix:")
    print(conf_matrix)
    print("Classification Report:")
    print(class_report)

    return svm, {'model': 'SVM', 'accuracy': accuracy, 'classification_report': class_report}

from sklearn.tree import DecisionTreeClassifier

def train_and_evaluate_decision_tree(X_train, X_test, y_train, y_test):
    decision_tree = DecisionTreeClassifier()
    decision_tree.fit(X_train, y_train)
    y_pred = decision_tree.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    conf_matrix = confusion_matrix(y_test, y_pred)
    class_report = classification_report(y_test, y_pred)

    print("Decision Tree Results")
    print(f"Accuracy: {accuracy}")
    print("Confusion Matrix:")
    print(conf_matrix)
    print("Classification Report:")
    print(class_report)

    return decision_tree, {'model': 'Decision Tree', 'accuracy': accuracy, 'classification_report': class_report}

from sklearn.ensemble import RandomForestClassifier

def train_and_evaluate_random_forest(X_train, X_test, y_train, y_test):
    random_forest = RandomForestClassifier()
    random_forest.fit(X_train, y_train)
    y_pred = random_forest.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    conf_matrix = confusion_matrix(y_test, y_pred)
    class_report = classification_report(y_test, y_pred)

    print("Random Forest Results")
    print(f"Accuracy: {accuracy}")
    print("Confusion Matrix:")
    print(conf_matrix)
    print("Classification Report:")
    print(class_report)

    return random_forest, {'model': 'Random Forest', 'accuracy': accuracy, 'classification_report': class_report}

from sklearn.neural_network import MLPClassifier

def train_and_evaluate_mlp(X_train, X_test, y_train, y_test):
    neural_network = MLPClassifier(hidden_layer_sizes=(100,), max_iter=300)
    neural_network.fit(X_train, y_train)
    y_pred = neural_network.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    conf_matrix = confusion_matrix(y_test, y_pred)
    class_report = classification_report(y_test, y_pred)

    print("MLP Results")
    print(f"Accuracy: {accuracy}")
    print("Confusion Matrix:")
    print(conf_matrix)
    print("Classification Report:")
    print(class_report)

    return neural_network, {'model': 'MLP', 'accuracy': accuracy, 'classification_report': class_report}

import xgboost as xgb

def train_and_evaluate_xgboost(X_train, X_test, y_train, y_test):
    model = xgb.XGBClassifier(use_label_encoder=False, eval_metric='logloss')
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    conf_matrix = confusion_matrix(y_test, y_pred)
    class_report = classification_report(y_test, y_pred)

    print("XGBoost Results:")
    print(f"Accuracy: {accuracy}")
    print("Confusion Matrix:")
    print(conf_matrix)
    print("Classification Report:")
    print(class_report)

    return model, {'model': 'XGBoost', 'accuracy': accuracy, 'classification_report': class_report}

from tensorflow import keras

def train_and_evaluate_nn(X_train, X_test, y_train, y_test):
    model = keras.Sequential([
        keras.layers.Dense(32, activation='relu', input_shape=(X_train.shape[1],)),
        keras.layers.Dense(16, activation='relu'),
        keras.layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer='adam',
                  loss='binary_crossentropy',
                  metrics=['accuracy'])
    model.fit(X_train, y_train, epochs=50, batch_size=16, validation_split=0.2, verbose=0)
    
    y_pred = (model.predict(X_test) > 0.5).astype("int32")

    accuracy = accuracy_score(y_test, y_pred)
    conf_matrix = confusion_matrix(y_test, y_pred)
    class_report = classification_report(y_test, y_pred)

    print("Neural Network Results:")
    print(f"Accuracy: {accuracy}")
    print("Confusion Matrix:")
    print(conf_matrix)
    print("Classification Report:")
    print(class_report)

    return model, {'model': 'Neural Network', 'accuracy': accuracy, 'classification_report': class_report}

def run_evaluation(X_train, X_test, y_train, y_test, k_value, results, models, save_results=True):
    model_functions = [
        ('KNN', train_and_evaluate_knn),
        ('Logistic Regression', train_and_evaluate_logistic_regression),
        ('SVM', train_and_evaluate_svm),
        ('Decision Tree', train_and_evaluate_decision_tree),
        ('Random Forest', train_and_evaluate_random_forest),
        ('MLP', train_and_evaluate_mlp),
        ('XGBoost', train_and_evaluate_xgboost), 
        ('Neural Network', train_and_evaluate_nn)
    ]

    for model_name, train_func in model_functions:
        model, result = train_func(X_train, X_test, y_train, y_test)
        if save_results:
            results.append({'k_value': k_value, **result})
            models.append({'k_value': k_value, 'model_name': model_name, 'model': model})


Running the model (after loading and proccesing the json file).
The example below uses all the k value vectors, hence the k_value='all_k'.

In [None]:
X_train, X_test, y_train, y_test = prepare_data(all_results, labels_dict)

run_evaluation(X_train, X_test, y_train, y_test, k_value="all_k", results=results, models=models)
