<div style="text-align: justify"> Celem naszego projektu było wykonanie modelu ML, który służy do rozpoznawania czy hostem danej sekwencji koronawirusa jest człowiek czy nie. Wykorzystaliśmy do tego model ProtBert do wygenerowania embeddingów sekwencji białkowych koronawirusów, a konkretnie spike protein, ponieważ wpływają one najsilniej na powinowactwo danego wirusa. Sekwencje te pobrano z bazy danych NCBI z wyłączeniem danych pochodzących z pandemii Covid-19, ponieważ wiele z nich było nieprawidłowych lub mocno nieuzupełnionych. Następnie tak wygenerowane embeddingi użyto do wytrenowania modelów RandomForest i XGBoost. Wykonano również uczenie kontrastowe modelu ProtBert w celu sprawdzenia czy dla tak specyficznego problemu z małą liczbą próbek poprawi to wyniki. </div>

Na samym początku dane pobrane w formacie FASTA sparsowano i zapisano w postaci dwóch list: jednej z samą sekwencją oraz drugą z metadanymi: nazwą sekwencji, nazwą wirusa od jakiego pochodzi dana sekwencja oraz informację czy hostem dla tego wirusa jest człowiek.

In [None]:
human_raw_path = "data/raw/human_98.fasta"
nonhuman_raw_path = "data/raw/nonhuman_98.fasta"

embeddings_path = "data/processed/protbert.pkl"

In [2]:
from Bio import SeqIO
from typing import List, Tuple
import os

def parse_fasta_with_groups(file_path: str, label: int) -> Tuple[List[str], List[dict]]:
    """Load FASTA file, extract sequences and metadata (virus group, host) from the header and save them as two separated lists"""
    if not os.path.exists(file_path):
        print(f"Error: File not found: {file_path}")
        return [], []

    sequences = []
    metadata = []

    for record in SeqIO.parse(file_path, "fasta"):
        sequence = str(record.seq)
        sequence = " ".join(list(sequence))

        header = record.description
        parts = header.split("|")

        if len(parts) >= 2:
            virus_name = parts[-1].strip().strip(".")
        else:
            virus_name = "Unknown"

        sequences.append(sequence)
        metadata.append(
            {
                "header": header,
                "virus_group": virus_name,
                "label": label,
            }
        )

    print(f"Loaded {len(sequences)} sequences from {file_path}.'")
    return sequences, metadata

In [3]:
pre_emb_human, metadata_human = parse_fasta_with_groups(human_raw_path, 1)
pre_emb_nonhuman, metadata_nonhuman = parse_fasta_with_groups(nonhuman_raw_path, 0)

Loaded 134 sequences from data/raw/human_98.fasta.'
Loaded 4507 sequences from data/raw/nonhuman_98.fasta.'


Następnie na podstawie wcześniej uzyskanych sekwencji wygenerowano embeddingi przy pomocy ProtBerta bez żadnego fine-tuningu, po wcześniejszym sprawdzeniu czy te embeddingi nie znajdują się już na dysku. Wygenerowane embeddingi zapisywane są jako tablica numpy, a następnie dodawana są do nich wcześniej wyciągnięte metadane: informacja odnośnie hosta, jako target oraz informacja odnośnie organizmu od jakiego pochodzi dana sekwencja w celu prawidłowego podziału danych na treningowe i testowe bez wycieku informacji. Na samym końcu dane są łączone w jednego DataFrame i zapisane w formacie pickle.

In [None]:
import torch
import numpy as np
from typing import List
from transformers import BertModel, BertTokenizer
 
def get_protbert_embeddings(
    sequences: List[str],
    max_seq_len: int,
    batch_size: int,
) -> np.ndarray:
    model_name = "Rostlab/prot_bert"
    tokenizer = BertTokenizer.from_pretrained(model_name, do_lower_case=False)
    base_model = BertModel.from_pretrained(model_name)
 
    model = base_model
 
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()
 
    embeddings_list = []
 
    for i in range(0, len(sequences), batch_size):
        batch = sequences[i : i + batch_size]
        
        encoded_input = tokenizer(
            batch,
            return_tensors="pt",
            padding=True,
            truncation=True,
            max_length=max_seq_len,
            add_special_tokens=True
        )
        
        input_ids = encoded_input["input_ids"].to(device)
        attention_mask = encoded_input["attention_mask"].to(device)
 
        with torch.no_grad():
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        
        cls_embeddings = outputs.last_hidden_state[:, 0, :]
        embeddings_list.append(cls_embeddings.cpu().numpy())
 
    return np.vstack(embeddings_list)

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
import pandas as pd

def add_metadata(embedding: np.ndarray, metadata: List[dict]) -> pd.DataFrame:
    """Open embeddings array and add to them target value (1 for human host or 0 for nonhuman host) and virus group."""

    virus_names = [item["virus_group"] for item in metadata]
    labels = [item["label"] for item in metadata]

    data = pd.DataFrame(
        data={
            "embedding": list(embedding),
            "label": labels,
            "virus_group": virus_names,
        }
    )
    return data

In [6]:
def concat_data(*embeddings: np.ndarray, out_path: str) -> None:
    """Concat DataFrames (human and nonhuman embeddings) into one with reseted index and save them as *.pkl file"""
    concat_data = pd.concat(embeddings, ignore_index=True)
    folder_dir = os.path.dirname(out_path)
    if folder_dir and not os.path.exists(folder_dir):
        os.makedirs(folder_dir, exist_ok=True)
    concat_data.to_pickle(out_path)

In [None]:
NORMAL_MAX_SEQ_LEN = 1024
NORMAL_BATCH_SIZE = 4

if os.path.exists(embeddings_path):
    print("Embedding were generated previously, skipping generating them.")
else:
    protbert_emb_human = get_protbert_embeddings(pre_emb_human, NORMAL_MAX_SEQ_LEN, NORMAL_BATCH_SIZE)
    protbert_emb_nonhuman = get_protbert_embeddings(
        pre_emb_nonhuman, NORMAL_MAX_SEQ_LEN, NORMAL_BATCH_SIZE
    )

    human_labeled = add_metadata(protbert_emb_human, metadata_human)
    nonhuman_labeled = add_metadata(protbert_emb_nonhuman, metadata_nonhuman)

    concat_data(human_labeled, nonhuman_labeled, out_path=embeddings_path)

Embedding were generated previously, skipping generating them.


Tak przygotowane embeddingi podzielono na zbiory treningowy i testowy. Zbiory te podzielono w taki sposób, aby w zbiorze treningowym nie znalazły się sekwencje pochodzące od wirusów znajdujących się w zbiorze testowym i na odwrót. Wykonano również 5-krotną walidację krzyżową w celu lepszej weryfikacji poprawności działania modelów. Następnie wykonano uczenie i weryfikację modelów Random Forest i XGBoost na embeddingach pochodzących z normalnego ProtBerta. Wyniki wyświetlane są w formie macierzy niepewności oraz raportu klasyfikacyjnego. Zapisywane są również do pliku tekstowego. 

In [None]:
from sklearn.model_selection import GroupKFold
from typing import Generator, Tuple

def split_train_test_virus_group(path: str, n_splits: int = 5) -> Generator[Tuple[np.ndarray, pd.Series, np.ndarray, pd.Series], None, None]:
    """Divides the data into training and test sets, ensuring that sequences originating from a single virus are only included in one group. The generator is designed to perform cross-validation."""
    data = pd.read_pickle(path)
    groups = data["virus_group"]
    splitter = GroupKFold(n_splits=n_splits)

    for train_idx, val_idx in splitter.split(data, groups=groups):
        train_df = data.iloc[train_idx]
        test_df = data.iloc[val_idx]

        X_train = np.stack(train_df["embedding"].values)
        y_train = train_df["label"]

        X_test = np.stack(test_df["embedding"].values)
        y_test = test_df["label"]

        yield X_train, y_train, X_test, y_test

In [9]:
from sklearn.metrics import classification_report, confusion_matrix

def generate_report(y_true, y_predicted, result_path : str) -> Tuple[np.ndarray,  str | dict]:
    cf = confusion_matrix(y_true, y_predicted)
    report = classification_report(y_true, y_predicted)

    with open(result_path, "w", encoding="utf-8") as f:
        f.write("=== RESULTS ===\n")
        f.write("Confusion Matrix:\n")
        f.write(str(cf))
        f.write("\n-------------------------------\n")
        f.write(report)

    return cf, report


In [None]:
from sklearn.ensemble import RandomForestClassifier

def run_rf_cv_evaluation(data_path: str) -> Tuple[list, list]:
    all_y_true = []
    all_y_pred = []

    cv_generator = split_train_test_virus_group(data_path, n_splits=5)

    for (X_train, y_train, X_test, y_test) in cv_generator:

        model = RandomForestClassifier(
            n_estimators=200,
            min_samples_leaf=2,
            max_features=10,
            max_depth=10,
            class_weight="balanced",
            n_jobs=-1,
            random_state=42
        )

        model.fit(X_train, y_train)

        y_pred = model.predict(X_test)

        all_y_true.append(y_test)
        all_y_pred.append(y_pred)
    
    return np.concatenate(all_y_true), np.concatenate(all_y_pred)

In [None]:
from xgboost import XGBClassifier

def run_xgb_cv_evaluation(data_path: str) -> Tuple[list, list]:
    all_y_true = []
    all_y_pred = []

    cv_generator = split_train_test_virus_group(data_path, n_splits=5)

    for (X_train, y_train, X_val, y_val) in cv_generator:

        model = XGBClassifier(
            subsample=1.0,
            min_child_weight=1,
            max_depth=7,
            learning_rate=0.1,
            colsample_bytree=0.6,
            n_estimators=100,
            class_weight="balanced",
            n_jobs=-1,
            random_state=42
        )

        model.fit(X_train, y_train)

        y_pred = model.predict(X_val)

        all_y_true.append(y_val)
        all_y_pred.append(y_pred)
    
    return np.concatenate(all_y_true), np.concatenate(all_y_pred)

In [None]:
from sklearn.svm import SVC

def run_svm_cv_evaluation(data_path: str) -> Tuple[list, list]:
    all_y_true = []
    all_y_pred = []

    cv_generator = split_train_test_virus_group(data_path, n_splits=5)

    for (X_train, y_train, X_val, y_val) in cv_generator:

        model = SVC(kernel="rbf", gamma=1, C=1000, class_weight="balanced", random_state=42)

        model.fit(X_train, y_train)

        y_pred = model.predict(X_val)
        all_y_true.append(y_val)
        all_y_pred.append(y_pred)
    
    return np.concatenate(all_y_true), np.concatenate(all_y_pred)


Wyniki Random forest

In [None]:
normal_rf_true, normal_rf_predicted = run_rf_cv_evaluation(embeddings_path)

normal_rf_path = "normal_rf_results.txt"
normal_rf_cm, normal_rf_report = generate_report(normal_rf_true, normal_rf_predicted, normal_rf_path)

print("=== RANDOM FOREST RESULTS ===")
print(normal_rf_cm)
print(normal_rf_report)

=== NORMAL PROTBERT -> RANDOM FOREST RESULTS ===
[[4504    3]
 [ 122   12]]
              precision    recall  f1-score   support

           0       0.97      1.00      0.99      4507
           1       0.80      0.09      0.16       134

    accuracy                           0.97      4641
   macro avg       0.89      0.54      0.57      4641
weighted avg       0.97      0.97      0.96      4641



Wyniki XGBoost

In [None]:
normal_xgb_true, normal_xgb_predicted = run_xgb_cv_evaluation(embeddings_path)

normal_xgb_path = "normal_xgb_results.txt"
normal_xgb_cm, normal_xgb_report = generate_report(normal_xgb_true, normal_xgb_predicted, normal_xgb_path)

print("=== XGBOOST RESULTS ===")
print(normal_xgb_cm)
print(normal_xgb_report)

Parameters: { "class_weight" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "class_weight" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "class_weight" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "class_weight" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "class_weight" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)


=== NORMAL PROTBERT -> XGBOOST RESULTS ===
[[4497   10]
 [ 110   24]]
              precision    recall  f1-score   support

           0       0.98      1.00      0.99      4507
           1       0.71      0.18      0.29       134

    accuracy                           0.97      4641
   macro avg       0.84      0.59      0.64      4641
weighted avg       0.97      0.97      0.97      4641



Wyniki SVM

In [None]:
normal_svm_true, normal_svm_predicted = run_svm_cv_evaluation(embeddings_path)

normal_svm_path = "normal_svm_results.txt"
normal_svm_cm, normal_svm_report = generate_report(normal_svm_true, normal_svm_predicted, normal_svm_path)

print("=== SVM RESULTS ===")
print(normal_svm_cm)
print(normal_svm_report)