# Эвристики для расстояния Левенштейна
Расстояние Левенштейна двух последовательностей символов -- это количество вставок, замен и уалений, которые нужно произвести над отдельными символами первой последовательности, чтобы получить вторую. Основные понятия и базовый способ подсчета через динамическое программирование описан например [здесь](https://ru.wikipedia.org/wiki/%D0%A0%D0%B0%D1%81%D1%81%D1%82%D0%BE%D1%8F%D0%BD%D0%B8%D0%B5_%D0%9B%D0%B5%D0%B2%D0%B5%D0%BD%D1%88%D1%82%D0%B5%D0%B9%D0%BD%D0%B0) В этом ноутбуке несколько примеров эвристик для его подсчета. Начнем со вспомогательного кода для визуализации состояния вычислений

In [1]:
from typing import List, Tuple, Dict
import matplotlib.pyplot as plt

def edit_distance_state(s: str, t: str, last: List[Tuple[int, int]], values: List[Dict[Tuple[int, int], int]]):
    n = len(s) + 2
    m = len(t) + 2
    
    grid = [['*' for j in range(m)] for i in range(n)]
    grid[0][0] = ''
    grid[1][0] = '#'
    grid[0][1] = '#'
    
    for i in range(n - 2):
        grid[i + 2][0] = s[i]
    for i in range(m - 2):
        grid[0][i + 2] = t[i]
    
    for (i, j), value in values.items():
        grid[i + 1][j + 1] = str(value)
    
    scaling_factor = max(n, m) / 20
    #matplotlib code
    fig = plt.figure(figsize=(m * scaling_factor, n * scaling_factor))

    ax = fig.add_axes([0, 0, 1, 1])
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    for name, spine in ax.spines.items():
        spine.set_visible(False)
        spine.set_visible(False)

    for i in range(m + 1):
        ax.plot([0, n], [i, i], color='black')
    for i in range(n + 1):
        ax.plot([i, i], [0, m], color='black')
        
    for (i, j), value in values.items():
        i = i + 1
        j = m - j - 2
        ax.fill([i, i, i + 1, i + 1, i], [j, j + 1, j + 1, j, j], color='lightcoral')
    for x, y in last:
        x += 1
        y = m - y - 2
        ax.fill([x, x, x + 1, x + 1, x],
                [y, y + 1, y + 1, y, y], color='cyan')
    for i in range(n):
        for j in range(m):
            ax.text(i + 0.5, m - j - 1 + 0.5, grid[i][j], ha='center', va='center', fontsize=20)
        
    plt.close(fig)
    return fig

Следующий код содержит базовую рекурсивную реализацию расстояния Левенштейна со сложностью по времени и памяти $\mathcal{O}(nm)$, где $n, m$ -- длины последовательностей.

In [2]:
def levenstein(i, j, s, t, cache, states = None):
    if i == 0 or j == 0:
        return max(i, j)
    if (i, j) in cache:
        return cache[(i, j)]
        
    cache[(i, j)] = min(
        levenstein(i - 1, j - 1, s, t, cache, states)
             + (0 if s[i - 1] == t[j - 1] else 1),
        levenstein(i - 1, j, s, t, cache, states) + 1,
        levenstein(i, j - 1, s, t, cache, states) + 1)
    # Visualization code
    if states is not None:
        states.append(edit_distance_state(s, t, [(i, j)], cache))
    ####################
    return cache[(i, j)]

In [3]:
s = 'canon in d'
t = 'cant inn d'
states = []
cache = dict()
levenstein(len(s), len(t), s, t, cache, states)

3

В последующей визуализации красным обозначены значений, которые вычислены на текущем шаге, голубым -- значение, вычисленное последним.

In [4]:
from interactive_visualization.animation_utils import animate_list
animate_list(states, play=True);

HBox(children=(Play(value=0, interval=200), Button(description='Prev', style=ButtonStyle()), Button(descriptio…

interactive(children=(IntSlider(value=0, description='step', max=99), Output()), _dom_classes=('widget-interac…

В следующем коде сделана простая модификация: если разница длин превосходит некий порог $k$, то вычислений не производится. В таком варианте мы вычислим точно расстояние только если оно не превосходит $k$.

In [5]:
def levenstein_bounded(i, j, s, t, k, cache, states = None):
    if i == 0 or j == 0:
        return max(i, j)
    if (i, j) in cache:
        return cache[(i, j)]
    if abs(i - j) >= k:
        return k
        
    cache[(i, j)] = min(
        levenstein_bounded(i - 1, j - 1, s, t, k, cache, states)
             + (0 if s[i - 1] == t[j - 1] else 1),
        levenstein_bounded(i - 1, j, s, t, k, cache, states) + 1,
        levenstein_bounded(i, j - 1, s, t, k, cache, states) + 1)
    # Visualization code
    if states is not None:
        states.append(edit_distance_state(s, t, [(i, j)], cache))
    ####################
    return cache[(i, j)]

In [6]:
states = []
cache = dict()
levenstein_bounded(len(s), len(t), s, t, 3, cache, states)

3

В этом варианте сложность по времени и памяти $\mathcal{O}(\min(n, m)k)$

In [7]:
animate_list(states, play=True);

HBox(children=(Play(value=0, interval=200), Button(description='Prev', style=ButtonStyle()), Button(descriptio…

interactive(children=(IntSlider(value=0, description='step', max=43), Output()), _dom_classes=('widget-interac…

Следующая модификация представляет собой онлайн вариант: вычислительная сложность $\mathcal{O}(nm)$, сложность по памяти -- $\mathcal{O}(m)$, кроме вторую последовательность можно обрабатывать в онлайн режиме.

In [8]:
def levenstein_online(s, t, states = None):
    distances = [i for i in range(len(t))]
    
    for k, c in enumerate(s):
        # initialization with deletion of current symbol
        new_distances = [d + 1 for d in distances]
        # Subs or corrects
        for i, d in enumerate(new_distances[1:]):
            distance_sub = distances[i] + (0 if c == t[i] else 1)
            new_distances[i + 1] = min(distance_sub, d)
        # Insertions
        for i, d in enumerate(new_distances[:-1]):
            new_distances[i + 1] = min(new_distances[i + 1],
                                       new_distances[i] + 1)
            
        # Visualization code
        if states is not None:
            values = dict()
            for j, x in enumerate(distances):
                values[(k, j + 1)] = x
                values[(k + 1, j + 1)] = new_distances[j]
            last = [(k + 1, j + 1) for j in range(len(distances))]
            states.append(edit_distance_state(s, t, last, values))
        ####################
        
        distances = new_distances
        
    return distances[-1]

In [9]:
states = []
levenstein_online(s, t, states)

4

In [10]:
animate_list(states, play=True, interval=500);

HBox(children=(Play(value=0, interval=500), Button(description='Prev', style=ButtonStyle()), Button(descriptio…

interactive(children=(IntSlider(value=0, description='step', max=9), Output()), _dom_classes=('widget-interact…

Комбинирую последние два метода можно получить онлайн алгоритм со сложностью $\mathcal{O}(min(n, m)k)$ по времени и $\mathcal{O}(k)$ по памяти, этот вариант очень схож с эвристикой **beam search**, используемой в распознавании речи

In [11]:
def levenstein_beam_search(s, t, k, max_tokens=5, states = None):
    tokens = [(0, 0)] # position in t, value
    
    best_value = 0
    for k, c in enumerate(s):
        new_tokens = dict()
        
        cur = 0
        best_value += 1
        tokens_bak = [x for x in tokens]
        while cur < len(tokens):
            pos, value = tokens[cur]
            cur += 1
            if pos not in new_tokens or new_tokens[pos] > value + 1 :
                new_tokens[pos] = value + 1
            if pos < len(t):
                cost = 1 if c != t[pos] else 0
                if value + cost < best_value + k:
                    if pos + 1 not in new_tokens or new_tokens[pos + 1] > value + cost:
                        new_tokens[pos + 1] = value + cost       
        
        new_tokens = list(sorted(new_tokens.items(), key=lambda x: x[1]))[:max_tokens]
        new_tokens = sorted(new_tokens)
        for i, (pos, value) in enumerate(new_tokens):
            if i == len(new_tokens) - 1:
                break
            next_pos = new_tokens[i + 1][0]
            new_tokens[i + 1] = (next_pos, min(new_tokens[i + 1][1], value + next_pos - pos))
        pos, value = new_tokens[-1]
        while pos + 1 <= len(t) and value + 1 <= best_value + k:
            new_tokens.append((pos + 1, value + 1))
            pos += 1
            value += 1
        new_tokens = list(sorted(new_tokens, key=lambda x: x[1]))[:max_tokens]
        new_tokens = sorted(new_tokens)
        
        # Visualization code
        if states is not None:
            values = dict()
            for j, x in tokens_bak:
                values[(k, j)] = x
            last = []
            for j, x in new_tokens:
                values[(k + 1, j)] = x
                last.append((k + 1, j))
            states.append(edit_distance_state(s, t, last, values))
        ####################
        tokens = new_tokens

                
    for pos, value in tokens:
        if pos == len(s):
            return value
        
    return k

In [12]:
states = []
levenstein_beam_search(s, t, 3, 5, states)

3

In [13]:
animate_list(states, play=True)

HBox(children=(Play(value=0, interval=200), Button(description='Prev', style=ButtonStyle()), Button(descriptio…

interactive(children=(IntSlider(value=0, description='step', max=9), Output()), _dom_classes=('widget-interact…

<function interactive_visualization.animation_utils.animation.step_slice(lst, step)>

## Дерево гипотез
Во всех рассматриваемых вариантах мы делали только вычисление расстояния Левенштейна, но не самих операций, которые нужно проделать, чтобы получить вторую строку из первой. Для восстановления этих операций в онлайн вариантах необходимо как-то хранить предыдущие вычисления. Один из вариантов -- хранить все промежуточные вычисления и делать это обычным способом. К сожалению с таким способом расход памяти будет $\mathcal{O}(nk)$, а не $\mathcal{k}$. Альтернативно можно хранить только те промежуточные гипотезы, которые участвовали в цепочки создания тех, которые рассматриваются сейчас. Это можно осуществить с помощью **дерева гипотез** -- структуры данных, поддерживающей следующие операции:
* Инициализировать дерево одним активным узлом, соответствующим пустой гипотезе
* Породить новый узел из существующей гипотезы и пометить его активным
* Пометить узел как неактивный
* Удалить все неактивные узлы, которые не содержатся на пути от корня к некоторому активному узлу

В зависимости от задачи узлы дерева гипотез могут хранить дополнительную информацию о том, как тот или иной узел был порожден, например в нашем случае необходимо будет хранить позиции в строках и при желании операцию, с помощью которой он был получен (одна операция -- один узел/переход). Идея активных/неактивных узлов заключается в том, что активными мы помечаем те гипотезы, которые рассматриваются на текузей итерации, а неактивные -- созданные на предыдущих итерациях гипотезы, которые необходимы нам для восстановления действий, которое обычно можно получить пройдя от узла лучшей гипотезы к корню.

Оказывается, что для эффективного менеджмента ненужных гипотез достаточно использовать стандартный сборщик мусора по количеству ссылок. Если в узле хранится ссылка узла-предка, а также активные узлы хранятся непосредственно, то ненужные неактивные гипотезы будут удаляться автоматически.

In [14]:
from weakref import WeakValueDictionary

class HypothesisTreeNode:
    def __init__(self, parent, pos_s, pos_t, value, op):
        self.parent = parent
        self.pos_s = pos_s
        self.pos_t = pos_t
        self.value = value
        self.op = op
        self.next = WeakValueDictionary()

    def prolong(self, op):
        if op == "del":
            new_node = HypothesisTreeNode(self, self.pos_s + 1, self.pos_t, self.value + 1, op)
        elif op == "ins":
            new_node = HypothesisTreeNode(self, self.pos_s, self.pos_t + 1, self.value + 1, op)
        elif op == "sub":
            new_node = HypothesisTreeNode(self, self.pos_s + 1, self.pos_t + 1, self.value + 1, op)
        else:
            new_node = HypothesisTreeNode(self, self.pos_s + 1, self.pos_t + 1, self.value, op)
        
        self.next[op] = new_node
        return new_node

    def backtrace(self):
        cur = self
        result = []
        prev = '_'
        while cur is not None:
            if cur.op is not None:
                result.append((cur.pos_s, cur.pos_t, cur.op))
            cur = cur.parent

        return list(reversed(result))

In [15]:
from interactive_visualization.graph_utils import Graph, Arc
from queue import Queue

def hypothesis_tree(tokens):
    arcs = []
    tmp = tokens[0][1]
    while tmp is not None and tmp.parent is not None:
        tmp = tmp.parent
    
    q = Queue()
    q.put((tmp, 0))
    state_num = dict()
    states = 1
    state_num[tmp] = states
    while not q.empty():
        history_item, num = q.get()
        for word, next_item in history_item.next.items():
            if next_item in state_num:
                state = state_num[next_item]
            else:
                state = states
                state_num[next_item] = state
                states += 1
                q.put((next_item, state))
            arcs.append(Arc(num, state, f"({next_item.pos_s}, {next_item.pos_t})/{next_item.op}"))
            #print(f'Added arc: {num}, {state}, ""{word}""')
                   
    tokens_set = {state_num[token[1]] for token in tokens}

    result = Graph(arcs)
    for i, node in result.nodes.items():
        node.SetShape('point')
        if i in tokens_set:
            node.SetColor('red')
    return result.Visualize()

In [16]:
def levenstein_beam_search_with_ht(s, t, k, max_tokens=5, states = None):
    tokens = [(0, HypothesisTreeNode(None, 0, 0, 0, None))] # position in t, value
    
    # Visualization code
    states.append(hypothesis_tree(tokens))
    ####################
    
    best_value = 0
    for k, c in enumerate(s):
        new_tokens = dict()
        
        cur = 0
        best_value += 1
        for pos, token in tokens:
            if pos not in new_tokens or new_tokens[pos].value > token.value + 1 :
                new_tokens[pos] = token.prolong("del")
            if pos < len(t):
                cost = 1 if c != t[pos] else 0
                if token.value + cost < best_value + k:
                    if pos + 1 not in new_tokens or new_tokens[pos + 1].value > token.value + cost:
                        new_tokens[pos + 1] = token.prolong("sub" if cost == 1 else "cor")       
        
        new_tokens = list(sorted(new_tokens.items(), key=lambda x: x[1].value))[:max_tokens]
        new_tokens = sorted(new_tokens)
        for i, (pos, token) in enumerate(new_tokens):
            if i == len(new_tokens) - 1:
                break
            next_pos = new_tokens[i + 1][0]
            if new_tokens[i + 1][1].value > token.value + next_pos - pos:
                new_tokens[i + 1] = (next_pos, token.prolong("ins"))
        pos, token = new_tokens[-1]
        while pos + 1 <= len(t) and token.value + 1 <= best_value + k:
            token = token.prolong("ins")
            pos += 1
            new_tokens.append((pos, token))
            
        new_tokens = list(sorted(new_tokens, key=lambda x: x[1].value))[:max_tokens]
        new_tokens = sorted(new_tokens)
        
        # Visualization code
        states.append(hypothesis_tree(new_tokens))
        ####################
        tokens = new_tokens

                
    for pos, token in tokens:
        if pos == len(s):
            return token.value, token
        
    return k, None

In [17]:
states = []
value, token = levenstein_beam_search_with_ht(s, t, 3, 5, states)
print(value)

3


In [18]:
animate_list(states, play=True, interval=500);

HBox(children=(Play(value=0, interval=500), Button(description='Prev', style=ButtonStyle()), Button(descriptio…

interactive(children=(IntSlider(value=0, description='step', max=10), Output()), _dom_classes=('widget-interac…

Теперь давайте построем выравнивание Левенштейна из полученного дерева

In [19]:
aligned = token.backtrace()
s_aligned = []
t_aligned = []
s_op = []
for i, j, op in aligned:
    if op in { "cor", "sub" }:
        s_aligned.append(s[i - 1])
        t_aligned.append(t[j - 1])
    elif op == "ins":
        s_aligned.append('*')
        t_aligned.append(t[j - 1])
    else:
        s_aligned.append(s[i - 1])
        t_aligned.append('*')
    s_op.append(op[0].upper())

In [20]:
print(''.join(s_aligned))
print(''.join(t_aligned))
print(''.join(s_op))

canon i*n d
can*t inn d
CCCDSCCICCC
