<a href="https://colab.research.google.com/github/IsaacFigNewton/DisCoFuzz/blob/main/Compositional_Model_MVP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import and Config

In [3]:
import nltk
nltk.download('wordnet')
from nltk.corpus import wordnet as wn

from sentence_transformers import SentenceTransformer
import numpy as np
import spacy
from typing import Optional, Iterable

[nltk_data] Downloading package wordnet to /root/nltk_data...


In [10]:
class LemmaVectorizer:
    def __init__(self, jit_vectorization = False):
        self.embedding_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
        self.keyed_vectors = None
        if not jit_vectorization:
          self.keyed_vectors = self.build_wordnet_lemma_embeddings()

    def build_wordnet_lemma_embeddings(
        self,
        batch_size: int = 64
    ):
        """
        Loads all WordNet lemmas, embeds them using a SentenceTransformer model,
        and returns a dictionary mapping lemma → embedding vector.

        Args:
            batch_size: Batch size for model.encode().

        Returns:
            dict[str, np.ndarray]: mapping from lemma string to embedding vector.
        """
        # Collect all lemmas (set removes duplicates)
        lemma_set = set()
        for syn in wn.all_synsets():
            for lemma in syn.lemmas():
                lemma_set.add(lemma.name().replace("_", " "))  # normalize underscore → space

        lemma_list = sorted(lemma_set)

        # Encode lemmas in batches
        embeddings = self.embedding_model.encode(
            lemma_list,
            batch_size=batch_size,
            show_progress_bar=True,
            convert_to_numpy=True
        )

        # Build dictionary
        lemma_to_vec = {lemma: emb for lemma, emb in zip(lemma_list, embeddings)}

        return lemma_to_vec

    def __call__(self, X: str) -> np.ndarray:
        if self.keyed_vectors:
            v = self.keyed_vectors.get(X)#.lemma_.lower())
            if v is not None:
                return np.asarray(v, dtype=float)
            # If a token is not in keyed_vectors, it will simply not contribute an embedding
            # unless other parts of the composition strategy can provide one.
            return None

        if not self.keyed_vectors:
            return self.embedding_model.encode([X])

In [13]:
# Load transformer model
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
lemma_vectorizer = LemmaVectorizer(True)

# Helpers

In [14]:
def _is_ignored(token):
    return token.is_punct or token.pos_ == "DET"

def node_embedding(token, cache):
    """Recursively compute embedding for the branch rooted at token."""
    if token.i in cache:
        return cache[token.i], cache

    # get pairwise embedding means, then get average of those
    if token.pos_ == "VERB":
        verb_vec = lemma_vectorizer(token.lemma_.lower())
        pair_means = []

        for child in token.children:
            child_emb, cache = node_embedding(child, cache)
            if verb_vec is not None and child_emb is not None:
                pair_means.append((verb_vec + child_emb) / 2)

        if len(pair_means) > 0:
            stacked = np.vstack(pair_means)
            result = stacked.mean(axis=0)
            cache[token.i] = result
            return result, cache
        # If no usable children, fall back to general logic below.

    # get mean of embeddings
    collected = []

    # include token vector unless ignored
    if not _is_ignored(token):
        vec = lemma_vectorizer(token.lemma_.lower())
        if vec is not None:
            collected.append(vec)

    # include all children's embeddings
    for child in token.children:
        emb, cache = node_embedding(child, cache)
        if emb is not None:
            collected.append(emb)

    if len(collected) == 0:
        result = np.zeros(384) # Ensure a 384-dim zero vector if no embeddings are found
    else:
        result = np.vstack(collected).mean(axis=0)

    cache[token.i] = result
    return result, cache


def embed_doc_by_dependency(
    text: str,
):
    nlp = spacy.load("en_core_web_sm")
    doc = nlp(text)
    cache = {}

    # Aggregate sentence root embeddings
    root_embeddings = []
    for sent in doc.sents:
        root = sent.root
        emb, cache = node_embedding(root, cache)
        if emb is not None:
            root_embeddings.append(emb)

    if len(root_embeddings) == 0:
        return np.zeros(384)

    return np.vstack(root_embeddings).mean(axis=0)

# Test embedding composition

In [15]:
v1 = embed_doc_by_dependency("the quick brown fox jumps over the lazy dog.")
v2 = embed_doc_by_dependency("the quick brown dog jumps over the lazy fox.")
v3 = embed_doc_by_dependency("alice loves bob.")

In [18]:
from sklearn.metrics.pairwise import cosine_similarity

print(f"similarity of similar sentences: {cosine_similarity([v1], [v2])}")
print(f"similarity of dissimilar sentences: {cosine_similarity([v1], [v3])}")

similarity of similar sentences: [[0.99968085]]
similarity of dissimilar sentences: [[0.45042074]]
