In [None]:
import pandas as pd
import numpy as np
from collections import Counter
from typing import List
import os
import json
import time
import random
import torch
from torch import Tensor
import re
from tqdm import tqdm
import csv
import nltk
import gensim
import gensim.downloader as gensim_api

# 1. Evaluation for ROUGE score and Semantic Similarity

### 1-1. Process output of GeneAgent and GPT-4

In [None]:
reference = []
genes = []
# data = pd.read_table("NeST_table.tsv", header=0, index_col=0)
data = pd.read_csv("Datasets/MsigDB/MsigDB.csv",header=0, index_col=None)
for gene, term in zip(data["ID"], data["Name"]):
    term = term.replace('/', ' ').replace(","," ").replace("\"","").replace("-", " ").strip()
    genes.append(gene)
    reference.append(term)
    
print(len(genes))
print(len(reference))

In [None]:
def process_text(text: str) -> list:
    pattern = r'\([^)]*\)'
    segments = text.split('//')
    # Remove numbers and stop tokens ('-', '*')
    cleaned_segments = []
    for segment in segments:
        cleaned_segment = ''.join(char for char in segment)
        cleaned_segment = re.sub(pattern, '', cleaned_segment)
        cleaned_segment = cleaned_segment.replace('/', ' ').replace(","," ").replace("\"","").replace("-", " ").strip()
        if cleaned_segment:
            cleaned_segments.append(cleaned_segment)

    return cleaned_segments

## read results of GeneAgent
agent = ""
with open ("Outputs/GeneAgent/Cascade/MsigDB_Final_Response_GeneAgent.txt", "r") as agentfile:
    for line in agentfile.readlines():
        agent += line
agent_text = process_text(agent)
agent_term = []
for text in agent_text:
    seg = text.split("\n")
    if len(seg) > 1:
        agent_term.append(seg[0].split(": ")[1])
    else:
        agent_term.append("None")
print("gpt agent file: %d" %(len(agent_term)))
        
## read results of GPT4
gpt = ""
with open ("Outputs/GPT-4/MsigDB_Response_GPT4.txt", "r") as gptfile:
    for line in gptfile.readlines():
        gpt += line
gpt_text = process_text(gpt)
gpt_term = []
for text in gpt_text:
    seg = text.split("\n")
    if len(seg) > 1:
        gpt_term.append(seg[0].split(": ")[1])
    else:
        gpt_term.append("None")
print("gpt file:%d" %(len(gpt_term)))

### 1-2. Calculate ROUGE scores

In [None]:
import json
from rouge_score import rouge_scorer
import sys
import pandas as pd

if __name__ == "__main__":
	metrics = ["rouge1", "rouge2", "rougeL"]
	metric2results = {metric: [] for metric in metrics}
	scorer = rouge_scorer.RougeScorer(metrics, use_stemmer=True)
 
	for ref, hypagent in zip (reference, agent_term):
    # for ref, hypgpt in zip (reference, gpt_term):
		scores_agent = scorer.score(ref, hypagent)
		# scores_gpt = scorer.score(ref, hypgpt)

		for metric in metrics:
			metric2results[metric].append(scores_gpt[metric].fmeasure)
   
	f = open("MsigDB.Rouge.txt","a")
	f.write("\n====GeneAgent (Cascade)====\n")
	for metric in metrics:
		results = metric2results[metric]
		f.write(metric + ":" + str(sum(results) / len(results)) + "\n")
	f.close()

### 1-3. Calculate semantic similarity using MedCPT

In [None]:
import numpy as np
from transformers import AutoTokenizer, AutoModel
model = AutoModel.from_pretrained("ncbi/MedCPT-Query-Encoder")
tokenizer = AutoTokenizer.from_pretrained("ncbi/MedCPT-Query-Encoder")

def cos_sim(a: Tensor, b: Tensor):
    """
    Computes the cosine similarity cos_sim(a[i], b[j]) for all i and j.
    :return: Matrix with res[i][j]  = cos_sim(a[i], b[j])
    """
    if not isinstance(a, torch.Tensor):
        a = torch.tensor(a)

    if not isinstance(b, torch.Tensor):
        b = torch.tensor(b)

    if len(a.shape) == 1:
        a = a.unsqueeze(0)

    if len(b.shape) == 1:
        b = b.unsqueeze(0)

    a_norm = torch.nn.functional.normalize(a, p=2, dim=1)
    b_norm = torch.nn.functional.normalize(b, p=2, dim=1)
    return torch.mm(a_norm, b_norm.transpose(0, 1))

agent_scores = []
gpt_scores = []
summary_scores = []

for ref, hypagent, hypgpt in zip(reference, agent_term, gpt_term):
    with torch.no_grad():
        tokenize the queries
        encoded_agent = tokenizer(
            [ref, hypagent], 
            truncation=True, 
            padding=True, 
            return_tensors='pt', 
            max_length=64,
        )
        encoded_gpt = tokenizer(
            [ref, hypgpt], 
            truncation=True, 
            padding=True, 
            return_tensors='pt', 
            max_length=64,
        )
        
        # encode the queries (use the [CLS] last hidden states as the representations)
        embeds_agent = model(**encoded_agent).last_hidden_state[:, 0, :] 
        score_agent = cos_sim(embeds_agent[0], embeds_agent[1])
        agent_scores.append(score_agent.tolist()[0])
        
        embeds_gpt = model(**encoded_gpt).last_hidden_state[:, 0, :]
        score_gpt = cos_sim(embeds_gpt[0], embeds_gpt[1])
        gpt_scores.append(score_gpt.tolist()[0])
        
        
print(np.average(agent_scores),np.average(gpt_scores))
print(np.max(agent_scores),np.max(gpt_scores))  
      
np.savetxt("MsigDB.GeneAgent.Cascade.Semantic.csv", np.asarray(agent_scores), fmt="%s", delimiter="\t", newline="\n") 
np.savetxt("MsigDB.GPT4.Semantic.csv", np.asarray(gpt_scores), fmt="%s", delimiter="\t", newline="\n")
    

# 2. Evaluation for background semantic similarity distribution

### 2-1. Collect the background gene sets

In [None]:
GSS = []

index = 0
bp = pd.read_csv("BP_terms_All.csv", header=0, index_col=0)
for ID, Genes, Count, Name in zip(bp["ID"], bp["Genes"], bp["Gene_Count"], bp["Truth Label"]):
    GSS.append([index, ID, Genes, Count, Name])
    index += 1
        
print(len(GSS))

with open("Datasets/NeST/NeST_table.tsv", "r") as nestfile:
    for line in nestfile.readlines()[1:]:
        arr = line.split("\t")
        ID = arr[0]
        Name = arr[1]
        Genes = " ".join(arr[2].split(",")).replace("\n","").replace("\"", "")
        Count = len(Genes.split())
        GSS.append([index, ID, Genes, Count, Name])
        index += 1
        
print(len(GSS))


zen = pd.read_csv("Datasets/MsigDB/MsigDB.csv", header=0, index_col=None)
for ID, Genes, Count, Name in zip(zen["ID"], zen["Genes"], zen["Count"], zen["Name"]):
    GSS.append([index, ID, Genes, Count, Name])
    index += 1
        
print(len(GSS))
print(index)


with open("background.csv", mode='w', newline='\n', encoding='utf-8') as file:
    writer = csv.writer(file, quoting=csv.QUOTE_ALL)  
    writer.writerow(['Index', 'ID', 'Genes', 'Count', 'Term'])
    for term in GSS:
        writer.writerow(term)

### 2-2. Calculate relative similarity using MedCPT

In [None]:
back = pd.read_csv("background.csv", header=0, index_col=0)
back["Index"] = back.index

all_Ref = []
for term in back["Term"]:
    all_Ref.append(term)
print(len(all_Ref))

data = pd.read_csv("Datasets/MsigDB/MsigDB.csv", header=0, index_col=None)
AllDATA = pd.merge(data, back, on='ID', how='inner')
AllDATA.head()

In [None]:
import numpy as np
from transformers import AutoTokenizer, AutoModel
model = AutoModel.from_pretrained("ncbi/MedCPT-Query-Encoder")
tokenizer = AutoTokenizer.from_pretrained("ncbi/MedCPT-Query-Encoder")

def get_medcpt_embeddings(queries):
    
    with torch.no_grad():
        encoded = tokenizer(
            queries, 
            truncation=True, 
            padding=True, 
            return_tensors='pt', 
            max_length=64,
        )
        
        embeds = model(**encoded).last_hidden_state[:, 0, :]      
        return embeds, embeds.size()  

ref_embeds, ref_embeds_size = get_medcpt_embeddings(all_Ref)
agent_embeds, agent_embeds_size = get_medcpt_embeddings(agent_term)
gpt_embeds, gpt_embeds_size = get_medcpt_embeddings(gpt_term) 

In [None]:
cos = torch.nn.CosineSimilarity(dim=0, eps=1e-6)

relative = []
for gpt_tensor in tqdm(gpt_embeds):
    temp = []
    for ref_tensor in ref_embeds:
        score = cos(gpt_tensor, ref_tensor)
        temp.append(score.tolist())

    relative.append(temp)

scores = np.asarray(relative)
print (scores.shape)
        
rank = []
row = 0
for ind in tqdm(AllDATA["Index"]):
    root = scores[row][ind]
    ith = 1
    for j in range(scores.shape[1]):
        if scores[row][j] > root:
            ith += 1 
    rank.append(ith)
    row += 1
    
np.savetxt("MsigDB.Relative.Rank.GPT.Background.txt", np.asarray(rank), fmt="%s", newline="\n")    

# 3. Evaluation for multiple enrichment terms test

### 3-1. Process output of GPT-4 in summarizing multiple enrichment terms

In [None]:
text = ""
with open("Outputs/EnrichedTermTest/gpt.geneagent.msigdb.summary.result.verification.txt") as summary:
    for line in summary.readlines():
        text += line
segments = text.split('//')
print(len(segments))
enrich_terms = []
for segment in segments:
    cleaned_segment = ''.join(char for char in segment)
    enrich = cleaned_segment.split("\n\n")[-2].replace(".", "").replace("\n","")
    enrich_terms.append(enrich.split("Enriched Terms: ")[1].split("; "))
    
print(len(enrich_terms))

### 3-2. Exact match with all significant enrichment terms

In [None]:
import json
with open("GSEATerms/MsigDB.EnrichTerms.Allsignificant.json","r") as file:
    enrich = json.load(file)
    
name2id = {}
for names in enrich:
    for name in names:
        if name["name"].lower() in name2id.keys():
            name2id[name["name"].lower()].append(name["native"])
        else:
            name2id[name["name"].lower()] = [name["native"]]

results = []
for terms in enrich_terms:
    matched = {} 
    for term in terms:
        if term.lower() in name2id.keys():
            matched[term] = list(set(name2id[term.lower()]))
        else:
            matched[term] = "None"

    results.append(matched)
        
with open("Term2Enrich_Exact.Verification.Allsignificant.json", "w") as file:
    json.dump(results, file, indent=4)

In [None]:
with open("Term2Enrich_Exact.Verification.Allsignificant.json","r") as enrichfile:
    data = json.load(enrichfile)
print(len(data))

total, success, fail = 0, 0, 0
for terms in data:
    for key in terms.keys():
        total += 1
        if terms[key] != "None":
            success += 1
        else:
            fail += 1

print(f"the total number of summarized terms: {total}")
print(f"the successful number of summarized terms: {success}")
print(f"the failed number of summarized terms: {fail}")
print(f"the match rate of summarized terms: {float(success/total)}")

### 3-3. Exact match with top-k (k=1,3,5) significant enrichment terms

In [None]:
### Exact MATCH with ENRICHED TERMS in TOP 1
with open("GSEATerms/MsigDB.EnrichTerms.top1.json","r") as file:
    enrich = json.load(file)

for ind in range(0, len(enrich_terms)):
    for pos in range(0, len(enrich_terms[ind])):
        enrich_terms[ind][pos] = enrich_terms[ind][pos].lower().replace("-"," ")

pairs = []
for terms, data in zip(enrich_terms, enrich):
    temp = {}
    if data["name"].lower().replace("-"," ") in terms:
        temp[data["name"]] = data["native"]
    else:
        temp[data["name"]] = "None"
        
    pairs.append(temp)
    
print(len(pairs))
with open("Term2Enrich_Exact.Verification.top1.json", "w") as file:
    json.dump(pairs, file, indent=4)

In [None]:
### Exact MATCH with ENRICHED TERMS in TOP 3 and TOP 5
with open("GSEATerms/MsigDB.EnrichTerms.top5.json","r") as file:
    enrich = json.load(file)
    
name2id = {}
for names in enrich:
    for name in names:
        if name["name"].lower() in name2id.keys():
            name2id[name["name"].lower()].append(name["native"])
        else:
            name2id[name["name"].lower()] = [name["native"]]
    
results = []
for terms in enrich_terms:
    matched = {} 
    for term in terms:
        if term.lower() in name2id.keys():
            matched[term] = list(set(name2id[term.lower()]))
        else:
            matched[term] = "None"

    results.append(matched)
        
with open("Term2Enrich_Exact.Verification.Top5.json", "w") as file:
    json.dump(results, file, indent=4)

In [None]:
with open("Term2Enrich_Exact.Verification.Top5.json","r") as gofile:
    data = json.load(gofile)
print(len(data))

total, success, fail = 0, 0, 0
for terms in data:
    total += 1
    for key in terms.keys():
        if terms[key] != "None":
            success += 1
            break
        else:
            # fail += 1
            continue

print(f"the total number of summarized terms: {total}")
print(f"the successful number of summarized terms: {success}")
print(f"the failed number of summarized terms: {total - success}")
print(f"the match rate of summarized terms: {float(success/total)}")

# 4. Other BERT-based model for the evaluation of semantic similarity

## SENTENCE BERT

In [None]:
if torch.cuda.is_available():
  dev = "cuda:3"
else:
  dev = "cpu"
torch.cuda.set_device(dev)

from text2vec import Similarity
sim_model = Similarity()
scores = []
# for ref, hyp in zip(reference, hypothesis_term):
for ref, hyp in zip(reference, agent_term):
  score = sim_model.get_score(ref, hyp)
  scores.append(score)

print(np.average(scores),np.max(scores))
# np.savetxt("semantic_similarity_nest_enrichment.txt", np.asarray(scores), fmt="%s", delimiter="\t\t", newline="\n")

# SapBERT

In [None]:
import numpy as np
import torch
from tqdm.auto import tqdm
from transformers import AutoTokenizer, AutoModel  

def cos_sim(a: Tensor, b: Tensor):
    """
    Computes the cosine similarity cos_sim(a[i], b[j]) for all i and j.
    :return: Matrix with res[i][j]  = cos_sim(a[i], b[j])
    """
    if not isinstance(a, torch.Tensor):
        a = torch.tensor(a)

    if not isinstance(b, torch.Tensor):
        b = torch.tensor(b)

    if len(a.shape) == 1:
        a = a.unsqueeze(0)

    if len(b.shape) == 1:
        b = b.unsqueeze(0)

    a_norm = torch.nn.functional.normalize(a, p=2, dim=1)
    b_norm = torch.nn.functional.normalize(b, p=2, dim=1)
    return torch.mm(a_norm, b_norm.transpose(0, 1))

tokenizer = AutoTokenizer.from_pretrained("cambridgeltl/SapBERT-from-PubMedBERT-fulltext")  
model = AutoModel.from_pretrained("cambridgeltl/SapBERT-from-PubMedBERT-fulltext").cuda()

# replace with your own list of entity names
# all_names = ["covid-19", "Coronavirus infection", "high fever", "Tumor of posterior wall of oropharynx"] 

# bs = 128 # batch size during inference
# all_embs = []
# for i in tqdm(np.arange(0, len(all_names), bs)):
scores = []
for ref, hyp in zip(reference, gpt_term):
    toks = tokenizer.batch_encode_plus([ref, hyp], 
                                       padding="max_length", 
                                       max_length=30, 
                                       truncation=True,
                                       return_tensors="pt")
    toks_cuda = {}
    for k,v in toks.items():
        toks_cuda[k] = v.cuda()
    cls_rep = model(**toks_cuda)[0][:,0,:] # use CLS representation as the embedding
    score = cos_sim(cls_rep[0], cls_rep[1])
    scores.append(score.tolist()[0])
    
print(np.average(scores),np.max(scores))