In [1]:
import pandas as pd
from scipy.fft import fft
import numpy as np
import math

In [3]:
class PhysicochemicalEncoder:

    def __init__(self,
                 dataset=None,
                 sep_dataset=",",
                 property_encoder="Group_0",
                 dataset_encoder=None,
                 name_column_seq="sequence",
                 columns_to_ignore=[]):

        self.dataset = dataset
        self.sep_dataset = sep_dataset

        self.property_encoder = property_encoder
        self.dataset_encoder = dataset_encoder
        self.name_column_seq = name_column_seq
        self.columns_to_ignore = columns_to_ignore

        self.possible_residues = [
            'A',
            'C',
            'D',
            'E',
            'F',
            'G',
            'H',
            'I',
            'N',
            'K',
            'L',
            'M',
            'P',
            'Q',
            'R',
            'S',
            'T',
            'V',
            'W',
            'Y'
        ]

        self.df_data_encoded = None

        self.status = False
        self.message= ""

    def run_process(self):
        self.__make_validations()

        if self.status == True:
            self.zero_padding = self.__check_max_size()
            self.__encoding_dataset()
            self.message = "ENCODING OK"
        
    def __check_columns_in_df(
            self,
            check_columns=None,
            columns_in_df=None):

        response_check = True

        for colum in check_columns:
            if colum not in columns_in_df:
                response_check=False
                break
        
        return response_check
    
    def __make_validations(self):

        # read the dataset with encoders
        self.dataset_encoder.index = self.dataset_encoder['residue']
        
        # check input dataset
        if self.name_column_seq in self.dataset.columns:
            
            if isinstance(self.columns_to_ignore, list):

                if len(self.columns_to_ignore)>0:
                    
                    response_check = self.__check_columns_in_df(
                        columns_in_df=self.dataset.columns.values,
                        check_columns=self.columns_to_ignore
                    )
                    if response_check == True:
                        self.status=True
                    else:
                        self.message = "ERROR: IGNORE COLUMNS NOT IN DATASET COLUMNS"   
                else:
                    pass
            else:
                self.message = "ERROR: THE ATTRIBUTE columns_to_ignore MUST BE A LIST"
        else:
            self.message = "ERROR: COLUMN TO USE AS SEQUENCE IS NOT IN DATASET COLUMNS"    

    def __check_residues(self, residue):
        if residue in self.possible_residues:
            return True
        else:
            return False

    def __encoding_residue(self, residue):

        if self.__check_residues(residue):
            return self.dataset_encoder[self.property_encoder][residue]
        else:
            return False

    def __check_max_size(self):
        size_list = [len(seq) for seq in self.dataset[self.name_column_seq]]
        return max(size_list)

    def __encoding_sequence(self, sequence):

        sequence = sequence.upper()
        sequence_encoding = []

        for i in range(len(sequence)):
            residue = sequence[i]
            response_encoding = self.__encoding_residue(residue)
            if response_encoding != False:
                sequence_encoding.append(response_encoding)

        # complete zero padding
        for k in range(len(sequence_encoding), self.zero_padding):
            sequence_encoding.append(0)

        return sequence_encoding

    def __encoding_dataset(self):

        #print("Start encoding process")
        if len(self.columns_to_ignore)>0:
            df_columns_ignore = self.dataset[self.columns_to_ignore]
            dataset_to_encode = self.dataset.drop(columns=self.columns_to_ignore)
        else:
            df_columns_ignore=None
            dataset_to_encode = self.dataset

        print("Encoding and Processing results")

        matrix_data = []
        for index in dataset_to_encode.index:
            sequence_encoder = self.__encoding_sequence(sequence=dataset_to_encode[self.name_column_seq][index])
            matrix_data.append(sequence_encoder)

        print("Creating dataset")
        header = ['p_{}'.format(i) for i in range(len(matrix_data[0]))]
        print("Export dataset")

        self.df_data_encoded = pd.DataFrame(matrix_data, columns=header)

        if len(self.columns_to_ignore)>0:
            self.df_data_encoded = pd.concat([self.df_data_encoded, df_columns_ignore], axis=1)

In [4]:
class FFTTransform:

    def __init__(
            self,
            dataset=None,
            size_data=None,
            columns_to_ignore=[]):
        
        self.size_data = size_data
        self.dataset = dataset
        self.columns_to_ignore = columns_to_ignore

        self.init_process()

    def __processing_data_to_fft(self):

        print("Removing columns data")
        
        if len(self.columns_to_ignore) >0:
            self.data_ignored = self.dataset[self.columns_to_ignore]
            self.dataset = self.dataset.drop(columns=self.columns_to_ignore)
    
    def __get_near_pow(self):

        print("Get near pow 2 value")
        list_data = [math.pow(2, i) for i in range(1, 20)]
        stop_value = list_data[0]

        for value in list_data:
            if value >= self.size_data:
                stop_value = value
                break

        self.stop_value = int(stop_value)
    
    def __complete_zero_padding(self):

        print("Apply zero padding")
        list_df = [self.dataset]
        for i in range(self.size_data, self.stop_value):
            column = [0 for k in range(len(self.dataset))]
            key_name = "p_{}".format(i)
            df_tmp = pd.DataFrame()
            df_tmp[key_name] = column
            list_df.append(df_tmp)

        self.dataset = pd.concat(list_df, axis=1)
    

    def init_process(self):
        self.__processing_data_to_fft()
        self.__get_near_pow()
        self.__complete_zero_padding()

    def __create_row(self, index):
        row =  self.dataset.iloc[index].tolist()
        return row
    
    def __apply_FFT(self, index):

        row = self.__create_row(index)
        T = 1.0 / float(self.stop_value)
        yf = fft(row)

        xf = np.linspace(0.0, 1.0 / (2.0 * T), self.stop_value // 2)
        yf = np.abs(yf[0:self.stop_value // 2])
        return [value for value in yf]


    def encoding_dataset(self):

        matrix_response = []
        for index in self.dataset.index:
            row_fft = self.__apply_FFT(index)
            matrix_response.append(row_fft)

        print("Creating dataset")
        header = ['p_{}'.format(i) for i in range(len(matrix_response[0]))]
        print("Export dataset")
        df_fft = pd.DataFrame(matrix_response, columns=header)
        
        if len(self.columns_to_ignore)>0:

            df_fft = pd.concat([df_fft, self.data_ignored], axis=1)

        return df_fft

# Data Load

In [2]:
df = pd.read_csv("data/human_dataset.csv")
df.head()

Unnamed: 0,sequence,response
0,AAFDRKSDAK,1
1,AAHARFVAA,1
2,AARDRFPGL,1
3,AARQRLQDI,1
4,AEIEDLIFLA,1


# Data Encoding

## Physicochemical Encoding

In [None]:
df["length"] = df["sequence"].apply(lambda x: len(x))
df = df[df["length"] <= 50]
df.reset_index(drop=True, inplace=True)
df

In [None]:
aaindex = pd.read_csv("aaindex_encoders.csv")
aaindex.index = aaindex["residue"]
aaindex.head()

In [None]:
physicochemical_instance = PhysicochemicalEncoder(
    dataset=df,
    sep_dataset=",",
    property_encoder="ANDN920101",
    dataset_encoder=aaindex,
    name_column_seq="sequence",
    columns_to_ignore=["length", "response"]
)
physicochemical_instance.run_process()
physicochemical_instance.df_data_encoded.head(5)

## FFT Encoding

In [None]:
fft_instance = FFTTransform(
    dataset=physicochemical_instance.df_data_encoded,
    size_data=len(physicochemical_instance.df_data_encoded.columns)-1,
    columns_to_ignore=["length", "response"]
)
df_fft = fft_instance.encoding_dataset()
df_fft.head()

# Save data

In [None]:
df_fft.drop(columns=["length"], inplace=True)
df_fft.to_csv("data/human_dataset_fft.csv", index=False)

## Embedding

In [3]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import torch

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

Device: cuda


In [5]:
tokenizer = AutoTokenizer.from_pretrained("ElnaggarLab/ankh-base")
model = AutoModelForSeq2SeqLM.from_pretrained("ElnaggarLab/ankh-base").to(device)
decoder_input_ids = tokenizer("<s>", return_tensors="pt").input_ids.to(device)

2024-11-29 13:58:15.701730: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-11-29 13:58:15.714871: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1732899495.731672  886377 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1732899495.737124  886377 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-11-29 13:58:15.753163: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr

In [6]:
encoded_sequences = []
for row in df[:5].iterrows():
    sequence = row[1]["sequence"]
    response = row[1]["response"]
    inputs = tokenizer(sequence, return_tensors="pt", add_special_tokens=True).to(device)
    outputs = model(input_ids=inputs["input_ids"], decoder_input_ids=decoder_input_ids)

    embedding = outputs.encoder_last_hidden_state[0].mean(dim=0).detach().cpu().numpy()

    encoded_sequences.append(embedding)
encoded_sequences

Passing a tuple of `past_key_values` is deprecated and will be removed in Transformers v4.48.0. You should pass an instance of `EncoderDecoderCache` instead, e.g. `past_key_values=EncoderDecoderCache.from_legacy_cache(past_key_values)`.


[array([-1.60834827e-02, -2.91826180e-03, -1.59589748e-03,  2.27789283e-02,
        -5.51022310e-03, -1.36222824e-01,  3.45436521e-02,  6.30496582e-03,
         5.75959031e-03, -1.68413501e-02,  3.29832621e-02,  2.18720417e-02,
         2.17842113e-04,  1.07164616e-02,  2.84836511e-03,  6.43049320e-03,
         1.10918563e-03,  3.63822468e-02, -4.11857758e-03,  1.35856560e-02,
        -1.55388401e-03, -1.25793353e-01,  2.03167945e-02, -2.96678878e-02,
         3.04508545e-02,  2.23635472e-02,  8.13983940e-03, -1.31360376e-02,
        -6.28252607e-03,  3.42471264e-02,  1.84882805e-03, -1.11945523e-02,
         1.60407871e-02,  2.61382759e-03, -5.37221436e-04,  8.88970867e-02,
         4.07679938e-03, -3.78238881e-04,  1.03300335e-02, -1.56556368e-02,
         3.26706888e-03, -9.51756071e-03, -3.66611257e-02, -3.52922939e-02,
        -3.50079611e-02,  1.50877880e-02, -7.13056000e-03,  1.92323190e-04,
         8.03111494e-03, -5.42824110e-03,  2.93949414e-02,  8.15332681e-03,
         2.4

In [7]:
header = [f"p_{i+1}" for i in range(encoded_sequences[0].shape[0])]
df_embedding = pd.DataFrame(encoded_sequences, columns=header)
df_embedding["response"] = df["response"][:5].values
df_embedding.head()

Unnamed: 0,p_1,p_2,p_3,p_4,p_5,p_6,p_7,p_8,p_9,p_10,...,p_760,p_761,p_762,p_763,p_764,p_765,p_766,p_767,p_768,response
0,-0.016083,-0.002918,-0.001596,0.022779,-0.00551,-0.136223,0.034544,0.006305,0.00576,-0.016841,...,0.005945,-0.034145,0.003568,-0.001957,-0.039163,0.000452,0.008176,-0.007832,-0.015098,1
1,-0.022358,-0.009296,0.025405,0.042871,0.01709,-0.090132,-0.019531,0.009517,0.00504,-0.011114,...,-0.001268,-0.038742,-0.017684,0.014782,-0.039524,-0.01113,0.025558,-0.038895,-0.002697,1
2,-0.01051,0.004492,0.004946,-0.001562,-0.01747,-0.093698,0.028258,0.02464,0.009362,0.000358,...,-0.012877,-0.045813,-0.003428,0.007065,-0.022434,0.012172,0.004529,-0.023664,0.005539,1
3,-0.023736,0.002394,0.000162,0.021105,0.015291,-0.0945,0.026909,0.016938,0.011371,-0.030738,...,-0.003539,-0.016653,-0.019593,0.028424,0.013728,0.007353,0.001882,-0.01405,-0.01238,1
4,-0.013481,0.000393,-0.009651,0.01533,-0.021118,-0.127067,0.055641,0.037137,0.00313,-0.030074,...,0.004838,-0.031514,-0.010816,0.013311,-0.016952,0.005332,0.006449,-0.029489,0.003227,1


### View the embedding

In [8]:
from sklearn.utils import shuffle
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
import seaborn as sns

In [9]:
df_pos = df_embedding[df_embedding["response"] == 1]
df_neg = df_embedding[df_embedding["response"] == 0]
df_neg = shuffle(df_neg, random_state=42).iloc[:len(df_pos)]
df_balanced = pd.concat([df_pos, df_neg], axis=0)
df_balanced.shape

(5, 769)

In [10]:
df_values = df_balanced.drop(columns=["response"]).values
pca = PCA(n_components=2, random_state=42)
pca.fit(df_values)
pca_transform = pca.transform(df_values)

df_pca = pd.DataFrame(pca_transform, columns=["pca_1", "pca_2"])
df_pca["response"] = df_balanced["response"].values
df_pca.head()

Unnamed: 0,pca_1,pca_2,response
0,-0.184458,-0.265472,1
1,-0.170951,-0.032378,1
2,-0.192838,-0.090379,1
3,-0.094232,0.436211,1
4,0.64248,-0.047981,1


In [None]:
sns.scatterplot(data=df_pca, x="pca_1", y="pca_2", hue="response")

In [None]:
tsne = TSNE(n_components=2, random_state=42, perplexity=5).fit_transform(df_values)
df_tsne = pd.DataFrame(tsne, columns=["tsne_1", "tsne_2"])
df_tsne["response"] = df_balanced["response"].values
df_tsne.head()

In [None]:
sns.scatterplot(data=df_tsne, x="tsne_1", y="tsne_2", hue="response")

## Centroid analysis

In [20]:
from scipy.spatial.distance import pdist, squareform

In [21]:
data = {
    'source': ['fuente1', 'fuente1', 'fuente2', 'fuente2'],
    'pca_1': [0.4, 0.1, 0.8, 0.2],
    'pca_2': [0.2, 0.1, 0.3, 0.2],
    'response': [1, 0, 1, 0]
}
df = pd.DataFrame(data)

In [22]:
def get_distance_by_source(group):
    coords = group[['pca_1', 'pca_2']].values
    distances = pdist(coords)

    dist_df = pd.DataFrame({
        'source': group['source'].iloc[0],
        "distance": distances
    })

    return dist_df

In [25]:
result = df.groupby("source").apply(get_distance_by_source).reset_index(drop=True)
result

  result = df.groupby("source").apply(get_distance_by_source).reset_index(drop=True)


Unnamed: 0,source,distance
0,fuente1,0.316228
1,fuente2,0.608276
