### Imports and Requirements

In [1]:
!git clone https://github.com/TattaBio/DGEB.git
!sed -i '/gradio-client==1.0.2/d' DGEB/leaderboard/requirements.txt
!pip install -r DGEB/leaderboard/requirements.txt
!pip install -q fsspec huggingface_hub

fatal: destination path 'DGEB' already exists and is not an empty directory.


In [2]:
from huggingface_hub import hf_hub_download
import pandas as pd
from xgboost import XGBClassifier
from sklearn.preprocessing import OneHotEncoder
from sklearn.ensemble import RandomForestClassifier
from collections import Counter
from sklearn.metrics import classification_report, accuracy_score
import random
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import f1_score
import random, numpy as np
from collections import Counter
from itertools import product
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.utils.class_weight import compute_sample_weight
from xgboost import XGBClassifier, callback

### Loading Dataset and Basic Data Exploration

In [3]:
# Link to dataset: https://huggingface.co/datasets/tattabio/ec_classification_dna
splits = {'train': 'data/train-00000-of-00001.parquet', 'test': 'data/test-00000-of-00001.parquet'}

# Load train, test data
df_train = pd.read_parquet("hf://datasets/tattabio/ec_classification_dna/" + splits["train"])
df_test = pd.read_parquet("hf://datasets/tattabio/ec_classification_dna/" + splits["test"])

orig_seqs   = df_train['Sequence'].tolist()
orig_labels = df_train['Label'].tolist()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [4]:
# Truncate to some fixed length
df_train['Sequence'] = df_train['Sequence'].apply(lambda x: x[:2000] if len(x) > 2000 else x)
df_test['Sequence'] = df_test['Sequence'].apply(lambda x: x[:2000] if len(x) > 2000 else x)

In [5]:
df_train.head()

Unnamed: 0,Entry,Label,Sequence
0,Q9LQC0,1.14.14.18,ATGGCTACATCAAGACTTAATGCCTCTTGCCGCTTTCCTGCAAGCA...
1,A0AFT8,1.14.14.18,ATGATTATTGTAACTAATACGATTAAGGTAGAAAAAGGCGCAGCAG...
2,P74133,1.14.14.18,CTATTCACCTACCATTAGGGTGATGGGATGACCAGCGGTACTGCGA...
3,O31534,1.14.14.18,TTATTGATTGTATGTCCCTTGTTTGACAGCACGTACATGGTACATG...
4,P34697,1.15.1.1,ATGTTTATGAATCTTCTCACTCAGGTCTCCAACGCGATTTTTCCGC...


In [6]:
# Print number of training and testing points
print("Shape of training dataframe: ", df_train.shape)
print("Shape of testing dataframe: ", df_test.shape)

Shape of training dataframe:  (512, 3)
Shape of testing dataframe:  (128, 3)


In [7]:
# List of Unique Classes
unique_classes = df_train['Label'].unique()
print(f"Number of unique classes: {len(unique_classes)}")
print("Classes:", unique_classes)

Number of unique classes: 128
Classes: ['1.14.14.18' '1.15.1.1' '1.16.3.1' '1.17.4.1' '1.5.98.3' '1.8.3.2'
 '2.1.1.354' '2.1.1.360' '2.1.1.37' '2.1.1.72' '2.1.1.77' '2.1.1.86'
 '2.1.3.15' '2.3.1.225' '2.3.1.269' '2.3.1.48' '2.3.1.51' '2.3.2.23'
 '2.3.2.26' '2.3.2.27' '2.3.2.31' '2.4.1.109' '2.4.1.198' '2.4.2.31'
 '2.5.1.18' '2.7.1.107' '2.7.1.21' '2.7.1.33' '2.7.1.67' '2.7.10.1'
 '2.7.10.2' '2.7.11.1' '2.7.11.25' '2.7.13.3' '2.7.4.3' '2.7.6.1'
 '2.7.7.108' '2.7.7.19' '2.7.7.48' '2.7.7.49' '2.7.7.6' '2.7.7.65'
 '2.7.7.7' '2.7.8.7' '2.8.1.13' '3.1.1.1' '3.1.1.3' '3.1.1.32' '3.1.1.4'
 '3.1.1.96' '3.1.11.2' '3.1.11.6' '3.1.12.1' '3.1.13.4' '3.1.21.10'
 '3.1.21.4' '3.1.26.3' '3.1.26.4' '3.1.26.5' '3.1.3.16' '3.1.3.18'
 '3.1.3.2' '3.1.3.3' '3.1.3.48' '3.1.3.5' '3.1.4.12' '3.1.4.35' '3.1.4.4'
 '3.1.4.52' '3.2.1.1' '3.2.1.14' '3.2.1.17' '3.2.1.18' '3.2.1.22'
 '3.2.1.23' '3.2.1.35' '3.2.1.39' '3.2.1.4' '3.2.1.52' '3.2.1.55'
 '3.2.1.78' '3.2.1.8' '3.2.1.96' '3.2.2.22' '3.2.2.6' '3.4.19.12'
 '3.4

In [8]:
# Number of datapoints per class
class_counts = df_train['Label'].value_counts()
print("Number of samples per class in the training set:")
print(class_counts)

Number of samples per class in the training set:
Label
1.14.14.18    4
1.15.1.1      4
1.16.3.1      4
1.17.4.1      4
1.5.98.3      4
             ..
7.1.1.2       4
7.1.1.8       4
7.1.1.9       4
7.1.2.2       4
7.2.1.1       4
Name: count, Length: 128, dtype: int64


In [9]:
# Sequence length statistics
df_train['Sequence_Length'] = df_train['Sequence'].apply(len)
print(df_train['Sequence_Length'].describe())

count     512.000000
mean     1243.017578
std       571.600744
min       123.000000
25%       741.000000
50%      1179.000000
75%      1905.750000
max      2000.000000
Name: Sequence_Length, dtype: float64


In [10]:
# Function to print datapoint-related stats after each pre-processing step
def show_stats(df, step_name):
    print(f"\n=== {step_name} ===")
    print(" Total samples:", len(df))
    print(" Per-label counts:\n", df['label'].value_counts())

### Pre-processing

In [11]:
# Random mutation
# generates n_mut variants per original by flipping each base at rate 2%.
def random_mutation(seq: str, rate: float = 0.02) -> str:
    nts = ['A','C','G','T']
    return ''.join(
        (random.choice(nts) if random.random() < rate else c)
        for c in seq
    )

# Apply n_mut augmented mutants per original
n_mut = 7
aug_mut_seqs   = []
aug_mut_labels = []
for seq, lab in zip(df_train['Sequence'], df_train['Label']):
    for _ in range(n_mut):
        aug_mut_seqs.append(random_mutation(seq, rate=0.02))
        aug_mut_labels.append(lab)

# After this we will have 16 points per label

In [12]:
labels_step1 = orig_labels + aug_mut_labels
print("\n=== After Random Mutation ===")
print(" Total samples:", len(labels_step1))
print(" Per-label counts:", Counter(labels_step1))


=== After Random Mutation ===
 Total samples: 4096
 Per-label counts: Counter({'1.14.14.18': 32, '1.15.1.1': 32, '1.16.3.1': 32, '1.17.4.1': 32, '1.5.98.3': 32, '1.8.3.2': 32, '2.1.1.354': 32, '2.1.1.360': 32, '2.1.1.37': 32, '2.1.1.72': 32, '2.1.1.77': 32, '2.1.1.86': 32, '2.1.3.15': 32, '2.3.1.225': 32, '2.3.1.269': 32, '2.3.1.48': 32, '2.3.1.51': 32, '2.3.2.23': 32, '2.3.2.26': 32, '2.3.2.27': 32, '2.3.2.31': 32, '2.4.1.109': 32, '2.4.1.198': 32, '2.4.2.31': 32, '2.5.1.18': 32, '2.7.1.107': 32, '2.7.1.21': 32, '2.7.1.33': 32, '2.7.1.67': 32, '2.7.10.1': 32, '2.7.10.2': 32, '2.7.11.1': 32, '2.7.11.25': 32, '2.7.13.3': 32, '2.7.4.3': 32, '2.7.6.1': 32, '2.7.7.108': 32, '2.7.7.19': 32, '2.7.7.48': 32, '2.7.7.49': 32, '2.7.7.6': 32, '2.7.7.65': 32, '2.7.7.7': 32, '2.7.8.7': 32, '2.8.1.13': 32, '3.1.1.1': 32, '3.1.1.3': 32, '3.1.1.32': 32, '3.1.1.4': 32, '3.1.1.96': 32, '3.1.11.2': 32, '3.1.11.6': 32, '3.1.12.1': 32, '3.1.13.4': 32, '3.1.21.10': 32, '3.1.21.4': 32, '3.1.26.3': 32, '3.1.

In [13]:
# Reverse‐complement: produces one reverse‐complement per sequence.
# Not used for final prediction
comp_table = str.maketrans('ACGT', 'TGCA')
def reverse_complement(seq: str) -> str:
    return seq.translate(comp_table)[::-1]


aug_rc_seqs   = df_train['Sequence'].apply(reverse_complement).tolist()
aug_rc_labels = df_train['Label'].tolist()
# After this we will have 20 pts per label

In [14]:
labels_step2 = labels_step1 + aug_rc_labels
print("\n=== After Reverse-Complement ===")
print(" Total samples:", len(labels_step2))
print(" Per-label counts:", Counter(labels_step2))


=== After Reverse-Complement ===
 Total samples: 4608
 Per-label counts: Counter({'1.14.14.18': 36, '1.15.1.1': 36, '1.16.3.1': 36, '1.17.4.1': 36, '1.5.98.3': 36, '1.8.3.2': 36, '2.1.1.354': 36, '2.1.1.360': 36, '2.1.1.37': 36, '2.1.1.72': 36, '2.1.1.77': 36, '2.1.1.86': 36, '2.1.3.15': 36, '2.3.1.225': 36, '2.3.1.269': 36, '2.3.1.48': 36, '2.3.1.51': 36, '2.3.2.23': 36, '2.3.2.26': 36, '2.3.2.27': 36, '2.3.2.31': 36, '2.4.1.109': 36, '2.4.1.198': 36, '2.4.2.31': 36, '2.5.1.18': 36, '2.7.1.107': 36, '2.7.1.21': 36, '2.7.1.33': 36, '2.7.1.67': 36, '2.7.10.1': 36, '2.7.10.2': 36, '2.7.11.1': 36, '2.7.11.25': 36, '2.7.13.3': 36, '2.7.4.3': 36, '2.7.6.1': 36, '2.7.7.108': 36, '2.7.7.19': 36, '2.7.7.48': 36, '2.7.7.49': 36, '2.7.7.6': 36, '2.7.7.65': 36, '2.7.7.7': 36, '2.7.8.7': 36, '2.8.1.13': 36, '3.1.1.1': 36, '3.1.1.3': 36, '3.1.1.32': 36, '3.1.1.4': 36, '3.1.1.96': 36, '3.1.11.2': 36, '3.1.11.6': 36, '3.1.12.1': 36, '3.1.13.4': 36, '3.1.21.10': 36, '3.1.21.4': 36, '3.1.26.3': 36, '3

In [15]:
# K-mer shuffle
# For n_kmer iterations, shuffles overlapping 3-mers and reassembles.
# Not used for final prediction
def kmer_shuffle(seq: str, k: int = 3) -> str:
    # break into overlapping kmers
    kmers = [seq[i:i+k] for i in range(len(seq)-k+1)]
    random.shuffle(kmers)
    # reassemble by stitching
    out = kmers[0]
    for km in kmers[1:]:
        out += km[-1]
    return out

n_kmer = 2
aug_kmer_seqs   = []
aug_kmer_labels = []
for seq, lab in zip(df_train['Sequence'], df_train['Label']):
    for _ in range(n_kmer):
        aug_kmer_seqs.append(kmer_shuffle(seq, k=3))
        aug_kmer_labels.append(lab)

# After this we have 28 points per label

In [16]:


labels_step3 = labels_step2 + aug_kmer_labels
print("\n=== After K-mer Shuffle ===")
print(" Total samples:", len(labels_step3))
print(" Per-label counts:", Counter(labels_step3))


=== After K-mer Shuffle ===
 Total samples: 5632
 Per-label counts: Counter({'1.14.14.18': 44, '1.15.1.1': 44, '1.16.3.1': 44, '1.17.4.1': 44, '1.5.98.3': 44, '1.8.3.2': 44, '2.1.1.354': 44, '2.1.1.360': 44, '2.1.1.37': 44, '2.1.1.72': 44, '2.1.1.77': 44, '2.1.1.86': 44, '2.1.3.15': 44, '2.3.1.225': 44, '2.3.1.269': 44, '2.3.1.48': 44, '2.3.1.51': 44, '2.3.2.23': 44, '2.3.2.26': 44, '2.3.2.27': 44, '2.3.2.31': 44, '2.4.1.109': 44, '2.4.1.198': 44, '2.4.2.31': 44, '2.5.1.18': 44, '2.7.1.107': 44, '2.7.1.21': 44, '2.7.1.33': 44, '2.7.1.67': 44, '2.7.10.1': 44, '2.7.10.2': 44, '2.7.11.1': 44, '2.7.11.25': 44, '2.7.13.3': 44, '2.7.4.3': 44, '2.7.6.1': 44, '2.7.7.108': 44, '2.7.7.19': 44, '2.7.7.48': 44, '2.7.7.49': 44, '2.7.7.6': 44, '2.7.7.65': 44, '2.7.7.7': 44, '2.7.8.7': 44, '2.8.1.13': 44, '3.1.1.1': 44, '3.1.1.3': 44, '3.1.1.32': 44, '3.1.1.4': 44, '3.1.1.96': 44, '3.1.11.2': 44, '3.1.11.6': 44, '3.1.12.1': 44, '3.1.13.4': 44, '3.1.21.10': 44, '3.1.21.4': 44, '3.1.26.3': 44, '3.1.26

### One-hot Encoding

In [17]:
# Combinging synthetic data (after mutations) with actual training data
orig_seqs   = df_train['Sequence'].tolist()
orig_labels = df_train['Label'].tolist()

all_seqs   = orig_seqs + aug_mut_seqs #+  aug_rc_seqs + aug_kmer_seqs
all_labels = orig_labels + aug_mut_labels # +  aug_rc_labels + aug_kmer_labels

In [18]:
# Making all sequences to the same length before one-hot encoding

# Padding to max length
max_len = max(len(s) for s in all_seqs)
def pad_or_truncate(seq: str, length: int, pad_char: str = 'N') -> list[str]:
    lst = list(seq)
    if len(lst) > length:
        return lst[:length]
    return lst + [pad_char] * (length - len(lst))

padded = [pad_or_truncate(s, max_len) for s in all_seqs]
pos_cols = [f'pos_{i}' for i in range(max_len)]
df_pos = pd.DataFrame(padded, columns=pos_cols)

# OneHotEncode
ohe = OneHotEncoder(sparse_output=True, handle_unknown='ignore')
X_enc = ohe.fit_transform(df_pos)
y_enc = pd.Series(all_labels)

In [19]:
# SMOTE - not used for final prediction since all classes have equal number of datapoints.
"""
sm = SMOTE(k_neighbors=3,  random_state=42)
X_dense = X_enc.toarray()
X_res, y_res = sm.fit_resample(X_dense, y_enc)
"""

'\nsm = SMOTE(k_neighbors=3,  random_state=42)\nX_dense = X_enc.toarray()\nX_res, y_res = sm.fit_resample(X_dense, y_enc)\n'

### Random Forest

In [20]:
clf = RandomForestClassifier(
    n_estimators=500,
    max_depth=None,
    min_samples_split=10,
    max_features="sqrt",
    random_state=42,
    n_jobs=-1,
    class_weight='balanced_subsample'
)

clf.fit(X_enc, y_enc)
train_acc = clf.score(X_enc, y_enc)
print(f"Train Accuracy: {train_acc:.4f}")
y_train_pred = clf.predict(X_enc)
print("Train Accuracy:", accuracy_score(y_enc, y_train_pred))

Train Accuracy: 1.0000
Train Accuracy: 1.0


In [21]:
# Pad/truncate test sequences
padded_test = [pad_or_truncate(s, max_len) for s in df_test['Sequence']]
df_test_pos = pd.DataFrame(padded_test, columns=pos_cols)
X_test_enc  = ohe.transform(df_test_pos)
y_test      = df_test['Label']

y_pred = clf.predict(X_test_enc)
print("Test Accuracy:", accuracy_score(y_test, y_pred))
print(classification_report(y_test, y_pred))

f1_micro = f1_score(y_test, y_pred, average='micro')
f1_macro = f1_score(y_test, y_pred, average='macro')

print(f"Micro F1-score: {f1_micro:.4f}")
print(f"Macro F1-score: {f1_macro:.4f}")

Test Accuracy: 0.046875
              precision    recall  f1-score   support

  1.14.14.18       0.00      0.00      0.00         1
    1.15.1.1       0.00      0.00      0.00         1
    1.16.3.1       0.00      0.00      0.00         1
    1.17.4.1       0.00      0.00      0.00         1
    1.5.98.3       0.00      0.00      0.00         1
     1.8.3.2       0.00      0.00      0.00         1
   2.1.1.354       0.00      0.00      0.00         1
   2.1.1.360       0.00      0.00      0.00         1
    2.1.1.37       0.00      0.00      0.00         1
    2.1.1.72       0.00      0.00      0.00         1
    2.1.1.77       0.25      1.00      0.40         1
    2.1.1.86       0.00      0.00      0.00         1
    2.1.3.15       0.00      0.00      0.00         1
   2.3.1.225       0.00      0.00      0.00         1
   2.3.1.269       0.00      0.00      0.00         1
    2.3.1.48       0.00      0.00      0.00         1
    2.3.1.51       0.00      0.00      0.00         1
   

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


### K-mer Encodings

In [17]:
orig_seqs   = df_train['Sequence'].tolist()
orig_labels = df_train['Label'].tolist()

all_seqs   = orig_seqs + aug_mut_seqs #+ aug_rc_seqs + aug_kmer_seqs
all_labels = orig_labels + aug_mut_labels #+ aug_rc_labels + aug_kmer_labels

train_seqs   = all_seqs
train_labels = all_labels
test_seqs   = df_test['Sequence'].tolist()
test_labels = df_test['Label'].tolist()

In [18]:
def kmer_analyzer(seq, k):
    return [seq[i:i+k] for i in range(len(seq) - k + 1)]

k = 2
bow_vec = CountVectorizer(analyzer=lambda s: kmer_analyzer(s, k=k))
X_train_bow = bow_vec.fit_transform(train_seqs)
X_test_bow  = bow_vec.transform(test_seqs)

rf_bow = RandomForestClassifier(
    n_estimators=500,
    max_features='sqrt',
    class_weight='balanced_subsample',
    n_jobs=-1,
    random_state=42
)
rf_bow.fit(X_train_bow, train_labels)
print(f"[BoW k={k}] RF accuracy: {rf_bow.score(X_test_bow, test_labels):.3f}")

[BoW k=2] RF accuracy: 0.047


### XG Boost Classifier

In [21]:
def kmer_analyzer(seq, k):
    return [seq[i:i+k] for i in range(len(seq) - k + 1)]

k = 4
bow_vec = CountVectorizer(analyzer=lambda s: kmer_analyzer(s, k=k))

X_train_bow = bow_vec.fit_transform(train_seqs)  # sparse CSR matrix
X_test_bow  = bow_vec.transform(test_seqs)

lbl = LabelEncoder() # Encoding labels as integers for XG Boost
y_full = lbl.fit_transform(train_labels)
y_test = lbl.transform(test_labels)
n_classes = len(lbl.classes_)

sss = StratifiedShuffleSplit(n_splits=1, test_size=0.15, random_state=42)
train_idx, val_idx = next(sss.split(X_train_bow, y_full))

X_tr, y_tr = X_train_bow[train_idx], y_full[train_idx]
X_va, y_va = X_train_bow[val_idx],  y_full[val_idx]

w_tr = compute_sample_weight('balanced', y_tr)
w_va = compute_sample_weight('balanced', y_va)

# XGBoost with early stopping for regularization
xgb = XGBClassifier(
        objective       = 'multi:softprob',
        num_class       = n_classes,
        eval_metric     = 'mlogloss',
        n_estimators    = 500,
        learning_rate   = 0.05,
        max_depth       = 6,
        subsample       = 0.8,
        colsample_bytree= 0.8,
        tree_method     = 'hist',
        n_jobs          = -1,
        random_state    = 42,
)

early_stop = callback.EarlyStopping(rounds=60, save_best=True, metric_name='mlogloss')

xgb.fit(
    X_tr, y_tr,
    sample_weight            = w_tr,
    eval_set                 = [(X_va, y_va)],
    sample_weight_eval_set   = [w_va],
    # callbacks                = [early_stop],
    verbose                  = 200
)

test_acc = xgb.score(X_test_bow, y_test)
print(f"XGBoost BoW test accuracy: {test_acc:.3f}")

[0]	validation_0-mlogloss:4.60384
[200]	validation_0-mlogloss:0.18311
[400]	validation_0-mlogloss:0.16290
[499]	validation_0-mlogloss:0.16215
XGBoost BoW test accuracy: 0.016
