
Adopting the initial code by Loureiro et al. (2021) as a starting point, I adapted it extensively to meet our project unique criteria.

Link to the original code:  https://github.com/danlou/bert-disambiguation

#Environment's settings and getting access to the data in Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install transformers
!pip install torch torchvision torchaudio
!pip install scikit-learn
!pip install nltk
!pip install numpy
!pip install mkl-service
!pip install transliterate

# NLM_encoder
for RuBERT & multilingual BERT

In [None]:
!pip install pymorphy2==0.8

In [None]:
import mkl
mkl.set_dynamic(0)
mkl.set_num_threads(6)

import torch as th
import numpy as np
import pymorphy2
from transformers import AutoModel, AutoTokenizer

class TransformerEncoder():

    def __init__(self, nlm_config):
        self.nlm_config = nlm_config
        self.nlm_model = None
        self.nlm_tokenizer = None

        self.load_nlm(nlm_config['model_name_or_path'])
        self.morph = pymorphy2.MorphAnalyzer()

    def load_nlm(self, model_name_or_path):
      self.nlm_model = AutoModel.from_pretrained(model_name_or_path,
                                                 output_hidden_states=True)
      self.nlm_tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)
      self.cls_encoding = self.nlm_tokenizer.encode(self.nlm_tokenizer.cls_token,
                                                    add_special_tokens=False)[0]
      self.sep_encoding = self.nlm_tokenizer.encode(self.nlm_tokenizer.sep_token,
                                                    add_special_tokens=False)[0]
      self.pad_encoding = self.nlm_tokenizer.encode(self.nlm_tokenizer.pad_token,
                                                    add_special_tokens=False)[0]

      self.nlm_model.eval()
      self.nlm_model.to('cuda')


    def encode_token(self, token):
        return self.nlm_tokenizer.encode(token, add_special_tokens=False)


    def get_encodings(self, tokens):
        return [self.encode_token(t) for t in tokens]


    def flatten_encodings(self, encodings):
        return sum(encodings, [])


    def add_special_encodings(self, encodings):

        model_name_or_path = self.nlm_config['model_name_or_path']
        return [self.cls_encoding] + encodings + [self.sep_encoding]


    def add_padding_encodings(self, encodings, max_len):
        encodings += [self.pad_encoding] * (max_len - len(encodings))
        return encodings


    def get_attention_mask(self, encodings):
        att_mask = []
        for enc in encodings:
            if enc == self.pad_encoding:
                att_mask.append(0)
            else:
                att_mask.append(1)
        return att_mask


    def merge_subword_embeddings(self, tokens, encodings, embeddings, return_tokens=True):
        # align and merge subword embeddings
        tok_embeddings = []
        encoding_idx = 0
        for tok, tok_encodings in zip(tokens, encodings):

            if self.nlm_config['subword_op'] == 'mean':
                tok_embedding = th.zeros(embeddings.shape[-1]).to('cuda')
                for _ in tok_encodings:
                    tok_embedding += embeddings[encoding_idx]
                    encoding_idx += 1
                tok_embedding = tok_embedding / len(tok_encodings)  # avg of subword embs

            elif self.nlm_config['subword_op'] == 'first':
                tok_embedding = embeddings[encoding_idx]
                for _ in tok_encodings:
                    encoding_idx += 1  # just move idx

            else:
                raise(BaseException('Invalid subword_op - %s' % self.nlm_config['subword_op']))

            tok_embedding = tok_embedding.detach().cpu().numpy()

            if return_tokens:
                tok_embeddings.append((tok, tok_embedding))
            else:
                tok_embeddings.append(tok_embedding)

        return tok_embeddings


    def get_num_features(self, tokens, n_special_toks=2):
        return len(self.get_encodings(tokens)) + n_special_toks


    def get_num_subtokens(self, tokens):
        return len(self.get_encodings(tokens))


    def get_token_embeddings_batch(self, batch_sent_tokens, return_tokens=True):
        batch_sent_encodings = [self.get_encodings(sent_tokens) for sent_tokens in batch_sent_tokens]
        batch_max_len = max([len(self.flatten_encodings(e)) for e in batch_sent_encodings]) + 2

        # prepare nlm input
        input_ids, input_mask = [], []
        for sent_tokens, sent_encodings in zip(batch_sent_tokens, batch_sent_encodings):

            sent_encodings = self.flatten_encodings(sent_encodings)
            sent_encodings = self.add_special_encodings(sent_encodings)
            sent_encodings = self.add_padding_encodings(sent_encodings, batch_max_len)
            input_ids.append(sent_encodings)

            sent_attention = self.get_attention_mask(sent_encodings)
            input_mask.append(sent_attention)

            assert len(sent_encodings) == len(sent_attention)


        input_ids = th.tensor(input_ids).to('cuda')
        input_mask = th.tensor(input_mask).to('cuda')
        with th.no_grad():

            if self.nlm_config['model_name_or_path'].startswith('xlnet-'):
                pooled, batch_hidden_states = self.nlm_model(input_ids, attention_mask=input_mask)
                last_layer = batch_hidden_states[-1]

            else:
                output_from_model = self.nlm_model(input_ids, attention_mask=input_mask)
                last_layer = output_from_model.last_hidden_state
                pooled = output_from_model.pooler_output
                batch_hidden_states = output_from_model.hidden_states

        # select layers of interest
        sel_hidden_states = [batch_hidden_states[i] for i in self.nlm_config['layers']]

        # merge subword embeddings
        merged_batch_hidden_states = []
        for layer_hidden_states in sel_hidden_states:
            merged_layer_hidden_states = []
            for sent_idx, sent_embeddings in enumerate(layer_hidden_states):
                sent_embeddings = sent_embeddings[1:-1]  # ignoring special tokens
                sent_tokens = batch_sent_tokens[sent_idx]
                sent_encodings = batch_sent_encodings[sent_idx]
                sent_embeddings = self.merge_subword_embeddings(sent_tokens, sent_encodings, sent_embeddings, return_tokens=return_tokens)
                merged_layer_hidden_states.append(sent_embeddings)
            merged_batch_hidden_states.append(merged_layer_hidden_states)

        # combine layers
        combined_batch_embeddings = []
        for sent_idx, sent_tokens in enumerate(batch_sent_tokens):
            combined_sent_embeddings = []
            for tok_idx in range(len(sent_tokens)):
                tok_layer_vecs = []
                for layer_idx in range(len(merged_batch_hidden_states)):
                    tok_layer_vecs.append(merged_batch_hidden_states[layer_idx][sent_idx][tok_idx][1])

                if len(tok_layer_vecs) == 1:
                    tok_combined_vec = tok_layer_vecs[0]

                else:
                    tok_layer_vecs = np.array(tok_layer_vecs)

                    if self.nlm_config['layer_op'] == 'sum':
                        tok_combined_vec = tok_layer_vecs.sum(axis=0)

                tok = merged_batch_hidden_states[layer_idx][sent_idx][tok_idx][0]
                tok = self.morph.parse(tok)[0].normal_form
                combined_sent_embeddings.append((tok, tok_combined_vec))

            combined_batch_embeddings.append(combined_sent_embeddings)

        return [combined_batch_embeddings]


    def token_embeddings(self, batch_sent_tokens, return_tokens=True):
        return self.get_token_embeddings_batch(batch_sent_tokens, return_tokens=return_tokens)


    def is_valid(self, tokens):
        encodings = self.flatten_encodings(self.get_encodings(tokens))
        if (len(encodings) + 2) > self.nlm_config['max_seq_len']:
            return False
        else:
            return True


if __name__ == '__main__':
    """For checking NLM_encoder with different models"""
    encoder_cfg = {
        'model_name_or_path':'bert-base-multilingual-cased',           #Choices: ['DeepPavlov/rubert-base-cased','bert-base-multilingual-cased']
        'weights_path': '',
        'min_seq_len': 0,
        'max_seq_len': 32,
        'layers': [-1, -2, -3, -4],
        'layer_op': 'sum',
        'subword_op': 'mean'
    }

    enc = TransformerEncoder(encoder_cfg)
    ru_tokenized_s = [['делу', 'время', ',', 'потехе', 'час', '.']]
    embs = enc.get_token_embeddings_batch(ru_tokenized_s)
    len_embds = len(embs[0])
    print(len_embds)
    for i in range(len_embds):
       for emb in embs[0][i]:
        print(emb[0],np.shape(emb[1]))


# WSD_reader

for Russain & English

In [None]:
import json
from collections import defaultdict
import os
import pymorphy2
from transliterate import translit


class WSD_reader():

    def __init__(self, path2data):
        self.path2data = path2data
        self.morph = pymorphy2.MorphAnalyzer()


    def load_coarse_senses(self):
        senses = defaultdict(list)
        with open(self.path2data + '/senses.tsv', 'r', encoding='utf-8' ) as senses_f:
            for line in senses_f:
                amb_word, sense = line.strip().split('\t', 1)
                senses[amb_word].append(sense)
        return dict(senses), list(dict(senses).keys())


    def get_word_classes(self, word, setname):
        file_path = os.path.join(self.path2data, setname, word, 'classes_map.txt')
        with open(file_path) as classes_json_f:
            word_classes = json.load(classes_json_f)
            return word_classes


    def sense2word(self, sense):
        return sense.split('_')[0]


    def get_sk_mappings(self):
        sk_mappings = {}

        with open(self.path2data + '/wn_mappings.tsv') as f:
            for line_idx, line in enumerate(f):
                if line_idx == 0:
                    continue
                word, coarse_sense, syn_offset, syn_name, sk = line.strip().split('\t')
                sk_mappings[sk] = coarse_sense
        return sk_mappings


    def load_instances(self, word, split, setname='WSD_full', mode='regular'):
        instances = []
        with open(self.path2data + '/%s/%s/%s.data.txt' % (setname, word, split),
                  'r', encoding="utf-8") as split_data_f:
            for line in split_data_f:
                word_idx, tokens = line.strip().split('\t')
                word_idx = int(word_idx.lstrip('\ufeff'))
                tokens = tokens.split()
                tokens[word_idx] = self.morph.parse(tokens[word_idx])[0].normal_form
                custom_mappings = {
                    "zaschita": "защита"
                    # Add more custom mappings here as needed
                    }
                if word in custom_mappings:
                    tr_word = custom_mappings[word]
                else:
                    tr_word = translit(word, "ru", reversed=False).lower()

                if tokens[word_idx] != tr_word:
                    tokens[word_idx] = tr_word
                instances.append({'tokens': tokens, 'idx': word_idx, 'class': None})

                assert tokens[word_idx] == tr_word

        word_classes = self.get_word_classes(word, setname)
        mfs_class = word_classes[sorted(list(word_classes.keys()))[0]]   # most frequent sense
        lfs_class = word_classes[sorted(list(word_classes.keys()))[-1]]  # least frequent sense
        with open(self.path2data + '/%s/%s/%s.gold.txt' % (setname, word, split)) as split_gold_f:
            for line_idx, line in enumerate(split_gold_f):
                line_class = line.strip()
                instances[line_idx]['class'] = word_classes[line_class.lstrip('\ufeff')]

        if mode == 'mfs':
            instances = [inst for inst in instances if inst['class'] == mfs_class]
        elif mode == 'lfs':
            instances = [inst for inst in instances if inst['class'] == lfs_class]
        return instances


if __name__ == '__main__':
    data = WSD_reader('/content/drive/MyDrive/RD_project/MERGED_DATA')
    insts = data.load_instances("zaschita", split='train', setname='WSD_full', mode='lfs')
    senses, amb_w = data.load_coarse_senses()
    print(amb_w)

# Creating_1nn_vecs

In [None]:
import json
import os
import sys
import logging
import numpy as np
from time import time
from transliterate import translit
from argparse import Namespace


# Define a function to create the vectors
def create_vecs(args):
    sense_vecs = {}
    n_sents = 0
    for word in ambiguous_words:
        print('Processing \'%s\' ...' % word)
        word2load = word
        for inst_idx, inst in enumerate(data.load_instances
         (translit(word2load, "ru", reversed=True), split='train', setname=args.dataset_id)):
            n_sents += 1
            if encoder.get_num_subtokens(inst['tokens']) >= args.max_seq_len:
                print('%s:%d exceeds max_seq_len (%d).' % (word, inst_idx, args.max_seq_len))
                continue
            try:
                inst_vecs = encoder.token_embeddings([inst['tokens']])[0][0]
            except:
                print('ERROR: %s:%d' % (word, inst_idx + 1))
                continue

            assert inst_vecs[inst['idx']][0] == word  # sanity check

            word_vec = inst_vecs[inst['idx']][1]
            word_cls = inst['class']

            try:
                sense_vecs[word_cls]['vecs_sum'] += word_vec
                sense_vecs[word_cls]['vecs_num'] += 1
            except KeyError:
                sense_vecs[word_cls] = {'vecs_sum': word_vec, 'vecs_num': 1}

    print('Writing Sense Vectors to %s ...' % args.out_path)
    output_file = os.path.join(args.out_path, args.dataset_id, args.nlm_id + '.txt')
    output_directory = os.path.dirname(output_file)
    os.makedirs(output_directory, exist_ok=True)
    with open(output_file , 'w') as vecs_f:
        for sense, vecs_info in sense_vecs.items():
            vec = vecs_info['vecs_sum'] / vecs_info['vecs_num']
            vec_str = ' '.join([str(round(v, 6)) for v in vec.tolist()])
            vecs_f.write('%s %s\n' % (sense, vec_str))


if __name__ == '__main__':
    args = Namespace(
        nlm_id= 'bert-base-multilingual-cased',  # choices: ['DeepPavlov/rubert-base-cased','bert-base-multilingual-cased']
        dataset_id= 'WSD_full',
        max_seq_len=512,
        subword_op='mean',
        layers='-1 -2 -3 -4',
        layer_op='sum',
        out_path='/content/drive/MyDrive/RD_project/ru_mBERT_f_ex'
    )

    args.layers = [int(n) for n in args.layers.split(' ')]

    encoder_cfg = {
        'model_name_or_path': args.nlm_id,
        'min_seq_len': 0,
        'max_seq_len': args.max_seq_len,
        'layers': args.layers,
        'layer_op': 'sum',
        'subword_op': 'mean'
    }

    print('Loading NLM ...')
    encoder = TransformerEncoder(encoder_cfg)
    data = WSD_reader('/content/drive/MyDrive/RD_project/MERGED_DATA')
    senses, ambiguous_words = data.load_coarse_senses()
    create_vecs(args)

# Evaluation

In [11]:
import numpy as np


class VSM(object):
    def __init__(self, vecs_path, normalize=True):
        self.labels = []
        self.vectors = np.array([], dtype=np.float32)
        self.indices = {}
        self.ndims = 0
        self.load_txt(vecs_path)
        if normalize:
            self.normalize()

    def load_txt(self, vecs_path):
        self.vectors = []
        with open(vecs_path, encoding='utf-8') as vecs_f:
            for line in vecs_f:
                elems = line.split()
                self.labels.append(elems[0])
                self.vectors.append(np.array(list(map(float, elems[1:])), dtype=np.float32))
        self.vectors = np.vstack(self.vectors)
        self.indices = {l: i for i, l in enumerate(self.labels)}
        self.ndims = self.vectors.shape[1]

    def normalize(self):
        self.vectors = (self.vectors.T / np.linalg.norm(self.vectors, axis=1)).T  # L2

    def get_vec(self, label):
        return self.vectors[self.indices[label]]

    def similarity(self, label1, label2):
        v1 = self.get_vec(label1)
        v2 = self.get_vec(label2)
        return np.dot(v1, v2).tolist()

    def most_similar_vec(self, vec, topn=100):
        sims = np.dot(self.vectors, vec)
        sims_ = sims.tolist()
        r = []
        for top_i in sims.argsort().tolist()[::-1][:topn]:
            r.append((self.labels[top_i], sims_[top_i]))
        return r

    def sims(self, vec):
        return np.dot(self.vectors, np.array(vec)).tolist()

if __name__ == "__main__":
   vsm = VSM("/content/drive/MyDrive/RD_project/ru_deepPavlov_f_ex/WSD_full/DeepPavlov/rubert-base-cased.txt")
   vectors = vsm.load_txt('/content/drive/MyDrive/RD_project/ru_deepPavlov_f_ex/WSD_full/DeepPavlov/rubert-base-cased.txt')

In [None]:
import argparse
import json
from time import time
from datetime import datetime
from collections import defaultdict
from collections import Counter
from argparse import Namespace
import numpy as np
from sklearn.metrics import f1_score, precision_score, recall_score
from transliterate import translit


custom_mappings = {
    "засчита": "защита"
    # Add more custom mappings here as needed
}

def eval_nn(args):
    all_sense_preds = defaultdict(list)
    all_results = defaultdict(list)
    GOLD_test = defaultdict(list)
    PREDICTIONS = defaultdict(list)
    data = WSD_reader('/content/drive/MyDrive/RD_project/MERGED_DATA')
    senses, amb_w = data.load_coarse_senses()

    # matching test instances
    for amb_word in amb_w:
        print('Evaluating %s ...' % amb_word)
        for inst_idx, test_inst in enumerate(data.load_instances(translit(amb_word, "ru", reversed=True), split='test', setname=args.dataset_id, mode=args.mode)):
            gold_sense = test_inst['class']

            if encoder.get_num_subtokens(test_inst['tokens']) >= args.max_seq_len:
                print('%s:%d exceeds max_seq_len (%d).' % (amb_word, inst_idx, args.max_seq_len))

                preds = [('NULL', -1)]
                all_sense_preds[gold_sense].append(preds)
                all_results[amb_word].append((test_inst, preds))
                continue

            inst_vecs = encoder.token_embeddings([test_inst['tokens']])[0][0]

            assert inst_vecs[test_inst['idx']][0] == amb_word  # sanity check

            amb_word_vec = inst_vecs[test_inst['idx']][1]
            amb_word_vec = amb_word_vec / np.linalg.norm(amb_word_vec)

            preds = senses_vsm.most_similar_vec(amb_word_vec, topn=None)

            # filter preds for target word
            word =  translit(amb_word, "ru", reversed=True)
            if word in custom_mappings:
                word = custom_mappings[word]
            preds = [(sense, score) for sense, score in preds if sense.split('_')[0]
                     == word]

            all_sense_preds[gold_sense].append(preds)
            all_results[amb_word].append((test_inst, preds))
            GOLD_test[amb_word].append(gold_sense)
            PREDICTIONS[amb_word].append(max(preds, key=lambda x: x[1])[0])
    # computing accuracies
    all_senses_accs = {}
    all_words_accs  = {}
    for amb_word in senses:
        n_word_correct, n_word_insts = 0, 0
        all_gold, all_pred = [], []
        for sense in senses[amb_word]:
            sense_preds = all_sense_preds[sense]
            if len(sense_preds) == 0:
                continue
            n_sense_correct = sum([1 for preds in sense_preds if preds[0][0] == sense])
            sense_acc = n_sense_correct / len(sense_preds)
            all_senses_accs[sense] = sense_acc

            n_word_correct += n_sense_correct
            n_word_insts += len(sense_preds)

            all_pred += [preds[0][0] for preds in sense_preds]
            all_gold += [sense] * len(sense_preds)

        word_recall_scores = recall_score(all_gold, all_pred, average=None)
        word_recall_MFS = word_recall_scores[0]
        word_recall_LFS = word_recall_scores[-1]

        word_precision_scores = precision_score(all_gold, all_pred, average=None, zero_division=0)
        word_precision_MFS = word_precision_scores[0]
        word_precision_LFS = word_precision_scores[-1]

        all_words_accs[amb_word] = n_word_correct / n_word_insts
    # writing summary
    if args.mode != 'regular':
        summary_path = '/content/drive/MyDrive/RD_project/results/%s/1nn/%s/summary_f1.%s.csv' % (args.dataset_id, args.nlm_id, args.mode)
    else:
        summary_path = '/content/drive/MyDrive/RD_project/results/%s/1nn/%s/summary_f1.csv' % (args.dataset_id, args.nlm_id)

    directory = os.path.dirname(summary_path)
    # Create the directory if it doesn't exist
    if not os.path.exists(directory):
        os.makedirs(directory)

    with open(summary_path, 'w') as summary_f:
        summary_f.write('word,sense,n_insts,f1_micro,f1_weighted,f1_macro\n')
        for amb_word in senses:
            n_word_insts = 0
            for i, sense in enumerate(senses[amb_word]):
                if sense not in all_senses_accs:
                    continue
                word_f1_macro = np.round(f1_score(GOLD_test[amb_word],
                                                  PREDICTIONS[amb_word],
                                                  average='macro'),3)
                word_f1_weighted = np.round(f1_score(GOLD_test[amb_word],
                                                     PREDICTIONS[amb_word],
                                                     average='weighted'),3)
                word_f1_none = np.round(f1_score(GOLD_test[amb_word],
                                                 PREDICTIONS[amb_word],
                                                 average=None),3)
                sense_acc = all_senses_accs[sense]
                s_f1_macro = list(word_f1_none)[i]
                n_sense_insts = len(all_sense_preds[sense])
                n_word_insts += n_sense_insts
                summary_f.write('%s,%s,%d,%f,%f,%f\n' % (amb_word,sense,
                                                         n_sense_insts,
                                                         sense_acc,
                                                         sense_acc,
                                                         s_f1_macro))

            word_acc = all_words_accs[amb_word]
            summary_f.write('%s,%s,%d,%f,%f,%f\n' % (amb_word,'ALL',
                                                     n_word_insts,word_acc,
                                                     word_f1_weighted,
                                                     word_f1_macro))


    # store full results for further analysis
    for amb_word in all_results:

        if args.mode != 'regular':
            word_results_path = '/content/drive/MyDrive/RD_project/results/%s/1nn/%s/%s.%s.jsonl' % (args.dataset_id, args.nlm_id, amb_word, args.mode)
        else:
            word_results_path = '/content/drive/MyDrive/RD_project/results/%s/1nn/%s/%s.jsonl' % (args.dataset_id, args.nlm_id, amb_word)

        with open(word_results_path, 'w') as word_results_f:
            for inst_idx, (test_inst, inst_matches) in enumerate(all_results[amb_word]):
                jsonl_results = {'idx': inst_idx, 'matches': inst_matches, 'gold': test_inst['class'], 'tokens': test_inst['tokens']}
                word_results_f.write('%s\n' % json.dumps(jsonl_results,ensure_ascii=False, sort_keys=True))


if __name__ == '__main__':
    args = Namespace(
        nlm_id='DeepPavlov/rubert-base-cased',      #choice = ['bert-base-multilingual-cased', 'DeepPavlov/rubert-base-cased']
        dataset_id='WSD_full',
        sv_path ='/content/drive/MyDrive/RD_project/ru_deepPavlov_f_ex/WSD_full/DeepPavlov/rubert-base-cased.txt',
        subword_op='mean', #choices=['mean', 'first', 'sum']
        layers='-1 -2 -3 -4',
        max_seq_len = 512,
        layer_op='sum',  # choices=['mean', 'first', 'sum'])
        mode = 'regular' # choices=['regular', 'mfs', 'lfs']
    )

    args.layers = [int(n) for n in args.layers.split(' ')]
    # catch mismatched nlms/sense_vecs
    if args.nlm_id.split('/')[-1] not in args.sv_path.split('/')[-1].split('.'):
        logging.fatal("Provided sense vectors don't seem to match nlm_id (%s)." % args.nlm_id)
        raise SystemExit('Fatal Error.')

    encoder_cfg = {
        'model_name_or_path': args.nlm_id,
        'min_seq_len': 0,
        'max_seq_len': args.max_seq_len,
        'layers': args.layers,
        'layer_op': args.layer_op,
        'subword_op': args.subword_op
    }

    print('Loading NLM ...')
    encoder = TransformerEncoder(encoder_cfg)

    print('Loading VSM ...')
    senses_vsm = VSM(args.sv_path, normalize=True)

    eval_nn(args)