# Implementasi Levenshtein Automaton untuk Spell Checker

- Nama: Syahrul Apriansyah
- Mata Kuliah: IR Gasal 2023/2024
- NPM: 2106708311

Sebelumnya di kelas kita sudah belajar bagaimana cara kerja dari Demerau Levenshtein Distance. Di bawah ini saya buat implementasinya berdasarkan pseudocode yang ada di slide.

In [None]:
def DL_dist(a, b):
    # Initialize the wagner-fischer matrix
    maxdist = len(a) + len(b)
    dist = [[0 for _ in range(len(b) + 2)] for _ in range(len(a) + 2)]

    dist[-1][-1] = maxdist
    for i in range(len(a) + 1):
        dist[i][-1] = maxdist
        dist[i][0] = i

    for j in range(len(b) + 1):
        dist[-1][j] = maxdist
        dist[0][j] = j

    lastrow = dict()  # A map or dictionary
    for i in range(1, len(a) + 1):
        lastcol = 0
        for j in range(1, len(b) + 1):
            lmr = lastrow.get(b[j - 1], 0)
            lmc = lastcol

            if a[i - 1] == b[j - 1]:
                cost = 0
                lastcol = j
            else:
                cost = 1

            dist[i][j] = min(
                dist[i - 1][j - 1] + cost,
                dist[i][j - 1] + 1,
                dist[i - 1][j] + 1,
                dist[lmr - 1][lmc - 1] + 1 + (i - lmr - 1) + (j - lmc - 1)
            )

        lastrow[a[i - 1]] = i

    return dist[len(a)][len(b)]

In [None]:
DL_dist("fasasilkom" , "fasilkom")

2

In [None]:
DL_dist("abcxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxd","abcadasdasdasdxd")

40

Namun di slide juga disebutkan bahwa ada ada cara yang lebih cepat lagi yaitu dengan menggunakan Levenshtein Automata.

Di slide diberikan sebuah web sebagai referensi: http://blog.notdot.net/2010/07/Damn-Cool-Algorithms-Levenshtein-Automata

Pada diperkenalkan pendekatan alternatif: Levenshtein Automata. Pendekatan ini memungkinkan pembuatan automaton yang dapat mengenali string berdasarkan jarak Levenshtein dari kata target. Keuntungannya, automata ini dapat bekerja dalam waktu O(n), lebih cepat dibandingkan dengan algoritma Levenshtein Dynamic Programming yang memerlukan waktu O(mn).

## Levenshtein Automata

Levenshtein Automata adalah pendekatan untuk mengenali set kata-kata yang berada dalam jarak Levenshtein tertentu dari kata target. Ini adalah alternatif dari metode tradisional yang menggunakan algoritma Dynamic Programming untuk menghitung jarak Levenshtein, yang memiliki kompleksitas waktu O(mn).

![Levenshtein Automata](http://lh4.ggpht.com/_23zDbjk-dKI/TFAMHm_FQUI/AAAAAAAABrI/jpf9QkVoZUk/levenstein-nfa-food.png)

**NFA untuk 'food'**:
   - Sebagai ilustrasi, sebuah NFA (Non-deterministic Finite Automaton) untuk kata 'food' dengan jarak edit maksimum 2 diperlihatkan. Diagramnya memiliki struktur yang teratur dan mudah dibuat.
   - Setiap state dalam NFA diberi label dengan notasi khusus yang menunjukkan jumlah karakter yang telah diproses dan jumlah kesalahan yang ditemukan.
   - Ada berbagai jenis transisi antara state: karakter yang tidak diubah, sisipan, penggantian, dan penghapusan.

In [None]:
import bisect

class NFA(object):
    EPSILON = object()
    ANY = object()

    def __init__(self, start_state):
        self.transitions = {}
        self.final_states = set()
        self._start_state = start_state

    @property
    def start_state(self):
        return frozenset(self._expand(set([self._start_state])))

    def add_transition(self, src, input, dest):
        self.transitions.setdefault(src, {}).setdefault(input, set()).add(dest)

    def add_final_state(self, state):
        self.final_states.add(state)

    def is_final(self, states):
        return self.final_states.intersection(states)

    def _expand(self, states):
        frontier = set(states)
        while frontier:
            state = frontier.pop()
            new_states = self.transitions.get(state, {}).get(NFA.EPSILON, set()).difference(states)
            frontier.update(new_states)
            states.update(new_states)
        return states

    def next_state(self, states, input):
        dest_states = set()
        for state in states:
            state_transitions = self.transitions.get(state, {})
            dest_states.update(state_transitions.get(input, []))
            dest_states.update(state_transitions.get(NFA.ANY, []))
        return frozenset(self._expand(dest_states))

    def get_inputs(self, states):
        inputs = set()
        for state in states:
            inputs.update(self.transitions.get(state, {}).keys())
        return inputs

    def to_dfa(self):
        dfa = DFA(self.start_state)
        frontier = [self.start_state]
        seen = set()
        while frontier:
            current = frontier.pop()
            inputs = self.get_inputs(current)
            for input in inputs:
                if input == NFA.EPSILON: continue
                new_state = self.next_state(current, input)
                if new_state not in seen:
                    frontier.append(new_state)
                    seen.add(new_state)
                    if self.is_final(new_state):
                        dfa.add_final_state(new_state)
                if input == NFA.ANY:
                    dfa.set_default_transition(current, new_state)
                else:
                    dfa.add_transition(current, input, new_state)
        return dfa

In [None]:
def levenshtein_automata(term, k):
    nfa = NFA((0, 0))
    for i, c in enumerate(term):
        for e in range(k + 1):
            # Correct character
            nfa.add_transition((i, e), c, (i + 1, e))
            if e < k:
                # Deletion
                nfa.add_transition((i, e), NFA.ANY, (i, e + 1))
                # Insertion
                nfa.add_transition((i, e), NFA.EPSILON, (i + 1, e + 1))
                # Substitution
                nfa.add_transition((i, e), NFA.ANY, (i + 1, e + 1))
    for e in range(k + 1):
        if e < k:
            nfa.add_transition((len(term), e), NFA.ANY, (len(term), e + 1))
        nfa.add_final_state((len(term), e))
    return nfa

   - Fungsi Python `levenshtein_automata` diberikan untuk mengkonstruksi NFA ini. Fungsi ini menerima kata dan jarak edit maksimum sebagai input dan mengembalikan NFA yang sesuai.
   - Fungsi ini membangun transisi berdasarkan karakter dalam kata dan jarak edit yang diberikan.

**Evaluasi NFA**:
   - Meskipun NFA dapat merepresentasikan automata Levenshtein, evaluasi langsung terhadapnya dapat menjadi mahal dari segi komputasi. Hal ini disebabkan oleh keberadaan banyak state aktif dan transisi epsilon (transisi yang tidak memerlukan simbol input).
   - Oleh karena itu, praktek standar adalah mengkonversi NFA menjadi DFA (Deterministic Finite Automaton) menggunakan konstruksi powerset.

![DFA untuk 'food'](http://lh6.ggpht.com/_23zDbjk-dKI/TFAMHtbPPAI/AAAAAAAABrE/cF_WDpCVCCg/levenstein-dfa-food.png)

- DFA untuk kata 'food' dengan satu kesalahan diizinkan diperlihatkan sebagai contoh. DFA ini lebih sederhana daripada NFA yang sesuai dan dapat dengan mudah mengevaluasi apakah kata tertentu berada dalam jarak edit yang diizinkan dari kata target.
- DFA ini dapat dibangun dari NFA dengan mengkonstruksi powerset dari state NFA. Setiap state dalam DFA adalah himpunan state NFA yang sesuai.
- Meskipun konstruksi NFA dapat dilakukan dalam waktu O(kn), konversi ke DFA memiliki worst case O(2n). Namun, ada optimasi yang telah dikembangkan oleh para peneliti untuk membuat proses ini lebih cepat dan efisien.

In [None]:
class DFA(object):
    def __init__(self, start_state):
        self.start_state = start_state
        self.transitions = {}
        self.defaults = {}
        self.final_states = set()

    def add_transition(self, src, input, dest):
        self.transitions.setdefault(src, {})[input] = dest

    def set_default_transition(self, src, dest):
        self.defaults[src] = dest

    def add_final_state(self, state):
        self.final_states.add(state)

    def is_final(self, state):
        return state in self.final_states

    def next_state(self, src, input):
        state_transitions = self.transitions.get(src, {})
        return state_transitions.get(input, self.defaults.get(src, None))

    def next_valid_string(self, input):
        state = self.start_state
        stack = []

        # Evaluate the DFA as far as possible
        for i, x in enumerate(input):
            stack.append((input[:i], state, x))
            state = self.next_state(state, x)
            if not state: break
        else:
            stack.append((input[:i+1], state, None))

        if self.is_final(state):
            # Input word is already valid
            return input

        # Perform a 'wall following' search for the lexicographically smallest
        # accepting state.
        while stack:
            path, state, x = stack.pop()
            x = self.find_next_edge(state, x)
            if x:
                path += x
                state = self.next_state(state, x)
                if self.is_final(state):
                    return path
                stack.append((path, state, None))
        return None

    def find_next_edge(self, s, x):
        if x is None:
            x = u'\0'
        else:
            x = chr(ord(x) + 1)
        state_transitions = self.transitions.get(s, {})
        if x in state_transitions or s in self.defaults:
            return x
        labels = sorted(state_transitions.keys())
        pos = bisect.bisect_left(labels, x)
        if pos < len(labels):
            return labels[pos]
        return None

- Banyak literatur, mencatat bahwa dictionary (yaitu, set dari record yang ingin dicari) dapat direpresentasikan sebagai DFA. Struktur seperti Trie atau DAWG, yang sering digunakan untuk menyimpan kamus, pada dasarnya adalah kasus khusus dari DFA.

**Intersect dua DFA**:
   - Mengingat dictionary dan kriteria pencarian (Automata Levenshtein) keduanya dapat direpresentasikan sebagai DFA, kita dapat menggabungkan kedua DFA tersebut untuk menemukan kata-kata dalam kamus yang sesuai dengan kriteria kita.
   - Fungsi `intersect` diberikan sebagai contoh. Fungsi ini menelusuri kedua DFA secara bersamaan, hanya mengikuti edge yang dimiliki oleh kedua DFA, dan mencatat jalur yang diikuti. Setiap kali kedua DFA berada dalam final state, kata tersebut termasuk dalam output set dan oleh karena itu dihasilkan.

In [None]:
def intersect(dfa1, dfa2):
    stack = [("", dfa1.start_state, dfa2.start_state)]
    while stack:
        s, state1, state2 = stack.pop()
        for edge in set(dfa1.edges(state1)).intersect(dfa2.edges(state2)):
            state1 = dfa1.next(state1, edge)
            state2 = dfa2.next(state2, edge)
            if state1 and state2:
                s = s + edge
                stack.append((s, state1, state2))
                if dfa1.is_final(state1) and dfa2.is_final(state2):
                    yield s

**Menghadapi Indeks Non-DFA**:
   - Meskipun pendekatan di atas bekerja dengan baik untuk indeks yang disimpan sebagai DFA, banyak indeks yang tidak disimpan dalam format ini. Misalnya, indeks dalam memori mungkin disimpan dalam sorted list, sementara indeks di disk mungkin disimpan dalam struktur seperti BTree.
   - Pertanyaannya adalah apakah kita dapat memodifikasi skema ini untuk bekerja dengan jenis indeks ini dan tetap lebih cepat daripada pendekatan brute-force.
---

- Dengan kriteria pencarian yang dinyatakan sebagai DFA, kita dapat menemukan string berikutnya (dalam urutan leksikografis) yang cocok jika string input tidak cocok.
- Intuisinya adalah kita mengevaluasi string input terhadap DFA sampai kita tidak bisa melanjutkan lagi. Kemudian, kita mengikuti edge dengan label leksikografis terkecil hingga kita mencapai keadaan akhir. Ada beberapa kasus khusus yang perlu diperhatikan, seperti saat tidak ada transisi yang valid untuk karakter berikutnya atau saat mencapai keadaan tanpa edge keluar yang valid.
- Proses ini mirip dengan algoritma 'wall following', tetapi diterapkan pada DFA.

**Contoh dengan DFA untuk 'food(1)'**:
- Saat mempertimbangkan input 'foogle', kita dapat memproses empat karakter pertama tanpa masalah, tetapi kemudian kita menghadapi masalah dengan karakter 'l'. Kita harus backtrack satu langkah dan mengikuti edge lain untuk mencapai state akhir dengan output string 'fooh', yang merupakan string berikutnya setelah 'foogle' dalam urutan leksikografis.

In [None]:
  def next_valid_string(self, input):
    state = self.start_state
    stack = []

    # Evaluate the DFA as far as possible
    for i, x in enumerate(input):
        stack.append((input[:i], state, x))
        state = self.next_state(state, x)
        if not state: break
    else:
        stack.append((input[:i+1], state, None))

    if self.is_final(state):
        # Input word is already valid
        return input

    # Perform a 'wall following' search for the lexicographically smallest
    # accepting state.
    while stack:
        path, state, x = stack.pop()
        x = self.find_next_edge(state, x)
        if x:
            path += x
            state = self.next_state(state, x)
            if self.is_final(state):
                return path
            stack.append((path, state, None))
    return None

In [None]:
def find_next_edge(self, s, x):
    if x is None:
        x = u'\0'
    else:
        x = chr(ord(x) + 1)
    state_transitions = self.transitions.get(s, {})
    if x in state_transitions or s in self.defaults:
        return x
    labels = sorted(state_transitions.keys())
    pos = bisect.bisect_left(labels, x)
    if pos < len(labels):
        return labels[pos]
    return None

- Fungsi `next_valid_string` mengevaluasi DFA sejauh mungkin dengan mempertahankan stack dari state yang dikunjungi. Jika tidak menemukan match yang tepat, fungsi tersebut melakukan backtracking untuk menemukan set transisi terkecil yang dapat diikuti untuk mencapai state akhir.
- Fungsi `find_next_edge` menemukan edge keluar leksikografis terkecil dari state yang lebih besar dari input tertentu.

**Optimasi**:
   - Dengan beberapa preprocessing, pencarian edge dapat dibuat lebih efisien, misalnya dengan menghasilkan pemetaan dari setiap karakter ke edge keluar pertama yang lebih besar daripadanya.

**Algoritma Pencarian Indeks**:
   - Dapatkan elemen pertama dari indeks Anda sebagai string 'current'.
   - Masukkan string 'current' ke dalam algoritma 'DFA successor' untuk mendapatkan string 'next'.
   - Jika 'next' sama dengan 'current', Anda telah menemukan match. Ambil elemen berikutnya dari indeks sebagai string 'current' dan ulangi.
   - Jika 'next' tidak sama dengan 'current', cari dalam indeks Anda string yang lebih besar atau sama dengan 'next'. Jadikan ini string 'current' baru dan ulangi.

In [None]:
def find_all_matches(word, k, lookup_func):
    """Uses lookup_func to find all words within levenshtein distance k of word.

    Args:
      word: The word to look up
      k: Maximum edit distance
      lookup_func: A single argument function that returns the first word in the
        database that is greater than or equal to the input argument.
    Yields:
      Every matching word within levenshtein distance k from the database.
    """
    lev = levenshtein_automata(word, k).to_dfa()
    match = lev.next_valid_string(u'\0')
    while match:
        next = lookup_func(match)
        if not next:
            return
        if match == next:
            yield match
            next = next + u'\0'
        match = lev.next_valid_string(next)

- Fungsi `find_all_matches` menggunakan fungsi pencarian untuk menemukan semua kata dalam jarak Levenshtein k dari kata yang diberikan. Ini menggunakan fungsi `lookup_func` untuk mendapatkan kata berikutnya dari database yang lebih besar atau sama dengan kata yang diberikan.
1. **Pandangan Umum**:
   - Algoritma ini dapat dilihat sebagai proses yang membandingkan DFA Levenshtein dan indeks sebagai sorted list. Prosedurnya mirip dengan strategi "zigzag merge join" dari App Engine. Kita terus-menerus mencari string di satu sisi, lalu menggunakan hasil tersebut untuk melompat ke tempat yang sesuai di sisi lain. Jika tidak ada entri yang cocok, kita menggunakan hasil pencarian untuk melompat lebih jauh di sisi pertama, dan seterusnya. Keuntungan dari pendekatan ini adalah kita dapat melewati banyak entri indeks atau string Levenshtein yang tidak cocok, sehingga menghemat usaha untuk mengenumerasi keduanya secara eksklusif.

2. **Efisiensi**:
   - Diharapkan dari deskripsi ini, prosedur ini memiliki potensi untuk menghindari kebutuhan untuk mengevaluasi semua entri indeks atau semua string kandidat Levenshtein.

3. **Catatan Tambahan**:
   - Tidak semua DFA memungkinkan kita untuk menemukan penerus leksikografis minimal untuk setiap string. Sebagai contoh, pertimbangkan penerus string 'a' dalam DFA yang mengenali pola 'a+b'. Tidak ada jawaban yang pasti karena harus terdiri dari jumlah karakter 'a' yang tak terbatas diikuti oleh satu karakter 'b'. Namun, dengan modifikasi sederhana, kita dapat mengembalikan string yang dijamin menjadi awalan dari string berikutnya yang dikenali oleh DFA, yang cukup untuk tujuan kita. Karena DFA Levenshtein selalu terbatas dan selalu memiliki penerus berpanjang finit (kecuali string terakhir), ekstensi seperti itu ditinggalkan sebagai latihan bagi pembaca. Ada potensi aplikasi menarik untuk pendekatan ini, seperti pencarian ekspresi reguler yang diindeks, yang mungkin memerlukan modifikasi ini.

In [None]:
import bisect
import random


class Matcher(object):
    def __init__(self, l):
        self.l = l
        self.probes = 0
    def __call__(self, w):
        self.probes += 1
        pos = bisect.bisect_left(self.l, w)
        if pos < len(self.l):
            return self.l[pos]
        else:
            return None


words = [x.strip().lower() for x in open('/content/words_alpha.txt', 'r')]
words.sort()
words10 = [x for x in words if random.random() <= 0.1]
words100 = [x for x in words if random.random() <= 0.01]


m = Matcher(words)
assert len(list(find_all_matches('food', 1, m))) == 21
print(m.probes)

m = Matcher(words)
assert len(list(find_all_matches('food', 2, m))) == 388
print(m.probes)


def levenshtein(s1, s2):
    if len(s1) < len(s2):
        return levenshtein(s2, s1)
    if not s1:
        return len(s2)

    previous_row = range(len(s2) + 1)
    for i, c1 in enumerate(s1):
        current_row = [i + 1]
        for j, c2 in enumerate(s2):
            insertions = previous_row[j + 1] + 1 # j+1 instead of j since previous_row and current_row are one character longer
            deletions = current_row[j] + 1     # than s2
            substitutions = previous_row[j] + (c1 != c2)
            current_row.append(min(insertions, deletions, substitutions))
        previous_row = current_row

    return previous_row[-1]

class BKNode(object):
    def __init__(self, term):
        self.term = term
        self.children = {}

    def insert(self, other):
        distance = levenshtein(self.term, other)
        if distance in self.children:
            self.children[distance].insert(other)
        else:
            self.children[distance] = BKNode(other)

    def search(self, term, k, results=None):
        if results is None:
            results = []
        distance = levenshtein(self.term, term)
        counter = 1
        if distance <= k:
            results.append(self.term)
        for i in range(max(0, distance - k), distance + k + 1):
            child = self.children.get(i)
            if child:
                counter += child.search(term, k, results)
        return counter

153
3477


In [None]:
# Membangun pohon BK
root = BKNode(words[0])
for word in words[1:]:
    root.insert(word)


Words within distance 1 of 'food':
['fod', 'foo', 'bood', 'feod', 'fold', 'fond', 'food', 'ford', 'foud', 'good']
Number of nodes examined: 3856


In [None]:

# Mencari kata dalam pohon BK
search_word = "food"
max_distance = 1
results = []
num_nodes_examined = root.search(search_word, max_distance, results)
print(f"Words within distance {max_distance} of '{search_word}':")
print(results[:10])
print(f"Number of nodes examined: {num_nodes_examined}")


Words within distance 1 of 'food':
['fod', 'foo', 'bood', 'feod', 'fold', 'fond', 'food', 'ford', 'foud', 'good']
Number of nodes examined: 3856


In [None]:
search_word = "food"
max_distance = 2
results = []
num_nodes_examined = root.search(search_word, max_distance, results)
print(f"Words within distance {max_distance} of '{search_word}':")
print(results[:5])
print(f"Number of nodes examined: {num_nodes_examined}")

Words within distance 2 of 'food':
['fad', 'oad', 'od', 'fo', 'adod']
Number of nodes examined: 29825


In [None]:
search_word = "food"
max_distance = 4
results = []
num_nodes_examined = root.search(search_word, max_distance, results)
print(f"Words within distance {max_distance} of '{search_word}':")
print(results[:5])
print(f"Number of nodes examined: {num_nodes_examined}")

Words within distance 4 of 'food':
['a', 'aa', 'ab', 'ac', 'ad']
Number of nodes examined: 150010


actual code: https://gist.github.com/Arachnid/491973
dictionary: https://github.com/dwyl/english-words/blob/master/words_alpha.txt