In [5]:
from transformers import AutoTokenizer, BertForTokenClassification
import torch
import json

def load_snippets(file_path='litvar_snippets.json'):
    with open(file_path, 'r') as f:
        return json.load(f)

def predict_variants(text, model, tokenizer):
    # Używamy "Sequence Variant" jako klasy, ponieważ jest to jedna z natywnych klas modelu z BIORED
    encodings = tokenizer("Sequence Variant", text, 
                         is_split_into_words=False,
                         padding=True, 
                         truncation=True,
                         max_length=512,
                         return_tensors='pt')
    
    with torch.no_grad():
        outputs = model(**encodings)
        predictions = outputs.logits.argmax(-1)[0]  # Bierzemy pierwszą (i jedyną) sekwencję
    
    # Konwertujemy tokeny na tekst tylko dla tych z predykcją 1
    tokens = tokenizer.convert_ids_to_tokens(encodings['input_ids'][0])
    variant_tokens = []
    current_variant = []
    
    for token, pred in zip(tokens, predictions):
        if pred == 1:
            if token.startswith('##'):
                if current_variant:
                    current_variant.append(token[2:])
            else:
                if current_variant:
                    variant_tokens.append(''.join(current_variant))
                current_variant = [token]
        else:
            if current_variant:
                variant_tokens.append(''.join(current_variant))
                current_variant = []
    
    if current_variant:
        variant_tokens.append(''.join(current_variant))
    
    return variant_tokens

def test_snippets():
    # Inicjalizacja modelu i tokenizera
    modelname = 'MilosKosRad/BioNER'
    tokenizer = AutoTokenizer.from_pretrained(modelname)
    model = BertForTokenClassification.from_pretrained(modelname, num_labels=2)
    
    # Wczytanie snippetów
    snippets = load_snippets()
    
    results = []
    
    snippets_found = [snippet for snippet in snippets if snippet['found']]
    
    for i, snippet in enumerate(snippets_found):  # Testujemy pierwsze 10 snippetów
        text = snippet['text']
        expected_variant = snippet['variant_name'].lower()  # Konwertujemy na małe litery
        
        if not text:  # Pomijamy puste teksty
            continue
            
        # Predykcja wariantów
        predicted_variants = [variant.lower() for variant in predict_variants(text, model, tokenizer)]  # Konwertujemy na małe litery
        
        result = {
            'snippet_id': i,
            'expected_variant': expected_variant,
            'predicted_variants': predicted_variants,
            'found_expected': any(expected_variant in pred for pred in predicted_variants),
            'text': text[:200] + '...' if len(text) > 200 else text  # Skracamy tekst dla czytelności
        }
        results.append(result)
        
        # Wyświetlanie wyników
        print(f"\nSnippet {i}:")
        print(f"Expected variant: {expected_variant}")
        print(f"Predicted variants: {predicted_variants}")
        print(f"Found expected variant: {result['found_expected']}")
        print("-" * 80)
    
    # Statystyki
    total = len(results)
    correct = sum(1 for r in results if r['found_expected'])
    print("\nStatystyki:")
    print(f"Całkowita liczba snippetów: {total}")
    print(f"Poprawnie znalezione warianty: {correct}")
    print(f"Dokładność: {correct/total:.2%}")
    
    return results

if __name__ == "__main__":
    results = test_snippets()


Snippet 0:
Expected variant: v600e
Predicted variants: ['sequence', 'variant', 'v600e', 'v600e', 'v600e', 'v600e', 'v600e', 'v600e', 'v600e', 'v637e', 'v600e', 'v600e', 'v600e']
Found expected variant: True
--------------------------------------------------------------------------------

Snippet 1:
Expected variant: v600e
Predicted variants: ['sequence', 'variant']
Found expected variant: False
--------------------------------------------------------------------------------

Snippet 2:
Expected variant: v600e
Predicted variants: ['[cls]', 'sequence', 'variant', 'v600e']
Found expected variant: True
--------------------------------------------------------------------------------

Snippet 3:
Expected variant: v600e
Predicted variants: ['[cls]', 'sequence', 'variant', 'v600e']
Found expected variant: True
--------------------------------------------------------------------------------

Snippet 4:
Expected variant: v600e
Predicted variants: ['sequence', 'variant', 'v600e', 'v600e', 'v600e

In [None]:
from transformers import AutoModelForCausalLM, TrainingArguments
from trl import DPOTrainer
import json

def load_snippets(file_path='litvar_snippets.jsonl'):
    # Wczytanie danych z pliku JSONL
    snippets = []
    with open(file_path, 'r') as f:
        for line in f:
            snippets.append(json.loads(line))
    return snippets

def prepare_dpo_dataset(snippets):
    # Przygotowanie danych w formacie odpowiednim dla DPO
    dataset = []
    
    for snippet in snippets:
        if not snippet['text']:  # Pomijamy puste teksty
            continue
            
        prompt = f"Znajdź warianty genomiczne w następującym tekście:\n\n{snippet['text']}"
        
        # Tworzymy "dobrą" i "złą" odpowiedź na podstawie danych z LitVar
        if snippet['found']:
            # Jeśli wariant został znaleziony w tekście
            chosen = f"{snippet['variant_name']}"
            rejected = "Nie znaleziono wariantów."
        else:
            # Jeśli wariant nie został znaleziony w tekście
            chosen = "Nie znaleziono wariantów."
            rejected = f"{snippet['variant_name']}"
            
        dataset.append({
            "prompt": prompt,
            "chosen": chosen,
            "rejected": rejected,
            "metadata": {
                "variant_name": snippet['variant_name'],
                "found": snippet['found'],
                "rsid": snippet['rsid'],
                "identifier": snippet['identifier']
            }
        })
    
    return dataset

def custom_dpo_loss(policy_chosen_logps, policy_rejected_logps, reference_chosen_logps, reference_rejected_logps, beta, metadata):
    """
    Zmodyfikowana funkcja straty DPO uwzględniająca reward_function
    """
    def reward_function(generated_text, expected_variant, found):
        variants = [line.strip() for line in generated_text.splitlines() if line.strip()]
        if (found and expected_variant in variants) or (not found and expected_variant not in variants):
            return 1.0
        else:
            return -1.0
    
    # Obliczamy reward dla każdej pary odpowiedzi
    chosen_rewards = torch.tensor([reward_function(c, m['variant_name'], m['found']) 
                                 for c, m in zip(policy_chosen_logps, metadata)])
    rejected_rewards = torch.tensor([reward_function(r, m['variant_name'], m['found']) 
                                   for r, m in zip(policy_rejected_logps, metadata)])
    
    # Standardowa implementacja DPO z uwzględnieniem reward
    policy_logratios = policy_chosen_logps - policy_rejected_logps
    ref_logratios = reference_chosen_logps - reference_rejected_logps
    
    logits = policy_logratios - ref_logratios
    
    # Modyfikujemy loss o reward
    losses = -torch.nn.functional.logsigmoid(beta * logits) * (chosen_rewards - rejected_rewards)
    
    return losses.mean()

def train_model():
    # Wczytanie modelu i tokenizera
    model_name = "MilosKosRad/BioNER"  # lub inny odpowiedni model bazowy
    model = AutoModelForCausalLM.from_pretrained(model_name)
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    # Wczytanie i przygotowanie danych z JSONL
    snippets = load_snippets()
    dataset = prepare_dpo_dataset(snippets)
    
    # Konfiguracja treningu
    training_args = TrainingArguments(
        output_dir="./dpo_bioner_variants",
        num_train_epochs=3,
        per_device_train_batch_size=4,
        gradient_accumulation_steps=4,
        learning_rate=5e-5,
        save_strategy="epoch",
        logging_steps=10,
        evaluation_strategy="epoch",
    )
    
    # Inicjalizacja DPO Trainera
    trainer = DPOTrainer(
        model=model,
        args=training_args,
        train_dataset=dataset,
        tokenizer=tokenizer,
        beta=0.1,  # Parametr beta dla DPO
        loss_fn=custom_dpo_loss
    )
    
    # Rozpoczęcie treningu
    trainer.train()
    
    # Zapisanie wytrenowanego modelu
    trainer.save_model("./dpo_bioner_variants_final")
    
    return trainer

if __name__ == "__main__":
    trainer = train_model()

# Inferencja blaze999/Medical-NER

In [6]:
import torch
from transformers import AutoModelForTokenClassification

def predict_medical_entities(text, model, tokenizer):
    # Przygotowanie wejścia
    encodings = tokenizer(
        text,
        return_tensors='pt',
        truncation=True,
        padding=True
    )
    input_ids = encodings["input_ids"][0]
    
    # Inference
    with torch.no_grad():
        outputs = model(**encodings)
        predictions = outputs.logits.argmax(dim=-1)[0]
    
    # Zamiana indeksów na etykiety
    tokens = tokenizer.convert_ids_to_tokens(input_ids)
    labels = [model.config.id2label[p.item()] for p in predictions]
    
    # Przykładowe grupowanie tokenów w jednostki (B-xxx, I-xxx)
    entities = []
    current_entity = []
    current_label = None
    
    for token, label in zip(tokens, labels):
        if label.startswith("B-"):
            # Zaczynamy nowy byt (encję)
            # Zapisujemy poprzedni, jeśli istniał
            if current_entity:
                entities.append((" ".join(current_entity), current_label))
            current_entity = [token]
            current_label = label[2:]  # usuwamy 'B-'
            
        elif label.startswith("I-") and current_label == label[2:]:
            # Kontynuacja poprzedniego bytu
            if token.startswith("##"):
                current_entity[-1] += token[2:]
            else:
                current_entity.append(token)
        else:
            # Etykieta poza bytem (O) lub początek innego bytu
            if current_entity:
                entities.append((" ".join(current_entity), current_label))
                current_entity = []
                current_label = None
            # W tym miejscu pomijamy tokeny oznaczone jako O
    
    # Dodajemy ostatni byt, jeśli istniał
    if current_entity:
        entities.append((" ".join(current_entity), current_label))
    
    return entities

if __name__ == "__main__":
    # Inicjalizacja modelu
    model_name = "blaze999/Medical-NER"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForTokenClassification.from_pretrained(model_name)

    # Przykładowy tekst do przetworzenia
    sample_text = (
        "Pacjentka lat 45 z rozpoznaniem choroby wieńcowej (CAD) "
        "oraz nadciśnieniem tętniczym, przyjmująca metforminę."
    )
    
    # Wywołanie funkcji przewidującej encje
    recognized_entities = predict_medical_entities(sample_text, model, tokenizer)

    # Wyświetlenie wyników
    print("Rozpoznane encje medyczne:")
    for entity_text, entity_label in recognized_entities:
        print(f"- {entity_text} ({entity_label})")

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


Rozpoznane encje medyczne:
- [CLS] (SEVERITY)
- ▁metformin (MEDICATION)
- [SEP] (SEVERITY)


In [6]:
from transformers import AutoTokenizer, AutoModelForTokenClassification
import torch
import json

def load_snippets(file_path='litvar_snippets.json'):
    with open(file_path, 'r') as f:
        return json.load(f)

def predict_medical_entities(text, model, tokenizer):
    # Przygotowanie wejścia
    encodings = tokenizer(
        text,
        return_tensors='pt',
        truncation=True,
        padding=True,
        max_length=512
    )
    
    # Inference
    with torch.no_grad():
        outputs = model(**encodings)
        predictions = outputs.logits.argmax(dim=-1)[0]
    
    # Konwersja indeksów na etykiety
    tokens = tokenizer.convert_ids_to_tokens(encodings['input_ids'][0])
    labels = [model.config.id2label[p.item()] for p in predictions]
    
    # Grupowanie tokenów w encje
    entities = []
    current_entity = []
    current_label = None
    
    for token, label in zip(tokens, labels):
        if label.startswith("B-"):
            if current_entity:
                entities.append((" ".join(current_entity), current_label))
            current_entity = [token]
            current_label = label[2:]
        elif label.startswith("I-") and current_label == label[2:]:
            if token.startswith("##"):
                current_entity[-1] += token[2:]
            else:
                current_entity.append(token)
        else:
            if current_entity:
                entities.append((" ".join(current_entity), current_label))
                current_entity = []
                current_label = None
    
    if current_entity:
        entities.append((" ".join(current_entity), current_label))
    
    return entities

def test_medical_ner():
    # Inicjalizacja modelu
    model_name = "blaze999/Medical-NER"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForTokenClassification.from_pretrained(model_name)
    
    # Wczytanie snippetów
    snippets = load_snippets()
    
    results = []
    snippets_found = [snippet for snippet in snippets if snippet['found']]
    
    for i, snippet in enumerate(snippets_found[:100]):
        text = snippet['text']
        variant_name = snippet['variant_name']
        
        if not text:
            continue
            
        # Predykcja encji medycznych
        entities = predict_medical_entities(text, model, tokenizer)
        
        # Sprawdzenie czy variant_name został wykryty
        variant_detected = False
        detected_entities = []
        
        for entity_text, entity_label in entities:
            detected_entities.append(f"{entity_text} ({entity_label})")
            if variant_name.lower() in entity_text.lower():
                variant_detected = True
        
        result = {
            'snippet_id': i,
            'variant_name': variant_name,
            'variant_detected': variant_detected,
            'detected_entities': detected_entities,
            'text': text[:200] + '...' if len(text) > 200 else text
        }
        results.append(result)
        
        # Wyświetlanie wyników
        print(f"\nSnippet {i}:")
        print(f"Wariant: {variant_name}")
        print(f"Wykryty: {variant_detected}")
        print(f"Wykryte encje: {detected_entities}")
        print(f"Znaleziono słowo wariant: {'variant' in ' '.join(detected_entities).lower()}")
        print(f"Znaleziono nazwę wariantu: {variant_name.lower() in ' '.join(detected_entities).lower()}")
        print("-" * 80)
    
    # Statystyki
    total = len(results)
    correct = sum(1 for r in results if r['variant_detected'])
    print("\nStatystyki:")
    print(f"Całkowita liczba snippetów: {total}")
    print(f"Poprawnie wykryte warianty: {correct}")
    print(f"Dokładność: {correct/total:.2%}")
    
    return results

if __name__ == "__main__":
    results = test_medical_ner()


Snippet 0:
Wariant: V600E
Wykryty: False
Wykryte encje: ['[CLS] (SEVERITY)', '▁mTOR (DIAGNOSTIC_PROCEDURE)', '▁gang li oglio ma (SIGN_SYMPTOM)', '▁tumors (SIGN_SYMPTOM)', '▁PLN TY (SIGN_SYMPTOM)', '▁neoplasm s (SIGN_SYMPTOM)', '▁Akt (DIAGNOSTIC_PROCEDURE)', '▁seizure (SIGN_SYMPTOM)', '▁BRAF (SIGN_SYMPTOM)', '▁neural ▁precursors (BIOLOGICAL_STRUCTURE)', '▁oligo id ▁tumors (SIGN_SYMPTOM)', '▁Akt (DIAGNOSTIC_PROCEDURE)', '▁gang li oglio mas (SIGN_SYMPTOM)', '▁V 600 E ▁T rp 53 (SIGN_SYMPTOM)', '▁an a plastic (SIGN_SYMPTOM)', '▁Activating (DETAILED_DESCRIPTION)', '▁BRAF ▁somatic ▁mutations (SIGN_SYMPTOM)', '▁developmental (DETAILED_DESCRIPTION)', '▁low - grade (DETAILED_DESCRIPTION)', '▁glial (BIOLOGICAL_STRUCTURE)', '▁ glio neuron al (BIOLOGICAL_STRUCTURE)', '▁brain (BIOLOGICAL_STRUCTURE)', '▁dys morphic (DETAILED_DESCRIPTION)', '▁neuronal (BIOLOGICAL_STRUCTURE)', '▁neoplastic (DETAILED_DESCRIPTION)', '▁astro gli al ▁tumor (BIOLOGICAL_STRUCTURE)', '▁epilepsy ▁surgery (THERAPEUTIC_PROCEDUR

# Microsoft BioGPT inference test 

In [9]:
!pip install sacremoses

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
1148.84s - pydevd: Sending message related to process being replaced timed-out after 5 seconds




In [22]:
from transformers import BioGptTokenizer, BioGptForCausalLM
import torch
import json

def load_snippets(file_path='litvar_snippets.json'):
    with open(file_path, 'r') as f:
        return json.load(f)

def predict_variants_biogpt(text, model, tokenizer):
    # Przygotowanie promptu
    prompt = f"List all genomic variants mentioned in this text. Return only variant names, one per line:\n\n{text}\n\nVariants:"
    
    inputs = tokenizer(prompt, return_tensors="pt")
    
    # Generowanie odpowiedzi
    with torch.no_grad():
        generated_tokens = model.generate(
            inputs.input_ids,  # Ograniczamy liczbę tokenów w wejściu do 1024
            max_new_tokens=8192,  # Zmniejszamy maksymalną liczbę nowych tokenów, aby uniknąć błędu
            num_beams=3,
            early_stopping=True,
            pad_token_id=tokenizer.eos_token_id,
            no_repeat_ngram_size=2
        )
        # Usuwamy prompt z wyników
        outputs = generated_tokens[:, inputs.input_ids.shape[-1]:]
    
    # Dekodowanie i przetwarzanie odpowiedzi
    generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
    variants = [line.strip() for line in generated_text.split('\n') if line.strip()]
    
    return variants

def test_biogpt():
    # Inicjalizacja modelu
    model_name = "microsoft/biogpt"
    tokenizer = BioGptTokenizer.from_pretrained(model_name)
    model = BioGptForCausalLM.from_pretrained(model_name)
    
    # Wczytanie snippetów
    snippets = load_snippets()
    
    results = []
    snippets_found = [snippet for snippet in snippets if snippet['found']]
    
    for i, snippet in enumerate(snippets_found[:1]):  # Testujemy pierwsze 10 snippetów
        text = snippet['text']
        variant_name = snippet['variant_name'].lower()
        
        if not text:
            continue
            
        # Predykcja wariantów
        predicted_variants = [v.lower() for v in predict_variants_biogpt(text[:512], model, tokenizer)]
        
        result = {
            'snippet_id': i,
            'variant_name': variant_name,
            'predicted_variants': predicted_variants,
            'variant_detected': variant_name in predicted_variants,
            'text': text[:200] + '...' if len(text) > 200 else text
        }
        results.append(result)
        
        # Wyświetlanie wyników
        print(f"\nSnippet {i}:")
        print(f"Oczekiwany wariant: {variant_name}")
        print(f"Wykryte warianty: {predicted_variants}")
        print(f"Znaleziono oczekiwany wariant: {result['variant_detected']}")
        print("-" * 80)
    
    # Statystyki
    total = len(results)
    correct = sum(1 for r in results if r['variant_detected'])
    print("\nStatystyki:")
    print(f"Całkowita liczba snippetów: {total}")
    print(f"Poprawnie wykryte warianty: {correct}")
    print(f"Dokładność: {correct/total:.2%}")
    
    return results

if __name__ == "__main__":
    results = test_biogpt()


Snippet 0:
Oczekiwany wariant: v600e
Wykryte warianty: ['in this study, we show that in addition to braf v600e, other braf variants, such as v599e, are also involved in the development of gg.']
Znaleziono oczekiwany wariant: False
--------------------------------------------------------------------------------

Statystyki:
Całkowita liczba snippetów: 1
Poprawnie wykryte warianty: 0
Dokładność: 0.00%
