In [None]:
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.cross_decomposition import PLSRegression
from sklearn.preprocessing import LabelBinarizer, StandardScaler, MinMaxScaler
from typing import Literal
import pandas as pd
import numpy as np
from modelling_utils import normalise_counts
import joblib

  import pynvml  # type: ignore[import]


In [2]:
class FeatureScaler(BaseEstimator, TransformerMixin):
    def __init__(self, scale=True, scale_type:Literal['ss', 'mm']='ss'):
        self.scale = scale
        self.scale_type = scale_type
        self.scaler_ = None
    
    def fit(self, X):
        if self.scale:
            self.scaler_ = StandardScaler() if self.scale_type == 'ss' else MinMaxScaler()
            self.scaler_.fit(X)
        return self
    def transform(self, X):
        if self.scaler_:
            X = self.scaler_.transform(X)
        return X
    def fit_transform(self, X):
        return self.fit(X).transform(X)

In [None]:
class PLSLatentTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, n_components=2):
        self.n_components = n_components
        self._pls = PLSRegression(n_components=self.n_components, scale=False)

    def fit(self, X, y):
        y_cat = LabelBinarizer().fit_transform(y)
        self._pls.fit(X, y_cat)
        return self

    def transform(self, X):
        return self._pls.transform(X)  # Return latent features
    
    def fit_transform(self, X, y = None):
        return self.fit(X, y).transform(X)

In [4]:
# load (files in kmer x sampleIDs)
train = pd.read_parquet('data/train_8kmer.parquet').T
test = pd.read_parquet('data/test_8kmer.parquet').T
train_labels = pd.read_csv('data/Train.csv')

In [5]:
train_labels = train_labels.assign(ID = train_labels.filename.str.replace('.mgb', '').str.strip())

# rename and select ID and target
train_labels = train_labels.rename(columns={'SampleType': 'target'})[['ID', 'target']]

# set train labels to match with train columns arrangement
train_labels = train_labels.set_index('ID').reindex(train.index)
train_labels.shape

(2901, 1)

In [6]:
def select_features(X):
    """
    Selects features with variance equal to or greater than the elbow point 
    X: 2D array input data. Pandas DataFrame or Numpy Array
    Returns: variance threshold at elbow
    """
    def compute_elbow(vals):
        """
        Find elbow point in variance CDF using a simple curvature method.
        vals: Unscaled variance
        Returns: elbow point variance (cutoff)
        """
        sorted_val = np.sort(vals)
        cdf = np.arange(1, len(sorted_val) + 1) / len(sorted_val)
        # Normalise both axes to [0,1]
        x_norm = (sorted_val - sorted_val.min()) / (sorted_val.max() - sorted_val.min())
        y_norm = (cdf - cdf.min()) / (cdf.max() - cdf.min())

        distances = y_norm - x_norm
        idx_elbow = np.argmax(distances)
        elbow_var = sorted_val[idx_elbow] # get elbow point
        return elbow_var
    
    unscaled_var = np.var(X, axis=0)
    elbow_var = compute_elbow(unscaled_var)

    if isinstance(X, np.ndarray):
        passed = unscaled_var >= elbow_var
        selected_cols = [i for (i, j) in enumerate(passed) if j]
    elif isinstance(X, pd.DataFrame):
        selected_cols = X.loc[unscaled_var >= elbow_var].columns
    return selected_cols

In [7]:
def reverse_complement(dna:str):
    dna_map = {'A':'T', 'G':'C', 'C':'G', 'T':'A'}
    return ''.join([dna_map.get(i) for i in reversed(dna)])

def canonical(kmer):
    rev = reverse_complement(kmer)
    return min(kmer, rev)

def get_canonical_kmers(kmers:list):
    canonical_kmers = list(set(canonical(k) for k in kmers))
    return canonical_kmers

In [8]:
scaler = FeatureScaler()

In [9]:
train.shape, test.shape

((2901, 65536), (1068, 65536))

In [10]:
norm_train = normalise_counts(train)

In [11]:
norm_test = normalise_counts(test)

__Selecting kmers__

Here, we will select features. First, canonical kmers and another canonical kmers within elbow point of kmer variance scores.

Canonical kmers are kmers whose reverse complements are considered as the same. To select using the elbow point of the variance scores, we will find kmers whose variance (unscaled) meets the variance cutoff point (elbow). We will use the training data

In [12]:
canonical_kmers = get_canonical_kmers(train.columns)
selected_canonical_kmers = get_canonical_kmers(train.columns[select_features(norm_train)])

canonical_kmers_idx = train.columns.get_indexer(canonical_kmers)
selected_canonical_kmers_idx = train.columns.get_indexer(selected_canonical_kmers)

In [13]:
len(selected_canonical_kmers), len(canonical_kmers)

(7746, 32896)

In [14]:
class_map = dict(zip(
    np.sort(train_labels['target'].unique()), 
    range(train_labels['target'].nunique())
))

train_labels['class_int'] = train_labels['target'].map(class_map)

target = train_labels.class_int

In [15]:
scaler.fit(norm_train)

In [None]:
joblib.dump(scaler, '../data/models/pls_scaler.pkl')

['data/models/pls_scaler.pkl']

In [17]:
Xtrain, ytrain = scaler.transform(norm_train), target
Xtest = scaler.transform(norm_test)

In [18]:
pls = PLSLatentTransformer(n_components=15)

In [None]:
for name, cols in zip(['canon', 'canon_elbow'], [canonical_kmers_idx, selected_canonical_kmers_idx]):
    pls.fit(Xtrain[:, cols], ytrain)
    joblib.dump(pls, f'../data/models/{name}_pls_model.pkl')

In [None]:
pd.Series(canonical_kmers).to_csv('../data/canonical_kmers.csv', index=False)
pd.Series(selected_canonical_kmers).to_csv('../data/selected_canonical_kmers.csv', index=False)