<a href="https://colab.research.google.com/github/asantos2000/master-degree-santos-anderson/blob/main/code/src/chap_6_nlp2sbvr_elements_association_creation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# nlp2sbvr - elements association and creation

Chapter 6. Ferramentas de suporte
- Section 6.2 Implementação dos principais componentes
  - Section 6.2.4 nlp2sbvr
    - Section Algoritmo "elements association and creation"
    - Section Algoritmo "define vocabular namespace"
    - Section Algoritmo "similarity search"

> Use this version if the best scored elements will be inserted into KG, for the insertion of the last checkpoint use the "chap_6_nlp2sbvr_elements_association_creation.ipynb" instead. DO NOT USE BOTH.

## Google colab

> Before run set your keys in `/content/.env` file and any preferences in `/content/config.yaml`, use the `.env.example` and `config.colab.yaml` of git repo as examples.

In [1]:
%load_ext autoreload
%autoreload 2

import sys

IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
  from google.colab import drive
  drive.mount('/content/drive')
  !rm -rf cfr2sbvr configuration checkpoint
  !git clone https://github.com/asantos2000/master-degree-santos-anderson.git cfr2sbvr
  %pip install -r cfr2sbvr/code/requirements.txt
  !cp -r cfr2sbvr/code/src/configuration .
  !cp -r cfr2sbvr/code/src/checkpoint .
  !cp -r cfr2sbvr/code/config.colab.yaml config.yaml
  DEFAULT_CONFIG_FILE="config.yaml"
else:
  DEFAULT_CONFIG_FILE="../config.yaml"

## Imports

In [2]:
# Standard library imports
import re
from decimal import Decimal
from datetime import datetime
import os

# Third-party libraries
from pydantic import BaseModel
from typing import List, Dict, Optional, Any, Tuple
import spacy
from slugify import slugify

# Franz AllegroGraph (AG) imports
from franz.openrdf.connect import ag_connect
from franz.openrdf.repository.repository import RepositoryConnection
from franz.openrdf.query.query import QueryLanguage

# Database
import duckdb

# Local application/library-specific imports
import configuration.main as configuration
import logging_setup.main as logging_setup

DEV_MODE = True

if DEV_MODE:
    # Development mode
    import importlib

    importlib.reload(configuration)
    importlib.reload(logging_setup)

## Settings

Default settings, check them before run the notebook.

### Get configuration

In [50]:
# load config
config = configuration.load_config(DEFAULT_CONFIG_FILE)

### Logging configuration

In [51]:
logger = logging_setup.setting_logging(config["DEFAULT_LOG_DIR"], config["LOG_LEVEL"])

2025-02-10 23:42:45 - INFO - Logging is set up with daily rotation.


## General functions and data structures

### Database

In [52]:
def load_data(conn, table, checkpoints, doc_ids, statement_sources, process_selected):
    where_clause = ""
    if checkpoints:
        checkpoints_string = ", ".join(f"'{item}'" for item in checkpoints)
        where_clause += f" AND checkpoint in ({checkpoints_string})"

    if doc_ids:
        doc_ids_string = ", ".join(f"'{item}'" for item in doc_ids)
        where_clause += f" AND doc_id in ({doc_ids_string})"

    if statement_sources:
        statement_sources_string = ", ".join(f"'{item}'" for item in statement_sources)
        where_clause += (
            f" AND list_has_any([{statement_sources_string}], statement_sources)"
        )

    data_query = f"""
    SELECT *
    FROM {table}
    WHERE 1 = 1
    {where_clause}
    ORDER BY *
    ;
    """

    logger.debug(data_query)
    df = conn.sql(query=data_query).fetchdf()
    return df

In [53]:
def db_connection(db_name, default_data_dir="data"):
    # Connect to the database
    if db_name.startswith("md:"):
        mother_duck_token = os.getenv("MOTHER_DUCK_TOKEN")
        conn = duckdb.connect(
            f"{db_name}?motherduck_token={mother_duck_token}", read_only=True
        )
    else:
        conn = duckdb.connect(f"{default_data_dir}/{db_name}", read_only=True)

    return conn, db_name

### Utils

In [54]:
def now_as_xsd_dateTime():
    # Get the current datetime in UTC
    current_time = datetime.utcnow().isoformat()

    # Remove microseconds for compliance
    if '.' in current_time:
        current_time = current_time.split('.')[0]

    # Append the UTC timezone indicator
    current_time += 'Z'

    return current_time

In [55]:
def remove_section_symbol(input_string: str) -> str:
    """
    Removes the '§' symbol from the input string and trims whitespace.

    Args:
        input_string (str): The string from which to remove the '§' symbol.

    Returns:
        str: The cleaned string without the '§' symbol and leading/trailing whitespace.

    Raises:
        TypeError: If 'input_string' is not a string.
    """
    if not isinstance(input_string, str):
        raise TypeError("input_string must be a string")
    return input_string.replace("§", "").strip()

In [56]:
def signifier_sources(sources: list) -> list:
    """
    Extract desgnations sources

    Args:
        sources (list): List of sources

    Returns:
        list: List of sources
    """
    # Extract desgnations sources
    sources_lst = []
    for source in sources:
        source_section = str(source.get("section"))
        source_paragraph = str(source.get("paragraph"))
        sources_lst.append(source_section + source_paragraph)
    return sources_lst

In [57]:
def normalize_ns_string(input_string: str) -> str:
    """
    Transform the input string to title case, which capitalizes the first letter of each word.

    Args:
        input_string (str): The string to normalize.

    Returns:
        normalized_string (str): The normalized string.
    """
    normalized_string = remove_section_symbol(input_string)

    # Remove all spaces, change points and hyphens to underscores
    return normalized_string.replace(" ", "").replace("-", "_").replace(".", "_")

In [58]:
def get_metadata_cfr2sbvr(element):
    return {
        "extract_original_statement":element.get('statement_text', 'missing'),
        "transformation_semscore": element.get("semscore", 0),
        "transformation_similarity_score":element.get("similarity_score", 0),
        "transformation_similarity_score_confidence":element.get("similarity_score_confidence", 0),
        "transformation_accuracy":element.get("transformation_accuracy", 0),
        "transformation_grammar_syntax_accuracy":element.get("grammar_syntax_accuracy", 0),
        "transformation_findings":element.get("findings", []),
        # from classification
        "classification_type":element.get("statement_classification_type", 'missing'),
        "classification_subtype":element.get("statement_classification_subtype", 'missing'),
        "classification_type_confidence":element.get("statement_classification_type_confidence", 0),
        "classification_type_explanation":element.get("statement_classification_type_explanation", 'not available'),
        "classification_subtype_confidence":element.get("statement_classification_subtype_confidence", 0),
        "classification_subtype_explanation":element.get("statement_classification_subtype_explanation", 'not available'),
        "classification_templates_ids":element.get("transformation_template_ids", [])
    }

### KG functions

In [59]:
def upsert_section_to_kg(conn: Any,
                   section_chapter: str,
                   section_part: str,
                   section_title: str,
                   section_id: str,
                   section_content: str) -> bool:
    """
    Upserts a section into the Knowledge Graph.

    Args:
        conn (RepositoryConnection): The connection to the Knowledge Graph.
        section_chapter (str): The chapter of the section.
        section_part (str): The part of the section.
        section_title (str): The title of the section.
        section_id (str): The ID of the section.
        section_content (str): The content of the section.

    Returns:
        bool: True if the section was upserted successfully, False otherwise.
    """
    # Upsert section into Knowledge Graph
    normalized_id = normalize_ns_string(section_id)
    query = f"""
PREFIX sbvr: <https://www.omg.org/spec/SBVR/20190601#>
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX cfr-sbvr: <http://cfr2sbvr.com/cfr#>

WITH cfr-sbvr:CFR_SBVR_Graph
DELETE {{
    cfr-sbvr:{normalized_id} ?p ?o .
}}
INSERT {{
    cfr-sbvr:{normalized_id} a cfr-sbvr:CFRSession ;
        cfr-sbvr:cfrId "{section_id}" ;
        cfr-sbvr:cfrChapter "{section_chapter}" ;
        cfr-sbvr:cfrPart "{section_part}" ;
        cfr-sbvr:cfrTitle "{section_title}" ;
        cfr-sbvr:cfrText \"""{section_content}\""" 
.
}}
WHERE {{
    # Match all existing triples
    OPTIONAL {{ cfr-sbvr:{normalized_id} ?p ?o . }}
}}
    """
    
    logger.debug(f"{query=}")

    try:
        conn.prepareUpdate(QueryLanguage.SPARQL, query).evaluate()
        logger.info(f"Section '{normalized_id}' upserted successfully.")
        return True
    except Exception as e:
        logger.error(f"Failed to upsert section {normalized_id}: {e}")
        return False


In [60]:
def get_section_from_kg(conn: Any, section_id: str) -> Any:
    """
    Retrieves a section from the Knowledge Graph based on the section number.

    Args:
        conn (RepositoryConnection): The connection to the Knowledge Graph.
        section_id (str): The section number.
      
    Returns:
        dict: The section content.
    """
    # Query section number from KG

    normalized_id = normalize_ns_string(section_id)
    
    query = f"""
PREFIX cfr-sbvr: <http://cfr2sbvr.com/cfr#>

SELECT ?p ?o
WHERE {{
    cfr-sbvr:{normalized_id} ?p ?o .
}}
    """
    tuple_query = conn.prepareTupleQuery(QueryLanguage.SPARQL, query)
    result = tuple_query.evaluate()

    logger.info(f"result.metadata: {result.metadata}")
    logger.info(f"result.variable_names: {result.variable_names}")

    section_text = ""
    section_part = ""
    section_chapter = ""
    section_title = ""
    section_id = ""

    with result:
        for binding_set in result:
            if str(binding_set.getValue("p")) == '<http://cfr2sbvr.com/cfr#cfrText>':
                section_text = str(binding_set.getValue("o")).replace('"', '')
            if str(binding_set.getValue("p")).replace('"', '') == '<http://cfr2sbvr.com/cfr#cfrPart>':
                section_part = str(binding_set.getValue("o")).replace('"', '')
            if str(binding_set.getValue("p")) == '<http://cfr2sbvr.com/cfr#cfrChapter>':
                section_chapter = str(binding_set.getValue("o")).replace('"', '')
            if str(binding_set.getValue("p")) == '<http://cfr2sbvr.com/cfr#cfrTitle>':
                section_title = str(binding_set.getValue("o")).replace('"', '')
            if str(binding_set.getValue("p")).replace('"', '') == '<http://cfr2sbvr.com/cfr#cfrId>':
                section_id = str(binding_set.getValue("o")).replace('"', '')
        res_dict = {
            "section_text": section_text.replace('\\n', '\n').replace('\\t', '\t'),
            "section_part": section_part,
            "section_chapter": section_chapter,
            "section_title": section_title,
            "section_id": section_id
        } 

        return res_dict

In [61]:
def transform_to_rdf_subject(input_string: str) -> str:
    """
    Transform the input string to a valid RDF subject by converting it to camel case
    and replacing invalid characters.

    Args:
        input_string (str): The string to transform.

    Returns:
        rdf_subject (str): The transformed RDF subject name.
    """
    # Convert to title case (camel case)
    camel_case_string = ''.join(word.capitalize() for word in input_string.split())
    # Replace invalid characters (retain only alphanumeric and underscore)
    rdf_subject = re.sub(r'[^a-zA-Z0-9_]', '', camel_case_string)

    return rdf_subject

In [62]:
class Term(BaseModel):
    term: str
    classification: Optional[str]  # Allows additional information about the term

class RuleAndFact(BaseModel):
    statement_id: str
    statement: str
    concept_type: str  # Maps to "element_name" in the structure
    terms: Optional[List[Term]]  # Supports nested terms structure
    verb_symbols: Optional[List[str]]  # Supports verb symbols as a list of strings
    vocabulary_namespace: str  # Maps to a constant or inferred namespace
    sources: Optional[List[str]]
    doc_id: Optional[str]
    metadata_cfr2sbvr: Optional[Dict[str, Any]]

def upsert_rule_and_fact_query(rule_fact_model: RuleAndFact) -> str:
    statement = rule_fact_model.statement
    designation_class = rule_fact_model.statement_id
    concept_type = rule_fact_model.concept_type
    vocabulary_namespace = rule_fact_model.vocabulary_namespace
    doc_id = rule_fact_model.doc_id
    metadata_cfr2sbvr = rule_fact_model.metadata_cfr2sbvr

    if concept_type == "Fact":
        designation_type = "DefinitionalRule"
    else:
        designation_type = "BehavioralBusinessRule"

    # Construct surces triple if sources is provided
    sources_triples = ""
    if rule_fact_model.sources:
        for source in rule_fact_model.sources:
            sources_triples += f'sbvr:referenceSupportsMeaning "{doc_id}{source}" ;\n'

    # Construct exactMatch triple if exactMatch is provided
    terms_triples = ""
    if rule_fact_model.terms:
        for term in rule_fact_model.terms:
            terms_triples += f"cfr-sbvr:hasTerm cfr-sbvr:{transform_to_rdf_subject(term.term)} ;\n"

    # Construct surces triple if sources is provided
    verb_symbols_triples = ""
    if rule_fact_model.verb_symbols:
        for verb_symbol in rule_fact_model.verb_symbols:
            verb_symbols_triples += f'cfr-sbvr:hasVerbSymbol cfr-sbvr:{transform_to_rdf_subject(verb_symbol)} ;\n'

    # Construct metadata triples if metadata_cfr2sbvr is provided
    logger.debug(f'{metadata_cfr2sbvr.get("classification_classification_confidence")=}')
    
    # Olny rules pass for two types of classification, facts are DefinitinalRules per definition
    type_classification = ""
    if concept_type == "Rule":
        type_classification = f"""
            cfr-sbvr:classificationTypeConfidence {metadata_cfr2sbvr.get("classification_type_confidence")} ;
            cfr-sbvr:classificationTypeExplanation "{metadata_cfr2sbvr.get("classification_type_explanation")}" ;
        """
    if statement != "missing":
        finding_triples = ""
        findigs = metadata_cfr2sbvr.get("transformation_findings")
        if findigs is not None and len(findigs) > 0:
            for find in findigs:
                finding_triples += f'cfr-sbvr:transformationFinding "{find}" ;\n'

        templates_triples = ""
        templates = metadata_cfr2sbvr.get("classification_templates_ids")
        if templates is not None and len(templates) > 0:
            for template in templates:
                templates_triples += f'cfr-sbvr:classificationTemplatesId "{template}" ;\n'

    metadata = f"""
        cfr-sbvr:extractOriginalStatement "{metadata_cfr2sbvr.get("extract_original_statement")}" ;
        cfr-sbvr:transformationSemscore {metadata_cfr2sbvr.get("transformation_semscore")} ;
        cfr-sbvr:transformationSimilarityScore {metadata_cfr2sbvr.get("transformation_similarity_score")} ;
        cfr-sbvr:transformationSimilarityScoreConfidence {metadata_cfr2sbvr.get("transformation_similarity_score_confidence")} ;
        cfr-sbvr:transformationAccuracy {metadata_cfr2sbvr.get("transformation_accuracy")} ;
        cfr-sbvr:transformationGrammarSyntaxAccuracy {metadata_cfr2sbvr.get("transformation_grammar_syntax_accuracy")} ;
        cfr-sbvr:classificationType "{metadata_cfr2sbvr.get("classification_type")}" ;
        cfr-sbvr:classificationSubtype "{metadata_cfr2sbvr.get("classification_subtype")}" ;
        {type_classification}
        {finding_triples}
        {templates_triples}
        cfr-sbvr:classificationSubtypeConfidence {metadata_cfr2sbvr.get("classification_subtype_confidence")} ;
        cfr-sbvr:classificationSubtypeExplanation "{metadata_cfr2sbvr.get("classification_subtype_explanation")}" ;
    """

    upsert_query = f"""
PREFIX sbvr: <https://www.omg.org/spec/SBVR/20190601#>
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX cfr-sbvr: <http://cfr2sbvr.com/cfr#>
PREFIX fro-cfr: <http://finregont.com/fro/cfr/Code_Federal_Regulations.ttl#>

WITH cfr-sbvr:CFR_SBVR
DELETE {{
    cfr-sbvr:{designation_class} ?p ?o .
}}
INSERT {{
    cfr-sbvr:{designation_class} a sbvr:{concept_type},
            sbvr:{designation_type} ;
        {terms_triples}
        {verb_symbols_triples}
        {sources_triples}
        sbvr:isImplicitlyUnderstood "false"^^xsd:boolean ;
        sbvr:statement "{statement}" ;
        sbvr:designationIsInNamespace {vocabulary_namespace} ;
        {metadata} 
        cfr-sbvr:createDate "{now_as_xsd_dateTime()}"^^xsd:dateTime .
}}
WHERE {{
    # Match all existing triples related to {designation_class}
    OPTIONAL {{ cfr-sbvr:{designation_class} ?p ?o . }}
}}
    """

    return upsert_query

Class to represent a verb symbol, terms, and names.

In [63]:
class Designation(BaseModel):
    signifier: str
    statement: str
    concept_type: str
    closeMatch: Optional[List[str]]
    exactMatch: Optional[List[str]]
    vocabulary_namespace: str
    sources: Optional[List[str]]
    doc_id: Optional[str]
    metadata_cfr2sbvr: Optional[Dict[str, Any]]

In [64]:
def upsert_verb_symbol_query(designation: Designation) -> str:
    signifier = designation.signifier
    transformed_statement = designation.statement
    statement = "missing"

    concept_type = designation.concept_type  # sbvr:VerbConcept
    vocabulary_namespace = designation.vocabulary_namespace
    doc_id = designation.doc_id
    metadata_cfr2sbvr = designation.metadata_cfr2sbvr

    designation_class = transform_to_rdf_subject(
        f"{signifier}-{remove_section_symbol(doc_id)}"
    )

    # Construct surces triple if sources is provided
    sources_triples = ""
    if designation.sources:
        for source in designation.sources:
            sources_triples += f'sbvr:referenceSupportsMeaning "{doc_id}{source}" ;\n'

    # Construct metadata triples if metadata_cfr2sbvr is provided

    metadata = f"""
        cfr-sbvr:extractOriginalStatement "{metadata_cfr2sbvr.get("extract_original_statement")}" ;
        cfr-sbvr:transformedStatement "{transformed_statement}" ;
        cfr-sbvr:createDate "{now_as_xsd_dateTime()}"^^xsd:dateTime ;
    """

    query = f"""
PREFIX sbvr: <https://www.omg.org/spec/SBVR/20190601#>
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX cfr-sbvr: <http://cfr2sbvr.com/cfr#>
PREFIX fro-cfr: <http://finregont.com/fro/cfr/Code_Federal_Regulations.ttl#>

WITH cfr-sbvr:CFR_SBVR
DELETE {{
    cfr-sbvr:{designation_class} ?p ?o .
}}
INSERT {{
    cfr-sbvr:{designation_class} a sbvr:VerbSymbol,
            sbvr:{concept_type} ;
        sbvr:signifier "{signifier}" ;
        {sources_triples}
        sbvr:isImplicitlyUnderstood "false"^^xsd:boolean ;
        sbvr:statement "{statement}" ;
        sbvr:designationIsInNamespace {vocabulary_namespace} ;
        {metadata} .
}}
WHERE {{
    # Match all existing triples related to {designation_class}
    OPTIONAL {{ cfr-sbvr:{designation_class} ?p ?o . }}
}}
    """

    return query

In [65]:
def upsert_term_and_name_query(designation: Designation) -> str:
    """
    Add a term to the knowledge graph. If exists, replace it.

    Args:
        conn (RepositoryConnection): The connection to the knowledge graph database.
        term (Term): The term to add to the knowledge graph.

    Returns:
        True if the term was added successfully, False otherwise.
    """
    signifier = designation.signifier
    statement = designation.statement
    concept_type = designation.concept_type
    vocabulary_namespace = designation.vocabulary_namespace
    doc_id = designation.doc_id
    metadata_cfr2sbvr = designation.metadata_cfr2sbvr

    designation_class = transform_to_rdf_subject(f"{signifier}-{remove_section_symbol(doc_id)}")

    if concept_type == "Name":
        designation_type = "IndividualNounConcept"
    else:
        designation_type = "GeneralConcept"

    logger.info(f"Format {signifier} to {designation_class}.")

    # Rule type
    match metadata_cfr2sbvr.get("classification_subtype"):
        case "Formal intensional definitions":
            rule_type = "sbvr:IntensionalDefinition"
        case "Formal extensional definitions":
            rule_type = "sbvr:Extensionaldefinition"
        case "Categorization scheme enumerations":
            rule_type = "sbvr:Categorizationscheme"
        case _:
            rule_type = "sbvr:DefinitionalRule"

    # Constructing closeMatch triples
    close_matches_triples = ""
    if designation.closeMatch:
        for close_match in designation.closeMatch:
            close_matches_triples += f"sbvr:closeMatch {close_match} ;\n"

    # Construct exactMatch triple if exactMatch is provided
    exact_match_triples = ""
    if designation.exactMatch:
        for exact_match in designation.exactMatch:
            exact_match_triples += f"sbvr:exactMatch {exact_match} ;\n"

    # Construct surces triple if sources is provided
    sources_triples = ""
    if designation.sources:
        for source in designation.sources:
            sources_triples += f'sbvr:referenceSupportsMeaning "{doc_id}{source}" ;\n'

    # Construct metadata triples if metadata_cfr2sbvr is provided
    metadata = ""
    if statement != "missing":
        finding_triples = ""
        findigs = metadata_cfr2sbvr.get("transformation_findings")
        if findigs is not None and len(findigs) > 0:
            for find in findigs:
                finding_triples += f'cfr-sbvr:transformationFinding "{find}" ;\n'

        templates_triples = ""
        templates = metadata_cfr2sbvr.get("classification_templates_ids")
        if templates is not None and len(templates) > 0:
            for template in templates:
                templates_triples += f'cfr-sbvr:classificationTemplatesId "{template}" ;\n'

        metadata = f"""
            cfr-sbvr:extractOriginalStatement "{metadata_cfr2sbvr.get("extract_original_statement")}" ;
            cfr-sbvr:transformationSemscore {metadata_cfr2sbvr.get("transformation_semscore")} ;
            cfr-sbvr:transformationSimilarityScore {metadata_cfr2sbvr.get("transformation_similarity_score")} ;
            cfr-sbvr:transformationSimilarityScoreConfidence {metadata_cfr2sbvr.get("transformation_similarity_score_confidence")} ;
            cfr-sbvr:transformationAccuracy {metadata_cfr2sbvr.get("transformation_accuracy")} ;
            cfr-sbvr:transformationGrammarSyntaxAccuracy {metadata_cfr2sbvr.get("transformation_grammar_syntax_accuracy")} ;
            {finding_triples}
            {templates_triples}
            cfr-sbvr:classificationType "{metadata_cfr2sbvr.get("classification_type")}" ;
            cfr-sbvr:classificationSubtype "{metadata_cfr2sbvr.get("classification_subtype")}" ;
            cfr-sbvr:classificationSubtypeConfidence {metadata_cfr2sbvr.get("classification_subtype_confidence")} ;
            cfr-sbvr:classificationSubtypeExplanation "{metadata_cfr2sbvr.get("classification_subtype_explanation")}" ;
        """

    designation_upsert_query = f"""
PREFIX sbvr: <https://www.omg.org/spec/SBVR/20190601#>
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX cfr-sbvr: <http://cfr2sbvr.com/cfr#>
PREFIX fro-cfr: <http://finregont.com/fro/cfr/Code_Federal_Regulations.ttl#>

WITH cfr-sbvr:CFR_SBVR
DELETE {{
    cfr-sbvr:{designation_class} ?p ?o .
}}
INSERT {{
    cfr-sbvr:{designation_class} a sbvr:{designation_type},
            {rule_type},
            sbvr:{concept_type} ;
        sbvr:signifier "{signifier}" ;
        {exact_match_triples}
        {close_matches_triples}
        {sources_triples}
        sbvr:isImplicitlyUnderstood "false"^^xsd:boolean ;
        sbvr:statement "{statement}" ;
        sbvr:designationIsInNamespace {vocabulary_namespace} ;
        {metadata}
        cfr-sbvr:createDate "{now_as_xsd_dateTime()}"^^xsd:dateTime .
}}
WHERE {{
    # Match all existing triples related to {designation_class}
    OPTIONAL {{ cfr-sbvr:{designation_class} ?p ?o . }}
}}
    """

    return designation_upsert_query

In [66]:
def upsert_to_kg(conn: RepositoryConnection, query: str) -> bool:
    try:
        conn.prepareUpdate(QueryLanguage.SPARQL, query).evaluate()
        logger.info("Upserted successfully.")
        return True
    except Exception as e:
        logger.error(f"Failed to upsert: {e}")
        return False

In [67]:
def create_vocabulary(conn: RepositoryConnection, vocabulary_name: str) -> bool:
    """
    Create a new vocabulary in the knowledge graph.

    Args:
        conn (RepositoryConnection): The connection to the knowledge graph database.
        vocabulary (str): The name of the vocabulary to create.

    Returns:
        True if the vocabulary was created successfully, False otherwise.
    """

    query_remove_association = f"""
PREFIX sbvr: <https://www.omg.org/spec/SBVR/20190601#>
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX cfr-sbvr: <http://cfr2sbvr.com/cfr#>
PREFIX fro-cfr: <http://finregont.com/fro/cfr/Code_Federal_Regulations.ttl#>

DELETE DATA {{
GRAPH cfr-sbvr:CFR_SBVR {{
    fro-cfr:CFR_Title_17_Part_275_VOC sbvr:vocabulary1IncorporatesVocabulary2 cfr-sbvr:CFR_SBVR_{vocabulary_name}_VOC .
}}
}}
    """

    query_add_triples = f"""
PREFIX sbvr: <https://www.omg.org/spec/SBVR/20190601#>
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX cfr-sbvr: <http://cfr2sbvr.com/cfr#>
PREFIX fro-cfr: <http://finregont.com/fro/cfr/Code_Federal_Regulations.ttl#>

WITH cfr-sbvr:CFR_SBVR
DELETE {{
    cfr-sbvr:CFR_SBVR_{vocabulary_name}_VOC ?p ?o .
}}

INSERT {{
    cfr-sbvr:CFR_SBVR_{vocabulary_name}_VOC
        a owl:Class, sbvr:Vocabulary ;
        cfr-sbvr:createDate "{now_as_xsd_dateTime()}"^^xsd:dateTime .
}}
WHERE {{
    # Match all existing triples related to cfr-sbvr:CFR_SBVR_{vocabulary_name}_VOC
    OPTIONAL {{ cfr-sbvr:CFR_SBVR_{vocabulary_name}_VOC ?p ?o . }}
}}
    """

    query_add_association = f"""
PREFIX sbvr: <https://www.omg.org/spec/SBVR/20190601#>
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX cfr-sbvr: <http://cfr2sbvr.com/cfr#>
PREFIX fro-cfr: <http://finregont.com/fro/cfr/Code_Federal_Regulations.ttl#>

INSERT DATA {{
GRAPH cfr-sbvr:CFR_SBVR {{
    fro-cfr:CFR_Title_17_Part_275_VOC sbvr:vocabulary1IncorporatesVocabulary2 cfr-sbvr:CFR_SBVR_{vocabulary_name}_VOC .
}}
}}
    """

    logger.debug(f"SPARQL Query: {query_remove_association}")
    logger.debug(f"SPARQL Query: {query_add_triples}")
    logger.debug(f"Vocabulary name: cfr-sbvr:CFR_SBVR_{vocabulary_name}_VOC")

    # Remove associated vocabulary
    try:
        conn.prepareUpdate(QueryLanguage.SPARQL, query_remove_association).evaluate()
        logger.info(f"Vocabulary {vocabulary_name} delete associated successfully.")
    except Exception as e:
        logger.error(f"Failed to delete associated vocabulary {vocabulary_name}: {e}")

    # create new vocabulary
    try:
        conn.prepareUpdate(QueryLanguage.SPARQL, query_add_triples).evaluate()
        logger.info(f"Vocabulary {vocabulary_name} created successfully.")
    except Exception as e:
        logger.error(f"Failed to create vocabulary {vocabulary_name}: {e}")

    # Add association with new vocabulary
    try:
        conn.prepareUpdate(QueryLanguage.SPARQL, query_add_association).evaluate()
        logger.info(f"Vocabulary {vocabulary_name} associated successfully.")
        return True
    except Exception as e:
        logger.error(f"Failed to associate vocabulary {vocabulary_name}: {e}")
        return False

In [68]:
def create_vocabulary_namespace(conn: RepositoryConnection, vocabulary_name: str) -> bool:
    """
    Create a new vocabulary namespace in the knowledge graph.

    Args:
        conn (RepositoryConnection): The connection to the knowledge graph database.
        vocabulary_namespace (str): The name of the vocabulary namespace to create.

    Returns:
        True if the vocabulary namespace was created successfully, False otherwise.
    """

    query_remove_association = f"""
PREFIX sbvr: <https://www.omg.org/spec/SBVR/20190601#>
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX cfr-sbvr: <http://cfr2sbvr.com/cfr#>
PREFIX fro-cfr: <http://finregont.com/fro/cfr/Code_Federal_Regulations.ttl#>

DELETE DATA {{
GRAPH cfr-sbvr:CFR_SBVR {{
    fro-cfr:CFR_Title_17_Part_275_NS sbvr:namespace1IncorporatesNamespace2 cfr-sbvr:CFR_SBVR_{vocabulary_name}_NS .
}}
}}
    """

    query_add_triples = f"""
PREFIX sbvr: <https://www.omg.org/spec/SBVR/20190601#>
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX cfr-sbvr: <http://cfr2sbvr.com/cfr#>
PREFIX dct: <http://purl.org/dc/terms/>
PREFIX fro-cfr: <http://finregont.com/fro/cfr/Code_Federal_Regulations.ttl#>

WITH cfr-sbvr:CFR_SBVR
DELETE {{
    cfr-sbvr:CFR_SBVR_{vocabulary_name}_NS ?p ?o .
}}

INSERT {{
cfr-sbvr:CFR_SBVR_{vocabulary_name}_NS
        a owl:Class, sbvr:VocabularyNamespace;
    sbvr:namespaceHasURI <http://cfr2sbvr.com/cfr/CFR_SBVR_{vocabulary_name}_NS#> ;
    sbvr:vocabularyIsExpressedInLanguage cfr-sbvr:EnglishLanguage ;
    sbvr:vocabularyNamespaceIsDerivedFromVocabulary cfr-sbvr:CFR_SBVR_{vocabulary_name}_VOC ;
    dct:title "Semantics of Business Vocabulary and Business Rules (SBVR) for Code of Federal Regulations (CFR)" ;
    skos:definition "SBVR-CFR is an adopted standard of the Object Management Group (OMG) intended to be the basis for formal and detailed natural language declarative description of CFR regulations" ;
    dct:source <https://github.com/asantos2000/dissertacao-santos-anderson-2024> ;
    cfr-sbvr:createDate "{now_as_xsd_dateTime()}"^^xsd:dateTime .
}}
WHERE {{
    # Match all existing triples related to cfr-sbvr:CFR_SBVR_{vocabulary_name}_NS
    OPTIONAL {{ cfr-sbvr:CFR_SBVR_{vocabulary_name}_NS ?p ?o . }}
}}
    """
    query_add_association = f"""
PREFIX sbvr: <https://www.omg.org/spec/SBVR/20190601#>
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX cfr-sbvr: <http://cfr2sbvr.com/cfr#>
PREFIX fro-cfr: <http://finregont.com/fro/cfr/Code_Federal_Regulations.ttl#>

INSERT DATA {{
GRAPH cfr-sbvr:CFR_SBVR {{
    fro-cfr:CFR_Title_17_Part_275_NS sbvr:namespace1IncorporatesNamespace2 cfr-sbvr:CFR_SBVR_{vocabulary_name}_NS .
}}
}}
    """

    logger.debug(f"SPARQL Query: {query_remove_association}")
    logger.debug(f"SPARQL Query: {query_add_triples}")
    logger.debug(f"SPARQL Query: {query_add_association}")

    # Remove associated vocabulary
    try:
        conn.prepareUpdate(QueryLanguage.SPARQL, query_remove_association).evaluate()
        logger.info(f"Vocabulary NS {vocabulary_name} delete associated successfully.")
    except Exception as e:
        logger.error(f"Failed to delete associated vocabulary NS {vocabulary_name}: {e}")

    # create new vocabulary
    try:
        conn.prepareUpdate(QueryLanguage.SPARQL, query_add_triples).evaluate()
        logger.info(f"Vocabulary NS {vocabulary_name} created successfully.")
    except Exception as e:
        logger.error(f"Failed to create vocabulary NS {vocabulary_name}: {e}")

    # Add association with new vocabulary
    try:
        conn.prepareUpdate(QueryLanguage.SPARQL, query_add_association).evaluate()
        logger.info(f"Vocabulary NS {vocabulary_name} associated successfully.")
        return True
    except Exception as e:
        logger.error(f"Failed to associate vocabulary NS {vocabulary_name}: {e}")
        return False

In [69]:
def define_vocabulary_ns(conn: RepositoryConnection, doc_id: str, is_local_scope: bool) -> str:
    """
    Determines the vocabulary section ID based on the term's source section.

    Args:
        section_id (str): The section ID of the current document.
        source_section: The section id.

    Returns:
        str: The appropriate vocabulary section ID.

    Raises:
        KeyError: If 'source' or 'section' key is missing in the term.
        TypeError: If 'section_id' is not a string or 'term' is not a dictionary.
    """

    doc_id = remove_section_symbol(normalize_ns_string(doc_id))

    if is_local_scope:
        ns = f"cfr-sbvr:CFR_SBVR_{doc_id}_NS"
    else:
        ns = "fro-cfr:CFR_Title_17_Part_275_NS"

    query = f"""
        PREFIX cfr-sbvr: <http://cfr2sbvr.com/cfr#>
        PREFIX fro-cfr: <http://finregont.com/fro/cfr/Code_Federal_Regulations.ttl#>

        select ?p ?o {{
            graph cfr-sbvr:CFR_SBVR {{
                {ns} ?p ?o 
            }}
        }}
    """

    tuple_query = conn.prepareTupleQuery(QueryLanguage.SPARQL, query)
    result = tuple_query.evaluate()

    logger.debug(f"result.metadata: {result.metadata}")
    logger.debug(f"result.variable_names: {result.variable_names}")

    for binding in result:
        logger.debug(f"binding: {binding}")

    if not result:
        # Vocabulary
        logger.info("Vocabulary not found. Creating vocabulary and namespace...")
        if create_vocabulary(conn, doc_id):
            logger.info("Vocabulary created")
        else:
            raise Exception("Failed to create vocabulary")

        # Vocabulary namespace
        if create_vocabulary_namespace(conn, doc_id):
            logger.info("Vocabulary namespace created")
        else:
            raise Exception("Failed to create vocabulary namespace")
    else:
        logger.info("Vocabulary already exists")
    
    logger.info(f"Vocabulary namespace: {ns}")
    
    return ns

similarity search (P5)

Try a similarity search to find the entity in the graph. If not found, create a new entity and corresponding embedding. If exists, create a link between the two.

In [None]:
def get_from_kg(conn: RepositoryConnection, signifier: str, kg: str, vector_db: str, exact: bool = False) -> List[Dict[str, Any]]:
    """
    Queries the knowledge graph to retrieve similar terms to the given term.

    Args:
        conn (RepositoryConnection): The AllegroGraph repository connection.
        term (str): The term to search for similar terms in the knowledge graph.
        kg (str): The name of the knowledge graph to query.

    Returns:
        List[Dict[str, Any]]: A list of dictionaries containing information about similar terms,
        including URIs, scores, definitions, and related predicates.
    """

    if kg not in {config["FIBO_GRAPH"], config["CFR_SBVR_GRAPH"]}:
        raise ValueError(f"Unsupported knowledge graph: {kg}")

    query_string_close = f"""
PREFIX llm: <http://franz.com/ns/allegrograph/8.0.0/llm/>
PREFIX fibo: <https://spec.edmcouncil.org/fibo/ontology/master/2024Q2/QuickFIBOProd#>
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
PREFIX cfr-sbvr: <http://cfr2sbvr.com/cfr#>
PREFIX sbvr: <https://www.omg.org/spec/SBVR/20190601#>

SELECT ?uri (xsd:decimal(?score) as ?score_percent) ?s ?p ?definition
FROM {kg}
WHERE {{
    (?uri ?score ?originalText ?p) llm:nearestNeighbor ("{signifier}" "{vector_db}" 5 {config["SIMILARITY_THRESHOLD"]}) .
    ?s ?p ?originalText .

    OPTIONAL {{ ?s skos:definition ?definition . }}
    OPTIONAL {{ ?s sbvr:Statement ?definition . }}
}}
ORDER BY DESC(?score)
    """

    query_string_exact = f"""
PREFIX fibo: <https://spec.edmcouncil.org/fibo/ontology/master/2024Q2/QuickFIBOProd#>
PREFIX fro-cfr: <http://cfr2sbvr.com/fro/cfr/Code_Federal_Regulations.ttl#>
PREFIX owl: <http://www.w3.org/2002/07/owl#>
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>

SELECT ("https://spec.edmcouncil.org/fibo"^^xsd:anyUri as ?uri) (xsd:decimal(100) as ?score_percent) ?s (rdfs:label as ?p) ?definition ?originalText
FROM NAMED fibo:FIBO_Graph
WHERE {{
  GRAPH ?g {{
    ?s a ?type ;
    skos:definition ?definition ;
    (rdfs:label | skos:prefLabel) ?originalText .
    
    FILTER(?type IN (owl:Class, owl:NamedIndividual))
    FILTER(LCASE(STR(?originalText)) = "{signifier}")
  }}
}}
    """

    if exact:
        query_string = query_string_exact
    else:
        query_string = query_string_close

    logger.debug(f"SPARQL Query: {query_string}")

    tuple_query = conn.prepareTupleQuery(QueryLanguage.SPARQL, query_string)

    try:
        result = tuple_query.evaluate()
        logger.debug(f"Result metadata: {result.metadata}")

        with result:
            similar_signifiers = [
                {
                    "uri": str(binding.getValue("uri")),
                    "score_percent": Decimal(binding.getValue("score_percent").getLabel()),
                    "located_signifier_uri": str(binding.getValue("s")),
                    "located_signifier_uri_local_name": binding.getValue("s").getLocalName(),
                    "located_signifier_predicate": str(binding.getValue("p")),
                    "definition": str(binding.getValue("definition"))
                }
                for binding in result
            ]
    except Exception as e:
        logger.error(f"Error evaluating SPARQL query: {e}")
        raise

    logger.info(f"Found {len(similar_signifiers)} similar signifier(s) for '{signifier}' on {kg}.")

    return similar_signifiers

In [None]:
def get_similar_signifiers(conn: RepositoryConnection, signifier: str) -> Tuple[list]:
    """
    Get similar signifiers for a given signifier.

    Args:
        conn (allegrograph.AllegroGraphConnection): An AllegroGraph connection object.
        signifier (str): The signifier to search for.

    Returns:
        list (Tuple[list]): A list of exact and close matches for the signifier.
    """
    fibo_exact =  get_from_kg(conn, signifier, config["FIBO_GRAPH"], config["FIBO_GRAPH_VECTOR_STORE"], True)
    fibo_similarity =  get_from_kg(conn, signifier, config["FIBO_GRAPH"], config["FIBO_GRAPH_VECTOR_STORE"], False)
    cfr_sbvr_similarity = get_from_kg(conn, signifier, config["CFR_SBVR_GRAPH"], config["CFR_SBVR_GRAPH_VECTOR_STORE"], False)

    exact_match = []
    close_match = []

    for item in fibo_similarity:
        logger.info(f"{item=}")
        close_match.append(item.get("located_signifier_uri"))

    for item in fibo_exact:
        logger.info(f"{item=}")
        exact_match.append(item.get("located_signifier_uri"))

    for item in cfr_sbvr_similarity:
        close_match.append(item.get("located_signifier_uri"))

    logger.info(f"Found {len(exact_match)} exact matche(s) and {len(close_match)} close matche(s) for '{signifier}'.")

    return exact_match, close_match

Generate a RDF subject from a statement.

In [72]:
def to_camel_case(snake_str):
    """
    Convert a snake_case string to CamelCase.
    """
    components = snake_str.split("_")
    return "".join(x.title() for x in components)


def generate_meaningful_rdf_subject(statement, domain_context=""):
    """
    Generate a meaningful and descriptive RDF subject from a statement in CamelCase.

    Args:
        statement (str): The input sentence or statement.
        domain_context (str): Additional domain context for enrichment.

    Returns:
        str: A rich, meaningful RDF subject in CamelCase.
    """
    nlp = spacy.load("en_core_web_sm")
    doc = nlp(statement)

    # Initialize components
    main_subject = None
    predicate = None
    obj = None
    attributes = []

    # Extract subject, verb (predicate), and object
    for token in doc:
        if token.dep_ in ("nsubj", "nsubjpass") and not main_subject:
            main_subject = token.text
        elif token.pos_ == "VERB" and not predicate:
            predicate = token.lemma_  # Base form of the verb
        elif token.dep_ in ("dobj", "pobj") and not obj:
            obj = token.text

    # Extract additional attributes (e.g., named entities or adjectives)
    attributes.extend(
        [ent.text for ent in doc.ents if ent.label_ in {"ORG", "GPE", "LAW", "EVENT"}]
    )
    attributes.extend([token.text for token in doc if token.pos_ == "ADJ"])

    # Combine extracted components
    components = [main_subject, predicate, obj] + attributes
    if domain_context:
        components.append(domain_context)

    # Filter out None, empty values, and repetitions
    logger.debug(f"{components=}")

    # Filter out None, empty values, and repetitions
    components = [
        comp for i, comp in enumerate(components)
        if comp and isinstance(comp, str) and (i == 0 or not components[i-1] or isinstance(components[i-1], str) and comp.lower() != components[i-1].lower())
    ]


    # Normalize to snake_case, then convert to CamelCase
    snake_case_subject = slugify("_".join(components), separator="_")
    camel_case_subject = to_camel_case(snake_case_subject)

    return camel_case_subject

## Datasets

### Connect KG

In [73]:
hosting = config["ALLEGROGRAPH_HOSTING"]

Workaround to connect AllegroGraph Cloud via stunnel.

> Configure the host using the connect variable.

In [None]:
%%writefile agraph_stunnel.conf

[allegrograph_proxy]
client = yes
accept = 127.0.0.1:8443
connect = ag1eawvuu0p3zv35.allegrograph.cloud:443

In [74]:
if hosting == "ALLEGROGRAPH_CLOUD":
    # Start tunnel
    import getpass
    import os

    password = getpass.getpass()
    command = "sudo -S stunnel agraph_stunnel.conf" #can be any command but don't forget -S as it enables input from stdin
    os.system('echo %s | %s' % (password, command)) # Start stunnel

    config[hosting]["HOST"]="localhost"
    config[hosting]["PORT"]=8443
    kg_conn = ag_connect(
        repo=config[hosting]["REPO"],
        catalog=config[hosting]["CATALOG"],
        host=config[hosting]["HOST"],
        port=config[hosting]["PORT"],
        protocol=config[hosting]["PROTOCOL"],
        user=config[hosting]["USER"],
        password=config[hosting]["PASSWORD"],
    )
else:
    kg_conn = ag_connect(
        repo=config[hosting]["REPO"],
        catalog=config[hosting]["CATALOG"],
        host=config[hosting]["HOST"],
        port=config[hosting]["PORT"],
        user=config[hosting]["USER"],
        password=config[hosting]["PASSWORD"],
    )

logger.info(f"Connected to AllegroGraph: {hosting}")


2025-02-10 23:42:52 - INFO - Connected to AllegroGraph: ALLEGROGRAPH_LOCAL


### Connect DB

In [42]:
DEFAULT_DATA_DIR = "../cfr2sbvr_inspect/data"
DATABASE = "cfr2sbvr_v4.db"

db_conn, db_name = db_connection(DATABASE, DEFAULT_DATA_DIR)

table_selected = "RAW_LLM_VALIDATION_BEST_VW"
checkpoints_selected = None
doc_id_selected = None
statement_sources_selected = None
process_selected = None

### Load data

In [43]:
#
# Load the selected data
#
data_df = load_data(
    db_conn,
    table_selected,
    checkpoints_selected,
    doc_id_selected,
    statement_sources_selected,
    process_selected,
)

Prepare dictionaries with the best results.

In [44]:
pred_terms_names = data_df[data_df['source'].isin(['Terms','Names'])].to_dict(orient="records")
pred_operative_rules_fact_types = data_df[data_df['source'].isin(['Operative_Rules','Fact_Types'])].to_dict(orient="records")

## Execution

### Terms and names

In [None]:
for index, element in enumerate(pred_terms_names):

    logger.info(f"{index=}")

    # from extraction
    doc_id = element.get('doc_id') # section

    statement_id = element.get('statement_id')
    
    statement = element.get('transformed')
    statement = statement if statement else "missing" # Change None to "missing"

    # SBVR ontology
    concept_type = "Term" if element.get('source') == "Terms" else "Name"
    
    sources = element.get("statement_sources") # paragraphs
    
    is_local_scope = element.get("isLocalScope")
    
    # from transformation
    metadata_cfr2sbvr = get_metadata_cfr2sbvr(element)

    # create vocabulary and namespace if not exists
    vocabulary = define_vocabulary_ns(kg_conn, doc_id, is_local_scope)

    # similar search
    exact_match, close_match = get_similar_signifiers(kg_conn, statement_id.lower())

    # create designation
    designation = Designation(
        signifier=statement_id,
        statement=statement,
        concept_type=concept_type,
        closeMatch=close_match,
        exactMatch=exact_match,
        vocabulary_namespace=vocabulary,
        sources=sources,
        doc_id=doc_id,
        metadata_cfr2sbvr=metadata_cfr2sbvr
    )

    # upsert
    query = upsert_term_and_name_query(designation)
    logger.info(f"{query=}")
    status = upsert_to_kg(kg_conn, query)

    logger.info(f"{concept_type} '{statement_id}' done.")

2025-02-10 23:43:02 - INFO - index=0
2025-02-10 23:43:02 - INFO - Vocabulary already exists
2025-02-10 23:43:02 - INFO - Vocabulary namespace: cfr-sbvr:CFR_SBVR_275_0_5_NS
2025-02-10 23:43:03 - INFO - Found 0 similar signifier(s) for 'Commission' on fibo:FIBO_Graph.
2025-02-10 23:43:03 - ERROR - Error evaluating SPARQL query: Server returned 400: Unable to open triple-store "cfr-sbvr-3m-vec" (it does not appear to exist).


400 QUERY FAILED: Unable to open triple-store "cfr-sbvr-3m-vec" (it does not appear to exist).


RequestError: Server returned 400: Unable to open triple-store "cfr-sbvr-3m-vec" (it does not appear to exist).

### Rules and fact types

In [71]:
for index, element in enumerate(pred_operative_rules_fact_types):

    logger.info(f"{index=}")

    # from extraction
    doc_id = element.get('doc_id') # section

    statement_id = element.get('statement_id')
    
    statement = element.get('transformed')
    statement = statement if statement else "missing" # Change None to "missing"

    # SBVR ontology
    concept_type = "Rule" if element.get('source') == "Operative_Rules" else "Fact"
    
    statement_subject  = transform_to_rdf_subject(element.get('statement_title'))#generate_meaningful_rdf_subject(element.get('statement'))

    sources = element.get("statement_sources") # paragraph

    terms = element.get("terms")

    verb_symbols = element.get("verb_symbols")
    
    # from transformation
    metadata_cfr2sbvr = get_metadata_cfr2sbvr(element)

    # create vocabulary and namespace if not exists
    vocabulary = define_vocabulary_ns(kg_conn, doc_id, True)

    # create Fact model
    rule_fact_model = RuleAndFact(
        statement_id=statement_subject,
        statement=statement,
        concept_type=concept_type,
        terms=terms,
        verb_symbols=verb_symbols,
        vocabulary_namespace=vocabulary,
        sources=sources,
        doc_id=doc_id,
        metadata_cfr2sbvr=metadata_cfr2sbvr
    )

    # upsert
    query = upsert_rule_and_fact_query(rule_fact_model)
    logger.info(f"{query=}")
    status = upsert_to_kg(kg_conn, query)

    logger.info(f"{concept_type} '{statement_subject}' done.")

2025-01-30 19:37:39 - INFO - index=0
2025-01-30 19:37:40 - INFO - Vocabulary already exists
2025-01-30 19:37:40 - INFO - Vocabulary namespace: cfr-sbvr:CFR_SBVR_275_0_7_NS
2025-01-30 19:37:40 - INFO - query='\nPREFIX sbvr: <https://www.omg.org/spec/SBVR/20190601#>\nPREFIX skos: <http://www.w3.org/2004/02/skos/core#>\nPREFIX xsd: <http://www.w3.org/2001/XMLSchema#>\nPREFIX cfr-sbvr: <http://cfr2sbvr.com/cfr#>\nPREFIX fro-cfr: <http://finregont.com/fro/cfr/Code_Federal_Regulations.ttl#>\n\nWITH cfr-sbvr:CFR_SBVR\nDELETE {\n    cfr-sbvr:PresumptionOfControlInPartnership ?p ?o .\n}\nINSERT {\n    cfr-sbvr:PresumptionOfControlInPartnership a sbvr:Fact,\n            sbvr:DefinitionalRule ;\n        cfr-sbvr:hasTerm cfr-sbvr:Dissolution ;\ncfr-sbvr:hasTerm cfr-sbvr:Partnership ;\ncfr-sbvr:hasTerm cfr-sbvr:Capital ;\ncfr-sbvr:hasTerm cfr-sbvr:RightToReceiveUponDissolution ;\ncfr-sbvr:hasTerm cfr-sbvr:Person ;\n\n        cfr-sbvr:hasVerbSymbol cfr-sbvr:IsPresumedToControl ;\ncfr-sbvr:hasVerbSym

### Verb symbols

In [72]:
for index, element in enumerate(pred_operative_rules_fact_types):

    logger.info(f"{index=}")

    # from extraction
    doc_id = element.get('doc_id') # section

    # There is no extracted / transformed statement for verb symbols
    # Storging the fact or rule statement as cfr-sbvr:transformedStatement
    statement = element.get('transformed')
    statement = statement if statement else "missing" # Change None to "missing"

    # SBVR ontology
    concept_type = "VerbConcept"
    
    sources = element.get("statement_sources") # paragraph

    verb_symbols = element.get("verb_symbols")
    
    # from transformation
    metadata_cfr2sbvr = get_metadata_cfr2sbvr(element)

    vocabulary = define_vocabulary_ns(kg_conn, doc_id, True)

    for verb_symbol in verb_symbols:
        # create Fact model
        designation_model = Designation(
            signifier=verb_symbol,
            statement=statement, # There is no extracted / transformed statement
            concept_type=concept_type,
            closeMatch=[],
            exactMatch=[],
            vocabulary_namespace=vocabulary,
            sources=sources,
            doc_id=doc_id,
            metadata_cfr2sbvr=metadata_cfr2sbvr
        )

        # upsert
        query = upsert_verb_symbol_query(designation_model)
        logger.info(f"{query=}")
        status = upsert_to_kg(kg_conn, query)
        logger.info(f"{concept_type} '{verb_symbol}' done.")

2025-01-30 19:39:56 - INFO - index=0
2025-01-30 19:39:57 - INFO - Vocabulary already exists
2025-01-30 19:39:57 - INFO - Vocabulary namespace: cfr-sbvr:CFR_SBVR_275_0_7_NS
2025-01-30 19:39:57 - INFO - query='\nPREFIX sbvr: <https://www.omg.org/spec/SBVR/20190601#>\nPREFIX skos: <http://www.w3.org/2004/02/skos/core#>\nPREFIX xsd: <http://www.w3.org/2001/XMLSchema#>\nPREFIX cfr-sbvr: <http://cfr2sbvr.com/cfr#>\nPREFIX fro-cfr: <http://finregont.com/fro/cfr/Code_Federal_Regulations.ttl#>\n\nWITH cfr-sbvr:CFR_SBVR\nDELETE {\n    cfr-sbvr:IsPresumedToControl27507 ?p ?o .\n}\nINSERT {\n    cfr-sbvr:IsPresumedToControl27507 a sbvr:VerbSymbol,\n            sbvr:VerbConcept ;\n        sbvr:signifier "is presumed to control" ;\n        sbvr:referenceSupportsMeaning "§ 275.0-7(b)(1)(ii)" ;\n\n        sbvr:isImplicitlyUnderstood "false"^^xsd:boolean ;\n        sbvr:statement "missing" ;\n        sbvr:designationIsInNamespace cfr-sbvr:CFR_SBVR_275_0_7_NS ;\n        \n        cfr-sbvr:extractOrigina

## Close database connection

In [30]:
kg_conn.close()
db_conn.close()