In [15]:
from openai import OpenAI
from random import randint, choice
from os import environ
from pathlib import Path
from json import loads, dumps
environ["OPENAI_API_KEY"] = Path("~/.openaiapikey").expanduser().read_text().strip()

openaiClient = OpenAI()
def gpt_3_5_turbo_completion(query):
    answer = openaiClient.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {
                "role": "system",
                "content": query
            }
        ],
        seed = randint(0, 1000000)
    )
    return answer.choices[0].message.content

def gpt_4_turbo_completion(query):
    answer = openaiClient.chat.completions.create(
        model="gpt-4-turbo",
        messages=[
            {
                "role": "system",
                "content": query
            }
        ],
        seed = randint(0, 1000000)
    )
    return answer.choices[0].message.content

def tryRecieveAnswer(query, completionFunction = gpt_4_turbo_completion, answerConversion = lambda x: x, maxTries = 10):
    tryNumber = 0
    while tryNumber < maxTries:
        answer = completionFunction(query)
        try:
            answer = answerConversion(answer)
            return (answer, True)
        except:
            pass
        tryNumber += 1
    print(f"Failed to recieve answer for query: {query}. Last answer: {answer}")
    return (None, False)

def listAnswerConversion(answer):
    result = loads(answer)
    assert isinstance(result, list)
    for item in result:
        assert isinstance(item, str)
    return result

In [9]:
from rdflib import Graph, URIRef, Literal, Namespace, term

In [7]:
from pathlib import Path
datapath = Path("../master-database-files/master-experimental/generated_ontology/")
assert datapath.exists()

In [11]:
def getAllClasses(graph):
    qres = graph.query(
        """SELECT ?s
           WHERE {
              ?s a owl:Class
              }""")
    classes = []
    for row in qres:
        if type(row.s) == term.BNode:
            continue
        classes.append(str(row.s))
    return classes

def getAllPropertiesBetweenClasses(graph):
    qres = graph.query(
        """SELECT ?subject ?property ?object
           WHERE {
                ?property a owl:ObjectProperty.
                ?property rdfs:domain ?subject.
                ?property rdfs:range ?object.
              }""")
    properties = []
    for row in qres:
        properties.append((str(row.subject), str(row.property), str(row.object)))
    return properties

def getAllSubclassRelationsBetweenClasses(graph):
    qres = graph.query(
        """SELECT ?subclass ?superclass
           WHERE {
                ?subclass rdfs:subClassOf ?superclass.
              }""")
    subclassRelations = []
    for row in qres:
        subclassRelations.append((str(row.subclass), str(row.superclass)))
    return subclassRelations

def getAllNamedIndividualsOfClass(graph, className):
    qres = graph.query(
        """SELECT ?s
           WHERE {
              ?s a <""" + className + """>
              }""")
    namedIndividuals = []
    for row in qres:
        if type(row.s) == term.BNode:
            continue
        namedIndividuals.append(str(row.s))
    return namedIndividuals

def getAllNamedIndividuals(graph):
    qres = graph.query(
        """SELECT ?s
           WHERE {
              ?s a owl:NamedIndividual
              }""")
    namedIndividuals = []
    for row in qres:
        if type(row.s) == term.BNode:
            continue
        namedIndividuals.append(str(row.s))
    return namedIndividuals

def getAllConnectionsBetweenNamedIndividuals(graph):
    qres = graph.query(
        """SELECT ?subject ?predicate ?object
           WHERE {
                ?subject ?predicate ?object.
                ?subject a owl:NamedIndividual.
                ?object a owl:NamedIndividual.
              }""")
    connections = []
    for row in qres:
        connections.append((str(row.subject), str(row.predicate), str(row.object)))
    return connections

def getNodeName(graph, iri):
    return iri.replace("/", "#").split("#")[-1]

In [1]:
def addNamedIndividualsForClass(graph, classURI, numberOfIndividuals = 5, baseURI = "http:quantsimulant.de/rdf/auto-generated/"):
    alreadyExistingIndividuals = getAllNamedIndividualsOfClass(graph, classURI)
    namesOfExistingIndividuals = [getNodeName(graph, iri) for iri in alreadyExistingIndividuals]
    query = f'''
I want to build a knowledge graph, that is based on an ontology.
Therefor I want to collect instances of the class {getNodeName(graph, classURI)}.
{"" if len(namesOfExistingIndividuals) == 0 else f" The following instances already exist: {', '.join(namesOfExistingIndividuals)}."}
I want to add {numberOfIndividuals} new instances of this class.
The instances should have unique names that are written in camel case.
Return the names of the new instances in the format ["first instance name", "second instance name", ...].
Return nothing but this list.
'''
    answer, success = tryRecieveAnswer(query, answerConversion = listAnswerConversion)
    if not success:
        return False
    for name in answer:
        if name in namesOfExistingIndividuals:
            print(f"Name {name} already exists for class {getNodeName(graph, classURI)}")
            continue
        graph.add((URIRef(baseURI + name), URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), URIRef(classURI)))
        graph.add((URIRef(baseURI + name), URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), URIRef("http://www.w3.org/2002/07/owl#NamedIndividual")))
    return True

In [3]:
def addNamedIndividualsToAllClasses(graph, numberOfIndividuals = 5, baseURI = "http:quantsimulant.de/rdf/auto-generated/"):
    classes = getAllClasses(graph)
    for classURI in classes:
        addNamedIndividualsForClass(graph, classURI, numberOfIndividuals, baseURI)

In [16]:
def addConnectionsBetweenIndividuals(graph, propertyURI, baseURI = "http:quantsimulant.de/rdf/auto-generated/"):
    # Get the domain and range of the property
    qres = graph.query(
        f"""SELECT ?subject ?object
           WHERE {{
                <{propertyURI}> rdfs:domain ?subject.
                <{propertyURI}> rdfs:range ?object.
              }}""")
    for row in qres:
        domain = str(row.subject)
        range = str(row.object)
    # Get all named individuals of the domain class
    domainIndividuals = getAllNamedIndividualsOfClass(graph, domain)
    # Get all named individuals of the range class
    rangeIndividuals = getAllNamedIndividualsOfClass(graph, range)

    query = f'''
I want to build a knowledge graph, that is based on an ontology.
Therefor I want to connect instances of the classes {getNodeName(graph, domain)} and {getNodeName(graph, range)} with the relation {getNodeName(graph, propertyURI)}.
The individuals of the domain class {getNodeName(graph, domain)} are {', '.join([getNodeName(graph, iri) for iri in domainIndividuals])}.
The individuals of the range class {getNodeName(graph, range)} are {', '.join([getNodeName(graph, iri) for iri in rangeIndividuals])}.
Which of these individuals should be connected with each other using the relation {getNodeName(graph, propertyURI)}?
Return the connections in the format [["first subject name", "first object name"], ["second subject name", "second object name"], ...].
If non of the listed instances should be connected, you can introduce new instance names to connect them with the already existing ones.
They should be written in camel case.
Return all possible connections.
Return nothing but this list.
'''
    def answerConversion(answer):
        result = loads(answer)
        assert isinstance(result, list)
        for item in result:
            assert isinstance(item, list)
            assert len(item) == 2
            assert isinstance(item[0], str)
            assert isinstance(item[1], str)
        return result
    answer, success = tryRecieveAnswer(query, answerConversion = answerConversion)
    if not success:
        return False
    for connection in answer:
        subjectName = connection[0]
        objectName = connection[1]
        subjectURI = baseURI + subjectName
        objectURI = baseURI + objectName
        if subjectURI not in domainIndividuals:
            graph.add((URIRef(subjectURI), URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), URIRef(domain)))
            graph.add((URIRef(subjectURI), URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), URIRef("http://www.w3.org/2002/07/owl#NamedIndividual")))
            domainIndividuals.append(subjectURI)
        if objectURI not in rangeIndividuals:
            graph.add((URIRef(objectURI), URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), URIRef(range)))
            graph.add((URIRef(objectURI), URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), URIRef("http://www.w3.org/2002/07/owl#NamedIndividual")))
            rangeIndividuals.append(objectURI)
        graph.add((URIRef(subjectURI), URIRef(propertyURI), URIRef(objectURI)))

In [17]:
def addConnectionsBetweenAllIndividuals(graph, baseURI = "http:quantsimulant.de/rdf/auto-generated/"):
    properties = getAllPropertiesBetweenClasses(graph)
    for property in properties:
        addConnectionsBetweenIndividuals(graph, property[1], baseURI)

In [21]:
def createKnowledgeGraphBasedOnOntology(ontologyPath, knowledgeGraphPath = None, baseURI = "http:quantsimulant.de/rdf/auto-generated/"):
    if knowledgeGraphPath is None:
        knowledgeGraphPath = Path(ontologyPath).parent / "knowledge_graph.ttl"
    graph = Graph()
    graph.bind("", Namespace(baseURI))
    graph.parse(str(ontologyPath), format="turtle")
    if Path(knowledgeGraphPath).exists():
        graph.parse(str(knowledgeGraphPath), format="turtle")
    addNamedIndividualsToAllClasses(graph)
    addConnectionsBetweenAllIndividuals(graph)
    graph.serialize(str(knowledgeGraphPath), format="turtle")
    return graph

In [22]:
createKnowledgeGraphBasedOnOntology(datapath / "researchAreaSeeded"/ "ontology.ttl")

<Graph identifier=N6be9edc292c14d37afa3785a699d20f6 (<class 'rdflib.graph.Graph'>)>