# Teilautomatisierter Prozess zur Erfassung von Sicherheitswissen in einer Ontologie


Der hier beschriebene Prozess ist dabei auf das Grundschutzkompendium bezogen, kann aber für andere Quellen in abgewandelter Form genauso verwendet werden. Damit der Wissensgraph immer auf dem neusten Stand ist, müssen die Bausteine im GC Storage Bucket jedes Jahr nach Erscheinen einer neuen Version des Grundschutzkompendiums aktualisiert werden und der Prozess wie folgt wiederholt werden. 

_Hilfsfunktionen_ werden in dem Prozess der in der Aufgabenstellung verlangt wurde nicht verwendet, dienen aber aus unserer Perspektive dazu das Tool als Baukasten für Entwickler einfacher nutzbar zu machen. 

Als Hilfestellung um weiter Funktionen zu entwerfen dient die [Dokumentation von owl-ready-2](https://owlready2.readthedocs.io/en/latest/)

 ### Zur Vorbereitung werden folgende Schritte benötigt:
 - Google Cloud Developer Account erstellen und lokal anmelden dabei den Schritten unter https://cloud.google.com/vertex-ai/docs/start/cloud-environment folgen
 - Vertex API und Umgebung einrichten und ein Projekt anlegen: https://cloud.google.com/vertex-ai/docs/start/cloud-environment 
 - Erstellung eines Google Cloud Storage Buckets nach: https://cloud.google.com/storage/docs/creating-buckets?hl=de
 - Entwicklungsumgebung in Python 
 - Installation der requirements

## Der Prozess
1. Herunterladen der Kreuzreferenztabelle, den Bausteinen in einzelnen PDFs und des Grundschutzkompendiums in Buchformat
2. Hochladen der Bausteine in den Google Cloud Storage Bucket. 
3. Liste der Dokumente laden über die Google Cloud shell per : gsutil ls gs://DEIN_BUCKET/ und in ein Textdokument abspeichern
4. Überprüfen ob die hier gelisteten Gefahren noch den im Grundschutzkompendium geführten Gefahren entsprechen

In [1]:
gefahren = [{'handle': 'G 0.1', 'text': 'Feuer'}, {'handle': 'G 0.2', 'text': 'Ungünstige klimatische Bedingungen'}, {'handle': 'G 0.3', 'text': 'Wasser'}, {'handle': 'G 0.4', 'text': 'Verschmutzung, Staub, Korrosion'}, {'handle': 'G 0.5', 'text': 'Naturkatastrophen'}, {'handle': 'G 0.6', 'text': 'Katastrophen im Umfeld'}, {'handle': 'G 0.7', 'text': 'Großereignisse im Umfeld'}, {'handle': 'G 0.8', 'text': 'Ausfall oder Störung der Stromversorgung'}, {'handle': 'G 0.9', 'text': 'Ausfall oder Störung von Kommunikationsnetzen'}, {'handle': 'G 0.10', 'text': 'Ausfall oder Störung von Versorgungsnetzen'}, {'handle': 'G 0.11', 'text': 'Ausfall oder Störung von Dienstleistungsunternehmen'}, {'handle': 'G 0.12', 'text': 'Elektromagnetische Störstrahlung'}, {'handle': 'G 0.13', 'text': 'Abfangen kompromittierender Strahlung'}, {'handle': 'G 0.14', 'text': 'Ausspähen von Informationen (Spionage)'}, {'handle': 'G 0.15', 'text': 'Abhören'}, {'handle': 'G 0.16', 'text': 'Diebstahl von Geräten, Datenträgern oder Dokumenten'}, {'handle': 'G 0.17', 'text': 'Verlust von Geräten, Datenträgern oder Dokumenten'}, {'handle': 'G 0.18', 'text': 'Fehlplanung oder fehlende Anpassung'}, {'handle': 'G 0.19', 'text': 'Offenlegung schützenswerter Informationen'}, {'handle': 'G 0.20', 'text': 'Informationen oder Produkte aus unzuverlässiger Quelle'}, {'handle': 'G 0.21', 'text': 'Manipulation von Hard- oder Software'}, {'handle': 'G 0.22', 'text': 'Manipulation von Informationen'}, {'handle': 'G 0.23', 'text': 'Unbefugtes Eindringen in IT-Systeme'}, {'handle': 'G 0.24', 'text': 'Zerstörung von Geräten oder Datenträgern'}, {'handle': 'G 0.25', 'text': 'Ausfall von Geräten oder Systemen'}, {'handle': 'G 0.26', 'text': 'Fehlfunktion von Geräten oder Systemen'}, {'handle': 'G 0.27', 'text': 'Ressourcenmangel'}, {'handle': 'G 0.28', 'text': 'Software-Schwachstellen oder -Fehler'}, {'handle': 'G 0.29', 'text': 'Verstoß gegen Gesetze oder Regelungen'}, {'handle': 'G 0.30', 'text': 'Unberechtigte Nutzung oder Administration von Geräten und Systemen'}, {'handle': 'G 0.31', 'text': 'Fehlerhafte Nutzung oder Administration von Geräten und Systemen'}, {'handle': 'G 0.32', 'text': 'Missbrauch von Berechtigungen'}, {'handle': 'G 0.33', 'text': 'Personalausfall'}, {'handle': 'G 0.34', 'text': 'Anschlag'}, {'handle': 'G 0.35', 'text': 'Nötigung, Erpressung oder Korruption'}, {'handle': 'G 0.36', 'text': 'Identitätsdiebstahl'}, {'handle': 'G 0.37', 'text': 'Abstreiten von Handlungen'}, {'handle': 'G 0.38', 'text': 'Missbrauch personenbezogener Daten'}, {'handle': 'G 0.39', 'text': 'Schadprogramme'}, {'handle': 'G 0.40', 'text': 'Verhinderung von Diensten (Denial of Service)'}, {'handle': 'G 0.41', 'text': 'Sabotage'}, {'handle': 'G 0.42', 'text': 'Social Engineering'}, {'handle': 'G 0.43', 'text': 'Einspielen von Nachrichten'}, {'handle': 'G 0.44', 'text': 'Unbefugtes Eindringen in Räumlichkeiten'}, {'handle': 'G 0.45', 'text': 'Datenverlust'}, {'handle': 'G 0.46', 'text': 'Integritätsverlust schützenswerter Informationen'}, {'handle': 'G 0.47', 'text': 'Schädliche Seiteneffekte IT-gestützter Angriffe'}]

Laden benötigter Importe 

In [None]:
import base64, json
import pandas as pd
import vertexai
from vertexai.generative_models import GenerativeModel, Part, FinishReason
import vertexai.preview.generative_models as generative_models
import os 
import fileinput
import re
from owlready2 import *
import logging
from rapidfuzz import fuzz
import locale

In [3]:
locale.setlocale(locale.LC_ALL, 'de_DE')

'de_DE'

Hier bitte alle persönlichen benötigten Variablen abspeichern: 
- Vertex project id
- Pfad zur Kreuzreferenztabelle 
- Pfad zur Liste mit den Links zu den Bausteinen im Google Cloud Bucket. 
- Link zum Storage Bucket für zwischenergebnisse.
Damit unterschiedliche Versuche nicht überlappen und zum Zweck der Versionierung, sollte auch bei jedem Durchlauf ein resultname (Name der Outputdatei OHNE Endung) nach einem Versionierungsschema gewählt werden.

In [4]:
projectid = "gen-lang-client-0489091220"
path_to_excel_file = r'krt2023_Excel.xlsx'
file_path = "example copy.txt"
storage_bucket = "gs://bausteinebsi/"
resultname = "cybersecurity_complete_v14"


In der Funktion **generate()** wird das Dictionary, das die Inhalte aus dem Grundschutzkompendium enthält, durch Gemini erstellt. Das Modell wird zuerst konfiguriert und anschließend werden die Prompts gespeichert. Da es zu Problemen durch Rezitation oder als gefährlich eingestufte Inhalte kommen kann, gibt es zwei Versionen der Prompts: eine Version, die die Beschreibungen enthält und im ersten Durchlauf des Prozesses abgerufen wird, und eine Version ohne diese Beschreibungen. Die Struktur des Dictionaries wird in den Prompts definiert; durch Anpassung dieser Prompts ist es auch möglich, andere Quellen einzulesen. Die Antworten der API werden anschließend aufbereitet.

In [5]:
def generate(path, failed=False):
        vertexai.init(project = projectid, location="northamerica-northeast1")

        model = GenerativeModel(
            "gemini-1.5-flash-001",
        )
        if not failed:
          prompt_extract1 = """You are an ontology assistant tasked with extracting core information from a single chapter of a German cybersecurity recommendation book. Your focus is on accurately identifying and establishing the relationships between elements.

                            **Extremely Important:**  

                            * **For fields with specific allowed values, ensure you ONLY use those values. Do not invent or guess values.**
                            * **The JSON output must be valid and ready for parsing. Do not include any extra formatting or explanations.**

                            **The Source Material:**

                            * You will be provided with a single chapter, which should be treated as one "Asset" in the JSON.

                            **Extraction Rules:**

                            1. **Identify Asset:**
                              - The chapter's main heading is the name of the Asset.
                              - The Asset "handle" is the first part of the subheading (e.g., "SYS.1.6").
                              - The Asset "text" is a summary of the description of the asset, found in the paragraph under the "Beschreibung" heading. 

                            2. **Identify Vulnerabilities:**
                              - Locate subheadings directly under the section titled "Gefährdungslage." These are the Vulnerabilities. 
                              - The Vulnerability "handle" is the first part of the subheading.
                              - The Vulnerability "name" is the remaining text of the subheading.
                              - The Vulnerability "text" is a summary of the text below the subheading describing the vulnerability.

                            3. **Identify Example Threats (For Each Vulnerability):**
                              - Within each Vulnerability section, find paragraphs that describe Threats exploiting that Vulnerability.
                              - The "text" is a summary of the text describing the example threat.
                              - **Optional Information (If Present in the Text):**
                                - **Threat Origin:** "human" or "natural". 
                                - **Threat Source:** "deliberate" or "accidental". 

                            **Output JSON Structure:**

                            ```json
                            {
                              "Informationsource": "BSI GSK 2023",
                              "name": "[Asset Name]",
                              "handle": "[Asset Handle]",
                              "text": "[Asset Description]",
                              "vulnerableTo": [
                                {
                                  "type": "specific",
                                  "Informationsource": "BSI GSK 2023",
                                  "name": "[Vulnerability Name]",
                                  "handle": "[Vulnerability Handle]",
                                  "text": "[Vulnerability Description]",
                                  "exploitedBy_examples": [
                                    {
                                      "Informationsource": "BSI GSK 2023",
                                      "name": "[Threat Name]", 
                                      "type": "Threat",
                                      "text": "[Threat Description]",
                                      "hasOrigin": [
                                        "[ThreatOrigin]", 
                                        ...
                                      ], // Optional: "human", "natural", or both
                                      "hasSource": [
                                        "[ThreatSource]",
                                        ...
                                      ] // Optional: "deliberate", "accidental", or both
                                    },
                                    ... 
                                  ],
                                  "mitigatedBy": [ 
                                    "[Control Handle 1]",
                                    "[Control Handle 2]",
                                    ... 
                                  ] 
                                },
                                // ... More Vulnerabilities ...
                              ]
                            }
                                """
        
          prompt_extract2 = """
              You are an ontology assistant tasked with extracting control information from a single chapter of a German cybersecurity recommendation book. Your focus is on accurately identifying and establishing the relationships between elements.

                **Extremely Important:**  

                * **For fields with specific allowed values, ensure you ONLY use those values. Do not invent or guess values.**
                * **The JSON output must be valid and ready for parsing. Do not include any extra formatting or explanations.**

                **The Source Material:**

                * You will be provided with a single chapter, which should be treated as one "Asset" in the JSON. 

                **Extraction Rules:**

                1. **Identify Controls (For Each Vulnerability):**
                  - Within each Vulnerability section, extract subheadings from these sections to identify relevant Controls:
                    - "Basis Anforderungen" (Low-Level Controls)
                    - "Standard Anforderungen" (Medium-Level Controls)
                    - "Anforderungen bei erhöhtem Schutzbedarf" (High-Level Controls)
                  - **Important:** Skip subheadings that contain the word "entfallen."
                  - The Control "handle" is the first part of the subheading (e.g., "SYS.1.6.A2").
                  - The Control "name" is the remaining text of the subheading.
                  - The Control "text" is the description of the control.
                  - The Control "isresponsible" is the responsible entity for the control, Optional: if mentioned it can be found in [] in the control heading.

                **Output JSON Structure:**

                ```json
                {
                  "Controls": [ 
                    {
                      "Informationsource": "BSI GSK 2023",
                      "name": "[Control Name 1]",
                      "handle": "[Control Handle 1]",
                      "level": "[Low | Medium | High]",
                      "affects": [], // Optional: "Availability", "Confidentiality", and/or "Integrity" ,
                      "text": "[Control Description]", 
                      "isresponsible": "[Control Responsible Entity]" 

                    },
                    // ... More Controls ...
                  ]
                }
                ```

                **Provide the JSON output directly. Do not include any additional text or explanations.** 
        """
        else: 
          prompt_extract1 = """You are an ontology assistant tasked with extracting core information from a single chapter of a German cybersecurity recommendation book. Your focus is on accurately identifying and establishing the relationships between elements.

                            **Extremely Important:**  

                            * **For fields with specific allowed values, ensure you ONLY use those values. Do not invent or guess values.**
                            * **The JSON output must be valid and ready for parsing. Do not include any extra formatting or explanations.**

                            **The Source Material:**

                            * You will be provided with a single chapter, which should be treated as one "Asset" in the JSON.

                            **Extraction Rules:**

                            1. **Identify Asset:**
                              - The chapter's main heading is the name of the Asset.
                              - The Asset "handle" is the first part of the subheading (e.g., "SYS.1.6").

                            2. **Identify Vulnerabilities:**
                              - Locate subheadings directly under the section titled "Gefährdungslage." These are the Vulnerabilities. 
                              - The Vulnerability "handle" is the first part of the subheading.
                              - The Vulnerability "name" is the remaining text of the subheading.

                            3. **Identify Example Threats (For Each Vulnerability):**
                              - Within each Vulnerability section, find paragraphs that describe Threats exploiting that Vulnerability.
                              - **Optional Information (If Present in the Text):**
                                - **Threat Origin:** "human" or "natural". 
                                - **Threat Source:** "deliberate" or "accidental". 

                            **Output JSON Structure:**

                            ```json
                            {
                              "Informationsource": "BSI GSK 2023",
                              "name": "[Asset Name]",
                              "handle": "[Asset Handle]",
                              "vulnerableTo": [
                                {
                                  "type": "specific",
                                  "Informationsource": "BSI GSK 2023",
                                  "name": "[Vulnerability Name]",
                                  "handle": "[Vulnerability Handle]",
                                  "exploitedBy_examples": [
                                    {
                                      "Informationsource": "BSI GSK 2023",
                                      "name": "[Threat Name]", 
                                      "type": "Threat",
                                      "hasOrigin": [
                                        "[ThreatOrigin]", 
                                        ...
                                      ], // Optional: "human", "natural", or both
                                      "hasSource": [
                                        "[ThreatSource]",
                                        ...
                                      ] // Optional: "deliberate", "accidental", or both
                                    },
                                    ... 
                                  ],
                                  "mitigatedBy": [ 
                                    "[Control Handle 1]",
                                    "[Control Handle 2]",
                                    ... 
                                  ] 
                                },
                                // ... More Vulnerabilities ...
                              ]
                            }
                                """
        
          prompt_extract2 = """
              You are an ontology assistant tasked with extracting control information from a single chapter of a German cybersecurity recommendation book. Your focus is on accurately identifying and establishing the relationships between elements.

                **Extremely Important:**  

                * **For fields with specific allowed values, ensure you ONLY use those values. Do not invent or guess values.**
                * **The JSON output must be valid and ready for parsing. Do not include any extra formatting or explanations.**

                **The Source Material:**

                * You will be provided with a single chapter, which should be treated as one "Asset" in the JSON. 

                **Extraction Rules:**

                1. **Identify Controls (For Each Vulnerability):**
                  - Within each Vulnerability section, extract subheadings from these sections to identify relevant Controls:
                    - "Basis Anforderungen" (Low-Level Controls)
                    - "Standard Anforderungen" (Medium-Level Controls)
                    - "Anforderungen bei erhöhtem Schutzbedarf" (High-Level Controls)
                  - **Important:** Skip subheadings that contain the word "entfallen."
                  - The Control "handle" is the first part of the subheading (e.g., "SYS.1.6.A2").
                  - The Control "name" is the remaining text of the subheading.
                  - The Control "isresponsible" is the responsible entity for the control, Optional: if mentioned it can be found in [] in the control heading.

                **Output JSON Structure:**

                ```json
                {
                  "Controls": [ 
                    {
                      "Informationsource": "BSI GSK 2023",
                      "name": "[Control Name 1]",
                      "handle": "[Control Handle 1]",
                      "level": "[Low | Medium | High]",
                      "affects": [], // Optional: "Availability", "Confidentiality", and/or "Integrity",
                      "isresponsible": "[Control Responsible Entity]" 
                    },
                    // ... More Controls ...
                  ]
                }
                ```

                **Provide the JSON output directly. Do not include any additional text or explanations.** 
        """      
        document1 = Part.from_uri(
        mime_type="application/pdf",
        uri=path)
    

        generation_config = {
            "top_p": 0.95,
            "temperature": 0.1
        }

        safety_settings = {
            generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,
            generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,
            generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,
            generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,
        }
        
        response1 = model.generate_content(
            [document1, prompt_extract1],
            generation_config=generation_config,
            safety_settings=safety_settings,
            #stream=True,
        )
        response1_txt = response1.text.replace("`json", "").replace("`", "").replace("Ä", "Ae").replace("Ö", "Oe").replace("Ü", "Ue").replace("ß", "ss").replace("ä", "ae").replace("ö", "oe").replace("ü", "ue")
        response2 = model.generate_content(
            [document1, prompt_extract2],
            generation_config=generation_config,
            safety_settings=safety_settings,
            #stream=True,
        )

        response2_txt = response2.text.replace("`json", "").replace("`", "").replace("Ä", "Ae").replace("Ö", "Oe").replace("Ü", "Ue").replace("ß", "ss").replace("ä", "ae").replace("ö", "oe").replace("ü", "ue")
        
        return [response1.usage_metadata, response1_txt, response2.usage_metadata, response2_txt]

Die Funktion **llm_output_to_json()** bereitet den Output der generate()-Funktion auf, kodiert und dekodiert ihn und speichert das Ergebnis im neu erstellten Ordner working_files ab. Dadurch kann bei Fehlern in den nächsten Prozessschritten auf diese Dateien zurückgegriffen werden, wodurch der rechenaufwändige Schritt der Neuerstellung vermieden wird.

In [6]:
def llm_output_to_json(output_data):
    output_json = json.loads(output_data[1])
    output_json.update(json.loads(output_data[3]))
    working_dir = os.getcwd()  # Get current working directory
    output_folder = os.path.join(working_dir, "working_files")
    os.makedirs(output_folder, exist_ok=True) 
    output_file = os.path.join(output_folder, f"{output_json['name']}.json")  # Assuming "name" is in the JSON
    # Save the JSON file
    with open(output_file, "w", encoding='utf-8') as output_file:
        output_file.write(output_data[1].join(output_data[3]))
        print(f"Txt file saved: {output_file}")
    return output_json


**read_excel_sheet()** gibt das entsprechende Blatt zum angefragten Baustein aus der Kreuzreferenztabelle als Dataframe zurück

In [7]:
def read_excel_sheet(sheet_name, file_path = path_to_excel_file):
  sheet_name = "KRT_" + sheet_name + ".xlsx"  # Remove any spaces from the sheet name
  df = pd.read_excel(file_path, sheet_name=sheet_name)
  return df

**populate_json_with_excel_data()** liest die Kreuzrefferenztabelle eines Bausteinsaus und fügt die Ergebnisse dem Dictionary des entsprechenden Bausteins hinzu, die Ergebnisse überschreiben die zugehörige JSON Datei in working_files als Zwischenspeicher

In [8]:
def populate_json_with_excel_data(json_data, dataframe):
    for control in json_data["Controls"]:
        control["affects"] = []  # 
        control_handle = control["handle"]
        row = dataframe.loc[dataframe[json_data["handle"]] == control_handle]
        row_cia_value = row["CIA"].tolist()[0] if len(row["CIA"].tolist()) > 0 else []  # Get the "CIA" value from the row
        x_indices = []
        count = 0
        try:
            for value in row.values[0]:
                if value == "X":
                    x_indices.append(count)
                count += 1
            Gefahren = [dataframe.iloc[:, index].name for index in x_indices]
            control["mitigates"] = Gefahren
        except Exception as e:
            logging.error(f"Appending Gefahren failed due to {e}")
        if type(row_cia_value) == str:
            control["affects"] = row_cia_value.split() if len(row_cia_value) > 0 else []
        

    
    #correct for mitigatedBy until now
    for vulnerability in json_data["vulnerableTo"]:
        vulnerability["mitigatedBy"] = []

    working_dir = os.getcwd()  # Get current working directory
    output_folder = os.path.join(working_dir, "working_files")
    os.makedirs(output_folder, exist_ok=True) 

    # Construct the JSON file path within the "working_files" folder
    output_file = os.path.join(output_folder, f"{json_data['name']}.json")  # Assuming "name" is in the JSON
    # Save the JSON file
    with open(output_file, "w", encoding='utf-8') as outfile:
        json.dump(json_data, outfile, indent=4, ensure_ascii=False)  # Use indent for pretty formatting
    return json_data

**read_links_to_array()** liest die Links aus dem Bucket ein und stellt sie für nachfolgende Funktionen zur Verfügung. Bitte Überprüfe ob deine Links um Output der nachfolgenden Zelle angezeigt werden.

In [None]:
def read_links_to_array(file_path):
    with open(file_path, 'r') as file:
        links = file.read().splitlines()  # Read lines, remove newline characters
    return links

# Example usage:
links_array = read_links_to_array(file_path)
print(links_array)  # Output: ['https://example.com/1', 'https://example.com/2', 'https://example.com/3']

_Hilfsfunktion_ **load_json_files()** Wird dazu verwendet um die Zwischenergebnisse aus "working_files" falls benötigt zu laden.

In [10]:
def load_json_files(file_paths):
    json_data = {}
    for file_path in file_paths:
        with open(file_path) as file:
            json_variable = json.load(file)
            filename = os.path.basename(file_path).replace(".json", "")  # Get filename without the path
            json_data[filename] = json_variable
    return json_data

_Hilfsfunktion_ **modify_filename()** ist eine Hilfsfunktion die benötigt wird um die entsprechenden Datein aus dem Zwischenspeicher auf zu rufen. 

In [11]:
def modify_filename(filename):
    # Remove "_Edition_2023.pdf" suffix
    filename = filename.replace("_Edition_2023.pdf", "").replace(storage_bucket, "")  # Remove suffix

    # Split remaining string by underscores
    parts = filename.split("_")  

    # Join first two parts with a dot
    modified_filename = parts[0] + "." + parts[1]  # Join first two with '.'

    # Process remaining parts based on their type
    for part in parts[2:]:
        if part.isdigit():  # If part is a number, join with a dot
            modified_filename += "." + part
        else:  # If part is not a number (assumed to be a word), join with a space
            modified_filename += " " + part

    return modified_filename


_Hilfsfunktion_ **load_json_from_link()** sucht aus zu einem gegebenen Link mit Hilfe der Funktionen *load_json_files* und *modify_filename* die Zwischenergebnisse zu Links und gibt deren Dictionaries zurück, um auf diese im Fall von Komplikationen zugreifen zu können.

In [12]:
def load_json_from_link(link):
    try: 
        link_text = modify_filename(link)
        link_text = " ".join(link_text.split(" ")[1:]).lower()
        subdirectory = 'working_files'  
        file_paths = [os.path.join(subdirectory, f) for f in os.listdir(subdirectory) if f.endswith('.json')]
        json_data = load_json_files(file_paths)
        json_data_keys = list(json_data.keys())
        threshold=70
        best_match_key = None
        best_match_ratio = 0
        for key in json_data_keys:
            normalized_key = key.lower().replace("-", " ")
            ratio = fuzz.partial_ratio(link_text, normalized_key)
            if ratio >= threshold and ratio > best_match_ratio:
                best_match_key = key
                best_match_ratio = ratio

        if best_match_key is not None:
            return json_data[best_match_key]
        else:
            logging.error(f"Error loading JSON from link: No match found for link: {link}")
            return None
    except Exception as e:
        logging.error(f"Error loading JSON from link: {e}")
        return None



* **intiiere_Ontologie()** - Initialisiert die Klassen, Eigenschaften und Annotationen der Ontologie.
* **create_baustein()** - Erweitert die Ontologie mit den vorbereiteten Informationen aus einem JSON-Objekt. Zuerst werden die notwendigen Dictionary-Dateien erstellt, falls diese noch nicht existieren, und dann in die Ontologie geladen. Sollte das Erstellen einzelner Attribute fehlschlagen, wird dies in den Logs vermerkt und sollte manuell überprüft werden.

In [13]:
def intiiere_Ontologie():
    logging.info("Initializing ontology...")
    onto = get_ontology(f"http://test.org/{resultname}.owl")
    with onto:
        # Define classes
        class Asset(Thing):
            pass

        class Vulnerability(Thing):
            pass

        class Threat_example(Thing):
            pass

        class Control(Thing):
            pass
        
        class Attribute(Thing): 
            pass
        
        class InformationSource(Thing):
            pass
        # Define control level subclasses
        class LowLevelControl(Control):
            pass

        class MediumLevelControl(Control):
            pass

        class HighLevelControl(Control):
            pass
        
        # define 

            
        class ExplicitVulnerability(Vulnerability):
            pass
        
        class GeneralClassAxiomenericVulnerability(Vulnerability):
            pass
        
        class ControlType(Attribute): 
            pass

        class SecurityAttribute(Attribute): 
            pass

        class ThreatOrigin(Attribute): 
            pass

        class ThreatSource(Attribute): 
            pass
        

        # Define object properties
        class implementedBy(Control >> Asset):
            pass

        class hasInformationSource(Thing >> InformationSource):
            pass
            
        class vulnerableTo(Asset >> Vulnerability):
            pass

        class exploitedBy_exapmle(Vulnerability >> Threat_example):
            pass

        class mitigatedBy(Vulnerability >> Control):
            pass
        
        class mitigates(Control >> Vulnerability):
            pass

        class hasOrigin(Threat_example >> ThreatOrigin): 
            pass
        
        class hasSource(Threat_example >> ThreatSource): 
            pass

        class affects(Control >> SecurityAttribute): 
            pass  

        class isresponsible(AnnotationProperty):
            pass 

        class description(AnnotationProperty):
            pass 
    logging.info("Ontology initialized successfully.")
    return onto
def create_baustein (link, onto, failed=False):
    #logging.info(f"Processing baustein from link: {link}")
    generated = generate(link, failed)
    try:
        json_data = llm_output_to_json(generated)
        logging.info(f"Successfully generated JSON data from link: {link}")
    except Exception as e:
        logging.error(f"Failed to generate JSON data from link: {link}. Error: {e}")
        return onto
    print("generating finished")
    json_data = populate_json_with_excel_data(json_data, read_excel_sheet(json_data["handle"]))
    GefahrenQuelle = [{'handle': 'G 0.1', 'text': 'Feuer'}, {'handle': 'G 0.2', 'text': 'Ungünstige klimatische Bedingungen'}, {'handle': 'G 0.3', 'text': 'Wasser'}, {'handle': 'G 0.4', 'text': 'Verschmutzung, Staub, Korrosion'}, {'handle': 'G 0.5', 'text': 'Naturkatastrophen'}, {'handle': 'G 0.6', 'text': 'Katastrophen im Umfeld'}, {'handle': 'G 0.7', 'text': 'Großereignisse im Umfeld'}, {'handle': 'G 0.8', 'text': 'Ausfall oder Störung der Stromversorgung'}, {'handle': 'G 0.9', 'text': 'Ausfall oder Störung von Kommunikationsnetzen'}, {'handle': 'G 0.10', 'text': 'Ausfall oder Störung von Versorgungsnetzen'}, {'handle': 'G 0.11', 'text': 'Ausfall oder Störung von Dienstleistungsunternehmen'}, {'handle': 'G 0.12', 'text': 'Elektromagnetische Störstrahlung'}, {'handle': 'G 0.13', 'text': 'Abfangen kompromittierender Strahlung'}, {'handle': 'G 0.14', 'text': 'Ausspähen von Informationen (Spionage)'}, {'handle': 'G 0.15', 'text': 'Abhören'}, {'handle': 'G 0.16', 'text': 'Diebstahl von Geräten, Datenträgern oder Dokumenten'}, {'handle': 'G 0.17', 'text': 'Verlust von Geräten, Datenträgern oder Dokumenten'}, {'handle': 'G 0.18', 'text': 'Fehlplanung oder fehlende Anpassung'}, {'handle': 'G 0.19', 'text': 'Offenlegung schützenswerter Informationen'}, {'handle': 'G 0.20', 'text': 'Informationen oder Produkte aus unzuverlässiger Quelle'}, {'handle': 'G 0.21', 'text': 'Manipulation von Hard- oder Software'}, {'handle': 'G 0.22', 'text': 'Manipulation von Informationen'}, {'handle': 'G 0.23', 'text': 'Unbefugtes Eindringen in IT-Systeme'}, {'handle': 'G 0.24', 'text': 'Zerstörung von Geräten oder Datenträgern'}, {'handle': 'G 0.25', 'text': 'Ausfall von Geräten oder Systemen'}, {'handle': 'G 0.26', 'text': 'Fehlfunktion von Geräten oder Systemen'}, {'handle': 'G 0.27', 'text': 'Ressourcenmangel'}, {'handle': 'G 0.28', 'text': 'Software-Schwachstellen oder -Fehler'}, {'handle': 'G 0.29', 'text': 'Verstoß gegen Gesetze oder Regelungen'}, {'handle': 'G 0.30', 'text': 'Unberechtigte Nutzung oder Administration von Geräten und Systemen'}, {'handle': 'G 0.31', 'text': 'Fehlerhafte Nutzung oder Administration von Geräten und Systemen'}, {'handle': 'G 0.32', 'text': 'Missbrauch von Berechtigungen'}, {'handle': 'G 0.33', 'text': 'Personalausfall'}, {'handle': 'G 0.34', 'text': 'Anschlag'}, {'handle': 'G 0.35', 'text': 'Nötigung, Erpressung oder Korruption'}, {'handle': 'G 0.36', 'text': 'Identitätsdiebstahl'}, {'handle': 'G 0.37', 'text': 'Abstreiten von Handlungen'}, {'handle': 'G 0.38', 'text': 'Missbrauch personenbezogener Daten'}, {'handle': 'G 0.39', 'text': 'Schadprogramme'}, {'handle': 'G 0.40', 'text': 'Verhinderung von Diensten (Denial of Service)'}, {'handle': 'G 0.41', 'text': 'Sabotage'}, {'handle': 'G 0.42', 'text': 'Social Engineering'}, {'handle': 'G 0.43', 'text': 'Einspielen von Nachrichten'}, {'handle': 'G 0.44', 'text': 'Unbefugtes Eindringen in Räumlichkeiten'}, {'handle': 'G 0.45', 'text': 'Datenverlust'}, {'handle': 'G 0.46', 'text': 'Integritätsverlust schützenswerter Informationen'}, {'handle': 'G 0.47', 'text': 'Schädliche Seiteneffekte IT-gestützter Angriffe'}]
    print("processing " + json_data["name"])
    with (onto): 
        accidental = onto.ThreatSource("Accidental")
        malicious = onto.ThreatSource("malicious")
        human = onto.ThreatOrigin("human")
        natural = onto.ThreatOrigin("natural")
        gefahren = [onto.Threat_example(gefahr["text"].replace('"',"'")) for gefahr in GefahrenQuelle]
        confidentiality = onto.SecurityAttribute("confidentiality")
        integrity = onto.SecurityAttribute("integrity")
        availability = onto.SecurityAttribute("availability")
        # Asset specific
        asset = onto.Asset(json_data["name"].replace('"',"'"))
        asset.hasInformationSource =  [onto.InformationSource(json_data["Informationsource"])]
        if not failed: asset.description = json_data["text"].replace('"',"'")
        vulnerabilities = []
        for vuln_data in json_data["vulnerableTo"]:
            vulnerability = onto.Vulnerability(vuln_data["name"].replace('"',"'"))
            if not failed:vulnerability.description = vuln_data["text"].replace('"',"'")
            vulnerability.hasInformationSource.append(onto.InformationSource(vuln_data["Informationsource"]))
            threats = []
            if len(vuln_data["exploitedBy_examples"]) > 0:
                for threat_json in vuln_data["exploitedBy_examples"]:
                    threat =  onto.Threat_example(threat_json["name"].replace('"',"'"))
                    if not failed: threat.description = threat_json["text"].replace('"',"'")
                    threat.hasInformationSource =  [onto.InformationSource(vuln_data["Informationsource"])]
                    try:
                        for origin in threat_json["hasOrigin"]:
                            if origin in ["human", "natural"]:
                                threat.hasOrigin.append({"human": human, "natural": natural}[origin])
                    except Exception as e: 
                        threat.hasOrigin = []
                        logging.warning(f"Failed to add origin to threat: {e} on {threat_json['name']}")
                    try:
                        for source in threat_json["hasSource"]:
                            if source in ["accidental", "malicious"]:
                                threat.hasSource.append({"accidental": accidental, "deliberate": malicious}[source])
                    except Exception as e:
                        logging.warning(f"Failed to add source to threat: {e} on {threat_json['name']}")
                        threat.hasSource = []
                    threats.append(threat)    
                vulnerability.exploitedBy_exapmle = threats
        asset.vulnerableTo = vulnerabilities

        for control_data in json_data["Controls"]:
            level = control_data["level"].lower()  # Convert to lowercase for matching
            control_class = {"low":  onto.LowLevelControl, "medium":  onto.MediumLevelControl, "high":  onto.HighLevelControl}[level]
            control = control_class(str(control_data["handle"]+  " "+ control_data["name"].replace('"',"'")))
            control.hasInformationSource =  [onto.InformationSource(control_data["Informationsource"])]
            for affect in control_data["affects"]:
                try: 
                    for letter in affect:
                        control.affects.append({"A": availability, "C": confidentiality, "I": integrity }[letter])
                    control.mitigates = [gefahren[int(gefahrtext.split(".")[1])-1] for gefahrtext in control_data["mitigates"]]
                except Exception as e: 
                    print(f"failed to add affect to control: {e} of {control_data['name']}")
                    logging.warning(f"failed to add affect to control: {e} of {control_data['name']}")
            try:
                if not failed: 
                    content = str(control_data["text"])
                    content = content.replace('"',"'")
                    control.description = content

            except Exception as e:
                logging.error(f"Failed to encode description to control: {e} on {control_data['name']}")
                control.description = ""
            try:
                control.isresponsible = control_data["isresponsible"]
            except Exception as e:
                logging.error(f"Failed to add responsible entity to control: {e} on {control_data['name']}")
                control.isresponsible = []
            control.implementedBy.append(asset)
    return onto

def ontologie_speichern(onto):
    with onto:
        onto.save(file=f"{resultname}.owl", format="rdfxml")




In diesem Block werden alle vorherigen Schritte aufgerufen. Für jeden Link wird versucht, ein vollständiges Dictionary zu erstellen. Sollte dies fehlschlagen, wird in einer zweiten Iteration für alle fehlgeschlagenen Links neue JSONs ohne Beschreibungen erzeugt. Dies liegt daran, dass die Beschreibungen aufgrund der Einschränkungen von Gemini häufig zu Fehlern führen. Das finale Ergebnis wird anschließend durch die Funktion **ontologie_speichern** gespeichert.

In [None]:
#create seperate trace and a log files
logging.basicConfig(filename='ontology_creation2.log', level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')


failed = []
onto = intiiere_Ontologie()

for link in links_array:
    try:
        onto = create_baustein(link,onto) 
    except Exception as e:
        failed.append(link)
logging.info(f"Failed links: {len(failed)}")
for i in failed:
    try: 
        onto = create_baustein(i, onto, True)
    except Exception as e:
        logging.error(f"Retry failed for link: {i} - Error: {e}")
ontologie_speichern(onto)

Diese Funktion hilft dabei, Formatierungsfehler in der resultierenden .owl-Datei in Protege vorzubeugen, indem sie Anführungszeichen (”) in Textblöcken ersetzt.
Dennoch sollte die Datei vor Verwendung in einer Ontologie Software wie Protege geöffnet werden, da dabei noch Fehler auftreten können die nicht durch diese Vorbehandlung verhindert werden können. Um diese zu beheben: Überprüfen Sie die genannten Stellen in der OWL Datei in einem Texteditor, oft müssen hier Sonderzeichen oder Kodierungsfehler ausgebessert werden. 

In [15]:
def replace_quotes_in_file(file_path):
    """
    Replaces nested double quotes within rdf:about attributes with single quotes
    in an RDF/XML file, modifying the file in place.

    Args:
        file_path (str): Path to the RDF/XML file to be modified.
    """
    try:
        for line in fileinput.input(file_path, inplace=True):
            # Step 1: Handle nested double quotes
            line = re.sub(r'(rdf:about="#)([^"]*)(")([^"]*)(")', r'\1\2&quot;\4&quot;', line)

            # Step 2: Replace &quot; entities with '
            line = line.replace("&quot;", "'")
            line = line.replace("%20", "_")
            
            print(line, end="")  # Print the modified line (fileinput.input will write it back)

        print(f"Quotes replaced successfully in file: {file_path}")

    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
    except Exception as e:
        print(f"An error occurred: {e}")

# Example usage
file_path = f"{resultname}.owl"  # Replace with your file's path
replace_quotes_in_file(file_path)

Quotes replaced successfully in file: cybersecurity_complete_v14.owl


## Abschließend müssen zur Sicherstellung der Qualität noch folgende Schritte erfolgen:
1. Durchgehen der .log-Datei: Am häufigsten treten Warnungen auf, da optionale Informationen nicht gefunden werden. Diese können ignoriert werden.
2. Fehlerbehandlung bei maximaler Outputlänge: Übersteigen Beschreibungen von Gefahren oder Kontrollen die maximale Outputlänge des LLMs, wird der Fehler “Error: Expecting ‘,’ delimiter: …” zurückgegeben. Um zu verhindern, dass dadurch ganze Bausteine nicht im Graphen abgebildet sind, werden im zweiten Schritt nur die Objekte ohne Beschreibungen erstellt. Falls gewünscht, müssen die Beschreibungen der entsprechenden Bausteine dann manuell in einer Bearbeitungssoftware wie Protege nachträglich hinzugefügt werden.
3. Umgang mit sicherheitsbedenklichen Bausteinen: Dasselbe gilt auch für Bausteine, die aufgrund von Sicherheitsbedenken durch die API zurückgehalten werden.
4. Manueller Vergleich zur Gewährleistung der Korrektheit: Um die vollständige Korrektheit des Wissensgraphen gewährleisten zu können, ist ein manueller Vergleich des Graphen mit dem Grundschutzkompendium notwendig.