<a href="https://colab.research.google.com/github/BowieSteutel/acc-nlp-firecodes/blob/main/1C_Regulatory_Information_Processing_cleared.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>



# **Module 1C - Regulatory Information Extraction & Semantic Parsing**



This module works with CSV files and ontologies.

The CSV file of the regulations to be processed should include information about classifications, either manually assigned or included via the Regulation Classification module (1B)

---
# **Load libraries**

In [None]:
# Import standard libraries
import re # for pattern matching regular expressions
import pandas as pd # for dataframes
import json # for export

In [None]:
# Prepare rdflib (for SPARQL querying)
!pip install rdflib --quiet
import rdflib
from rdflib import Graph, Namespace, URIRef, Literal
from rdflib.namespace import RDF, RDFS, OWL

In [None]:
# Prepare spacy (for NLP pipeline)
!pip install spacy --quiet
import spacy
from spacy import displacy # for visualization
from spacy.matcher import Matcher # for pattern matching
from spacy import Language # for custom pipeline components
from spacy.tokens import Token, Span, Doc # for information extraction]

# Prepare gliner_spacy (for NER)
!pip install gliner-spacy --quiet
from gliner_spacy.pipeline import GlinerSpacy

---
# **Load inputs**

In [None]:
# @title Change root directory (update after downloading)

root_directory = "/content/drive/MyDrive/FINAL_CODE_THESIS" #  @param {"type":"string", "placeholder":""}
import sys
from pathlib import Path
if 'google.colab' in sys.modules:
    from google.colab import drive
    drive.mount('/content/drive', force_remount=False)
    %cd {root_directory}

In [None]:
# @title Load files
path_hierarchy = "output/BBL_hier_elements.csv" #  @param {type:"string", "placeholder":"path to hierarchy, relative to file_path (csv)"}
path_subset_big = "output/BBL_subset_final_big.csv" #  @param {type:"string", "placeholder":"path to subset, relative to file_path (csv)"}
path_subset_small = "output/BBL_subset_final_small.csv" #  @param {type:"string", "placeholder":"path to subset, relative to file_path (csv)"}

df_hierarchy = pd.read_csv(path_hierarchy)#, encoding='windows-1252')
df_subset_big = pd.read_csv(path_subset_big)#, encoding='windows-1252')
df_subset_small = pd.read_csv(path_subset_small)#, encoding='windows-1252')
df_subset_big.head()

In [None]:
hier_codes = [reg['code'] for index, reg in df_hierarchy.iterrows()]
hier_codes[:10]

---
# **Load default spaCy pipeline**

*disabled NER (since custom NER will be added)*

In [None]:
# Load standard model

# small:
nlp = spacy.load("en_core_web_sm", disable=["ner"])

# # medium:
# !spacy download en_core_web_md
# nlp = spacy.load("en_core_web_md", disable=["ner"])

# # large:
# !spacy download en_core_web_lg
# nlp = spacy.load("en_core_web_lg", disable=["ner"])

# # Load transformer model for better parsing:
# !pip install spacy-transformers # restart kernel afterwards?
# import spacy_transformers

# # !spacy download en_core_web_trf
# nlp = spacy.load("en_core_web_trf", disable=["ner"])

---
# **Load statistical NER (GLiNER)**

***Requires Hugging Face token in environmental variables!***


## Prepare NER labels

Defined based on literature and trial & error

In [None]:
# define NER labels
labels = ["building object",  "building component",  #BEO
          "attribute",
          "unit", #"measurement", "value",
          "site", "building", "building storey", "building room", "building space", #BOT
          "escape route", "fire compartment", #FSE
          "fire system", "fire safety device", #FSE
          "material", "layer", "geometry",
          "classification", # FSE
          "use function", #"area", "usage function"
          "building type", "construction permanence", #BBL
          "reference", "standard", #REGULATIONS
]


# group NER labels
NER_bot = ["site", "building", "building storey", "building space"]
NER_custom_spaces = ["fire compartment", "escape route"]
NER_spatial = NER_bot + NER_custom_spaces

NER_rooms = ["building_room", "use function"] #"building space"?

NER_elements = ["building component", "building object"]
NER_MEP = ["fire system", "fire safety device"]
NER_physical = NER_elements + NER_MEP
#NER_physical = NER_elements + ["fire safety device"]
#NER_physical = ["building component", "building object", "fire safety device"]

NER_parts = ["material", "layer"]

NER_props_default = ["attribute"]
NER_props_custom = [ "classification"]# "construction type"? "use function"?
NER_props = NER_props_default + NER_props_custom

NER_quantity = ["unit"]
NER_quality = ["classification", "material", "use function"]

NER_references = ["reference", "cross-reference", "standard"] # includes cross-reference identified later

## Add GLiNER to pipeline

In [None]:
custom_spacy_config = { "gliner_model": "urchade/gliner_multi",
                            "chunk_size": 1000,
                            "labels":labels,
                            "style": "ent"}

try:
  ner = nlp.add_pipe("gliner_spacy", config=custom_spacy_config)
except:
  nlp.remove_pipe("gliner_spacy")
  ner = nlp.add_pipe("gliner_spacy", config=custom_spacy_config)

## Prepare NER visualization

In [None]:
# Prepare labels & colors
options = {"colors": {},"ents": []}

ner_options = {"colors": {},"ents": []}

# Extra labels for cross-references and quality (idenfitied with rule-based NER)
ent_labels = custom_spacy_config.get("labels")+['CROSS_REFERENCE', 'QUALITY']

# Function for automatic label color generation below
import colorsys
def HSV2HEX(h, s, v):
  (r, g, b) = colorsys.hsv_to_rgb(h, s, v)
  return '#{:02x}{:02x}{:02x}'.format(int(255*r), int(255*g), int(255*b))
def getDistinctColors(n):
  huePartition = 1 / (n + 1)
  # huePartition = 0.9 / (n + 1)
  return (HSV2HEX(huePartition * value, .5, 1) for value in range(0, n))
l_colors = [color for color in getDistinctColors(len(ent_labels))]

for i, label in enumerate(ent_labels):
  options["colors"][label.lower()] = l_colors[i]
  options["ents"].append(label.lower())

# Prepare NER visualization
def display_ner_score(ner_doc):#, ner_options):
  ents = ner_doc.ents
  for ent in ents:
    new_label = f"{ent.label_} ({ent._.score:.0%})"
    ner_options["colors"][new_label] = options["colors"].get(ent.label_.lower())
    ner_options["ents"].append(new_label)
    # ner_options['colors']['CROSS_REFERENCE (100%)'] = '#BBBBBB'
    ent.label_ = new_label
  ner_doc.ents = ents
  displacy.render(ner_doc, style="ent", options=ner_options)

## Test NER on subset

In [None]:
for text in df_subset_small['text_translated']:
  doc = nlp(text)
  #print(len(doc))
  display_ner_score(doc)
  #display_ner_score(doc, options)
  #displacy.render(doc, style="ent")

In [None]:
for text in df_subset_big['text_translated'][:10]:
  doc = nlp(text)
  #print(len(doc))
  display_ner_score(doc)
  #display_ner_score(doc, options)
  #displacy.render(doc, style="ent")

---
# **Load rule-based NER**


## Cross-reference recognition

### Prepare regex patterns

In [None]:
hier_labels = {"chapter": {"re_untranslated"    : r"hoofdstuk(?:ken)?",
                          "re_translated"       : r"chapter[s]?",
                          "code": "C"},
               "section": {"re_untranslated"    : r"afdeling(?:en)?",
                          "re_translated"       : r"section[s]?",
                          "code": "S"},
               "paragraph": {"re_untranslated"  : r"(?:paragra(?:af|fen)|§)",
                          "re_translated"       : r"paragraph[s]?",
                          "code": "P"},
               "article": {"re_untranslated"    : r"artikel(?:en|s)?",
                          "re_translated"       : r"article[s]?",
                          "code": "A"},
               "sub-article": {"re_untranslated": r"",
                          "re_translated"       : r"sub[-]?article[s]?",
                          "code": "SUB"},
               "table": {"re_untranslated"      : r"tabel(?:len)?",
                          "re_translated"       : r"table[s]?",
                          "code": "TABLE"},
               "appendix": {"re_untranslated"   : r"bijlage(?:n|s)?",
                          "re_translated"       : r"appendi(?:x|ces)",
                          "code": "APPX"},
               "figure": {"re_untranslated"     : r"figu(?:ur|en)?",
                          "re_translated"       : r"figure[s]?",
                          "code": "FIG"}
          }
print([x for x in hier_labels])
for x in hier_labels:
    print(hier_labels[x])

In [None]:
tens_map_full = {
    "twenty": "20", "thirty": "30", "forty": "40", "fifty": "50",
    "sixty": "60", "seventy": "70", "eighty": "80", "ninety": "90",
}

tens_map_part = {
    "twenty-": "2", "thirty-": "3", "forty": "4", "fifty-": "5",
    "sixty-": "6", "seventy-": "7", "eighty": "8", "ninety-": "9",
    "twenty": "2", "thirty": "3", "forty": "4", "fifty": "5",
    "sixty": "6", "seventy": "7", "eighty": "8", "ninety": "9",
}

ordinal_map = {
    "first": "1st", "second": "2nd", "third": "3rd", "fourth": "4th", "fifth": "5th",
    "sixth": "6th", "seventh": "7th", "eighth": "8th", "ninth": "9th", "tenth": "10th",
    "eleventh": "11th", "twelfth": "12th", "thirteenth": "13th", "fourteenth": "14th", "fifteenth": "15th",
    "sixteenth": "16th", "seventeenth": "17th", "eighteenth": "18th", "nineteenth": "19th",
    "twentieth": "20th", "thirtieth": "30th", "fortieth": "40th", "fiftieth": "50th",
    "sixtieth": "60th", "seventieth": "70th", "eightieth": "80th", "ninetieth": "90th",
}

numerical_map = {
    "one": "1", "two": "2", "three": "3", "four": "4", "five": "5",
    "six": "6", "seven": "7", "eight": "8", "nine": "9", "ten": "10",
    "eleven" : "11", "twelve": "12", "thirteen": "13", "fourteen": "14", "fifteen": "15",
    "sixteen": "16", "seventeen": "17", "eighteen": "18", "nineteen": "19",
}

# regex patterns for finding numbers
re_tens = f'(?:{"|".join(re.escape(k) for k in tens_map_full.keys())})'
re_numerical = f'(?:{"|".join(re.escape(k) for k in numerical_map.keys())})'
re_ordinal = f'(?:{"|".join(re.escape(k) for k in ordinal_map.keys())})'
re_numerical_text = f'\\b(?:{re_tens}?-?{re_numerical}|{re_tens})\\b(?!-)'
re_ordinal_text = f'\\b{re_tens}?-?{re_ordinal}\\b(?!-)'

# example
print(re.findall(re_numerical_text, """first, fourth and fifth, as well as twentieth twenty-first to twenty-three,
        twenty-third and thirtyfourth and tenth three and thirtyfive and stwenty twenty sfirst""", flags=re.IGNORECASE))
print(re.findall(re_ordinal_text, """first, fourth and fifth, as well as twentieth twenty-first to twenty-three,
        twenty-third and thirtyfourth and tenth three and thirtyfive and stwenty twenty sfirst""", flags=re.IGNORECASE))

In [None]:
# RegEx for enumerations
re_enum_number = "(?:[0-9]{1,5})" # Match number enumerations: 1 2 3 up to 99997 99998 99999
re_enum_letter = "(?:[A-Z]{1,3})" # Match letter enumerations A B C up to ZZX ZZY ZZZ
re_enum_letter_ci = "(?:[A-Z]{1,3}|[a-z]{1,3})" # also includes a b c up to zzx zzy zzz (but not mixed uppercase and lowercase!)
re_enum_roman = "(?:\\bM{0,4}(?:CM|CD|D?C{0,3})(?:XC|XL|L?X{0,3})(?:IX|IV|V?I{0,3})\\b)" # Match roman numerals (I-II-III)
re_enum_roman_ci = f"(?:{re_enum_roman}|{re_enum_roman.lower()})" # Also match lowercase roman numerals (i-ii-iii)
re_enum_mix = f"(?:{re_enum_number}{re_enum_letter_ci}|{re_enum_letter_ci}{re_enum_number})" ## Match mixed enumerations: 1a 1b 1c up to 99999zzx 99999zzy 99999zzz and a1 a2 a3 up to zzz99997 zzz99998 zzz99999, or with uppercase letters


# RegEx for finding cross-reference codes, up to 6 levels, starting with the dividable enumerations and ending with any enumeration:
# 1.1 1.2 1.3   1.1.1 1.1.2 1.1.3   etc.
re_hier_codes = f"\\b(?:(?:{re_enum_mix}|{re_enum_number}|{re_enum_letter})\."+"){1,5}"+f"(?:{re_enum_mix}|{re_enum_number}|{re_enum_roman}|{re_enum_letter})\\b\.?"

# match all references to structural elements, such as article
re_hier_cref = "(?:\\b"+"|".join([hier_labels[x]['re_translated'] for x in hier_labels])+"\\b)"

# matches (sub)articles specifically
re_hier_art = "(?:"+hier_labels["article"]['re_translated']+")"
re_hier_sub = "(?:"+hier_labels["sub-article"]['re_translated']+")"

# matches ordinal numbers in text format, such as first, seventh, twentyfifth, etc.
re_hier_ordinal_num = "(?:\\b(?:[0-9]+(?:st|nd|rd|th))\\b)"
re_hier_ordinal = f"(?:{re_hier_ordinal_num}|{re_ordinal_text})"

# matches relative paths, e.g. "this article", "current paragraph"
re_hier_relative = "\\b(?:this|that|current)\\b"
#re_hier_all = "\\b(?P<relative>this|current|all|every)\\b"


# matches all relevant ways to make conjunctions
#re_hier_conj = "(?:\, |(?:\,)? and | except(?: for)? | (?:up )?to(?: and including)? )"
re_hier_conj = "(?:\, |(?:\,)? and | (?:up )?to(?: and including)? )"

# matches all possible identifiers and combinations at the start of a cross-reference
#re_hier_id_start = f"(?:(?P<id_rel>{re_hier_relative})|(?P<id_ord>{re_hier_ordinal}(?:{re_hier_conj}{re_hier_ordinal})*))"
re_hier_id_ord = f"{re_hier_ordinal}(?:{re_hier_conj}{re_hier_ordinal})*"

# matches all possible identifiers and combinations at the end of a cross-reference
re_hier_id_num = f"{re_hier_codes}(?:{re_hier_conj}{re_hier_codes})*"

# ensures match starts at the beginning of a word (also not after a hyphen)
re_start = "(?<!\\-)\\b"

# Full regex patterns. looks for four distinct patterns:
#re_hier_cref1 = f"(?:the )?({re_hier_id_start}) (?P<sub1>{re_hier_sub})"
# 1. ordinal followed by subarticle
re_hier_cref1 = f"{re_start}(?P<id1_sub>{re_hier_id_ord}) (?P<label1_sub>{re_hier_sub})"
# 2. relative label followed by label
re_hier_cref2 = f"{re_start}(?P<rel2>{re_hier_relative}) (?P<label2>{re_hier_cref})"
# 3. label followed by code
re_hier_cref3 = f"{re_start}(?P<label3>{re_hier_cref}) (?P<id3>{re_hier_id_num})"
# 4. article code followed by ordinal+subarticle/subarticle + number
re_hier_cref4 = f"{re_start}(?P<label4_art>{re_hier_art}) (?P<id4_art>{re_hier_id_num})(?:\,)? (?:(?P<id4_sub1>{re_hier_id_ord}) (?P<label4_sub1>{re_hier_sub})|(?P<label4_sub2>{re_hier_sub}) (?P<id4_sub2>{re_hier_id_num}))"

re_hier_cref = f"({re_hier_cref4}|{re_hier_cref3}|{re_hier_cref2}|{re_hier_cref1})"

# example:
# match = re.findall(re_hier_cref, "the current paragraph, the 1st and 2nd subarticle, table 1 and 1.2.3.3 and 1.3. article 1.2. Article 4.53, sub-article 8. This is also true for this paragraph.", flags=re.IGNORECASE)
matches = re.finditer(re_hier_cref, "The current section. First and second subarticle. table 1.2 and 1.2.3 and article 1.2. Article 4.6 to 4.8. Article 4.53, first sub-article. This paragraph.", flags=re.IGNORECASE)
[(m.start(), m.end(), m[0]) for m in matches]

### Validation (before)

In [None]:
l_sentences_cref_eval = [
    "First and second subarticle.",
    "This paragraph.",
    "Table 1.2 and 1.2.3, and article 1.2.",
    "Article 4.43 to 4.45a.",
    "Article 4.53, first sub-article."
]

In [None]:
for text in l_sentences_cref_eval:
  doc = nlp(text)
  display_ner_score(doc)

### Add to pipeline

In [None]:
# Define the custom pipeline component
def crossref_recognizer(doc, regex_pattern):
    # Find matches using re.finditer, which returns match objects
    matches = re.finditer(regex_pattern, doc.text, flags=re.IGNORECASE)

    # check each match for spans
    custom_spans = []
    for m in matches:
        span = doc.char_span(m.start(), m.end())

        # If no span found, try trimming whitespace or punctuation
        if span is None:
            trimmed_match = m.group(0).rstrip(' ').strip(',').strip('.')#.strip(' and')
            trimmed_start = doc.text.find(trimmed_match)
            trimmed_end = trimmed_start + len(trimmed_match)
            span = doc.char_span(trimmed_start, trimmed_end)

        # If span found, try assigning a label
        if span is not None:
            span.label_ = "CROSS_REFERENCE"
            span._.score = 1
            custom_spans.append(span)

    # check for overlapping spans and remove the existing spans in case of overlap
    existing_spans = list(doc.ents)
    spans_to_remove = [] # gather spans to be removed in a separate list

    for span in custom_spans:
        for x in existing_spans:
            if span.start <= x.end and span.end >= x.start:
                # store overlapping existing spans to be removed in list to avoid duplicate removals
                if x not in spans_to_remove:
                    spans_to_remove.append(x)

    # remove overlapping existing span at once to avoid conflicts with loop
    for span_to_remove in spans_to_remove:
            existing_spans.remove(span_to_remove)


    # Add custom spans to the document's entities without overwriting existing ones
    doc.ents = tuple(existing_spans) + tuple(custom_spans)
    return doc

In [None]:
@Language.component("cref_recognizer")#, assigns=["doc.ents"])
def entity_linker(doc):
    doc = crossref_recognizer(doc, re_hier_cref)
    return doc

# Add the custom component to the pipeline
try:
    nlp.remove_pipe("cref_recognizer") #remove old version when rerunning code
except:
    None
# nlp.add_pipe("cref_recognizer", after="gliner_spacy")
nlp.add_pipe("cref_recognizer", last=True)

### Validation (after)

In [None]:
for text in l_sentences_cref_eval:
  doc = nlp(text)
  display_ner_score(doc)

## Quantity recognition

### Prepare regex patterns

In [None]:
for index, reg in df_subset_big[:10].iterrows():
    print(reg['text_translated'])
    print(re.findall(r'(?<!\w)(([+-]?\d+(?:\.\d+)?(?:\s*\/\s*\d+)?(?:\s*[°μΩ²³⁴⁵⁶23456]?\s*[a-zA-Z²³⁴⁵⁶μΩ\d]+(?:\s*\/\s*[a-zA-Z²³⁴⁵⁶23456μΩ\d]+)*))|(?:[0-9]+(?:\.[0-9]+)? ?%))', reg['text_translated']))
    print()

### Validation (before)

In [None]:
l_sentences_quant_eval = [
    "The heat radiation that can occur is larger than 2 kW/m2.",
    "A temperature can occur that is higher than 90 ° C.",
    "A sub-fire compartment with an internal cross-section larger than 0.015 m2, complies with fire class A2, determined according to NEN-EN 13501-1.",
    "At most 5% of the total area ...",
]

In [None]:
for text in l_sentences_quant_eval:
  doc = nlp(text)
  display_ner_score(doc)

### Add to pipeline

In [None]:
# Define the custom pipeline component
def quantity_recognizer(doc):
    regex_pattern = r'(?<!\w)(([+-]?\d+(?:\.\d+)?(?:\s?\/\s*\d+)?\s?%?(?:[°μΩ23456²³⁴⁵⁶]?\s?[a-zA-ZμΩ\d²³⁴⁵⁶]+(?:\s?\/\s?[a-zA-ZμΩ\d²³⁴⁵⁶]+)*))|(?:[0-9]+(?:\.[0-9]+)?))'

    # Find matches using re.finditer, which returns match objects
    matches = re.finditer(regex_pattern, doc.text)#, flags=re.IGNORECASE) # This line was changed


    updated_ents = list(doc.ents)
    # check each match for spans
    custom_spans = []
    for m in matches:
        if m[0] != '':
            new_span = doc.char_span(m.start(), m.end())

            # If no span found, try trimming whitespace or punctuation
            if new_span is None:
                trimmed_match = m.group(0).rstrip(' ').rstrip(',').rstrip('.')#.strip(' and')
                trimmed_start = doc.text.find(trimmed_match)
                trimmed_end = trimmed_start + len(trimmed_match)
                new_span = doc.char_span(trimmed_start, trimmed_end)

            # If still no span is found, try increasing the span length (needed for cases such as "90 ° C.", where spacy recognizes the period as part of the token)
            if new_span is None:
                new_span = doc.char_span(m.start(), m.end()+1)

            # If span found, try assigning a label
            if new_span is not None:
                overlap_found = False
                to_remove = []

                for i, ent in enumerate(updated_ents):
                    if (new_span.start < ent.end and new_span.end > ent.start):
                        # Overlapping span found
                        overlap_found = True
                        if ent.label_ != "CROSS_REFERENCE" and ent.label_ != "reference" and ent.label_ != "standard" and ent.label_ != "classification":
                            to_remove.append(i)
                        break

                # Remove old overlapping span if it's not a reference or classification
                for i in sorted(to_remove, reverse=True):
                    del updated_ents[i]

                # Add new span if it's not overlapping a reference or classification
                if not overlap_found or (overlap_found and not any((ent.label_ == "CROSS_REFERENCE" or  ent.label_ == "reference" or ent.label_ == "standard" or ent.label_ == "classification") and new_span.start < ent.end and new_span.end > ent.start for ent in updated_ents)):
                    # Create span with label "UNIT" and assign a 100% score
                    span = Span(doc, new_span.start, new_span.end, label="unit")
                    span._.score = 1.0
                    updated_ents.append(span)

    # Sort to maintain proper order
    updated_ents = sorted(updated_ents, key=lambda x: (x.start, x.end))
    doc.ents = updated_ents

    return doc

In [None]:
@Language.component("quantity_recognizer")#, assigns=["doc.ents"])
def entity_linker(doc):
    doc = quantity_recognizer(doc)
    return doc

# Add the custom component to the pipeline
try:
    nlp.remove_pipe("quantity_recognizer") #remove old version when rerunning code
except:
    None
# nlp.add_pipe("quantity_recognizer", after="cref_recognizer")
nlp.add_pipe("quantity_recognizer", last=True)

### Validation (after)

In [None]:
for text in l_sentences_quant_eval:
  doc = nlp(text)
  display_ner_score(doc)

## **Quality recognition**



In [None]:
# Sample sentence
text = "A cell unit located in an enclosed building space that is enclosed is a protected sub-fire compartment."

# Process the text
doc = nlp(text)

# Find nouns with outgoing 'relcl' relation to a verb -- could also try using token.dep_ = "quant" or token.dep_ = "attr"?
for token in doc:
    if token.pos_ in ["NOUN", "PROPN"]:  # Check if token is a noun
        for child in token.children:  # Iterate over its children
            if child.dep_ == "relcl" and child.pos_ in ["VERB", "AUX"]:
                # get span for new entity
                print(f"Noun: {token.text}, Relcl Verb: {child.text}")


In [None]:
# Define the custom pipeline component
def quality_recognizer(doc):
    updated_ents = list(doc.ents)
    # Find nouns with outgoing 'relcl' relation to a verb -- could also try using token.dep_ = "quant" or token.dep_ = "attr"?
    for token in doc:
        if token.pos_ in ["NOUN", "PROPN"]:  # Check if token is a noun
            for child in token.children:  # Iterate over its children
                if child.dep_ == "relcl" and child.pos_ in ["VERB", "AUX"]:
                    # print(f"Noun: {token.text}, Relcl Verb: {child.text}")
                    # get span for new entity
                    new_span = doc.char_span(child.idx, child.idx+len(child))

                    # If span found, try assigning a label
                    if new_span is not None:
                        overlap_found = False
                        to_remove = []

                        for i, ent in enumerate(updated_ents):
                            if (new_span.start < ent.end and new_span.end > ent.start):
                                # Overlapping span found
                                overlap_found = True
                                to_remove.append(i)

                        # Remove old overlapping span if it's not a reference or classification
                        for i in sorted(to_remove, reverse=True):
                            del updated_ents[i]

                        # Add new span if it's not overlapping a reference or classification
                        if not overlap_found:
                            # Create span with label "QUALITY" and assign a 100% score
                            span = Span(doc, new_span.start, new_span.end, label="QUALITY")
                            span._.score = 1.0
                            updated_ents.append(span)

    # Sort to maintain proper order
    updated_ents = sorted(updated_ents, key=lambda x: (x.start, x.end))
    doc.ents = updated_ents
    return doc

In [None]:
# Sample sentence
text = "A cell unit located in a building space that is enclosed is a protected sub-fire compartment."

doc = nlp(text)
# Process the text
doc = quality_recognizer(doc)

for ent in doc.ents:
    print(ent.text, ent.label_)

### Validation (before)

In [None]:
l_sentences_qual_eval = ["A cell unit located in a building space that is enclosed is a protected sub-fire compartment."]

In [None]:
for text in l_sentences_qual_eval:
  doc = nlp(text)
  display_ner_score(doc)

### Add to pipeline

In [None]:
@Language.component("quality_recognizer")#, assigns=["doc.ents"])
def entity_linker(doc):
    doc = quality_recognizer(doc)
    return doc

# Add the custom component to the pipeline
try:
    nlp.remove_pipe("quality_recognizer") #remove old version when rerunning code
except:
    None
# nlp.add_pipe("quality_recognizer", after="cref_recognizer")
nlp.add_pipe("quality_recognizer", last=True)

### Validation (after)

In [None]:
for text in l_sentences_qual_eval:
  doc = nlp(text)
  display_ner_score(doc)

# NER pipeline Validation

In [None]:
for index, reg in df_subset_big[:10].iterrows():
    doc = nlp(reg['text_translated'])
    print(doc)
    for span in doc.ents:
        print(f'{span.text:25} --> {span.label_}')
    print()

In [None]:
for index, reg in df_subset_big[:10].iterrows():
    # doc = nlp(text_to_num(reg['text_translated']))
    doc = nlp(reg['text_translated'])
    display_ner_score(doc)

---
# **Prepare Knowledge Bases**

Knowledge Bases are created per entity type, mostly using ontologies.

*Mostly using labels only, but information like descriptions can be included as well*


Not yet included:
*  systems & devices
*  quality data
*  materials & layers
*  construction permanence



Functions for converting ontologies to knowledge bases

In [None]:
# Convert ontology to knowledge base
def ONT2KB(ontology_url, prefix, rdf_type=OWL.Class, splitter="#"): # splitter is sometimes "/"
    kb = {} # Initialize kb as an empty dictionary within the function
    # Parse graph
    g = Graph()
    g.parse(ontology_url, format="turtle")

    # Find literals (either the only item or the english item)
    def get_best_literal(literals):
        if not literals:
            return None
        for lit in literals:
            if lit.language == "en":
                return str(lit)
        return str(next(iter(literals)))  # return first one

    # Check each class (a owl:class) in the ontology
    for c in g.subjects(RDF.type, rdf_type):

        # Get URI
        if splitter in c: # Check if splitter ("#" or "/") is present before splitting
            uri = prefix+":"+str(c.split(splitter)[-1])
        else: # Handle cases where splitter is not present (e.g., use the full URI)
            # uri = prefix+":"+str(c) splitter or any other appropriate logic
            continue # skip element entirely if no URI is present

        # Get label (name)
        label = get_best_literal(list(g.objects(c, RDFS.label)))
        if not label: # Create label from the URL fragment if no explicit label is present
            # get URL fragment and split PascalCase string  into separate words
            label = re.sub(r'(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])', ' ', str(c.split("#")[1]))
            # Also remove prefixes like "Ifc" from name
            if label.lower().startswith(prefix.lower()):
                label = label[len(prefix):]
            # Also change underscores to spaces
            label = label.replace("_", " ")
            # Remove leading & trailing spaces
            label = label.strip()

        # Get description (if any)
        comment = get_best_literal(list(g.objects(c, RDFS.comment)))

        # Get domain (if any)
        domain = [str(d) for d in g.objects(c, Namespace("http://schema.org/").domainIncludes)]

        # Get range (if any)
        range = [str(r) for r in g.objects(c, RDFS.range)]

        # Add info to knowledge base
        kb[uri] = kb.get(uri, {}) # Get existing value or initialize as empty dict
        if label: # Assign name (if present)
            kb[uri]["name"] = label   # Assign name
        if comment: # Assign description (if present)
            kb[uri]["description"] = comment
        if domain: # Assign domain (if present)
            kb[uri]["domain"] = domain
        if range: # Assign range (if present)
            kb[uri]["range"] = range

    return kb # Return the populated kb dictionary

## Spatial elements

In [None]:
# all spatial elements
kb_spatial_bot = {
    "bot:Site" : {"descriptors": ["building site", "construction site"]},
    "bot:Building" : {"descriptors": ["building", "construction"]},
    "bot:Storey" : {"descriptors": ["storey", "building storey", "building story"]},
    "bot:Space" : {"descriptors": ["building space", "space"]},
    "bot:Element" : {"descriptors": ["building element", "built element"]},
}
kb_spatial_custom = {
    "ex:ProtectedSubFireCompartment" : {"descriptors": ["protected sub-fire compartment", "protected subfire compartment"]},
    "ex:SubFireCompartment" : {"descriptors": ["sub-fire compartment", "subfire compartment"]},
    "ex:FireCompartment" : {"descriptors": ["fire compartment"]},
    "ex:ExtraProtectedEscapeRoute" : {"descriptors": ["additionally protected escape route", "extra protected escape route"]},
    "ex:ProtectedEscapeRoute" : {"descriptors": ["protected escape route"]},
    "ex:EscapeRoute" : {"descriptors": ["escape route"]},
}
kb_spatial = {**kb_spatial_bot, **kb_spatial_custom}

In [None]:
kb_spatial

## Building elements

Using both BEO and ifcOWL

BEO knowledge base will contain name, URI and description

In [None]:
# load BEO ontology as knowledge base
kb_BEO = ONT2KB("https://cramonell.github.io/beo/actual/ontology.ttl", "beo")
kb_BEO["beo:Door"]

ifcOWL knowledge base will only contain names

In [None]:
# load ifcOWL ontology as knowledge base
# kb_IFC = ONT2KB("https://standards.buildingsmart.org/IFC/DEV/IFC4/ADD2_TC1/OWL/ontology.ttl", "ifc")
kb_IFC = ONT2KB("https://cramonell.github.io/ifc/ifcowl/IFC4X3_ADD2/actual/ontology.ttl", "ifc")
kb_IFC['ifc:IfcDoor']

## Materials

simple workaround for now

In [None]:
kb_materials = {
    "Glass": {"descriptors": ["glass", "tempered glass", "laminated glass"]},
              #"pset": "pset:Identity", "property": "props:Category"},
    "Concrete": {"descriptors": ["concrete", "reinforced concrete", "precast concrete"]},
                 #"pset": "pset:Identity", "property": "props:Category"},
    "Steel": {"descriptors": ["steel", "structural steel", "stainless steel"]},
              #"pset": "pset:Identity", "property": "props:Category"},
    "Wood": {"descriptors": ["wood", "plywood", "timber", "laminated wood"]},
             #"pset": "pset:Identity", "property": "props:Category"},
    "Masonry": {"descriptors": ["masonry", "brick", "concrete blocks", "stone"]},
                #"pset": "pset:Identity", "property": "props:Category"},
    "Insulation": {"descriptors": ["insulation", "mineral wool", "EPS", "foam board"]},
                   #"pset": "pset:Identity", "property": "props:Category"},
    "Finishes": {"descriptors": ["finishes", "paint", "plaster", "tiles"]},
                 #"pset": "pset:Identity", "property": "props:Category"},
    "Composites": {"descriptors": ["composites", "fiberglass"]},
                   #"pset": "pset:Identity", "prop": "props:Category"},
    "Plastics": {"descriptors": ["plastic", "PVC", "polycarbonate", "acrylic"]},
                 #"pset": "pset:Identity", "prop": "props:Category"},
}

## Properties

*Correct psets are not yet taken into account*

In [None]:
# custom building properties
kb_custom_buildingprops = {
    'props:Usefunction' : {"pset": "pset:Other", "descriptors": ["use function", "use functions", "usage function", "usage functions", "use area", "area of use"]},
}

In [None]:
# Load dimensions kb
kb_dimensions = {
    "props:Width" : {"descriptors" : ["width", "wide"], "pset": "pset:Dimensions"},
    "props:Length": {"descriptors" : ["length", "long"], "pset": "pset:Dimensions"},
    "props:Height": {"descriptors" : ["height", "high"], "pset": "pset:Dimensions"},
    "props:Depth": {"descriptors" : ["depth", "deep"], "pset": "pset:Dimensions"},
    "props:Thickness": {"descriptors" : ["thickness", "thick"], "pset": "pset:Dimensions"},
    "props:Perimeter": {"descriptors": ["perimeter"], "pset": "pset:Dimensions"},
    "props:Area": {"descriptors": ["area"], "pset": "pset:Dimensions"},
    "props:Volume": {"descriptors": ["volume"], "pset": "pset:Dimensions"},
    # "props:ComputationHeight": {"name": "computation height", "pset": "pset:Dimensions"},

}

In [None]:
# Load the PROPS ontology as knowledge base
kb_PROPS = ONT2KB("https://raw.githubusercontent.com/maximelefrancois86/props/refs/heads/master/IFC4-output.ttl", "props", rdf_type=OWL.DatatypeProperty, splitter="/")
kb_PROPS['props:width']

In [None]:
kb_PROPS['props:nominalWidth']

## Qualities

*Simplified for now*

In [None]:
kb_qualities = {
    ("props:Isenclosed", True): {"pset": "pset:Other", "descriptors": ["enclosed", "closed"]},
    ("props:Isenclosed", False): {"pset": "pset:Other", "descriptors": ["unenclosed", "open"]},
    ("props:Isexternal", True): {"pset": "pset:Common", "descriptors": ["external", "externally", "exterior"]},
    ("props:Isexternal", False): {"pset": "pset:Common", "descriptors": ["internal", "internally", "interior"]},
    ("props:Loadbearing", True): {"pset": "pset:Common", "descriptors": ["load bearing", "load-bearing", "loadbearing", "structural"]},
    ("props:Loadbearing", False): {"pset": "pset:Common", "descriptors": ["non load bearing", "non-load bearing", "nonstructural", "non-structural"]},
}

kb_qualities.get(("props:Isenclosed", True))

## Classifications

*Will not work for all types of classifications!*

Turns classifications into numbers, so they are ordinal

In [None]:
# fire classifications
kb_classifications = {
    'props:Fireclass' : {"pset": "pset:Other",
                         "descriptors": ["fire class", "fire classification"],
                         "classes": ('F', 'E', 'D', 'C', 'B', 'A2', 'A1')},
    'props:Fireresistance' : {"pset": "pset:Other",
                              "descriptors": ["fire resistance", "fire resistant", "fire spread resistance", "fire resistance class",
                              "resistance to fire penetration and fire spread", "resistance to fire penetration", "resistance to fire spread",
                              "fire spread resistance class", "fire resistance classification", "fire spread resistance classification"],
                              "classes": ('15', '30', '60', '90', '120')}, # NEN 6068
    'props:Smokeresistance': {"pset": "pset:Other",
                              "descriptors": ["smoke control class", "smoke resistance", "resistance to smoke passage"],
                              "classes": ('Ra', 'R200')}
    }

In [None]:
kb_classifications.get('props:Fireclass')

## Quantities

In [None]:
# load unit vocabulary
kb_unit = Graph()
# kb_unit.parse("https://qudt.org/2.1/vocab/unit.ttl", format="turtle") # most recent version
kb_unit.parse("https://qudt.org/vocab/unit/", format="turtle") # using this version instead, which works better with quantity & unit normalization

---
# **Entity Linking** (rule-based)



Current script works mostly using rule-based approaches, but implementing statistical or neural approaches as well is recommended

## Prepare subfunctions

### Find by keyword

*Might return false positives for short keywords*

In [None]:
# Return entries where the keyword is found in a specific item of a database
def find_by_keyword(kb, keyword, key, exact=False, list=False, plurality=True):
    # Prepare dictionary for results
    results = {}
    # Match keywords

    for curie, data in kb.items():
        if list: # check items in a list
            if exact and any(keyword.lower() == value.lower() for value in data.get(key, [])):
                results[curie] = data
            elif not exact and any(keyword.lower() in value.lower() for value in data.get(key, [])):
                results[curie] = data
        else: # check an individual item
            if exact and data.get(key, "").lower() == keyword.lower():
                results[curie] = data
            elif not exact and keyword.lower() in data.get(key, "").lower():
                results[curie] = data
    # if no match is found and exact match is enabled, try again while removing s from the end (which might indicate plurality)
    if not results and exact == True and plurality == True:
        results = find_by_keyword(kb, keyword.rstrip('s'), key, exact=exact, list=True, plurality=False)
    return results
find_by_keyword(kb_BEO, "sun", "description")

In [None]:
find_by_keyword(kb_spatial_bot, "building spaces", "descriptors", list=True, exact=True)

###  Material & quality match

*Simple workaround for now*

In [None]:
def cand_match_quality(text):
    match = find_by_keyword(kb_qualities, text, "descriptors", list=True, exact=True)
    if match:
        return {"pset" : list(match.values())[0].get('pset'),
                "property": list(match.keys())[0][0]}
    else:
        return None
cand_match_quality("unenclosed")

In [None]:
def cand_match_material(text):
    match = find_by_keyword(kb_materials, text, "descriptors", list=True, exact=True)
    if match:
        return {"material_pset" : "pset:Identity",
                "material_property": "props:Category",
                "material_category": list(match.keys())[0]}
    else:
        return None
cand_match_material("PVC")

### Adjective match

Tries to match qualitities and quantities from adjectives (nested entities)

In [None]:
def cand_match_adjective(adjective):
    if adjective == "":
        return {}
    quality = cand_match_quality(adjective)
    if quality:
        return quality
    else:
        material = cand_match_material(adjective)
        if material:
            return material
        else:
            return {}

### Partial matching




In [None]:
def partial_match(kb, text, key, list=False, exact=True):

    # First, try removing tokens on the left
    candidate = text.split(" ") # split text by spaces
    trimmed = ""
    while len(candidate) > 0: # keep trying until down to the last word
        # Try to find candidate match in names of the knowledge base
        cand_match = find_by_keyword(kb, " ".join(candidate), key, list=list, exact=exact)
        if cand_match:  # return first match as linked entity + trimmed
            return next(iter(cand_match)), trimmed.strip(" ")
        trimmed += candidate[0] + " "
        candidate = candidate[1:]

    # Then, try removing tokens on the right
    candidate = text.split(" ") # split text by spaces
    trimmed = ""
    while len(candidate) > 0: # keep trying until down to the last word
        # Try to find candidate match in names of the knowledge base
        cand_match = find_by_keyword(kb, " ".join(candidate), key, list=list, exact=exact)
        if cand_match:  # return first match as linked entity + trimmed
            return next(iter(cand_match)), trimmed.strip(" ")
        trimmed = candidate[-1] + " " + trimmed
        candidate = candidate[:-1]

    # Return empty if no matches are found
    return None, ""

match = partial_match(kb_BEO, "external doors", "name")
{"element": match[0], **cand_match_adjective(match[1])}

### Classification match

*Tries to make the classification ordinal*

In [None]:
kb_classifications['props:Fireresistance']['classes']

In [None]:
def cand_match_classification(text):
    # first, find if there is a name and class from the classifications kb
    class_name, class_code = partial_match(kb_classifications, text, "descriptors", list=True)
    if not class_name:
        return None
    class_prop = kb_classifications[class_name]

    # special case for fire resistance (which is not ordinal but in minutes)
    if class_name == 'props:Fireresistance':
        return {'pset': 'pset:Other',
                'property': 'props:Fireresistance',
                'quantity': int([c for c in kb_classifications['props:Fireresistance']['classes'] if c in class_code][0])}

    # try to make the classification ordinal, try to match class or part of class to property
    if class_prop.get('classes'):
        for i, clss in enumerate(class_prop['classes']):
            if clss.lower() in class_code.lower(): # case-insensitive
                return {'pset': 'pset:Other',
                        'property': class_name,
                        'classification': clss,
                        'compliant_classes': class_prop['classes'][i:]} # store all classes that are also met for this specific class
        # if no ordinal match found, return normal matches
        if class_code:
            return {'pset': 'pset:Other',
                    'property': class_name,
                    'classification': class_code}
    # try to match without classification value
    if class_prop:
        return {'pset': 'pset:Other',
                'property': class_name}
    else:
        return None

print(cand_match_classification("fire class D"))
print(cand_match_classification("fire resistance of 90 minutes"))
print(cand_match_classification("fire resistant for 30 minutes"))

### Props match

*PROPS returns many false positives when checking for description, so this has been disabled*

In [None]:
# convert camelCase to PascalCase (which is used by IfcOpenShell for classes)
def camel2pascal(text):
    # try splitting prefix from URI (if prefix is present)
    try:
        prefix = text.split(':')[0]
        clss = text.split(':')[1]
        return ':'.join([prefix, clss[0].upper()+clss[1:]])
    except: # without prefix
        return text[0].upper()+text[1:]

def cand_match_props(text):
    # First, try finding the name as one of the manually defined properties
    property = find_by_keyword(kb_custom_buildingprops, text, "descriptors", list=True, exact=True)
    if not property:
        property = find_by_keyword(kb_dimensions, text, "descriptors", list=True, exact=True)

    # If custom KB match is found, return it
    if property:
        key = next(iter(property.keys()), None)
        values = list(property.values())[0]
        return {'property': key,
                'pset': values.get('pset')}

    # Then, try matching as classification
    classification = cand_match_classification(text)
    if classification:
        return classification

    # Then, try finding the text as name of a property
    property = find_by_keyword(kb_PROPS, text.replace(' ', ''), "name", exact=True)
    # # Then, try finding it in a description
    # if not property:
    #     property = find_by_keyword(kb_PROPS, text, "description")

    # If no match is found: return generic props name. Also for multiple matches?
    if len(property) != 1:
        return {'property': 'props:'+text.title().replace(' ','')}
                #'pset': None}

    # If PROPS match is found, return it
    key = camel2pascal(next(iter(property.keys()), None))
    values = list(property.values())[0]
    return {'property': key}
            # 'pset': values.get('range')[0],
            # 'pset': values.get('domain')[0],}

cand_match_props("wide")

### Quantity match

*Might return incorrect standardized units for some elements, but standardized values should be correct*

In [None]:
# Define namespaces
QUDT = Namespace("http://qudt.org/schema/qudt/")
UNIT = Namespace("http://qudt.org/vocab/unit/")

# # To deal with floating point errors:
# from decimal import Decimal, getcontext
# getcontext().prec = 4  # Set precision to 4 decimal places

# Function for finding base unit (after conversion)
def get_base_unit(qudt_unit) -> URIRef:
    # Special case for grams, where kilograms is the base unit
    if "GM" in str(qudt_unit):
        return UNIT["KiloGM"]
    # Option A: Try simple scaling unit (e.g., MilliM)
    scaled_from = kb_unit.value(subject=URIRef(qudt_unit), predicate=QUDT.scalingOf)
    if scaled_from:
        return scaled_from

    # Option B: Handle complex factor units
    base_units = []
    for factor_unit in kb_unit.objects(subject=URIRef(qudt_unit), predicate=QUDT.hasFactorUnit):
        base = kb_unit.value(subject=factor_unit, predicate=QUDT.hasUnit)
        exponent = kb_unit.value(subject=factor_unit, predicate=QUDT.exponent)

        # Recursively find the base of this unit
        base_of_base = get_base_unit(base)
        base_units.append((int(exponent), base_of_base))

    # Construct base unit name (e.g., unit:N-M2)
    # This assumes the original vocabulary uses this naming convention
    numerators = []
    denominators = []

    for exponent, base_unit in base_units:
        base_name = str(base_unit).split("/")[-1]  # extract local name
        if exponent > 0:
            part = f"{base_name}" if exponent == 1 else f"{base_name}{exponent}"
            numerators.append(part)
        elif exponent < 0:
            part = f"{base_name}" if exponent == -1 else f"{base_name}{-exponent}"
            denominators.append(part)

    base_name = "-".join(numerators)
    if denominators:
        base_name += "-PER-" + "-".join(denominators)
    return base_name


def standardize_quantity(value, unit):
    # Making sure metres is always specified as "m" instead of "M" (while not modifying M for mega)
    unit = re.sub(r'M(?![^\dΩ])', 'm', unit)
    # Different unit representations to try:
    unit_1 = unit.replace("2", "Â²").replace("3", "Â³").replace("4", "Â⁴").replace("5", "Â⁵").replace("6", "Â⁶")
    unit_2 = unit.replace("2", "²").replace("3", "³").replace("4", "⁴").replace("5", "⁵").replace("6", "⁶")
    unit_3 = unit.replace("²", "2").replace("³", "3").replace("⁴", "4").replace("⁵", "5").replace("⁶", "6")

    # Query the units. Alternatively, case-insensitive version:  FILTER(LCASE(?symbol) = LCASE("{unit_X}"))
    query_units = f'''
    PREFIX qudt: <{QUDT}>
    PREFIX rdfs: <{RDFS}>

    SELECT ?unitname ?factor ?kind
    WHERE {{
        {{
            ?unitname qudt:symbol ?symbol .
            FILTER( ?symbol = "{unit_1}" )
        }}
        UNION {{
            ?unitname qudt:symbol ?symbol .
            FILTER( ?symbol = "{unit_2}" )
        }}
        UNION {{
            ?unitname qudt:uCumCode ?code .
            FILTER( STR(?code) = "{unit_3}" )
        }}'''
    # If unit has exponents in the denominator, replace those with a different notation for uCumCode
    if "/" in unit:
        unit_4 = unit_3.split('/')[0]+'.'+unit_3.split('/')[1].replace("2", "-2").replace("3", "-3").replace("4", "-4").replace("5", "-5").replace("6", "-6")
        query_units += f'''        UNION {{
            ?unitname qudt:uCumCode ?code .
            FILTER( STR(?code) = "{unit_4}" )
        }}'''
    # Also try to find text labels if the unit is purely text-based
    if re.match('^[a-zA-Z]{3,}$', unit):
        unit_5 = unit.strip(' ')
        unit_6 = unit.strip(' ').rstrip('s')
        query_units += f'''
        UNION {{
            ?unitname rdfs:label "?label" .
            FILTER( LCASE(STR(?label)) = LCASE("{unit_5}") )
        }}
        UNION {{
            ?unitname rdfs:label ?label .
            FILTER( LCASE(STR(?label)) = LCASE("{unit_6}") )
        }}'''
    query_units += '\n}' # close brackets in query

    # Perform query
    results = kb_unit.query(query_units)

    if not results:
        print("error converting", value, unit)
        return value, None
    for row in results:
        if not row.unitname:
            None
        else:
            qudt_unit = str(row.unitname)


    # Check conversion multiplier
    factor = kb_unit.value(subject=URIRef(qudt_unit), predicate=QUDT.conversionMultiplier)
    # Skip base unit retrieval if factor = 1 (assumes base unit for those cases)
    if factor and float(factor) == 1.0:
        return float(value), qudt_unit.replace(UNIT, 'unit:')
    # Special case for minutes, which are used for fire resistance classes and should not be standardized?
    elif unit == "minutes":
        return value, unit
    elif not factor:
        return float(value), unit
    else:
        return float(factor)*float(value), "unit:"+get_base_unit(qudt_unit).replace(UNIT, '')


# Example use:
print(standardize_quantity(90, "kW/m2"))
print(standardize_quantity("200", "MJ/m2"))
print(standardize_quantity("200", "MJ/M2"))
print(standardize_quantity("90", "cm"))
print(standardize_quantity("3", "meters"))
print(standardize_quantity("3", "centimeter"))
print(standardize_quantity("90", "g"))
print(standardize_quantity("90", "kW/m2"))
print(standardize_quantity("90", "minutes"))
print(standardize_quantity("5", "kN/m2"))
print(standardize_quantity("90", "°C"))

"*minutes*" *have been left out of the equation since they will always refer to a classification in case of fire safety and not a quantity.*

*Even if this is true, "90 minutes" should return unit SEC but returns MIN while also modifying value so this should be fixed.*

In [None]:
def cand_match_quantity(text):
    # Remove spaces & thousands separators first
    quantity = text.replace(",", "").strip(' ')
    # Check if quantity is unitless
    unitless = re.match(r'^-?\d+(?:\.\d+)?$', quantity)
    if unitless:
        return {'quantity': unitless[0]}
    # Check if quantity is a percentage and convert to number if true
    percentage = re.match(r'^(-?\d+(?:\.\d+)?) ?%', quantity)
    if percentage:
        return {'quantity': float(percentage[1])*0.01, 'unit': 'unit:PERCENT'}
    # Find value and unit part of text
    match = re.match(r"(-?\d+(?:\.\d+)?) ?((?:° )?\S+)", text) # Also addresses degree formatting issues
    try:
        value = match[1]
        unit = match[2].rstrip('.') #remove periods at the end, which happens when spaCy doesn't identify the entity correctly
        unit = unit.replace ('° ', '°') # Also addresses degree formatting issues
        # special case for minutes (which is a fire class):
        if "minutes" in unit:
            fr_classes = kb_classifications['props:Fireresistance']['classes']
            return {'pset': 'pset:Other',
                    'property': 'props:Fireresistance',
                    'quantity': int(value)
                    # 'classification': f'"{quantity}"^^xsd:string',
                    # 'compliant_classes': fr_classes[fr_classes.index(str(value)):]
                    }
        # For other cases:
        result = standardize_quantity(value, unit)
        if result:
            v, u = result
            # return {'quantity': v, 'unit': 'unit:'+u.split('/')[-1]} if u else {'quantity': v, 'unit': unit}
            return {'quantity': v, 'unit': u} if u else {'quantity': v, 'unit': unit}
        else:
            return {'quantity': value, 'unit': unit}
    except:
        try: # Try to see if value was found without unit
            return {'quantity': value}
        except:
            return None

print(cand_match_quantity('3500 mm'))
print(cand_match_quantity('60 minutes'))
print(cand_match_quantity("90 minutes"))
print(cand_match_quantity('90 ° C'))
print(cand_match_quantity("200 MJ/m2"))
print(cand_match_quantity("90 cm"))
print(cand_match_quantity("3 meters"))
print(cand_match_quantity("90 g"))
print(cand_match_quantity("90 kW/m2"))

### Cross-reference linking

*Basic implementation for now, will have to be improved for actual deployment*


1.   Find the parts of the text mentioning crefs & store it
2.   Replace the references in the text with placeholders, and store reference text separately
3.   Find the corresponding references for each reference text & store this with the reference text

In [None]:
# Function to replace ordinal words with numbers
def text_to_num(text):
    # Regex pattern to first replace all orders of 10 (except for 10 itself)
    tens_pattern_full = re.compile(r'\b(?:' + '|'.join(re.escape(k) for k in tens_map_full.keys()) + r')(?![a-z\-])', re.IGNORECASE)
    tens_pattern_part = re.compile(r'\b(?:' + '|'.join(re.escape(k) for k in tens_map_part.keys()) + r')(?=[a-z\-])', re.IGNORECASE)

    text = tens_pattern_full.sub(lambda match: tens_map_full[match.group(0).lower()], text)
    text = tens_pattern_part.sub(lambda match: tens_map_part[match.group(0).lower()], text)

    # Regex patterns to match ordinal strings
    ordinal_pattern_full = re.compile(r'(?:\b' + '|'.join(re.escape(k) for k in ordinal_map.keys()) + r')\b', re.IGNORECASE)
    ordinal_pattern_part = re.compile(r'(?:(?<=\b[0-9])' + '|'.join(re.escape(k) for k in ordinal_map.keys()) + r')\b', re.IGNORECASE)
    text = ordinal_pattern_full.sub(lambda match: ordinal_map[match.group(0).lower()], text)
    text = ordinal_pattern_part.sub(lambda match: ordinal_map[match.group(0).lower()], text)

    # Regex patterns to match numerical strings
    numerical_pattern_full = re.compile(r'(?:\b' + '|'.join(re.escape(k) for k in numerical_map.keys()) + r')\b', re.IGNORECASE)
    numerical_pattern_part = re.compile(r'(?<=\b[0-9])(?:' + '|'.join(re.escape(k) for k in numerical_map.keys()) + r')\b', re.IGNORECASE)
    text = numerical_pattern_full.sub(lambda match: numerical_map[match.group(0).lower()], text)
    text = numerical_pattern_part.sub(lambda match: numerical_map[match.group(0).lower()], text)

    return text

# Example usage
text = "first, fourth and fifth, as well as twentieth twenty-first to twenty-three, twenty-third and thirtyfourth and tenth three and thirtyfive and stwenty twenty sfirst"
converted_text = text_to_num(text)
print(text)
print(converted_text)

In [None]:
def replace_with_codes(text, mapping):
    for item in mapping.values():
        pattern = f'\\b{item["re_translated"]}\\b'
        code = item["code"]
        text = re.sub(pattern, code, text.replace("-", "").replace("the ",""), flags=re.IGNORECASE)  # Case-insensitive replacement
    return text

# Example usage
for text in ["This chapter should be read along with the following sections.",
             "ARTICLES 4.43 and 4.45a, first and second sub-articles, subarticle 1"]:
    print(text_to_num(replace_with_codes(text, hier_labels)))

In [None]:
def find_cref_codes(cref_code, cref_text, hier_codes):
    # replace spaces with underscores & split if there are multiple
    l_cref_codes = cref_code.split('+')
    for cref in l_cref_codes:
        # check if there is a range present
        if "--" in cref:
            l_cref_codes.remove(cref)
            cref_start, cref_end = cref.split('--')
            print("---->", cref_text)
            #print("---->", cref_start, cref_end)
            in_range = False
            # add references in range to dataset (assumes that list is sorted)
            for reg in hier_codes:
                if reg == cref_start:
                    l_cref_codes = [reg]
                    in_range = True
                elif reg == cref_end:
                    l_cref_codes.append(reg)
                    in_range = False
                    break
                elif in_range:
                    l_cref_codes.append(reg)
    if len(l_cref_codes) == 1:
        return l_cref_codes[0]
    elif len(l_cref_codes) > len(hier_codes)/2 or len(l_cref_codes) > 60: #probably an error
        print("error converting:", cref_text)
        return None
    else:
        return l_cref_codes

*Code below is unfinished:*

In [None]:
def get_cref_codes(text, reg_code):
    # remove unimportant words
    text = text.lstrip('the').lstrip('The')

    # pattern matching for for type 4 (article code followed by ordinal+subarticle/subarticle + number)
    # (label4_art, id4_art, id4_sub1, label4_sub1, label4_sub2, id4_sub2)
    # (should be checked first since it overlaps with the others!)
    # cref_match = re.sub(re_hier_cref, "\g<label4_art>\g<id4_art>, \g<label4_sub1>\g<label4_sub2> \g<id4_sub1>\g<id4_sub2>", text, flags=re.IGNORECASE)
    re_match = re.match(re_hier_cref4, text)
    if re_match:
        # print(4, re_match[0])
        print(f'WIP! cref not converted: "{re_match[0]}"')
    # #re.match()
    # #cref_match = re.sub(re_hier_cref, "\g<label4_art>\g<id4_art>, \g<label4_sub1>\g<label4_sub2> \g<id4_sub1>\g<id4_sub2>", text, flags=re.IGNORECASE)
    #if cref_match != ",  ":
        # print("type 4", text)
        # cref_match = re_match['label3']
        # cref_codes = replace_with_codes(text_to_num(cref_match), hier_labels)
        # print(cref_codes)
        # return cref_codes

        # # replace "SUB X, Y and Z" with "SUB_X+SUB_Y+SUB_Z"
        # cref_codes = re.sub("[ ]?(?:(?:\,(?:and )?)|and) " f"+{base_URI}SUB", cref_codes, flags=re.IGNORECASE)
        # # replace ranges with "--"
        # cref_codes = re.sub(" ?(?:up )?to(?: and including)? ", f"--{base_URI}SUB", cref_codes, flags=re.IGNORECASE)
        # # add base URI and remove spaces
        # cref_codes = base_URI + cref_codes.replace(" ", "")
        # print(cref_codes)
        #cref_codes = _find_cref_codes(cref_codes)
        #return cref_codes

    # pattern matching for for type 1 (ordinal followed by subarticle)
    re_match = re.match(re_hier_cref1, text)
    if re_match:
        # print(1, re_match[0])
        cref_match = re_match['label1_sub'] + ' '+re_match['id1_sub']
        # print(cref_match)
        #print(cref_match)
    #cref_match = re.sub(re_hier_cref, "\g<label1_sub> \g<id1_sub>", text, flags=re.IGNORECASE)
    #if cref_match != " ":
        #print(cref_match)
        # replace hier labels with code (e.g. "subarticle" -> "SUB")
        cref_codes = replace_with_codes(text_to_num(cref_match), hier_labels)
        # replace ranked numbers with regular numbers
        cref_codes = re.sub("([0-9]+)(?:st|nd|rd|th)?", "\g<1>", cref_codes, flags=re.IGNORECASE)
        # get base URI to search for
        base_URI = reg_code.split('SUB')[0]
        # make regex pattern with base URI
        #cref_codes = f"{base_URI}"
        # replace "SUB X, Y and Z" with "SUB_X+SUB_Y+SUB_Z"
        cref_codes = re.sub("[ ]?(?:(?:\,(?:and )?)|and) ", f"+{base_URI}SUB", cref_codes, flags=re.IGNORECASE)
        # replace ranges with "--"
        cref_codes = re.sub(" ?(?:up )?to(?: and including)? ", f"--{base_URI}SUB", cref_codes, flags=re.IGNORECASE)
        # add base URI and remove spaces
        cref_codes = base_URI + cref_codes.replace(" ", "")
        # print(cref_codes)
        cref_codes = find_cref_codes(cref_codes, text, hier_codes)
        return cref_codes

    re_match = re.match(re_hier_cref2, text)
    if re_match:
    # if cref_match != " ":
        # print(9, re_match)
        # skip coreference resolution for now
        if "that" in re_match:
            print(f'coreference resolution needed for "{text}"')
            return None
        # code to look for in URI
        cref_code = replace_with_codes(re_match, hier_labels).split(" ")[1]
        # print(2, cref_code)

        #print("cref", cref_codes)
        # split base URI at code
        base_URI = reg_code.split(cref_code)
        # Check if cref_code was found in reg_code
        if len(base_URI) > 1:
            base_URI = base_URI[0] + cref_code + base_URI[1][0]
        else:
            # Handle the case where cref_code is not found
            print(f"cref_code '{cref_code}' not found in reg_code '{reg_code}'")
            return None

        # get all matching codes
        try:
            cref_codes = [x for x in hier_codes if cref_code in x]
            if len(cref_codes) == 1:
                cref_codes = cref_codes[0]
            return cref_codes
        except:
          print(f'error converting "{text}"')
          return None

    return None


for index, reg in df_subset_big[:10].iterrows():
    # doc = nlp(text_to_num(reg['text_translated']))
    doc = nlp(reg['text_translated'])
    crefs = []
    for ent in doc.ents:
        if ent.label_ == "CROSS_REFERENCE":
            print("current:", reg['code'])
            print("ent:", ent.text)
            cref_codes = get_cref_codes(ent.text, reg['code'])
            if cref_codes:
                # crefs.append(cref_codes)
                print({'int_ref': cref_codes})
            else:
                print({'int_ref': str(ent.text)})
    #print(crefs)

## Entity link function

In [None]:
def link_entities_rb(ent):
    if isinstance(ent, Token):
        # if it is a token, return nothing (invalid entity)
        return None

    # Try matching spatial elements
    # elif ent.label_ in NER_bot:
    elif ent.label_ in NER_spatial:
        try: # try to see if match was found
            # return {'element' : cand_match_exact(ent.text, kb_spatial_bot)[0]}
            element, adjective = partial_match(kb_spatial, ent.text, "descriptors", list=True, exact=True)
            return {'element' : element, **cand_match_adjective(adjective)}
        except:
            None


    # Try matching building elements
    elif ent.label_ in NER_elements:
        # Try partial ontology URI matching (to correct for mistakes)
        bot_candidate, adjective = partial_match(kb_spatial_bot, ent.text, "descriptors", list=True)
        if bot_candidate:
            # If there is an adjective, try to find it as either a quality or material
            return {'element' : bot_candidate, **cand_match_adjective(adjective)}
        else:
            beo_candidate, adjective = partial_match(kb_BEO, ent.text, "name")
            if beo_candidate:
                # If there is an adjective, try to find it as either a quality or material
                return {'element' : beo_candidate, **cand_match_adjective(adjective)}
            else:
                ifc_candidate, adjective = partial_match(kb_IFC, ent.text, "name")
                if ifc_candidate: #and ifc_candidate != "IfcSpace":
                    # If there is an adjective, try to find it as either a quality or material
                    return {'element' : ifc_candidate, **cand_match_adjective(adjective)}

    # Try matching materials
    elif ent.label_ == "material":
        material = cand_match_material(ent.text)
        if material:
            return material

    # Try matching attributes either to a known list of properties, or make a new property
    elif ent.label_ in NER_props_default:
        return cand_match_props(ent.text)

    # Try matching qualities (from rule-based quality recognition)
    elif ent.label_ == "QUALITY":
        quality = cand_match_quality(ent.text)
        if quality:
            return quality

    # Try matching classifications
    elif ent.label_ == "classification":
        classification = cand_match_classification(ent.text)
        if classification:
            return classification

    # Try matching use functions
    elif ent.label_ == "use function": # might have to be done via a list of possible types
        # If use functions in general are targetted, return this
        if ent.text in kb_custom_buildingprops['props:Usefunction']['descriptors']:
            return {'pset': 'pset:Other', 'property': 'props:Usefunction'}
        # Else, return the mentioned use function as a string
        else:
            return {'pset': 'pset:Other', 'property': 'props:Usefunction', 'value': f'"{ent.text}"^^xsd:string'}

    # Try matching quantities
    elif ent.label_ == "unit":
        quantity = cand_match_quantity(ent.text)
        if quantity:
            return quantity

    # Try matching references and norms
    if ent.label_ == "CROSS_REFERENCE":
        # extract code using function and current reg code
        cref_codes = get_cref_codes(ent.text, current_code)
        if cref_codes:
             return {'int_ref': cref_codes}
        else:
             return {'int_ref': str(ent.text)}
    elif ent.label_ == "standard" or ent.label_ == "reference":
        return {'ext_ref': ent.text}

    # If there are still no matches found, find adjectives at start of token and try to find if they are in the knowledge base
    token_deps = [token.dep_ for token in ent]
    adjectives = ""
    split_ent = ent
    while token_deps[0] == "amod": #keep going until all adjective modifiers are found
        adjectives += split_ent[0].text_with_ws
        split_ent = split_ent[1:]
        token_deps = token_deps[1:]
    # Reanalyze NER of split entity and retry EL
    if len(adjectives) > 0:
        adj_link = cand_match_adjective(adjectives)
        # If match is found, try entity linking again with split entities.
        if adj_link:
            # First try BOT (which has some problems with NER), then the rest
            try:
                return {'element' : find_by_keyword(kb_spatial_bot, split_ent.text, "descriptors", list=True, exact=True), **adj_link}
            except:
                split_ent_doc = nlp(split_ent.text)
                new_ent = split_ent_doc[0]
                # Try to match second part of entity by reiterating the function
                new_ent_match = link_entities_rb(new_ent)
                if new_ent_match:
                    return {**new_ent_match, **adj_link}
        # # alternatively, work from the split entity part
        # split_ent_doc = nlp(''.join(token.text_with_ws for token in split_ent))
        # for new_ent in split_ent_doc.ents:
        #     print(new_ent.text, new_ent.label_)

        #     split_link = link_entities_rb(new_ent)
        #     if split_link:
        #         return {**split_link, **cand_match_adjective(adjectives, kb_qualities, kb_materials)}

    # If there is no match, either return nothing or return a new custom element name which will have to be defined later
    if ent.label_ in NER_elements:
        return {'element' : 'ex:'+''.join(token.text.title() for token in ent)}
    else:
        return None

In [None]:
doc = nlp("An enclosed space is located in a fire compartment.")
for ent in doc.ents:
    ent_link = link_entities_rb(ent)
    print(f'{ent.label_:25} {ent.text:30} --> {ent_link}')

## Add entity linker to pipeline

In [None]:
#from spacy.tokens import Token
from spacy import Language
from spacy.tokens import Span, Doc

# Set the extension for the new information
Span.set_extension("links", default={}, force=True)

@Language.component("rb_entity_linker")#, assigns=["doc.ents"])
def entity_linker(doc):
    for entity in doc.ents:
        entity._.set("links", link_entities_rb(entity))
    #for ent in doc.ents:
        #ent.ent_link = link_entities_rb(ent)
    return doc


# Add the custom component to the pipeline
try:
    nlp.remove_pipe("rb_entity_linker") #remove old version when rerunning code
except:
    None
nlp.add_pipe("rb_entity_linker", last=True)


## Validation

Full subset validation:

In [None]:
for index, reg in df_subset_big.iterrows():
    doc = nlp(reg['text_translated'])
    print(f"Article {reg['code'].split('A')[1].replace('_SUB','(').replace('_','.')}): \"{doc}\"")
    for ent in doc.ents:
        print(f"{str(ent.text):<25} = {ent.label_:<25} ({ent._.score:.0%}) --> {ent._.links}")
    print()

---
# **Relation Extraction & Semantic Parsing** (rule-based)

## Clause extraction

*Rudimentary approach, needs additional development for deployment*

In [None]:
# prepare checking of clauses based on index
def split_consecutive(lst):
    result = []
    sublist = [lst[0]]

    for i in range(1, len(lst)):
        if lst[i] == lst[i - 1] + 1:  # If consecutive, add to sublist
            sublist.append(lst[i])
        else:  # Otherwise, start a new sublist
            result.append(sublist)
            sublist = [lst[i]]

    result.append(sublist)  # Add the last sublist
    return result

# prepare checking of clauses based on index
def split_consecutive(lst):
    result = []
    sublist = [lst[0]]

    for i in range(1, len(lst)):
        if lst[i] == lst[i - 1] + 1:  # If consecutive, add to sublist
            sublist.append(lst[i])
        else:  # Otherwise, start a new sublist
            result.append(sublist)
            sublist = [lst[i]]

    result.append(sublist)  # Add the last sublist
    return result

# function for clause extraction
def extract_clauses(doc):
    clauses = [] # Initialize clauses as an empty list
    tokens_used = [] # Initialize tokens_used list
    # Iterate through the dependency tree to identify the target (subject) and constraint (predicate)
    n = 1 # for counters in names
    for token in doc:
        if token.i not in tokens_used:
            # Extracting the noun phrase (subject) which should be before the verb
            if "advcl" in token.dep_:
                tokens_text = ''.join([child.text_with_ws for child in token.subtree if token.i not in tokens_used])# and token.text not in [",", "."]])
                tokens_i = [child.i for child in token.subtree if token.i not in tokens_used and not token.is_punct]
                #print(tokens_i)
                tokens_used += tokens_i
                # check if negative or positive relation
                # clauses = {**clauses, "condition": condition.rstrip(" ")}
                if "NEN" in tokens_text:
                    # clauses = {**clauses, f"EXT_REF_{n}": tokens_text.rstrip(" ")}
                    clauses.append({f"EXT_REF_{n}": tokens_i}) # Use append to add to the list
                    n+=1
                else:
                    # clauses = {**clauses, f"CONDITION_{n}": tokens_text.rstrip(" ")}
                    clauses.append({f"CONDITION_{n}": tokens_i}) # Use append to add to the list
                    n+=1

            elif "subj" in token.dep_:
                dep_ancestors = [token.dep_ for token in token.ancestors]
                #print(dep_ancestors)
                if "advcl" not in dep_ancestors: #check whether main is not conditional statement (which is overruling)
                    #target = " ".join([child.ent_id for child in token.subtree])
                    target = "".join([child.text_with_ws for child in token.subtree])
                    #print(target)
                    tokens_i = [child.i for child in token.subtree]
                    #print(tokens_i)
                    tokens_used += tokens_i
                    #print("target:", target)
                    # add target to list
                    clauses.append({f"TARGET_UNION_{n}": tokens_i})
                    n+=1

    rest_i = [token.i for token in doc if token.i not in tokens_used if token.text not in [",", "."]]
    for tokens_i in split_consecutive(rest_i):
        clause = ''.join([doc[i].text_with_ws for i in tokens_i]).rstrip(" ")
        if "not applicable to" in clause or "not apply to" in clause or "not applied to" in clause:
            clauses.append({f"TARGET_MINUS_{n}": tokens_i}) # Use append to add to the list
            n+=1
        elif "applicable to" in clause or "apply to" in clause or "applied to" in clause:
            clauses.append({f"TARGET_UNION_{n}": tokens_i}) # Use append to add to the list
            n+=1
        elif [i for i in tokens_i if doc[i].lemma_ in ["except", "exception", "deviate", "deviation"]]:
            clauses.append({f"TARGET_MINUS_{n}": tokens_i}) # Use append to add to the list
        elif [i for i in tokens_i if doc[i].lemma_ in ["accord", "accordance", "unless", "contrary", "according"]]:
            clauses.append({f"INT_REF_{n}": tokens_i}) # Use append to add to the list
            n+=1
        else:
            clauses.append({f"CONSTRAINT_{n}": tokens_i}) # Use append to add to the list
            n+=1

    return clauses

In [None]:
for index, reg in df_subset_small.iterrows():
    doc = nlp(reg['text_translated'])
    print(doc)
    clauses = extract_clauses(doc)
    for cl in clauses:
        for cl_type, token_i in cl.items():
            print(f"{cl_type:15} = {''.join(doc[i].text_with_ws for i in token_i)}")
    print()

In [None]:
for index, reg in df_subset_big[:10].iterrows():
    # doc = nlp(text_to_num(reg['text_translated']))
    doc = nlp(reg['text_translated'])
    print(doc)
    clauses = extract_clauses(doc)
    for cl in clauses:
        for cl_type, token_i in cl.items():
            print(f"{cl_type:15} = {''.join(doc[i].text_with_ws for i in token_i)}")
    # for key, value in clauses.items():
        #
    # for ent in doc.ents:
    #     print(f"{str(ent.text):<25} = {ent.label_:<25} ({ent._.score:.0%}) --> {ent._.links}")
    print()

## Predicate extraction & Semantic parsing

### Comparison operators

In [None]:
dict_comparisons = {
    '<=': ['no more than', 'no bigger than', 'not bigger than', 'no larger than', 'not larger than', 'less than or equal to', 'maximum', 'most', 'not exceeding', 'not exceed', 'limited to'], #'at most': 'sconj'
    '>=': ['no less than', 'no fewer than', 'no smaller than', 'not smaller than', 'at least', 'more than or equal to', 'minimum'],
    '<': ['less than', 'lower than', 'smaller than', 'fewer than', 'within', 'below'],
    '>': ['more than', 'higher than', 'larger than', 'bigger than','exceeds', 'exceeding', 'above'],
    '!=': ['is not', 'not', 'does not meet', 'not exactly', 'not equal to', 'not equal'],
    '=':  ['is', 'of', 'meets', 'exactly', 'equal to', 'equals', 'exactly', 'precisely'],
    '≈': ['approximately', 'roughly', 'close to', 'around', 'about'], # not implemented yet
    '<>': ['not between', 'not inbetween', 'not in the range of'], # outside a range, not implemented yet
    '><': ['between', 'inbetween', 'in the range of'] # inside a range, not implemented yet
    # approximately?
}


for index, reg in df_subset_big[:10].iterrows():
# for index, reg in subset.iterrows():
    doc = nlp(reg['text_translated'])
    print(doc)
    for ent in doc.ents:
        # check whether the entity is a unit
        if ent.label_ == "unit":
            # retrieve the full document up to the current unit to
            doc_before = ''.join(str(doc[:ent.start])).rstrip(' ')
            # find comparison operators in dictionary and return corresponding symbol if found
            for symbol, text in dict_comparisons.items():
                for comparison in text:
                    # look at the end, which should be where comparisons are located
                    if doc_before.endswith(comparison):
                        ent._.links = {**ent._.links, 'comparison': str(symbol)}
                        break

    for ent in doc.ents:
        if ent.label_ == "unit":
            print(f"{str(ent.text):<25} = {ent.label_:<25} ({ent._.score:.0%}) --> {ent._.links}")
    print()

## Final function & validation

In [None]:
# function for custom predicates
def extract_predicates(clause):
    # check all parts of the clause
    converted_clause = []
    for i, part in enumerate(clause):
        # check if the part is an entity (dictionary)
        if isinstance(part, dict):
            if part.get('element'):
                # custom spatial predicates (containment, adjacencies)
                if "in " in clause[i-1]:
                    if 'FireCompartment' in part['element']:
                        converted_clause.append({'containment_path': 'ex:locatedInCompartment', 'containment_class': part['element']})
                    elif 'Space' in part['element']:
                        converted_clause.append({'containment_path': 'bot:hasSpace', 'containment_class': part['element']})
                    else:
                        converted_clause.append(part)
                elif "adjacent" in clause[i-1]:
                    if 'FireCompartment' in part['element']:
                        converted_clause.append({'adjacent_path': 'ex:adjacentCompartment', 'adjacent_class': part['element']})
                    elif 'Space' in part['element']:
                        converted_clause.append({'containment_path': 'bot:adjacentZone', 'containment_class': part['element']})
                    else:
                        converted_clause.append({'containment_path': 'bot:adjacentElement', 'containment_class': part['element']})
                else:
                    converted_clause.append(part)
            else:
                converted_clause.append(part)

    return converted_clause

from itertools import groupby
def merge_consecutive_dicts(lst):
    merged_list = []
    for is_dict, group in groupby(lst, key=lambda x: isinstance(x, dict)):
        if is_dict:
            merged_dict = {}
            for d in group:
                merged_dict.update(d)
            merged_list.append(merged_dict)
        else:
            merged_list.extend(group)

    # Check if the list is empty before accessing element 0
    if merged_list:
        return merged_list[0]
    else:
        # Return an appropriate value if the list is empty
        return {}  # or None, or whatever is suitable for your use case

# function for finding logical operators
def extract_logic(clause, clause_type):
    l_ents = []
    l_nest_ents = []
    # find all parts of converted clause
    for i, part in enumerate(clause):
        # find all entities (which are contained as dictionary items)
        if isinstance(part, dict):
            # try to see if the previous item is a predicate (which is a string)
            if isinstance(clause[i-1], str):
                # try to find predicate, which should be the previous doc item
                try:
                    predicate = clause[i-1]#.strip(' ')
                    # find logical operators, and map them with their equivalent logical operator or SPARQL operator
                    n = 1 # counter to avoid duplicates
                    if 'or ' in predicate or 'or;' in predicate:
                        # clause[i-1].replace('or', '')
                        if "TARGET" in clause_type:
                            op = f"UNION_{n}"
                        else:
                            op = f"OR_{n}"
                        n+=1
                        if l_nest_ents: # for chained logical operators
                            l_ents.append({op: l_nest_ents + [part]})
                            l_nest_ents = []
                        else: # for simple logical operators
                            l_nest_ents = [clause[i-2], part]
                            del l_ents[-1]
                            l_ents.append({op: l_nest_ents})
                            l_nest_ents = []
                    elif 'and ' in predicate or 'and;' in predicate:
                        if "TARGET" in clause_type:
                            op = f"INTERSECT_{n}"
                        else:
                            op = f"AND_{n}"
                        n+=1
                        if l_nest_ents: # for chained logical operators
                            l_ents.append({op: l_nest_ents + [part]})
                            l_nest_ents = []
                        else: # for simple logical operators
                            l_nest_ents = [clause[i-2], part]
                            del l_ents[-1]
                            l_ents.append({op: l_nest_ents})
                            l_nest_ents = []
                    elif 'not ' in predicate:
                        if "TARGET" in clause_type:
                            op = f"MINUS_{n}"
                        else:
                            op = f"NOT_{n}"
                        n+=1
                        if l_nest_ents: # for chained logical operators
                            l_ents.append(l_nest_ents + [{op: part}])
                        else:
                            l_ents.append({op: part})
                    elif ',' in predicate or ';' in predicate:
                        if l_nest_ents:
                            l_nest_ents.append(part)
                        # if this is the first item in the nested list, add previous item (before the comma/semicolon) as well and remove it from flattened list
                        else:
                            l_nest_ents = [clause[i-2], part]
                            del l_ents[-1]

                    else:
                        # add predicate to text (except for logical operators)
                        l_ents.append(clause[i-1])
                        # add entity to text
                        l_ents.append(part)
                except:
                    l_ents.append(part)
            else:
                l_ents.append(part)
    return l_ents

def link_comparisons(predicate, ent):
    # find comparison operators in dictionary and return corresponding symbol if found
    if ent.label_ == "unit":
        for symbol, text in dict_comparisons.items():
            for comparison in text:
                # look at the end, which should be where comparisons are located
                if predicate.rstrip(' ').endswith(comparison):
                    # if found, add comparison operator to entity link and remove it from the text
                    predicate = predicate.replace(predicate, ' ')
                    ent._.links = {**ent._.links, 'comparison': str(symbol)}
                    break
    return predicate, ent

# function for replacing entities with linked info
def convert_doc(doc):
  # Initialize an empty list to hold the modified tokens
    converted_doc = []
    # Set to track which tokens have already been replaced by span labels
    replaced_tokens = set()
    # Iterate over all tokens in the document
    for token in doc:
        # Check if the token is part of any entity (span)
        in_span = False
        for ent in doc.ents:
            if token.i == ent.start:  # token is part of the span
                # try to find predicate, which should be the previous doc item
                try:
                    predicate = converted_doc[-1]#.rstrip(' ')
                    # try to link predicates and entities
                    predicate, ent = link_comparisons(predicate, ent)

                    # try to replace predicate or remove it if it is empty
                    if not predicate or predicate == ' ':
                        del converted_doc[-1]
                    else:
                        converted_doc[-1] = predicate
                except:
                    None
                # replace entities with corresponding entity links
                if token not in replaced_tokens:
                    converted_doc.append(ent._.links)  # Replace with span label
                in_span = True
                break
            elif token.i > ent.start and token.i < ent.end:
                in_span = True

        # If the token is not part of any span, it should be checked for predicates
        if not in_span:
            # try to add tokens as normal text
            try:
                converted_doc[-1] += token.text_with_ws
            except:
                converted_doc.append(token.text_with_ws)
    return converted_doc

def extract_relations(doc):
    # find clauses
    clauses = extract_clauses(doc)
    extracted_dict = {}
    # extract entities & predicates from clauses
    for cl in clauses:
        for cl_type, token_i in cl.items():
            # get new doc span for clause
            cl_doc = doc[token_i[0]:token_i[-1]+1]
            # replace entities with linked info & find comparisons (cardinality/quantifiers should also be added here!)
            converted_clause = convert_doc(cl_doc)
            # extract logical structure
            converted_clause = extract_logic(converted_clause, cl_type)
            # extract predicates
            converted_clause = extract_predicates(converted_clause)
            # join consecutive entities
            converted_clause = merge_consecutive_dicts(converted_clause)
            # add clause extraction to dictionary (if non_empty)
            if converted_clause:
                extracted_dict[cl_type] = converted_clause

    return extracted_dict

In [None]:
for index, reg in df_subset_small.iterrows():
    doc = nlp(reg['text_translated'])
    print(doc)
    extracted_dict = extract_relations(doc)
    for key, value in extracted_dict.items():
        print(f"{key:15} = {value}")
    print()

In [None]:
for index, reg in df_subset_big.iterrows():
    doc = nlp(reg['text_translated'])
    print(f"Article {reg['code'].split('A')[1].replace('_SUB','(').replace('_','.')}): \"{doc}\"")
    extracted_dict = extract_relations(doc)
    for key, value in extracted_dict.items():
        print(f"{key:15} = {value}")
    print()

---
# **Final information extraction**

Validation using small subset

In [None]:
nlp.pipe_names

Final pipeline order should be:
```
['tok2vec',
 'tagger',
 'parser',
 'attribute_ruler',
 'lemmatizer',
 'gliner_spacy',
 'cref_recognizer',
 'quantity_recognizer',
 'quality_recognizer',
 'rb_entity_linker']
```

## Extract information

*Conditional information extraction and cross-reference contextualization not yet implemented!*

output should have a structure like:
```
[ { 'INFO': {
        'reg_CURIE': '__:___',
        'label': 'Article 00.00(00)',
        'seeAlso': 'https://_____',
        'text_en' : '___',
        'text_original' : '___'},
    'TARGETS'  : {
        ...},
    'CONSTRAINTS': {
        ...}
    'CROSS_REFERENCES' : [
      ...
    ]
    'EXT_REFERENCES': [
        ...
    ]
        },
  { 'INFO': {
        'code': '__:___',
        'label': 'Article X.X/X',
        'seeAlso': 'https://_____',
        'text_en' : '___',
        'text_original' : '___'},
    'CONDITION_1'  : {
        'TARGETS'  : {
            ...},
        'CONSTRAINTS': {
            ...}}},
]
```

In [None]:
def extract_information(df_reg):
    extract = []
    # extract information and store in a list with a dictionary for each regulation
    for index, reg in df_reg.iterrows():
        # extract article (and subarticle) label from code as Article 00.00(0)
        if 'SUB' in reg["code"]:
            label = 'Article '+reg["code"].split("_A")[1].replace('_SUB', '(').replace('_', '.')+')'
        else:
            label = 'Article '+reg["code"].replace('_', '.')
        extract.append({
            # add meta information
            'INFO': {
                'reg_CURIE': 'bbl:'+reg['code'],
                'label': label,
                'seeAlso': reg['URL'],
                'text_en': reg['text_translated'],
                'text_original': reg['text_original'],
            },
        })

        global current_code  # needed for cross-reference linking
        current_code = reg["code"]
        # extract information (tokens, POS, dependency tree, NER labels, linked entities, linked predicates, relations)
        doc = nlp(reg["text_translated"])
        extracted_information = extract_relations(doc)



        # figure out if target and/or constraints are present, and if so, add empty items to the dictionary
        for key, values in extracted_information.items():
            if "TARGET" in key and not extract[-1].get('TARGETS'):
                extract[-1]['TARGETS'] = {}
            elif "CONSTRAINTS" in key and not extract[-1].get('CONSTRAINTS'):
                extract[-1]['CONSTRAINTS'] = {}
            elif "INT_REF" in key and not extract[-1].get('INT_REFERENCES'):
                extract[-1]['INT_REFERENCES'] = []
            elif "EXT_REF" in key and not extract[-1].get('EXT_REFERENCES'):
                extract[-1]['EXT_REFERENCES'] = []
            elif values.get('ext_ref'):
                extract[-1]['EXT_REFERENCES'] = []

            # INCLUDE CODE FOR CONDITIONS HERE

        n = 1 # counter to make sure dictionary keys are unique
        # fill dictionary with corresponding information
        for key, values in extracted_information.items():
            if "TARGET_UNION" in key:
                # remove first layer of OR logic, if present
                # if "UNION" in values and len(values) == 1:
                    # print(1, values)
                    # values = values.get('UNION')
                extract[-1]['TARGETS'] = values #{**extract[-1]['TARGETS'], f'UNION_{n}': values}
                n+=1
            elif "TARGET_MINUS" in key:
                extract[-1]['TARGETS'] = {**extract[-1]['TARGETS'], f'MINUS_{n}': values}
                n+=1
            elif "CONSTRAINT" in key:
                extract[-1]['CONSTRAINTS'] = values
            elif "INT_REF" in key:
                extract[-1]['INT_REFERENCES'].append(values)
            elif "EXT_REF" in key:
                extract[-1]['EXT_REFERENCES'].append(values)
            elif values.get('ext_ref'):
                extract[-1]['EXT_REFERENCES'].append({'ext_ref': values['ext_ref']})

        # display extracted information for current regulation
        print(json.dumps(extract[-1], indent=4))
    return extract

In [None]:
reg_info_subset_small = extract_information(df_subset_small)

In [None]:
reg_info_subset_big = extract_information(df_subset_big)

## Export to JSON

In [None]:
import json
with open("output/regulatory_information_subset_small.json", 'w') as json_file:
    json.dump(reg_info_subset_small, json_file)
with open("output/regulatory_information_subset_big.json", 'w') as json_file:
    json.dump(reg_info_subset_big, json_file)