In [135]:
import numpy as np
def levenshtein_distance(word1, word2):
    len1 = len(word1) + 1
    len2 = len(word2) + 1

    # Create the distance matrix
    dp = [[0 for _ in range(len2)] for _ in range(len1)]

    # Initialize base cases 
    for i in range(len1):
        dp[i][0] = i
    for j in range(len2):
        dp[0][j] = j

    # Calculate minimum edit operations
    for i in range(1, len1):
        for j in range(1, len2):
            if word1[i - 1] == word2[j - 1]:
                cost = 0  # Substitution cost if characters match
            else:
                cost = 1  # Substitution cost

            dp[i][j] = min(
                dp[i - 1][j] + 1,  # Deletion
                dp[i][j - 1] + 1,  # Insertion
                dp[i - 1][j - 1] + cost,  # Substitution
            )

    return dp[len1 - 1][len2 - 1]  # Result at the bottom right of the matrix



class TrieNode:
    def __init__(self):
        self.children = {}
        self.path = None


class AppTrie:
    def __init__(self):
        self.root = TrieNode()

    def insert(self, path):
        current_node = self.root
        if current_node.path is None:
            current_node.path = path.split(" > ")[0]

        increasing_path = ""
        for word in path.split(" > "):
            increasing_path = increasing_path + " > " + word if increasing_path else word
            if word not in current_node.children:
                current_node.children[word] = TrieNode()
            current_node.children[word].path = increasing_path
            current_node = current_node.children[word]

    def search(self, query, threshold=10):
        results = []

        def _dfs_query(node: TrieNode, query: str):
            if node.children == {}:
                return
            
            print("Searching", node.children)

            for child_word, child_node in node.children.items():
                distance = levenshtein_distance(query, child_word)
                print("Distance", distance, child_word, query)
                if distance <= threshold:
                    results.append(child_node.path)
                else :
                    _dfs_query(child_node, query)
        
        _dfs_query(self.root, query)
        return results
    
# Construct the trie for testing
# trie = AppTrie()
# trie.insert("select_post > new_comment > add_image")
# trie.insert("settings > battery > toggle_battery_saver")
# trie.insert("select_post > new_comment")
# trie.insert("select_post")

# print("output:", trie.search("comment_on_image", 10))

In [136]:
# print(trie.root)

In [137]:
# # Test Cases from Your Description
# test_cases = [
#     {"input":"toggle_battery_saver","output" :"settings > battery > toggle_battery_saver"},
#     {"input":"turn_on_battery_saver","output":"settings > battery > toggle_battery_saver"},
#     {"input":"turn_of_battery_saver","output":"settings > battery > toggle_battery_saver"},
#     {"input":"comment_on_image","output": "select_post > new_comment > add_image"},
#     {"input":"comment_on_post","output": "select_post > new_comment"},
#     {"input":"see_post","output": "select_post"}
# ]

# # Run tests
# for case in test_cases:
#     result = trie.search(case["input"])
#     assert result == [case["output"]], f"Failed for input: {case['input']}"

In [138]:
# !pip install gensim
import gensim.downloader

In [139]:
existing_models = list(gensim.downloader.info()['models'].keys())

In [140]:
import os
import gensim.downloader as api

# Directory where you want to save the downloaded models
dir = "pretrained_word2vec_gensim"

# Ensure the directory exists
if not os.path.exists(dir):
    os.makedirs(dir)

models = api.info()['models'].keys()

for model in models:
    # Define the path for the model to be saved
    model_path = os.path.join(dir, model + ".model")
    
    # Check if the model file already exists
    if not os.path.exists(model_path):
        # Download the model only if it does not exist
        print(f"Downloading {model}...")
        embedding_file = api.load(model)
        # Save the model locally
        embedding_file.save(model_path)
        print(f"{model} has been downloaded and saved to {model_path}")
    else:
        print(f"{model} already exists. Skipping download.")

fasttext-wiki-news-subwords-300 already exists. Skipping download.
conceptnet-numberbatch-17-06-300 already exists. Skipping download.
word2vec-ruscorpora-300 already exists. Skipping download.
word2vec-google-news-300 already exists. Skipping download.
glove-wiki-gigaword-50 already exists. Skipping download.
glove-wiki-gigaword-100 already exists. Skipping download.
glove-wiki-gigaword-200 already exists. Skipping download.
glove-wiki-gigaword-300 already exists. Skipping download.
glove-twitter-25 already exists. Skipping download.
glove-twitter-50 already exists. Skipping download.
glove-twitter-100 already exists. Skipping download.
glove-twitter-200 already exists. Skipping download.
__testing_word2vec-matrix-synopsis already exists. Skipping download.


In [141]:
dir = "pretrained_word2vec_gensim"
model_paths = [os.path.join(dir, model + ".model") for model in api.info()['models'].keys()]
model_paths

['pretrained_word2vec_gensim/fasttext-wiki-news-subwords-300.model',
 'pretrained_word2vec_gensim/conceptnet-numberbatch-17-06-300.model',
 'pretrained_word2vec_gensim/word2vec-ruscorpora-300.model',
 'pretrained_word2vec_gensim/word2vec-google-news-300.model',
 'pretrained_word2vec_gensim/glove-wiki-gigaword-50.model',
 'pretrained_word2vec_gensim/glove-wiki-gigaword-100.model',
 'pretrained_word2vec_gensim/glove-wiki-gigaword-200.model',
 'pretrained_word2vec_gensim/glove-wiki-gigaword-300.model',
 'pretrained_word2vec_gensim/glove-twitter-25.model',
 'pretrained_word2vec_gensim/glove-twitter-50.model',
 'pretrained_word2vec_gensim/glove-twitter-100.model',
 'pretrained_word2vec_gensim/glove-twitter-200.model',
 'pretrained_word2vec_gensim/__testing_word2vec-matrix-synopsis.model']

In [142]:
model = model_paths[0]

# load model
model = gensim.models.KeyedVectors.load(model)

# query and path
query = "comment_on_image"
path = "select_post > new_comment > add_image"

query = query.split("_")
path = [i.split("_") for i in path.split(">")]
print(query, path)

threshold = 0.7

import time

start = time.time()
# get the most similar word of each word in the query
for word in query:
    word = word.strip()
    if not word:
        continue
    for path_word in path:
        path_word = [v.strip() for v in path_word]
        if not path_word:
            continue
        # print(word, path_word)
        similarity = model.n_similarity([word], path_word)
        # print(similarity)
        if similarity > threshold:
            print (word, path_word, similarity)
end = time.time()
print("Time taken:", end - start)



['comment', 'on', 'image'] [['select', 'post '], [' new', 'comment '], [' add', 'image']]
comment ['new', 'comment'] 0.72556317
image ['add', 'image'] 0.7751524
Time taken: 0.0006873607635498047


In [143]:
import heapq
import math
from itertools import chain

#hyperparameters
model = model_paths[0]
model = gensim.models.KeyedVectors.load(model)
w = 1
b = 0
sigma = 1
mu = 0


def sigmoid(x):
    return 1 / (1 + math.exp(-x))


def sim(word1, word2):
    if word1 == word2:
        return 1.0
    try:
        return sigmoid(w * model.similarity(word1, word2) + b)
    except:
        return 0
    
def insert_cost(word, target, string):
    score = 0
    for i in range(len(string)):
        if string[i] == target:
            continue
        if sim(word, string[i]) > score:
            score = sim(word, string[i])
    
    return 1 - sigma * score + mu

def substitute_cost(word, target):
    return 2 - 2*sim(word, target)


# https://arxiv.org/pdf/1810.10752.pdf
def distance_string(query, path):
    query = query.split("_")
    path = [i.split("_") for i in path.split(">")]
    path = list(chain.from_iterable(path))
    
    # print(query, path)
    dp = [[0 for _ in range(len(path) + 1)] for _ in range(len(query) + 1)]

    # base case
    for i in range(len(query) + 1):
        dp[i][0] = i
    for j in range(len(path) + 1):
        dp[0][j] = j

    # calculate minimum edit operations
    for i in range(1, len(query) + 1):
        for j in range(1, len(path) + 1):
            insertion = dp[i][j - 1] + insert_cost(query[i-1], path[j-1], path)
            deletion = dp[i - 1][j] + insert_cost(path[j-1], query[i-1], query)
            substitution = dp[i - 1][j - 1] + substitute_cost(query[i-1], path[j-1])
            dp[i][j] = min(insertion, deletion, substitution)

    return dp[len(query)][len(path)]


In [153]:
def a_star_search(self, query, threshold_lower=1, threshold_upper=1.5):
    h = []
    heapq.heappush(h, (0, 0, self.root))
    results = []

    while len(h) > 0:
        current_cost, step, current_node = heapq.heappop(h)
        a = 0.8
        b = 0.2

        for child_word, child_node in current_node.children.items():
            distance_path = distance_string(query, child_node.path)
            distance_current = distance_string(query, child_word)
            distance = a * distance_path + b * distance_current

            distance = distance_string(query, child_node.path)
            distance = distance / (step + 1)

            # print stat
            # print("Searching in path:", child_node.path, "with distance:", distance, "and step:", step + 1)

            if distance <= threshold_upper:
                results.append(child_node.path)
                if distance <= threshold_lower:
                    return results

            heapq.heappush(h, (distance, step + 1, child_node))

    return results


AppTrie.a_star_search = a_star_search

In [154]:
# Construct the trie for testing
trie = AppTrie()
trie.insert("select_post > new_comment > add_image")
trie.insert("settings > battery > toggle_battery_saver")
trie.insert("select_post > new_comment")
trie.insert("select_post")

print(trie.root.children["settings"].children['battery'].children)

{'toggle_battery_saver': <__main__.TrieNode object at 0x7f0c71281220>}


In [155]:
test_input = [
    "turn_on_battery_saver",
    "turn_of_battery_saver",
    "comment_on_image",
    "comment_on_post",
    "see_post"
]

for case in test_input:
    result = trie.a_star_search(case)
    print(result)

['select_post > new_comment > add_image', 'settings > battery > toggle_battery_saver']
['select_post > new_comment > add_image', 'settings > battery > toggle_battery_saver']
['select_post > new_comment']
['select_post', 'select_post > new_comment']
['select_post']
