In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import gc
import copy

import torch
import torch.nn as nn
import pandas as pd
import numpy as np

import seaborn as sns
import matplotlib.pyplot as plt

import evaluate

from transformers import (
    T5Tokenizer,
    DataCollatorForTokenClassification,
    TrainingArguments,
    Trainer,
)

from src.model_new import (
    T5EncoderModelForTokenClassification,
    T5EncoderModelForSequenceClassification,
    create_datasets,
)
import src.config
import src.data
import src.model_new


import peft
from peft import (
    LoraConfig,
    PeftModel
)

import random

from tqdm import tqdm
tqdm.pandas()


In [None]:
ROOT = src.utils.get_project_root_path()
device = torch.device('cuda:0' if torch.cuda.is_available() else ('mps' if torch.backends.mps.is_available() else 'cpu'))

USE_CRF = False

EXPERT = 'ALL'
MODEL_VERRSION = src.config.model_version

SEED = 42
torch.manual_seed(42)
random.seed(42)
np.random.seed(42)

print("Base Model:\t", src.config.base_model_name)
print("MPS:\t\t", torch.backends.mps.is_available())
print("Path:\t\t", ROOT)
print(f"Using device:\t {device}")

# torch.set_printoptions(threshold=10_000)

In [None]:
t5_tokenizer = T5Tokenizer.from_pretrained(
        pretrained_model_name_or_path=src.config.base_model_name,
        do_lower_case=False,
        use_fast=True,
        legacy=False
    )

In [None]:
FASTA_FILENAME = '5_SignalP_5.0_Training_set.fasta'
# FASTA_FILENAME = '5_SignalP_5.0_Training_set_testing.fasta'
annotations_name = ['Label'] + ['Type'] # Choose Type or Label

df_data = src.data.process(src.data.parse_file(ROOT + '/data/raw/' + FASTA_FILENAME))

dataset_signalp_type_splits = {}

for sequence_type in src.config.select_encoding_type.keys():
    dataset_signalp = src.model_new.create_datasets(
        splits=src.config.splits,
        tokenizer=t5_tokenizer,
        data=df_data,
        annotations_name=annotations_name,
        dataset_size=src.config.dataset_size,
        sequence_type=sequence_type
        )
    dataset_signalp_type_splits.update({sequence_type: dataset_signalp})

del df_data

dataset_signalp = dataset_signalp_type_splits[EXPERT]
display(dataset_signalp_type_splits)

In [None]:
t5_base_model_gate = T5EncoderModelForSequenceClassification.from_pretrained(
    pretrained_model_name_or_path=src.config.base_model_name,
    device_map='auto',
    load_in_8bit=False,
    custom_num_labels=len(src.config.type_encoding),
    custom_dropout_rate=0.1,
    )

In [None]:
t5_base_model_expert = T5EncoderModelForTokenClassification.from_pretrained(
    pretrained_model_name_or_path=src.config.base_model_name,
    device_map='auto',
    load_in_8bit=False,
    custom_num_labels=len(src.config.label_decoding),
    custom_dropout_rate=0.1,
    use_crf=USE_CRF
    )

In [None]:
adapter_location = f'/models/moe_v{MODEL_VERRSION}_'

In [None]:
gate_adapter_location = adapter_location+'gate'
t5_base_model_gate.load_adapter(ROOT+gate_adapter_location, adapter_name=f"gate")
t5_base_model_gate.set_adapter("gate")

---
todo

X gate\
O lin all\
O lin expert\
O crf all \
O crf expert

In [None]:
FASTA_FILENAME = '5_SignalP_5.0_Training_set.fasta'
df_data = src.data.process(src.data.parse_file(ROOT + '/data/raw/' + FASTA_FILENAME))
df_data = df_data[df_data['Partition_No'] == 4].reset_index(drop=True)
df_data['Sequence_Raw'] = df_data['Sequence'].apply(lambda x: x.replace(' ', ''))
# df_data['Mask'] = [x[1:] for x in dataset_signalp_type_splits['ALL']['test']['attention_mask']]
df_data['input_ids'] = dataset_signalp_type_splits['ALL']['test']['input_ids']
df_data['ds_attention_mask'] = dataset_signalp_type_splits['ALL']['test']['attention_mask']
df_data['ds_labels'] = dataset_signalp_type_splits['ALL']['test']['labels']
df_data['ds_type'] = dataset_signalp_type_splits['ALL']['test']['type']

In [None]:
# print(dataset_signalp_type_splits['ALL']['test'])
display(df_data.head(), df_data.shape)

In [None]:
# test_df = df_data.head().copy(deep=True)

df_data['predicted_type'] = df_data.progress_apply(lambda x:
    src.model_new.translate_logits(
        src.model_new.predict_model(
            sequence=x['Sequence'],
            tokenizer=t5_tokenizer,
            model=t5_base_model_gate,
            attention_mask=torch.Tensor([x['ds_attention_mask']]).to(device),
            device=device
            )['logits'].unsqueeze(0),
        src.config.type_decoding,
        viterbi_decoding=False
        )[0],  axis=1
    )

In [None]:
display(df_data.head())

In [None]:
# df_data[df_data['predicted_type'] != df_data['Type']]
# t5_tokenizer.decode(t5_tokenizer.encode('M T E T L P P V T E S A V A L Q A E V T Q R E L F E F V L N D P L L A S S L Y I N I A L A G L S I L L F V F M T R G L D D P R A K L I A V S'))
# dataset_signalp_type_splits['ALL']['test']['attention_mask']
# decoded_shit = [(x, ''.join([t5_tokenizer.decode(z) for z in y])) for x, y in zip(df_data['Sequence'], dataset_signalp_type_splits['ALL']['test']['input_ids'])]
# print(*decoded_shit, sep='\n')
# len(decoded_shit)
# len(df_data['Sequence'].at[0]), len(df_data['Mask'].at[0])# print(*[(x, y) for x, y in zip(df_data['Sequence'], df_data['Mask'])], sep='\n')
#  df_singalp_split = df_singalp6_preds[
#     df_singalp6_preds['Type'] == 'SP'
# ]
# test_df = df_data.head().copy(deep=True)
# test_df
# test_df['asd'] = test_df.apply(lambda x: (x['Type'], x['Mask']), axis=1)

In [None]:
df_data.to_parquet('./results/df_data.parquet.gzip', compression='gzip')

---

In [None]:
# expert_adapter_location = adapter_location + f'expert_{EXPERT}'
# t5_base_model_expert.load_adapter(ROOT+expert_adapter_location)

# FASTA_FILENAME = '5_SignalP_5.0_Training_set_testing.fasta'
# df_data = src.data.process(src.data.parse_file(ROOT + '/data/raw/' + FASTA_FILENAME))
# df_data = df_data[df_data['Partition_No'] == 4].reset_index(drop=True)
# df_data['Sequence'] = df_data['Sequence'].apply(lambda x: x.replace(' ', ''))

In [None]:
# # df_data['Type_Prediction'] = 
# df_data['Label'].iloc[17:18].apply(lambda x: src.model_new.moe_inference(
#     sequence=x,
#     tokenizer=t5_tokenizer,
#     model_gate=t5_base_model_gate,
#     model_expert=t5_base_model_expert,
#     device=device,
#     # result_type='SP',
#     )[0])

In [None]:
EXPERT = 'LIPO'
expert_adapter_location = ROOT + adapter_location + f'expert_{EXPERT}'
print(expert_adapter_location)

In [None]:
t5_base_model_expert.load_adapter(expert_adapter_location, adapter_name=f"{EXPERT}_1")

In [None]:
t5_base_model_expert.unload(EXPERT)

In [None]:
_ds_index = 4
# _input_ids_test = df_data['Sequence'].iloc[_ds_index]
# _labels_test = df_data['Label'].iloc[_ds_index]
# _type_test = df_data['Type'].iloc[_ds_index]
_input_ids_test = t5_tokenizer.decode(dataset_signalp[_ds_type][_ds_index]['input_ids'][:-1])
_labels_test = torch.tensor([dataset_signalp[_ds_type][_ds_index]['labels'] + [-100]]).to(device)
_attention_mask_test = torch.tensor([dataset_signalp[_ds_type][_ds_index]['attention_mask']]).to(device)


print('Iput IDs:\t', _input_ids_test)
print('Labels:\t\t', _labels_test)
print('Type:\t\t', _type_test)

result = src.model_new.moe_inference(
    sequence=_input_ids_test,
    attentino_mask=_attention_mask_test,
    tokenizer=t5_tokenizer,
    model_gate=t5_base_model_gate,
    model_expert=t5_base_model_expert,
    device=device,
    result_type='LIPO',
    use_crf=True,
)

print(result)

In [None]:
t5_base_model_gate.unload()

---

In [None]:
# _ds_index = 220
# _ds_type = 'test'
# USE_CRF = True

# _input_ids_test = t5_tokenizer.decode(dataset_signalp[_ds_type][_ds_index]['input_ids'][:-1])
# _labels_test = torch.tensor([dataset_signalp[_ds_type][_ds_index]['labels'] + [-100]]).to(device)
# _attention_mask_test = torch.tensor([dataset_signalp[_ds_type][_ds_index]['attention_mask']]).to(device)

# _labels_test_decoded = [src.config.label_decoding[x] for x in _labels_test.tolist()[0][:-1]]

# print('Iput IDs:\t', _input_ids_test)
# print('Labels:\t\t', *_labels_test.tolist()[0])
# print('Labels Decoded:\t', *_labels_test_decoded)
# print('Attention Mask:\t', *_attention_mask_test.tolist()[0])
# print('----')

# _ds_index = 3250
_ds_index = 3250
_ds_type = 'test'
USE_CRF = True

_input_ids_test = t5_tokenizer.decode(dataset_signalp[_ds_type][_ds_index]['input_ids'][:-1])
_labels_test = torch.tensor([dataset_signalp[_ds_type][_ds_index]['labels'] + [-100]]).to(device)
_attention_mask_test = torch.tensor([dataset_signalp[_ds_type][_ds_index]['attention_mask']]).to(device)

_labels_test_decoded = [src.config.label_decoding[x] for x in _labels_test.tolist()[0][:-1]]

print('Iput IDs:\t', _input_ids_test)
print('Labels:\t\t', *_labels_test.tolist()[0])
print('Labels Decoded:\t', *_labels_test_decoded)
print('Attention Mask:\t', *_attention_mask_test.tolist()[0])
print('----')

preds = src.model_new.predict_model(
    sequence=_input_ids_test,
    tokenizer=t5_tokenizer,
    model=t5_base_model_expert,
    labels=_labels_test,
    attention_mask=_attention_mask_test,
    device=device,
    viterbi_decoding=USE_CRF,
    )

_result = src.model_new.translate_logits(
    logits=preds.logits,
    viterbi_decoding=USE_CRF,
    decoding=src.config.label_decoding
    )

print('Result: \t',* _result)