In [None]:
import json
import xml.etree.ElementTree as ET
from om.ont import tokenize
from py_stringmatching import SoftTfIdf, JaroWinkler
from itertools import chain
from pymagnitude import Magnitude
import numpy as np
from transformers import AutoTokenizer, AutoModel
from transformers import BertTokenizer, BertModel
import pandas as pd
import torch
from tqdm.auto import tqdm

In [None]:
def get_xml_paths(base_path, node, out):
    current_label = node.tag.split('}')[-1]

    if len(node) <= 0:
        out.append(base_path + current_label)
        for attribute in node.attrib:
            out.append(base_path + current_label + '.' + attribute)

    for child in node:
        get_xml_paths(base_path + current_label + '.', child, out)


def get_json_paths(base_path, node, out):
    if type(node) is list:
        for child in node:
            get_json_paths(base_path, child, out)

    elif type(node) is dict:
        for child in node.keys():
            get_json_paths(base_path + child + '.', node[child], out)
    else:
        out.append(base_path[:-1])


In [None]:
root = ET.parse('./nat/odatis.xml').getroot()
source = []
get_xml_paths('', root, source)

with open('./nat/aeris.json', 'r') as f:
    data = json.load(f)

target = []
get_json_paths('', data, target)

print(f'source paths count: {len(source)}, target paths count: {len(target)}')

In [None]:
data = []

In [None]:
slist = []

for q in chain(source, target):
    slist.append(list(map(str.lower, tokenize(q))))

soft_metric = SoftTfIdf(slist, sim_func=JaroWinkler().get_raw_score, threshold=0.8)

In [None]:
def jaccard(a, b):
    return len(a.intersection(b)) / len(a.union(b))


for source_path in source:
    n1 = list(map(str.lower, tokenize(source_path)))
    s1 = set(n1)
    for target_path in target:
        n2 = list(map(str.lower, tokenize(target_path)))
        s2 = set(n2)

        data.append(['jaccard', source_path, target_path, jaccard(s1, s2)])
        data.append(['soft_tf_idf', source_path, target_path, soft_metric.get_raw_score(n1, n2)])


<http://magnitude.plasticity.ai/glove/medium/glove.840B.300d.magnitude>

In [None]:
glove = Magnitude("embs/glove.840B.300d.magnitude")

In [None]:
def get_vectors(paths):
    elements = []
    vectors = []

    for path in paths:
        unique_tokens = list(set(map(str.lower, tokenize(path))))
        elements.append(path)
        vectors.append(glove.query(unique_tokens).mean(0, keepdims=True))

    return elements, np.concatenate(vectors)


def get_np_similarity_matrix(v1, v2):
    norm1 = np.linalg.norm(v1, axis=1, keepdims=True)
    norm2 = np.linalg.norm(v2, axis=1, keepdims=True)

    dot_prod = v1 @ v2.T
    norm_matrix = norm1 * norm2.T

    return dot_prod / norm_matrix



In [None]:
source_paths, source_vectors = get_vectors(source)
target_paths, target_vectors = get_vectors(target)

similarity = get_np_similarity_matrix(source_vectors, target_vectors)
for i, source_path in enumerate(source_paths):
    for j, target_path in enumerate(target_paths):
        data.append(['glove', source_path, target_path, similarity[i, j]])


In [None]:

def encode_with_language_model(paths, tokenizer, model):
    tokenized_paths = []

    for path in paths:
        tokenized_paths.append(' '.join(map(str.lower, tokenize(path))))

    encoded_input = tokenizer(tokenized_paths, return_tensors='pt', padding=True)

    with torch.no_grad():
        return model(**encoded_input)['pooler_output']


def get_torch_similarity_matrix(v1, v2):
    norm1 = v1.norm(dim=1, keepdim=True)
    norm2 = v2.norm(dim=1, keepdim=True)

    dot_prod = v1 @ v2.t()
    norm_matrix = norm1 * norm2.t()

    return dot_prod / norm_matrix


bert_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert = BertModel.from_pretrained("bert-base-uncased")
bert.eval()

source_vectors = encode_with_language_model(source_paths, bert_tokenizer, bert)
target_vectors = encode_with_language_model(target_paths, bert_tokenizer, bert)

similarity = get_torch_similarity_matrix(source_vectors, target_vectors)

for i, source_path in enumerate(source_paths):
    for j, target_path in enumerate(target_paths):
        data.append(['bert', source_path, target_path, similarity[i, j].item()])

In [None]:
mini_lm_tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
mini_lm = AutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
mini_lm.eval()

source_vectors = encode_with_language_model(source_paths, mini_lm_tokenizer, mini_lm)
target_vectors = encode_with_language_model(target_paths, mini_lm_tokenizer, mini_lm)

similarity = get_torch_similarity_matrix(source_vectors, target_vectors)

for i, source_path in enumerate(source_paths):
    for j, target_path in enumerate(target_paths):
        data.append(['all-MiniLM-L6-v2', source_path, target_path, similarity[i, j].item()])

In [None]:
def metrics(correct, tries, total):
    precision = 0 if tries == 0 else correct / tries
    recall = 0 if total == 0 else correct / total
    fm = 2 * (precision * recall) / (1 if precision + recall == 0 else precision + recall)
    return precision, recall, fm

In [None]:
similarity_table = pd.DataFrame(data, columns=['name', 'e1', 'e2', 'sim'])
similarity_table.to_csv('./nat/path_sim.csv', index=False, encoding='utf-8')

In [None]:
similarity_table.head()

In [None]:
raw_alignments = pd.read_csv('./nat/odatis-aeris.csv')

alignments = set()

for i, row in raw_alignments.iterrows():
    alignments.add((row['key'], row['match']))

In [None]:

techniques = set(similarity_table['name'])
similarity_map = dict()

for threshold in tqdm(np.arange(0, 1.0, 0.05)):

    for name in techniques:
        filtered_rows = similarity_table.loc[(similarity_table['name'] == name) & (similarity_table['sim'] >= threshold)]

        predicted = set()
        for i, row in filtered_rows.iterrows():
            predicted.add((row['e1'], row['e2']))

        predicted_count = len(predicted)
        correct_count = len(predicted.intersection(alignments))
        total = len(alignments)

        if name not in similarity_map:
            similarity_map[name] = []

        similarity_map[name].append((threshold,) + metrics(correct_count, predicted_count, total))


In [None]:
metrics = []
for k in similarity_map:
    for t in similarity_map[k]:
        metrics.append([k] + list(t))

df = pd.DataFrame(metrics, columns=['name', 'threshold', 'precision', 'recall', 'f-measure'])
df.to_csv('./nat/metrics.csv')