In [1]:
import spacy
import spacy.cli

spacy.cli.download("en_core_web_trf")

from neo4j import GraphDatabase
driver = GraphDatabase.driver("bolt://192.168.0.40:7687", auth=("neo4j", "neo4j"))
driver.verify_connectivity()

Collecting en-core-web-trf==3.7.3
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_trf-3.7.3/en_core_web_trf-3.7.3-py3-none-any.whl (457.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m457.4/457.4 MB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_trf')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


In [2]:
class EntityType:
    PLAYER = "Player"
    CLUB = "Club"
    LEAGUE = "League"

In [3]:
def get_items(name: str):
    with driver.session() as session:
        items = list()
        result = driver.execute_query(f"match (n:{name}) return n.name")
        for r in result.records:
            if r.get("n.name") is not None:
                items.append(r.get("n.name"))
    
    return items

player_list = get_items(EntityType.PLAYER)
club_list = get_items(EntityType.CLUB)
league_list = get_items(EntityType.LEAGUE)

In [37]:
import difflib
import numpy as np
processor = spacy.load("en_core_web_trf")

def get_most_similar_item(item: str, items: list):
    most_similar = difflib.get_close_matches(item, items, n=1, cutoff=0)
    if len(most_similar) == 0:
        return None
    return most_similar[0]

def rank_similarity(item: str, club_match: str | None, league_match: str | None):
    club_score = difflib.SequenceMatcher(None, item, club_match).ratio() if club_match is not None else 0
    league_score = difflib.SequenceMatcher(None, item, league_match).ratio() if league_match is not None else 0
    max_type = np.argmax([club_score, league_score])
    if max_type == 0:
        return EntityType.CLUB, club_match
    else:
        return EntityType.LEAGUE, league_match


def named_entity_recognize(document):
    return_dict = dict()
    text_db_mapping = dict()
    for ent in processor(document).ents:
        entity_text = ent.text
        entity_type = ent.label_

        if entity_type == "PERSON":
            entity_type = EntityType.PLAYER
            entity_db = get_most_similar_item(entity_text, player_list)
        else:
            club = get_most_similar_item(entity_text, club_list)
            league = get_most_similar_item(entity_text, league_list)
            entity_type, entity_db = rank_similarity(entity_text, club, league)
        
        return_dict[entity_db] = entity_type
        text_db_mapping[entity_text] = entity_db

    # replace the entity in the document with the entity_db
    for entity_orig, entity_db in text_db_mapping.items():
        if entity_db is not None:
            document = document.replace(entity_orig, entity_db)
    return return_dict, document


In [35]:
# Input here
input_sentence = "Name a player who has played for Napoli, PSG, and Chelsea."

In [38]:
# identify all the name entity
entities, replaced_input_sentence = named_entity_recognize(input_sentence)

# count the number of entities for each type
entity_count = {
    EntityType.PLAYER: 0,
    EntityType.CLUB: 0,
    EntityType.LEAGUE: 0
}
for key, entity in entities.items():
    if entity is not None:
        entity_count[entity] += 1
    else:
        # returns that the entity is currently not in database
        raise KeyError(f"Entity {key} not found in database", key)

print(replaced_input_sentence)
print(entities)

Name a player who has played for Napoli, PSG, and Chelsea.
Name a player who has played for S.S.C. Napoli, Sporting Gijón, and Chelsea F.C..
{'S.S.C. Napoli': 'Club', 'Sporting Gijón': 'Club', 'Chelsea F.C.': 'Club'}


In [85]:
# get templates
from pathlib import Path
import json
entity_count_template_path = Path("templates/entity_count.json")
cypher_template_path = Path("templates/cypher_template.json")
sentence_template_path = Path("templates/sentence_template.json")

entity_count_template = json.load(entity_count_template_path.open("r"))
cypher_template = json.load(cypher_template_path.open("r"))
sentence_template = json.load(sentence_template_path.open("r"))

assert len(entity_count_template) == len(cypher_template) == len(sentence_template)

In [86]:
# find the template that matches the entity count
template_ids = list()
for key, value in entity_count_template.items():
    if value == entity_count:
        template_ids.append(key)

if len(template_ids) == 0:
    # fall back to chatgpt or something
    raise ValueError("No template found for the given entity count.")
print(template_ids)

['2', '3']


In [87]:
import re
import json

def extract_tag(sentence: str):
    pattern = r'(<\w+_\w+>)'
    return re.findall(pattern, sentence)

def dequeue_entity(entity_type: str, entities_copy: dict):
    for e_name, e_type in entities_copy.items():
        if str(e_type).lower().strip() == entity_type.lower().strip():
            entities_copy.pop(e_name)
            return e_name
    else:
        print(entity_type)
        raise ValueError(f"No entity found for the given entity type {entity_type}")

# generate the sentences for the given template id
potential_sentences = dict()
sentence_tag_values = dict()

for tid in template_ids:
    sentences: list = sentence_template[tid]
    if not isinstance(sentences, list) or len(sentences) == 0:
        raise ValueError("No sentence found for the given template id")
    
    for s in sentences:
        tags = extract_tag(s)
        inserted_sentence = s
        entities_copy = entities.copy()
        tag_values = dict()
        for tag in tags:
            entity_type, _ = tag[1:-1].split("_")
            entity = dequeue_entity(entity_type, entities_copy)
            inserted_sentence = inserted_sentence.replace(tag, entity)
            tag_values[tag] = entity
        
        sentence_tag_values[inserted_sentence] = tag_values
        potential_sentences[inserted_sentence] = tid

print(json.dumps(potential_sentences, indent=4))
print(json.dumps(sentence_tag_values, indent=4))

{
    "Who are the players that have played for AC Monza?": "2",
    "Can you name everyone who's played for AC Monza?": "2",
    "Which players have had stints at AC Monza?": "2",
    "What is the roster history of AC Monza?": "2",
    "Who has AC Monza fielded over the years?": "2",
    "Can you list all individuals that have been part of AC Monza?": "2",
    "What players have donned AC Monza's jersey?": "2",
    "Who are all the athletes that played for AC Monza?": "2",
    "Which footballers have been associated with AC Monza?": "2",
    "What is the complete list of players for AC Monza?": "2",
    "Who has appeared in matches for AC Monza?": "2",
    "What league does AC Monza belong to?": "3",
    "Which league is AC Monza a member of?": "3",
    "In what league does AC Monza compete?": "3",
    "What is the competitive tier of AC Monza?": "3",
    "Can you specify AC Monza's league affiliation?": "3",
    "Which football league includes AC Monza?": "3",
    "What division does

In [94]:
from sentence_transformers import SentenceTransformer, util
import numpy as np

model = SentenceTransformer("all-MiniLM-L6-v2")

# now find the most similar sentence
s = list(potential_sentences.keys())

embeddings = model.encode(s, convert_to_tensor=True)
input_embedding = model.encode([replaced_input_sentence], convert_to_tensor=True)

cosine_scores = util.pytorch_cos_sim(input_embedding, embeddings)[0].cpu()

# find the index of the highest scored sentence
highest_score_index = np.argmax(cosine_scores)
# find the highest score, and see if it exceed threadhold
highest_score = cosine_scores[highest_score_index]
if highest_score < 0.5:
    raise ValueError("No template exists for the given input sentence")
else:
    print(f"Similarity Score: {highest_score}")

# find the template id and tags
template_id = potential_sentences[s[highest_score_index]]
tags = sentence_tag_values[s[highest_score_index]]
print(f"Query sentence: {s[highest_score_index]}")
print(f"Template id: {template_id}")

Similarity Score: 0.9308320879936218
Query sentence: Who are the players that have played for AC Monza?
Template id: 2


In [95]:
# obtain the cypher query with appropriate entities
cypher_query = cypher_template[str(template_id)]
for tag, entity in tags.items():
    cypher_query = cypher_query.replace(tag, entity)

print(cypher_query)

MATCH (p:Player)-[:PLAYED_FOR]->(c:Club {name: 'AC Monza'}) RETURN p.name


In [96]:
result = driver.execute_query(cypher_query)

print("Results:")

# count number of digits in the length of the result
justify_length = len(str(len(result.records)))

for number, record in enumerate(result.records):
    name = record.values()[0]
    print(f"{(number + 1):>{justify_length}}: {name}")

Results:
  1: Patrice Evra
  2: Marco Baroni
  3: Alessandro Di Munno
  4: Nicola Rauti
  5: Andrea Colpani
  6: Giacomo Tomaselli
  7: Cidimar Aparecido Ernegas
  8: Hervé Otelé
  9: Vittorio Panucci
 10: Giorgio Galli
 11: Luca Guidetti
 12: Lorenzo Carissoni
 13: Ruggero Riva
 14: Marco Perini
 15: Alberto Tentardini
 16: Pietro Cogliati
 17: Federico Del Frate
 18: Marco Gasparri
 19: Luca Palesi
 20: Andrea D'Errico
 21: Luca Giudici
 22: Luca Liverani
 23: Stefano Negro
 24: Giuseppe Ponsat
 25: Manuel Di Paola
 26: Marco Costa
 27: Tommaso Cazzaniga
 28: Nicholas Battaiola
 29: Vincenzo Vivarini
 30: Ernesto Corno
 31: Dany Mota
 32: Giulio Sironi
 33: Maximiliano Uggè
 34: Angelo Piffarerio
 35: Bruno Petrachi
 36: Matteo Pessina
 37: Francesco Margiotta
 38: Andrea Beduschi
 39: Franco Cacciatori
 40: Enrico Perego
 41: Francesco Alberti
 42: Alessio Vita
 43: Stefano Marra
 44: Virginio Costa
 45: Angelo Ferraris
 46: Ernesto Torregrossa
 47: Mario Riva
 48: Carlo Villa
 49: 