In [1]:
from google.colab import drive
drive.mount('/gdrive', force_remount=True)

Mounted at /gdrive


### Можно почитать:

выравнивание деревьев: https://research.utwente.nl/en/publications/normalized-alignment-of-dependency-trees-for-detecting-textual-en

нечеткое сравнение деревьев: https://www.researchgate.net/publication/236164307_Kto_vinovat_i_gde_sobaka_zaryta_Metod_validacii_otvetov_na_osnove_netocnogo_sravnenia_semanticeskih_grafov_v_voprosno-otvetnoj_sisteme_Who_is_to_blame_and_Where_the_dog_is_buried_Method_of_answers_val

In [2]:
import os
import numpy as np
import json
import csv
import math

from scipy.spatial import distance as dst

os.chdir('/gdrive/My Drive/Грант 19-20/')

In [3]:
class State:
    def __init__(self, ftr, idx):
        self.ftr = ftr # вектор признаков узла
        self.isFinal = False # для последнего слова во фразе
        self.nextStates = []
        self.idx = idx
        self.best_token = None # на будущее для сравнения графов

def print_state(state, print_vectors=True):
    nextStatesIdxs = [s.idx for s in state.nextStates]
    if print_vectors:
        print("State: idx={} isFinal={} nextStatesIdxs={} ftr={} ".format(
            state.idx, state.isFinal, nextStatesIdxs, state.ftr))
    else:
        print("State: idx={} isFinal={} nextStatesIdxs={} ".format(
            state.idx, state.isFinal, nextStatesIdxs))
    
    
def add_children(v, prevState, features, parents, visited, graph):
    if v in visited:  # Если вершина уже посещена, выходим
        return
    visited.add(v)  # Посетили вершину v
    state = State(features[v-1], v)
    # state.nextStates.append(state)  # добавили петлю
    prevState.nextStates.append(state)
    graph.append(state)
    children = [i+1 for i, item in enumerate(parents) if item == v]
    for child in children:  # Все смежные с v вершины
        if not child in visited:                
            add_children(child, state, features, parents, visited, graph)
    return graph

def get_tree(features, parents): # формирование графа из по признакам (вершинам) и спискам родителей вершин
    startState = State(0, 0)
    graph = [startState, ]
    prevState = startState
    visited = set()
    main_word = parents.index(0)+1
    add_children(main_word, prevState, features, parents, visited, graph)
    return graph

def print_graph(graph, print_vectors=True):
    print("*** DEBUG. GRAPH ***")
    np.set_printoptions(formatter={'float': '{: 0.1f}'.format})
    for state in graph:
        print_state(state, print_vectors)
    print("*** END DEBUG. GRAPH ***")

Тест на "игрушечном" графе

In [4]:
parents = [0, 3, 1, 6, 6, 1, 8, 1]
features = [[1],[2],[3],[4],[5],[6],[7],[8]]
graph_test = get_tree(features, parents)
print_graph(graph_test)

*** DEBUG. GRAPH ***
State: idx=0 isFinal=False nextStatesIdxs=[1] ftr=0 
State: idx=1 isFinal=False nextStatesIdxs=[3, 6, 8] ftr=[1] 
State: idx=3 isFinal=False nextStatesIdxs=[2] ftr=[3] 
State: idx=2 isFinal=False nextStatesIdxs=[] ftr=[2] 
State: idx=6 isFinal=False nextStatesIdxs=[4, 5] ftr=[6] 
State: idx=4 isFinal=False nextStatesIdxs=[] ftr=[4] 
State: idx=5 isFinal=False nextStatesIdxs=[] ftr=[5] 
State: idx=8 isFinal=False nextStatesIdxs=[7] ftr=[8] 
State: idx=7 isFinal=False nextStatesIdxs=[] ftr=[7] 
*** END DEBUG. GRAPH ***


Работа с реальными предобработанными ответами

In [5]:
with open('./answers_udify.json', 'r') as file_read:
    data = json.load(file_read)

all_vectors = [d['RuBERT'] for d in data]
all_parents = [d['parents'] for d in data]
               
print(len(all_vectors))

n_questions = len(all_vectors)//5

130


In [6]:
answer_graphs = []
for k in range(n_questions):
    answer_graphs_temp = []
    for i in range(5):
        graph = get_tree(all_vectors[k*5+i], all_parents[k*5+i])
        answer_graphs_temp.append(graph)
    answer_graphs.append(answer_graphs_temp)

Заготовки для алгоритма сравнения графов

In [7]:
# Вычисляем (евклидово) расстояние между вершинами

def eucl_distance(v1, v2): # нужно для игрушечного примера
    x = v1.ftr 
    y = v2.ftr
    return math.sqrt(np.sum(pow(np.array(x)-np.array(y),2)))

def cosine_distance(v1, v2):
    x = v1.ftr 
    y = v2.ftr
    return dst.cosine(x,y)

In [8]:
def match_graphs_eucl(graph1, graph2):
    v1 = graph1[0]
    v2 = graph2[0]
    full_score = match_children_eucl(v1,v2)
    return full_score/(len(graph1)+len(graph2))

In [9]:
def match_graphs_cosine(graph1, graph2):
    v1 = graph1[1]
    v2 = graph2[1]
    depth = 1
    full_score = match_children_cosine(v1,v2,depth)
    return full_score/(1+0.05*(len(graph1)+len(graph2)))

In [10]:
def match_children_eucl(v1, v2):

    children1 = [c1.idx for c1 in v1.nextStates]
    children2 = [c2.idx for c2 in v2.nextStates]
    cur_score = eucl_distance(v1, v2)

    if (len(children1) != 0) & (len(children2) != 0):
        v2_rest_children = v2.nextStates
        for v3 in v1.nextStates:
            if len(v2_rest_children) == 0:
                cur_score += eucl_distance(v3, v1)
            else:
                subscores = [match_children_eucl(v3,v4) for v4 in v2_rest_children]
                arg_min = np.argmin(subscores)
                cur_score += subscores[arg_min]
                v2_rest_children.pop(arg_min)
        
    if (len(children1) == 0) & (len(children2) != 0): 
        subscores = [match_children_eucl(v1,v3) for v3 in v2.nextStates]
        cur_score += np.sum(subscores)*0.01

    if (len(children1) != 0) & (len(children2) == 0):
        subscores = [match_children_eucl(v3, v2) for v3 in v1.nextStates]
        cur_score += np.sum(subscores)*5
    
    return cur_score

In [11]:
def match_children_cosine(v1, v2, depth):

    children1 = [c1.idx for c1 in v1.nextStates]
    children2 = [c2.idx for c2 in v2.nextStates]
    cur_score = cosine_distance(v1, v2)/depth
    depth += 1

    if (len(children1) != 0) & (len(children2) != 0):
        v2_rest_children = v2.nextStates
        for v3 in v1.nextStates:
            if len(v2_rest_children) == 0:
                cur_score += cosine_distance(v3, v1)
            else:
                subscores = [match_children_cosine(v3,v4,depth) for v4 in v2_rest_children]
                arg_min = np.argmin(subscores)
                cur_score += subscores[arg_min]
                v2_rest_children.pop(arg_min)
        
    if (len(children1) == 0) & (len(children2) != 0): 
        subscores = [match_children_cosine(v1,v3,depth) for v3 in v2.nextStates]
        cur_score += np.sum(subscores)*0.6

    if (len(children1) != 0) & (len(children2) == 0):
        subscores = [match_children_cosine(v3, v2,depth) for v3 in v1.nextStates]
        cur_score += np.sum(subscores)*2
    
    return cur_score

In [12]:
print(match_graphs_eucl(graph_test,graph_test))

0.0


Косинусная мера

In [13]:
score_table_cosine = np.zeros((n_questions, 5))
for i in range(n_questions):
    for j in range(5):
        i_j_score = match_graphs_cosine(answer_graphs[i][0],answer_graphs[i][j])
        score_table_cosine[i][j] = i_j_score

print(score_table_cosine)
print("\n")
print(np.mean(score_table_cosine, axis=0))

[[ 0.0  0.3  0.3  0.6  0.9]
 [ 0.0  0.5  0.6  0.9  0.9]
 [ 0.0  0.6  0.6  0.7  0.5]
 [ 0.0  0.4  0.7  0.8  0.5]
 [ 0.0  1.2  0.5  0.9  1.1]
 [ 0.0  1.1  0.7  1.1  1.0]
 [ 0.0  0.6  0.6  0.8  0.8]
 [ 0.0  0.7  0.5  0.5  0.7]
 [ 0.0  0.8  0.7  0.4  0.8]
 [ 0.0  0.4  1.1  1.4  1.0]
 [ 0.0  0.9  1.2  1.5  6.0]
 [ 0.0  0.6  0.7  0.6  1.0]
 [ 0.0  1.0  0.8  1.2  0.9]
 [ 0.0  1.2  1.1  3.0  0.9]
 [ 0.0  0.9  0.9  0.6  0.7]
 [ 0.0  0.5  0.6  0.5  2.4]
 [ 0.0  0.8  0.9  0.9  1.0]
 [ 0.0  0.1  0.3  0.7  0.6]
 [ 0.0  1.0  1.2  1.3  1.3]
 [ 0.0  0.8  0.7  0.8  1.2]
 [ 0.0  0.8  0.7  1.0  1.5]
 [ 0.0  0.7  1.1  1.0  3.6]
 [ 0.0  0.6  0.7  0.9  3.7]
 [ 0.0  0.7  0.7  0.5  2.0]
 [ 0.0  0.8  1.0  1.3  0.8]
 [ 0.0  0.7  1.0  1.4  1.0]]


[ 0.0  0.7  0.8  1.0  1.4]
