In [1]:
# taxo_name = "food"
# taxo_name = "equipment"
taxo_name = "chemical"
# taxo_name = "science"
# taxo = "taxos/wn_food.taxo.txt"
taxo = f"taxos/{taxo_name}.taxo"
# taxo = "taxos/equipment.taxo.txt"
is_parent = {}
for line in open(taxo):
    #split by tab
    line = line.strip().split("\t")
    is_parent[line[1]] = line[2]

In [2]:
def get_ancestors(taxo, term):
    """
    Get all ancestors of a term in the taxonomy
    """
    ancestors = []
    while term in is_parent:
        term = is_parent[term]
        ancestors.append(term)
    return ancestors

def get_children(taxo, term):
    """
    Get all children of a term in the taxonomy
    """
    children = []
    for line in open(taxo):
        line = line.strip().split("\t")
        if line[2] == term:
            children.append(line[1])
    return children

#get all tree except the term and its children
#Format:
# root child 1 child 2
# child 1 child 3 child 4
#iterative version
def get_tree_it(taxo,root, term):
    """
    Get all terms in the taxonomy except the term and its children
    """
    tree = ""
    stack = [(root, 0)]
    while stack:
        node, depth = stack.pop()
        if depth > 4:
            continue
        tree += "\nParent: " + node 
        root_children = get_children(taxo, node)
        if len(root_children) != 0:
            tree = tree + "\nChildren: "
        for child in root_children:
            if child == term:
                continue
            tree += child + ", "
            stack.append((child, depth + 1))
    return tree

def get_leafs(taxo,root, term):
    """
    Get all terms in the taxonomy except the term and its children
    """
    parents_leaves_dict = {}
    stack = [(root, 0)]
    while stack:
        node, depth = stack.pop()
        if depth > 5:
            continue
        root_children = get_children(taxo, node)
        if len(root_children) == 0:
            parent = is_parent[node]
            if parent not in parents_leaves_dict:
                parents_leaves_dict[parent] = [node]
            else:
                parents_leaves_dict[parent].append(node)
        # if term in root_children:
        #     #get parent
        #     parent = is_parent[child]
        #     grand_parent = is_parent[parent]
        #     if grand_parent not in parents_leaves_dict:
        #         parents_leaves_dict[grand_parent] = [parent]
        #     else:
        #         parents_leaves_dict[grand_parent].append(parent)
        #     continue
        for child in root_children:
            if child == term:
                continue
            stack.append((child, depth + 1))
    return parents_leaves_dict

def get_tree_leaves(parents_leaves_dict):
    tree =""
    for parent in parents_leaves_dict:
        if parent == "food":
            continue
        granparent = is_parent[parent]
        tree += "\nGranparent: " + granparent
        tree += "\nParent: " + parent
        tree += "\nChildren: "
        for child in parents_leaves_dict[parent]:
            tree += child + ", "
        tree += "\n"
    return tree

In [3]:
#choose 20% of the leaf terms
import random
random.seed(42)
all_terms = list(is_parent.keys())

#if chemical, test needs to be less(too big)
if taxo_name == "chemical":
    k=20
else:
    k=3
from sklearn.model_selection import KFold
kf = KFold(n_splits=k, shuffle=True, random_state=42)
terms = [term for term in all_terms if len(get_children(taxo, term)) == 0]
# indexes = [i for i in range(len(terms))]
# random.shuffle(indexes)
# #train 80% and test 20%
# train_index = indexes[:int(len(indexes)*0.8)]
# test_index = indexes[int(len(indexes)*0.8):]
kf.get_n_splits(terms)

for i, (train_index, test_index) in enumerate(kf.split(terms)):
    print(f"Fold {i}:")
    print(f"  Train: index={train_index}")
    print(f"  Test:  index={test_index}")
    #create directory
    import os
    if not os.path.exists(f"folds/{taxo_name}"):
        os.makedirs(f"folds/{taxo_name}")
    #save folds
    with open(f"folds/{taxo_name}/train_{i}.txt", "w") as f:
        for index in train_index:
            f.write(terms[index] + "\n")
    with open(f"folds/{taxo_name}/test_{i}.txt", "w") as f:
        for index in test_index:
            f.write(terms[index] + "\n")

Fold 0:
  Train: index=[    0     1     2 ... 14231 14232 14233]
  Test:  index=[   14    27    31    99   101   102   168   169   173   177   181   218
   280   284   286   290   304   311   327   339   353   357   360   399
   410   450   455   468   476   487   608   624   708   735   736   772
   794   836   884   902   903   907  1006  1010  1022  1053  1061  1086
  1121  1213  1217  1225  1253  1320  1323  1350  1358  1368  1372  1373
  1385  1393  1398  1433  1453  1483  1489  1516  1533  1547  1562  1575
  1606  1611  1623  1650  1655  1683  1684  1697  1701  1714  1722  1747
  1768  1769  1795  1880  1911  1923  1953  1962  1983  1989  2020  2034
  2054  2085  2099  2112  2124  2140  2150  2157  2167  2236  2308  2310
  2312  2342  2344  2366  2439  2453  2455  2456  2487  2512  2519  2525
  2534  2585  2589  2626  2629  2631  2676  2715  2754  2856  2864  2900
  2913  2928  2955  2980  2991  2993  2998  3000  3011  3024  3031  3063
  3065  3115  3120  3251  3295  3297  3305  

In [4]:
#load fold
fold = 0
train_terms = []
with open(f"folds/{taxo_name}/train_{fold}.txt", "r") as f:
    for line in f:
        train_terms.append(line.strip())
test_terms = []
with open(f"folds/{taxo_name}/test_{fold}.txt", "r") as f:
    for line in f:
        test_terms.append(line.strip())

In [5]:
len(train_terms), len(test_terms), len(all_terms)

(13522, 712, 17583)

In [6]:
#extract word embeddings of terms
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

In [None]:
#extract word embeddings of terms
import numpy as np

# embeddings = []
# for term in all_terms:
#     embedding = model.encode(term)
#     embeddings.append(embedding)
# embeddings = np.array(embeddings)
# embeddings.shape

(17583, 384)

In [8]:
#import cosine similarity
from sklearn.metrics.pairwise import cosine_similarity

In [9]:
import tqdm
train_data = []
train_labels = []

patterns = ["is a", "is a kind of", "is a type of", "is a subtype of", "is a subcategory of", "is a subclass of", "is a member of", "is a part of", "is a component of", "is a constituent of"]
for i in tqdm.tqdm(range(10000)):
    r = random.randint(0, len(train_terms)-1)
    term = train_terms[r]
    #get parent
    parent = is_parent[term]
    #positive
    pattern = random.choice(patterns)
    if random.random() < 0.1:        
        #get random pattern
        train_data.append(f"{term} {pattern} {parent}")
        train_labels.append(1)
    #negative
    else:
        #get any term that is not the parent
        #get random term
        # if random.random() < 0.5:
        r = random.randint(0, len(train_terms)-1)
        term2 = train_terms[r]
        train_data.append(f"{term} {pattern} {term2}")
        train_labels.append(0)
#         else:
#             #get most similar term that is not the parent
#             #get embedding of term
#             term_embedding = model.encode(term)
#             #cosine similarity
#             cos_sim = cosine_similarity(term_embedding.reshape(1, -1), embeddings)
#             #now get the most similar term that is not the parent
#             #order
#             similarity_index = np.argsort(cos_sim[0])[::-1]
#             #get most similar term that is not the parent
#             for j in similarity_index:
#                 term2 = all_terms[j]
#                 #if term2 is not the parent
#                 if term2 != parent and term2 != term:
#                     break
#             #get random pattern
#             pattern = random.choice(patterns)
#             train_data.append(f"{term} {pattern} {term2}")
#             train_labels.append(0)
# #get test
test_data = []
for i in range(len(test_terms)):
    term = test_terms[i]
    #get parent
    parent = is_parent[term]
    #get random pattern
    pattern = random.choice(patterns)
    test_data.append(f"{term} {pattern} {parent}")
    

100%|██████████| 10000/10000 [00:00<00:00, 911983.65it/s]


In [10]:
r = random.randint(0, len(train_data)-1)
train_data[r], train_labels[r]

('navenone a is a neurolenin d', 0)

In [11]:
r = random.randint(0, len(test_data)-1)
test_data[r]

'dioxohydrazine is a subtype of nitrogen oxide'

In [12]:
#now lets train using bert
from transformers import BertTokenizer, BertForSequenceClassification
import torch
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import accuracy_score, f1_score

tokenizer=BertTokenizer.from_pretrained('bert-base-uncased')
model=BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


BertForSequenceClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSdpaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e

In [13]:
class CustomDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data[idx]
        label = self.labels[idx]
        #to 512
        inputs = tokenizer(text, padding='max_length', truncation=True, max_length=32, return_tensors="pt")
        inputs['labels'] = torch.tensor(label, dtype=torch.long)

        return {
            'input_ids': inputs['input_ids'].squeeze(),
            'attention_mask': inputs['attention_mask'].squeeze(),
            'labels': inputs['labels']
        }

In [14]:
train_dataset = CustomDataset(train_data, train_labels)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

#test dataset
for t in train_loader:
    print(t['input_ids'].shape)
    print(t['attention_mask'].shape)
    print(t['labels'].shape)
    break

torch.Size([16, 32])
torch.Size([16, 32])
torch.Size([16])


In [15]:
t['labels']

tensor([0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0])

In [16]:
#decode sample 1
sample = train_loader.dataset[0]
print(tokenizer.decode(sample['input_ids']))

[CLS] tetrachlorobiphenyl is a kind of polychlorobiphenyl [SEP] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD]


In [17]:
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)

In [18]:
import tqdm
num_epochs = 3
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    for batch in  tqdm.tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    print(f"Epoch {epoch+1}/{num_epochs} Loss: {epoch_loss/len(train_loader)}")

Epoch 1/3: 100%|██████████| 625/625 [00:29<00:00, 21.06it/s]


Epoch 1/3 Loss: 0.1870060645133257


Epoch 2/3: 100%|██████████| 625/625 [00:29<00:00, 21.45it/s]


Epoch 2/3 Loss: 0.07782335833273828


Epoch 3/3: 100%|██████████| 625/625 [00:28<00:00, 22.13it/s]

Epoch 3/3 Loss: 0.052988943858630955





In [19]:
#lets create negatives for test set
test_labels=[1]*len(test_data)
for i in range(len(test_data)*2):
    r = random.randint(0, len(test_terms)-1)
    term = test_terms[r]
    #get parent
    parent = is_parent[term]
    #negative
    #get any term that is not the parent
    #get random term
    # if random.random() < 0.5:
    r = random.randint(0, len(test_terms)-1)
    term2 = test_terms[r]
    fake_parent = is_parent[term2]
    #check if fake parent is not the true parent
    while fake_parent == parent:
        r = random.randint(0, len(test_terms)-1)
        term2 = test_terms[r]
        fake_parent = is_parent[term2]
    test_data.append(f"{term} {pattern} {fake_parent}")
    test_labels.append(0)
    

In [20]:
#eval
import numpy as np
model.eval()
test_dataset = CustomDataset(test_data, test_labels)  # Assuming all test labels are 1 (positive)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)
predictions = []
true_labels = []
for batch in test_loader:
    inputs_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    labels = batch['labels'].to(device)
    with torch.no_grad():
        outputs = model(input_ids=inputs_ids, attention_mask=attention_mask)
        logits = outputs.logits
    preds = torch.argmax(logits, dim=1).cpu().numpy()
    predictions.extend(preds)
    true_labels.extend(labels.cpu().numpy())
accuracy = accuracy_score(true_labels, predictions)
f1 = f1_score(true_labels, predictions, average='weighted')
print(f"Accuracy: {accuracy}")
print(f"F1 Score: {f1}")

Accuracy: 0.50187265917603
F1 Score: 0.4846927006096557


In [21]:
#classification report
from sklearn.metrics import classification_report
# print(classification_report(true_labels, predictions, target_names=["negative", "positive"]))
#save classification report in folder results
import os
os.makedirs(f"results/{taxo_name}", exist_ok=True)
with open(f"results/{taxo_name}/musubu_classification_report.txt", "w") as f:
    f.write(classification_report(true_labels, predictions, target_names=["negative", "positive"]))