In [3]:
import json
import re
from tqdm import tqdm


In [1]:

from typing import Dict, List, Tuple, Set, Iterable, Optional
import networkx as nx
import numpy as np
from sentence_transformers import SentenceTransformer, util
from tqdm import tqdm
import math


  from .autonotebook import tqdm as notebook_tqdm


In [6]:
import spacy
nlp = spacy.load("en_core_web_sm")

In [13]:
# instantiate embedder (will download model if necessary)
embedder = Embedder(model_name="all-MiniLM-L6-v2", device="cpu")

In [7]:
def extract_keywords_spacy(question):
    doc = nlp(question)

    keywords = []
    for token in doc:
        # Filter out stopwords, punctuation, and select meaningful POS
        if not token.is_stop and not token.is_punct:
            if token.pos_ in {"NOUN", "PROPN", "VERB", "ADJ"}:
                keywords.append(token.lemma_.lower())

    return list(dict.fromkeys(keywords))  # Remove duplicates, preserve order


# Example
question = "Why would someone bring an umbrella outside on a sunny day?"
print(extract_keywords_spacy(question))

['bring', 'umbrella', 'sunny', 'day']


In [5]:
with open(f"data/conceptnet_relations.json", "r") as fp:
    conceptnet_relations = json.load(fp)

len(conceptnet_relations)

5230

In [26]:
"""
kg_to_context.py

End-to-end pipeline:
  Knowledge Graph -> Text Triples -> Embeddings -> Ranking -> Context

Requirements:
  pip install networkx sentence-transformers numpy tqdm

By default this uses sentence-transformers "all-MiniLM-L6-v2".
You may swap the embedder to any model that returns dense vectors.

Usage:
  - Put your ConceptNet-like data in a dict: {entity: {relation: [(target, score), ...], ...}, ...}
  - Call build_graph_from_conceptnet_dicts(...)
  - Call create_context_for_question(...)
"""

# -------------------------
# Types
# -------------------------
CNNode = str
Relation = str
Score = float
CNData = Dict[Relation, List[Tuple[CNNode, Score]]]  # like the dict you provided
Triple = Tuple[str, str, str]  # (subject, relation, object)

# -------------------------
# Graph construction
# -------------------------
def build_graph_from_conceptnet_dicts(conceptnet_dict_by_word: Dict[str, CNData], add_reverse_edges: bool=True
                                     ) -> nx.MultiDiGraph:
    """
    Build a MultiDiGraph where nodes are concept strings and edges have attribute 'rel'.
    Uses ALL relations found (scores are ignored but preserved if you want later).
    Optionally adds reverse edges labeled 'rev:<REL>' for easier path finding.
    """
    G = nx.MultiDiGraph()
    for subj, rels in conceptnet_dict_by_word.items():
        G.add_node(subj)
        for rel, obj_list in rels.items():
            for obj, score in obj_list:
                G.add_node(obj)
                G.add_edge(subj, obj, rel=rel, score=score)
                if add_reverse_edges:
                    G.add_edge(obj, subj, rel=f"rev:{rel}", score=score)
    return G


# -------------------------
# Triple extraction utilities
# -------------------------
def triples_from_edge(u: str, v: str, attrs: Dict) -> List[Triple]:
    """
    Create triple(s) from edge attrs in MultiDiGraph.
    For multi-edges we'll get multiple triples (same subj,obj but different rel).
    """
    rel = attrs.get("rel", "RelatedTo")
    return [(u, rel, v)]

def triples_from_path(G: nx.MultiDiGraph, path: List[str]) -> List[Triple]:
    """
    Given a node path [n0, n1, n2, ...], return the list of triple facts for each adjacent pair.
    For each adjacent pair, we return a triple for every multiedge's 'rel'.
    """
    triples: List[Triple] = []
    for i in range(len(path) - 1):
        u, v = path[i], path[i + 1]
        data = G.get_edge_data(u, v, default={})
        # get_edge_data returns dict keyed by keys for MultiDiGraph; each entry is attr dict
        if data:
            for k, attrs in data.items():
                rel = attrs.get("rel", "RelatedTo")
                triples.append((u, rel, v))
        else:
            # fallback - shouldn't happen
            triples.append((u, "RelatedTo", v))
    return triples


def extract_candidate_triples(
    G: nx.MultiDiGraph,
    question_concept: str,
    keywords: Iterable[str],
    choices: Iterable[str],
    max_hops: int = 2,
    include_1hop_from_choices: bool = True,
    include_1hop_from_question_concept: bool = True,
    top_k_paths_per_pair: int = 40
) -> Dict[str, Set[Triple]]:
    """
    For each choice, collect candidate triples connecting:
      - question_concept -> choice (paths up to max_hops)
      - keywords -> choice (paths up to max_hops)
      - optionally 1-hop neighbors of choice (triples touching the choice)
      - optionally 1-hop neighbors of question_concept

    Returns: { choice: set_of_triples }
    NOTE: This uses structure only to collect candidates; ranking will be embedding-based.
    """
    choices = list(choices)
    keywords = list(keywords)
    facts_by_choice: Dict[str, Set[Triple]] = {c: set() for c in choices}

    nodes_present = set(G.nodes())

    def get_paths(src, tgt, cutoff):
        if src not in nodes_present or tgt not in nodes_present:
            return []
        try:
            # all_simple_paths considers number of edges <= cutoff
            paths = list(nx.all_simple_paths(G, source=src, target=tgt, cutoff=cutoff))
        except nx.NetworkXNoPath:
            return []
        paths.sort(key=lambda p: len(p))
        return paths[:top_k_paths_per_pair]

    # 1) collect paths from question_concept -> choice
    if include_1hop_from_question_concept:
        for c in choices:
            paths = get_paths(question_concept, c, cutoff=max_hops)
            for p in paths:
                for t in triples_from_path(G, p):
                    facts_by_choice[c].add(t)

    # 2) collect paths from each keyword -> choice
    for kw in keywords:
        for c in choices:
            paths = get_paths(kw, c, cutoff=max_hops)
            for p in paths:
                for t in triples_from_path(G, p):
                    facts_by_choice[c].add(t)

    # 3) include direct edges touching the choice (1-hop outgoing & incoming)
    if include_1hop_from_choices:
        for c in choices:
            if c not in nodes_present:
                continue
            # outgoing edges c -> neighbor
            for _, neigh, data in G.out_edges(c, data=True):
                triples = triples_from_path(G, [c, neigh])
                for t in triples:
                    facts_by_choice[c].add(t)
            # incoming edges neigh -> c
            for neigh, _, data in G.in_edges(c, data=True):
                triples = triples_from_path(G, [neigh, c])
                for t in triples:
                    facts_by_choice[c].add(t)

    return facts_by_choice


# -------------------------
# Text conversion for triples
# -------------------------
def triple_to_text(triple: Triple) -> str:
    """
    Convert triple to a short atomic fact string that is stable and compact.
    Example: ("scissors", "UsedFor", "cut") -> "scissors UsedFor cut"
    """
    s, rel, o = triple
    # normalize spacing
    s = s.strip()
    rel = rel.strip()
    o = o.strip()
    return f"{s} {rel} {o}"


# -------------------------
# Embedding utilities
# -------------------------
class Embedder:
    """
    Simple wrapper around sentence-transformers / any embedding model with encode() method.
    """
    def __init__(self, model_name: str = "all-MiniLM-L6-v2", device: str = "cpu"):
        self.model = SentenceTransformer(model_name, device=device)

    def embed_texts(self, texts: List[str], batch_size: int = 64) -> np.ndarray:
        """
        Return L2-normalized numpy array of embeddings shape (N, D)
        """
        embs = self.model.encode(texts, batch_size=batch_size, show_progress_bar=False, convert_to_numpy=True)
        # normalize
        norms = np.linalg.norm(embs, axis=1, keepdims=True)
        norms[norms == 0.0] = 1.0
        embs = embs / norms
        return embs


def cosine_sim_matrix(A: np.ndarray, B: np.ndarray) -> np.ndarray:
    """
    Compute cosine similarity matrix between two normalized embedding sets A and B.
    Assumes rows are normalized.
    Returns matrix shape (A_rows, B_rows)
    """
    return np.dot(A, B.T)


# -------------------------
# Ranking & context builder
# -------------------------
def rank_triples_for_choices(
    question: str,
    keywords: Iterable[str],
    facts_by_choice: Dict[str, Set[Triple]],
    embedder: Embedder,
    top_k_per_choice: int = 8,
    combine_with_keywords: bool = True,
    weight_question: float = 0.6,
    weight_keywords: float = 0.4
) -> Dict[str, List[Tuple[Triple, float]]]:
    """
    Rank triples per choice by semantic similarity to the question (and optionally keywords).
    Returns: { choice: [ (triple, score), ... ] } sorted descending score, up to top_k_per_choice.
    Notes:
      - We compute embeddings for all triples once (deduplicated across choices).
      - Scoring: score = weight_question * sim(triple, question) + weight_keywords * max_sim(triple, any_keyword)
    """
    # prepare texts
    all_triples = set()
    for c, facts in facts_by_choice.items():
        all_triples.update(facts)
    all_triples = sorted(all_triples)  # deterministic order

    triple_texts = [triple_to_text(t) for t in all_triples]
    # embed triples and question+keywords
    triple_embs = embedder.embed_texts(triple_texts)
    q_emb = embedder.embed_texts([question])[0:1]  # shape (1,D)
    if combine_with_keywords:
        kw_texts = list(keywords)
        if len(kw_texts) == 0:
            kw_embs = None
        else:
            kw_embs = embedder.embed_texts(kw_texts)
    else:
        kw_embs = None

    # compute sims
    sim_q = cosine_sim_matrix(triple_embs, q_emb).squeeze(axis=1)  # shape (num_triples,)
    if kw_embs is not None:
        sim_kw = cosine_sim_matrix(triple_embs, kw_embs)  # (num_triples, num_kw)
        sim_kw_max = sim_kw.max(axis=1)
    else:
        sim_kw_max = np.zeros_like(sim_q)

    # final score
    scores = weight_question * sim_q + (weight_keywords * sim_kw_max if kw_embs is not None else 0.0)

    # map back to choices
    triple_to_score = {all_triples[i]: float(scores[i]) for i in range(len(all_triples))}

    ranked_by_choice: Dict[str, List[Tuple[Triple, float]]] = {}
    for c, facts in facts_by_choice.items():
        scored = [(t, triple_to_score.get(t, -1.0)) for t in facts]
        scored.sort(key=lambda x: x[1], reverse=True)
        ranked_by_choice[c] = scored[:top_k_per_choice]
    return ranked_by_choice


def build_choice_grouped_context(
    question: str,
    choices: List[str],
    ranked_triples_by_choice: Dict[str, List[Tuple[Triple, float]]],
    facts_per_choice_limit: int = 6
) -> str:
    """
    Create a compact prompt context grouped by choice.
    We include only top facts_per_choice_limit per choice.
    """
    lines = []
    lines.append("Use the facts below (extracted from ConceptNet) to answer the question.")
    lines.append("")
    lines.append("Question:")
    lines.append(question)
    lines.append("")
    lines.append("Choices:")
    for idx, c in enumerate(choices):
        lines.append(f"{chr(ord('A') + idx)}. {c}")
    lines.append("\nRelevant facts (grouped by choice):")

    for idx, c in enumerate(choices):
        lines.append(f"\nChoice {chr(ord('A') + idx)}: {c}")
        facts = ranked_triples_by_choice.get(c, [])
        if not facts:
            lines.append("  - (no facts found)")
            continue
        for (t, score) in facts[:facts_per_choice_limit]:
            lines.append(f"  - {triple_to_text(t)}  [score={score:.3f}]")
    lines.append("\nTask: Based only on the facts above, choose the best answer and briefly explain using those facts.")
    return "\n".join(lines)


def build_choice_grouped_context_modified_2(
    question: str,
    choices: List[str],
    ranked_triples_by_choice: Dict[str, List[Tuple[Triple, float]]],
    facts_per_choice_limit: int = 5
) -> str:
    """
    Create a compact prompt context grouped by choice.
    We include only top facts_per_choice_limit per choice.
    """
    lines = []
    # lines.append("Use the facts below (extracted from ConceptNet) to answer the question.")
    # lines.append("")
    # lines.append("Question:")
    # lines.append(question)
    # lines.append("")
    # lines.append("Choices:")
    # for idx, c in enumerate(choices):
    #     lines.append(f"{chr(ord('A') + idx)}. {c}")
    # lines.append("\nRelevant facts (grouped by choice):")

    for idx, c in enumerate(choices):
        lines.append(f"\nChoice {chr(ord('A') + idx)}: {c}")
        facts = ranked_triples_by_choice.get(c, [])
        if not facts:
            lines.append("  - (no facts found)")
            continue
        for (t, score) in facts[:facts_per_choice_limit]:
            # lines.append(f"  - {triple_to_text(t)}  [score={score:.3f}]")
            lines.append(f"  - {triple_to_text(t)}")
    # lines.append("\nTask: Based only on the facts above, choose the best answer and briefly explain using those facts.")
    return "\n".join(lines)

def build_choice_grouped_context_modified(
    question: str,
    choices: List[str],
    ranked_triples_by_choice: Dict[str, List[Tuple[Triple, float]]],
    facts_per_choice_limit: int = 6,
    top_k_total_facts: int = 5
) -> str:
    """
    Create a compact prompt context grouped by choice.
    We include only top facts_per_choice_limit per choice.
    """
    lines = []
    # lines.append("Use the facts below (extracted from ConceptNet) to answer the question.")
    # lines.append("")
    # lines.append("Question:")
    # lines.append(question)
    # lines.append("")
    # lines.append("Choices:")
    # for idx, c in enumerate(choices):
    #     lines.append(f"{chr(ord('A') + idx)}. {c}")
    # lines.append("\nRelevant facts:")

    total_ranked_facts = []
    for idx, c in enumerate(choices):
        # lines.append(f"\nChoice {chr(ord('A') + idx)}: {c}")
        facts = ranked_triples_by_choice.get(c, [])
        if not facts:
            # lines.append("  - (no facts found)")
            continue
        for (t, score) in facts[:facts_per_choice_limit]:
            # total_ranked_facts.append((c, t, triple_to_text(t), score))
            total_ranked_facts.append((c, t, triple_to_text(t), score))
        # for (t, score) in facts[:facts_per_choice_limit]:
        #     lines.append(f"  - {triple_to_text(t)}  [score={score:.3f}]")
    
    # print("=== Total Ranked Facts Across Choices ===")
    total_ranked_facts.sort(key=lambda x: x[3], reverse=True)
    # print(total_ranked_facts)

    for item in total_ranked_facts[: top_k_total_facts]:
        c, t, text, score = item
        lines.append(f"{text} [score={score:.3f}]")
        # print(lines)
    
    # lines.append("\nTask: Based only on the facts above, choose the best answer and briefly explain using those facts.")
    # return "\n".join(lines)
    return lines

# -------------------------
# End-to-end wrapper
# -------------------------
def create_context_for_question(
    G: nx.MultiDiGraph,
    conceptnet_map: Dict[str, CNData],
    question: str,
    question_concept: str,
    keywords: Iterable[str],
    choices: Iterable[str],
    embedder: Optional[Embedder] = None,
    max_hops: int = 2,
    top_k_paths_per_pair: int = 40,
    top_k_triples_per_choice: int = 10,
    facts_per_choice_limit: int = 6
) -> str:
    """
    End-to-end: extract candidate triples from KG -> embed/rank -> build grouped context prompt.
    Returns the prompt text to feed to smolVLM.
    """
    if embedder is None:
        embedder = Embedder()

    # 1) build facts_by_choice using graph (structure only)
    facts_by_choice = extract_candidate_triples(
        G,
        question_concept,
        keywords,
        choices,
        max_hops=max_hops,
        include_1hop_from_choices=True,
        include_1hop_from_question_concept=True,
        top_k_paths_per_pair=top_k_paths_per_pair
    )

    # 2) rank triples using embeddings
    ranked = rank_triples_for_choices(
        question=question,
        keywords=keywords,
        facts_by_choice=facts_by_choice,
        embedder=embedder,
        top_k_per_choice=top_k_triples_per_choice,
        combine_with_keywords=True
    )

    # 3) build grouped context
    prompt = build_choice_grouped_context_modified_2(
        question=question,
        choices=list(choices),
        ranked_triples_by_choice=ranked,
        facts_per_choice_limit=facts_per_choice_limit
    )

    return prompt

In [27]:
question = "A revolving door is convenient for two direction travel, but it also serves as a security measure at a what?"
question_concept = "revolving door"
choices = [ "bank", "library", "department store", "mall", "new york" ]

# question = "What do people aim to do at work?"
# question_concept = "people"
# choices = ['complete job', 'learn from each other', 'kill animals', 'wear hats', 'talk to each other']

conceptnet_map = {
    c: conceptnet_relations[c] for c in [question_concept] + choices
}
keywords = extract_keywords_spacy(question)          # extracted keywords

# Build graph
G = build_graph_from_conceptnet_dicts(conceptnet_map, add_reverse_edges=False)

# create prompt/context
knowledge_context = create_context_for_question(
    G=G,
    conceptnet_map=conceptnet_map,
    question=question,
    question_concept=question_concept,
    keywords=keywords,
    choices=choices,
    embedder=embedder,
    max_hops=2,
    top_k_paths_per_pair=40,
    top_k_triples_per_choice=12,
    facts_per_choice_limit=6
)

print(knowledge_context)



Choice A: bank
  - revolving door AtLocation bank
  - bank AtLocation secure place
  - bank UsedFor keeping money safe
  - bank RelatedTo vault
  - bank RelatedTo safe
  - bank RelatedTo robbery

Choice B: library
  - library AtLocation house
  - library AtLocation computers
  - library DerivedFrom cyberlibrary
  - library AtLocation human
  - library AtLocation literature
  - library Antonym book

Choice C: department store
  - revolving door AtLocation department store
  - department store AtLocation escalator
  - department store AtLocation changing room
  - department store UsedFor anchor mall
  - department store RelatedTo gum
  - department store AtLocation fitting room

Choice D: mall
  - revolving door AtLocation department store
  - revolving door AtLocation bank
  - revolving door AtLocation mall
  - mall AtLocation escalator
  - mall RelatedTo mall walker
  - mall UsedFor concentrated foot traffic

Choice E: new york
  - revolving door AtLocation department store
  - new yo

In [15]:
dataset_path = "data/commonsenseqa_validation.json"

with open(dataset_path, "r") as fp:
    csqa_dataset = json.load(fp)

len(csqa_dataset)

1221

In [16]:
csqa_dataset[0]

{'id': '1afa02df02c908a558b4036e80242fac',
 'question': 'A revolving door is convenient for two direction travel, but it also serves as a security measure at a what?',
 'question_concept': 'revolving door',
 'choices': {'label': ['A', 'B', 'C', 'D', 'E'],
  'text': ['bank', 'library', 'department store', 'mall', 'new york']},
 'answerKey': 'A'}

In [28]:
csqa_knowledge_data = []

for item in tqdm(csqa_dataset):
    question = item["question"]
    question_concept = item["question_concept"]
    choices = item["choices"]["text"]

    conceptnet_map = {
        c: conceptnet_relations[c] for c in [question_concept] + choices
    }
    keywords = extract_keywords_spacy(question)          # extracted keywords

    # Build graph
    G = build_graph_from_conceptnet_dicts(conceptnet_map, add_reverse_edges=False)

    # create prompt/context
    knowledge_context = create_context_for_question(
        G=G,
        conceptnet_map=conceptnet_map,
        question=question,
        question_concept=question_concept,
        keywords=keywords,
        choices=choices,
        embedder=embedder,
        max_hops=2,
        top_k_paths_per_pair=40,
        top_k_triples_per_choice=12,
        facts_per_choice_limit=5
    )

    csqa_knowledge_data.append({
        "id": item["id"],
        "question": question,
        "concept": question_concept,
        "choices": item["choices"],
        "answerKey": item["answerKey"],
        "knowledge_context": knowledge_context
    })

100%|██████████| 1221/1221 [05:54<00:00,  3.44it/s]


In [30]:
import pickle

with open("data/csqa_validation_knowledge_context.pkl", "wb") as fp:
    pickle.dump(csqa_knowledge_data, fp)