In [2]:
from rdflib import URIRef

WD = rdflib.Namespace('http://www.wikidata.org/entity/')
WDT = rdflib.Namespace('http://www.wikidata.org/prop/direct/')
DDIS = rdflib.Namespace('http://ddis.ch/atai/')
RDFS = rdflib.namespace.RDFS
SCHEMA = rdflib.Namespace('http://schema.org/')

class NamespaceWrapper:
    WD = WD
    WDT = WDT
    DDIS = DDIS
    RDFS = RDFS
    SCHEMA = SCHEMA

    @staticmethod
    def uri_to_prefixed(uri):
        uri_ref = URIRef(uri)
        if str(uri_ref).startswith(str(WD)):
            return getattr(NamespaceWrapper, "WD")[str(uri_ref).replace(str(WD), '')]
        elif str(uri_ref).startswith(str(WDT)):
            return getattr(NamespaceWrapper, "WDT")[str(uri_ref).replace(str(WDT), '')]
        elif str(uri_ref).startswith(str(DDIS)):
            return getattr(NamespaceWrapper, "DDIS")[str(uri_ref).replace(str(DDIS), '')]
        elif str(uri_ref).startswith(str(RDFS)):
            return getattr(NamespaceWrapper, "RDFS")[str(uri_ref).replace(str(RDFS), '')]
        elif str(uri_ref).startswith(str(SCHEMA)):
            return getattr(NamespaceWrapper, "SCHEMA")[str(uri_ref).replace(str(SCHEMA), '')]
        else:
            return uri_ref

In [16]:
from transformers import BertTokenizer, BertModel
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import rdflib
import pandas as pd
from sklearn.metrics import pairwise_distances
import csv
import pickle
import os
from nltk.tokenize import word_tokenize
import rdflib
from rdflib import URIRef

# Define your dictionary
graph = rdflib.Graph()
graph.parse(r'C:\Users\dli0305\Downloads\ddis-movie-graph.nt\14_graph.nt', format='turtle')

# Initialize BERT tokenizer and model
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')

entity_emb = np.load(r'C:\Users\dli0305\Downloads\ddis-graph-embeddings\ddis-graph-embeddings\entity_embeds.npy')
relation_emb = np.load(r'C:\Users\dli0305\Downloads\ddis-graph-embeddings\ddis-graph-embeddings\relation_embeds.npy')

# load the dictionaries
with open(r'C:\Users\dli0305\Downloads\ddis-graph-embeddings\ddis-graph-embeddings\entity_ids.del', 'r') as ifile:
    ent2id = {rdflib.term.URIRef(ent): int(idx) for idx, ent in csv.reader(ifile, delimiter='\t')}
    id2ent = {v: k for k, v in ent2id.items()}
with open(r'C:\Users\dli0305\Downloads\ddis-graph-embeddings\ddis-graph-embeddings\relation_ids.del', 'r') as ifile:
    rel2id = {rdflib.term.URIRef(rel): int(idx) for idx, rel in csv.reader(ifile, delimiter='\t')}
    id2rel = {v: k for k, v in rel2id.items()}

ent2lbl = {ent: str(lbl) for ent, lbl in graph.subject_objects(NamespaceWrapper.RDFS.label)}
lbl2ent = {lbl: ent for ent, lbl in ent2lbl.items()}

# Define your dictionary
my_dict = lbl2ent

rel2name = {}
for _, pred, _ in graph:
    label = graph.value(pred, NamespaceWrapper.RDFS.label)
    if label:
        rel2name[pred] = str(label)
    else:
        rel2name[pred] = pred.split('/')[-1]

rel2name_str = {rdflib.term.URIRef(rel): name for rel, name in rel2name.items()}
name2rel = {name: rel for rel, name in rel2name_str.items()}

if os.path.exists(r"C:\Users\dli0305\Desktop\ATAIChatbot\ATAIChatbot\relationship_bert.pkl"):
    with open(r"C:\Users\dli0305\Desktop\ATAIChatbot\ATAIChatbot\relationship_bert.pkl", "rb") as f:
        relationship_bert = pickle.load(f)
    print("Loaded key_embeddings from file.")
else:
    relationship_bert = {key: get_embedding(key) for key in name2rel.keys()}
    with open(r"C:\Users\dli0305\Desktop\ATAIChatbot\ATAIChatbot\relationship_bert.pkl", "wb") as f:
        pickle.dump(relationship_bert, f)
    print("Computed and saved key_embeddings.")

if os.path.exists(r"C:\Users\dli0305\Desktop\ATAIChatbot\ATAIChatbot\key_embeddings.pkl"):
    # Load existing key_embeddings from the file
    with open(r"C:\Users\dli0305\Desktop\ATAIChatbot\ATAIChatbot\key_embeddings.pkl", "rb") as f:
        key_embeddings = pickle.load(f)
    print("Loaded key_embeddings from file.")
else:
    # Compute and save key_embeddings if it doesn't exist
    #key_embeddings = {key: get_embedding(key) for key in my_dict.keys()}
    #with open(r"C:\Users\dli0305\Desktop\ATAIChatbot\ATAIChatbot\key_embeddings.pkl", "wb") as f:
        #pickle.dump(key_embeddings, f)
    print("Computed and saved key_embeddings.")

# Helper function to get the BERT embedding for a phrase
def get_embedding(text):
    inputs = tokenizer(text, return_tensors='pt')
    outputs = model(**inputs)
    return outputs.last_hidden_state[:, 0, :].detach().numpy()

# Encode dictionary keys
#key_embeddings = {key: get_embedding(key) for key in my_dict.keys()}

# Function to identify entities using spaCy's NER and match with dictionary keys
def find_best_entity_matches(sentence, threshold=0.5):
    words = word_tokenize(sentence)
    
    # Extract potential movie titles by matching dictionary keys with sentence tokens
    found_titles = [title for title in key_embeddings.keys() if all(word in words for word in word_tokenize(title))]
    longest_title = max(found_titles, key=len) if found_titles else None
    
    # If we found potential titles, compute similarity
    matches = []
    if longest_title:
        entity_embedding = get_embedding(longest_title)
        similarities = {
            key: cosine_similarity(entity_embedding, emb.reshape(1, -1)).item()
            for key, emb in key_embeddings.items()
        }
        best_match_key = max(similarities, key=similarities.get)
        
        # Only consider matches above threshold
        if similarities[best_match_key] >= threshold:
            matches.append((longest_title, best_match_key, similarities[best_match_key], my_dict[best_match_key]))
    
    return matches

def find_best_relationship_matches(sentence, threshold=0.5):
    # Tokenize the text
    words = word_tokenize(sentence)
    
    # Extract potential movie titles by matching dictionary keys with sentence tokens
    found_titles = [title for title in relationship_bert.keys() if all(word in words for word in word_tokenize(title))]
    
    # If we found potential titles, compute similarity
    matches = []
    if found_titles:
        entity_embedding = get_embedding(found_titles)
        similarities = {
            key: cosine_similarity(entity_embedding, emb.reshape(1, -1)).item()
            for key, emb in relationship_bert.items()
        }
        best_match_key = max(similarities, key=similarities.get)
        
        # Only consider matches above threshold
        if similarities[best_match_key] >= threshold:
            matches.append((found_titles, best_match_key, similarities[best_match_key], name2rel[best_match_key]))
    
    return matches

def get_embedding_answer(sentence):
    try:
        entity_text, entity_matched, entity_similarity, entity_uri = find_best_entity_matches(sentence)[0]
        relationship_text, relationship_matched, relationship_similarity, relationship_uri = find_best_relationship_matches(sentence)[0]
    except IndexError:
        return "I regret to inform you that there is no answer available at this time."

    
    if entity_uri != None or relationship_uri != None:
        head = entity_emb[ent2id[NamespaceWrapper.uri_to_prefixed(entity_uri)]]
        pred = relation_emb[rel2id[NamespaceWrapper.uri_to_prefixed(relationship_uri)]]

        lhs = head + pred
        dist = pairwise_distances(lhs.reshape(1, -1), entity_emb).reshape(-1)
        most_likely = dist.argsort()
        entity_uri = id2ent[most_likely[0]]
        label = ent2lbl.get(entity_uri, "I regret to inform you that there is no answer available at this time.")
        return label


Loaded key_embeddings from file.
Loaded key_embeddings from file.


In [17]:
get_embedding_answer('When was "The Godfather" released? ')

'I regret to inform you that there is no answer available at this time.'

In [19]:
# Extract entity and relationship matches
entity_text, entity_matched, entity_similarity, entity_uri = find_best_entity_matches(
    'Who is the screenwriter of The Masked Gang: Cyprus? ')[0]
relationship_text, relationship_matched, relationship_similarity, relationship_uri = find_best_relationship_matches(
    'Who is the screenwriter of The Masked Gang: Cyprus? ')[0]

# Print debug information
print("Entity URI:", entity_uri)
print("Relationship URI:", relationship_uri)

# Generate SPARQL query
query = f"""
SELECT ?ans WHERE {{ 
    ?a rdfs:label <{NamespaceWrapper.uri_to_prefixed(entity_uri)}>.  
    ?a <{NamespaceWrapper.uri_to_prefixed(relationship_uri)}> ?b . 
    ?b rdfs:label ?ans . 
}}
LIMIT 1
"""

# Print query for debugging
print("SPARQL Query:")
print(query)

# Run the query
try:
    results = graph.query(query)
    # Return first answer or default response if none found
    found_answer = False
    for row in results:
        # Convert the answer URI to string and extract the final part
        answer = str(row[0]) if isinstance(row[0], URIRef) else row[0]
        print(f"I think it is {answer.split('/')[-1]}.")
        found_answer = True
    if not found_answer:
        print("No answer found.")
except Exception as e:
    print("Error running query:", e)


Entity URI: http://www.wikidata.org/entity/Q7750525
Relationship URI: http://www.wikidata.org/prop/direct/P58
SPARQL Query:

SELECT ?ans WHERE { 
    ?a rdfs:label <http://www.wikidata.org/entity/Q7750525>.  
    ?a <http://www.wikidata.org/prop/direct/P58> ?b . 
    ?b rdfs:label ?ans . 
}
LIMIT 1

No answer found.
