Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: renaming functions in preparation for TextValue verification #808

Merged
merged 8 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 35 additions & 25 deletions src/dsp_tools/commands/xmlupload/check_consistency_with_ontology.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
from lxml import etree
from regex import Pattern

from dsp_tools.commands.xmlupload.models.ontology_diagnose_models import InvalidOntologyElements, OntoCheckInformation
from dsp_tools.commands.xmlupload.models.ontology_lookup_models import ProjectOntosInformation
from dsp_tools.commands.xmlupload.models.ontology_problem_models import (
InvalidOntologyElementsInData,
)
from dsp_tools.commands.xmlupload.ontology_client import OntologyClient
from dsp_tools.models.exceptions import UserError

Expand All @@ -14,7 +17,7 @@
genericPrefixedOntology: Pattern[str] = regex.compile(r"^[\w\-]+:\w+$")


def do_xml_consistency_check(onto_client: OntologyClient, root: etree._Element) -> None:
def do_xml_consistency_check_with_ontology(onto_client: OntologyClient, root: etree._Element) -> None:
Nora-Olivia-Ammann marked this conversation as resolved.
Show resolved Hide resolved
"""
This function takes an OntologyClient and the root of an XML.
It retrieves the ontologies from the server.
Expand All @@ -28,21 +31,24 @@ def do_xml_consistency_check(onto_client: OntologyClient, root: etree._Element)
Raises:
UserError: if there are any invalid properties or classes
"""
onto_check_info = OntoCheckInformation(
default_ontology_prefix=onto_client.default_ontology, onto_lookup=onto_client.get_all_ontologies_from_server()
onto_check_info = ProjectOntosInformation(
default_ontology_prefix=onto_client.default_ontology,
onto_lookup=onto_client.get_all_ontologies_from_server(),
)
classes, properties = _get_all_classes_and_properties(root)
_find_problems_in_classes_and_properties(classes, properties, onto_check_info)
classes_in_data, properties_in_data = _get_all_classes_and_properties_from_data(root)
_find_if_all_classes_and_properties_exist_in_onto(classes_in_data, properties_in_data, onto_check_info)


def _find_problems_in_classes_and_properties(
classes: dict[str, list[str]], properties: dict[str, list[str]], onto_check_info: OntoCheckInformation
def _find_if_all_classes_and_properties_exist_in_onto(
classes_in_data: dict[str, list[str]],
properties_in_data: dict[str, list[str]],
onto_check_info: ProjectOntosInformation,
) -> None:
class_problems = _diagnose_all_classes(classes, onto_check_info)
property_problems = _diagnose_all_properties(properties, onto_check_info)
class_problems = _check_if_all_class_types_exist(classes_in_data, onto_check_info)
property_problems = _check_if_all_properties_exist(properties_in_data, onto_check_info)
if not class_problems and not property_problems:
return None
problems = InvalidOntologyElements(
problems = InvalidOntologyElementsInData(
classes=class_problems, properties=property_problems, ontos_on_server=list(onto_check_info.onto_lookup.keys())
)
msg, df = problems.execute_problem_protocol()
Expand All @@ -56,15 +62,17 @@ def _find_problems_in_classes_and_properties(
raise UserError(msg)


def _get_all_classes_and_properties(root: etree._Element) -> tuple[dict[str, list[str]], dict[str, list[str]]]:
cls_dict = _get_all_class_types_and_ids(root)
def _get_all_classes_and_properties_from_data(
root: etree._Element,
) -> tuple[dict[str, list[str]], dict[str, list[str]]]:
cls_dict = _get_all_class_types_and_ids_from_data(root)
prop_dict: dict[str, list[str]] = {}
for resource in root.iterchildren(tag="resource"):
prop_dict = _get_all_property_names_and_resource_ids_one_resource(resource, prop_dict)
return cls_dict, prop_dict


def _get_all_class_types_and_ids(root: etree._Element) -> dict[str, list[str]]:
def _get_all_class_types_and_ids_from_data(root: etree._Element) -> dict[str, list[str]]:
cls_dict: dict[str, list[str]] = {}
for resource in root.iterchildren(tag="resource"):
restype = resource.attrib["restype"]
Expand All @@ -88,18 +96,18 @@ def _get_all_property_names_and_resource_ids_one_resource(
return prop_dict


def _diagnose_all_classes(
classes: dict[str, list[str]], onto_check_info: OntoCheckInformation
def _check_if_all_class_types_exist(
classes: dict[str, list[str]], onto_check_info: ProjectOntosInformation
) -> list[tuple[str, list[str], str]]:
problem_list = []
for cls_type, ids in classes.items():
if problem := _diagnose_class(cls_type, onto_check_info):
if problem := _check_if_one_class_type_exists(cls_type, onto_check_info):
problem_list.append((cls_type, ids, problem))
return problem_list


def _diagnose_class(cls_type: str, onto_check_info: OntoCheckInformation) -> str | None:
prefix, cls_ = _get_prefix_and_prop_or_cls_identifier(cls_type, onto_check_info.default_ontology_prefix)
def _check_if_one_class_type_exists(cls_type: str, onto_check_info: ProjectOntosInformation) -> str | None:
prefix, cls_ = _get_separate_prefix_and_iri_from_onto_prop_or_cls(cls_type, onto_check_info.default_ontology_prefix)
if not prefix:
return "Property name does not follow a known ontology pattern"
if onto := onto_check_info.onto_lookup.get(prefix):
Expand All @@ -108,18 +116,20 @@ def _diagnose_class(cls_type: str, onto_check_info: OntoCheckInformation) -> str
return "Unknown ontology prefix"


def _diagnose_all_properties(
properties: dict[str, list[str]], onto_check_info: OntoCheckInformation
def _check_if_all_properties_exist(
properties: dict[str, list[str]], onto_check_info: ProjectOntosInformation
) -> list[tuple[str, list[str], str]]:
problem_list = []
for prop_name, ids in properties.items():
if problem := _diagnose_property(prop_name, onto_check_info):
if problem := _check_if_one_property_exists(prop_name, onto_check_info):
problem_list.append((prop_name, ids, problem))
return problem_list


def _diagnose_property(prop_name: str, onto_check_info: OntoCheckInformation) -> str | None:
prefix, prop = _get_prefix_and_prop_or_cls_identifier(prop_name, onto_check_info.default_ontology_prefix)
def _check_if_one_property_exists(prop_name: str, onto_check_info: ProjectOntosInformation) -> str | None:
prefix, prop = _get_separate_prefix_and_iri_from_onto_prop_or_cls(
prop_name, onto_check_info.default_ontology_prefix
)
if not prefix:
return "Property name does not follow a known ontology pattern"
if onto := onto_check_info.onto_lookup.get(prefix):
Expand All @@ -128,7 +138,7 @@ def _diagnose_property(prop_name: str, onto_check_info: OntoCheckInformation) ->
return "Unknown ontology prefix"


def _get_prefix_and_prop_or_cls_identifier(
def _get_separate_prefix_and_iri_from_onto_prop_or_cls(
prop_or_cls: str, default_ontology_prefix: str
) -> tuple[str, ...] | tuple[None, None]:
if defaultOntologyColon.match(prop_or_cls):
Expand Down
57 changes: 57 additions & 0 deletions src/dsp_tools/commands/xmlupload/models/ontology_lookup_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from dataclasses import dataclass, field
from typing import Any


@dataclass(frozen=True)
class OntoInfo:
"""This class saves the properties and the classes from an ontology."""

classes: list[str] = field(default_factory=list)
properties: list[str] = field(default_factory=list)


@dataclass
class ProjectOntosInformation:
"""This class saves information needed to check the consistency with the ontology."""

default_ontology_prefix: str
onto_lookup: dict[str, OntoInfo]


def extract_classes_properties_from_onto(onto_graph: list[dict[str, Any]]) -> OntoInfo:
"""
This function takes an ontology graph from the DSP-API.
It extracts the classes and properties.
And saves them in an instance of the class Ontology.

Args:
onto_graph: graph from DSP-API

Returns:
Ontology instance with the classes and properties
"""
classes = _get_all_cleaned_classes_from_graph(onto_graph)
properties = _get_all_cleaned_properties_from_graph(onto_graph)
return OntoInfo(classes, properties)


def _get_all_cleaned_classes_from_graph(onto_graph: list[dict[str, Any]]) -> list[str]:
classes = _get_all_classes_from_graph(onto_graph)
return _remove_prefixes(classes)


def _get_all_classes_from_graph(onto_graph: list[dict[str, Any]]) -> list[str]:
return [elem["@id"] for elem in onto_graph if elem.get("knora-api:isResourceClass")]


def _get_all_cleaned_properties_from_graph(onto_graph: list[dict[str, Any]]) -> list[str]:
props = _get_all_properties_from_graph(onto_graph)
return _remove_prefixes(props)


def _get_all_properties_from_graph(onto_graph: list[dict[str, Any]]) -> list[str]:
return [elem["@id"] for elem in onto_graph if not elem.get("knora-api:isResourceClass")]


def _remove_prefixes(ontology_elements: list[str]) -> list[str]:
return [x.split(":")[1] for x in ontology_elements]
Original file line number Diff line number Diff line change
@@ -1,32 +1,17 @@
import itertools
from dataclasses import dataclass, field
from dataclasses import dataclass

import pandas as pd

separator = "\n "
list_separator = "\n - "
grand_separator = "\n----------------------------\n"
medium_separator = "\n----------------------------\n"
grand_separator = "\n\n---------------------------------------\n\n"
maximum_prints = 50


@dataclass(frozen=True)
class OntoInfo:
"""This class saves the properties and the classes from an ontology."""

classes: list[str] = field(default_factory=list)
properties: list[str] = field(default_factory=list)


@dataclass
class OntoCheckInformation:
"""This class saves information needed to check the consistency with the ontology."""

default_ontology_prefix: str
onto_lookup: dict[str, OntoInfo]


@dataclass(frozen=True)
class InvalidOntologyElements:
class InvalidOntologyElementsInData:
"""This class saves and prints out the information regarding ontology classes and properties
that are in the XML but not the ontology."""

Expand All @@ -42,15 +27,14 @@ def execute_problem_protocol(self) -> tuple[str, pd.DataFrame | None]:
Returns:
the error message and a dataframe with the errors if they exceed 50 or None
"""
extra_separator = "\n\n---------------------------------------\n\n"
msg = (
f"\nSome property and/or class type(s) used in the XML are unknown.\n"
f"The ontologies for your project on the server are:{list_separator}"
f"{list_separator.join(self.ontos_on_server)}{extra_separator}"
f"{list_separator.join(self.ontos_on_server)}{grand_separator}"
)
cls_msg = self._compose_problem_string_for_cls()
if cls_msg:
msg += cls_msg + extra_separator
msg += cls_msg + grand_separator
prop_msg = self._compose_problem_string_for_props()
if prop_msg:
msg += prop_msg
Expand Down Expand Up @@ -108,7 +92,7 @@ def _format_cls(cls_tup: tuple[str, list[str], str]) -> str:

problems = [_format_cls(x) for x in self.classes]

return "The following resource(s) have an invalid resource type:\n\n" + grand_separator.join(problems)
return "The following resource(s) have an invalid resource type:\n\n" + medium_separator.join(problems)
else:
return None

Expand All @@ -126,6 +110,6 @@ def _format_prop(prop_tup: tuple[str, list[str], str]) -> str:
)

problems = [_format_prop(x) for x in self.properties]
return "The following resource(s) have invalid property type(s):\n\n" + grand_separator.join(problems)
return "The following resource(s) have invalid property type(s):\n\n" + medium_separator.join(problems)
else:
return None
63 changes: 19 additions & 44 deletions src/dsp_tools/commands/xmlupload/ontology_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pathlib import Path
from typing import Any, Protocol

from dsp_tools.commands.xmlupload.models.ontology_diagnose_models import OntoInfo
from dsp_tools.commands.xmlupload.models.ontology_lookup_models import OntoInfo, extract_classes_properties_from_onto
from dsp_tools.models.exceptions import BaseError, UserError
from dsp_tools.utils.connection import Connection
from dsp_tools.utils.create_logger import get_logger
Expand All @@ -21,7 +21,10 @@ class OntologyClient(Protocol):
ontology_names: list[str] = field(default_factory=list)

def get_all_ontologies_from_server(self) -> dict[str, OntoInfo]:
"""Get all the ontologies for a project and the knora-api ontology from the server."""
"""Get all the ontologies for a project from the server."""

def get_knora_api_ontology_from_server(self) -> list[dict[str, Any]]:
"""Get the knora-api ontology from the server."""


@dataclass
Expand All @@ -42,12 +45,14 @@ def get_all_ontologies_from_server(self) -> dict[str, OntoInfo]:
a dictionary with the ontology name as key and the ontology as value.
"""
ontologies = self._get_all_ontology_jsons_from_server()
return {onto_name: deserialize_ontology(onto_graph) for onto_name, onto_graph in ontologies.items()}
return {
onto_name: extract_classes_properties_from_onto(onto_graph) for onto_name, onto_graph in ontologies.items()
}

def _get_all_ontology_jsons_from_server(self) -> dict[str, list[dict[str, Any]]]:
self._get_ontology_names_from_server()
project_ontos = {onto: self._get_ontology_from_server(onto) for onto in self.ontology_names}
project_ontos["knora-api"] = self._get_knora_api_ontology_from_server()
project_ontos["knora-api"] = self.get_knora_api_ontology_from_server()
return project_ontos

def _get_ontology_names_from_server(self) -> None:
Expand Down Expand Up @@ -77,7 +82,16 @@ def _get_ontology_from_server(self, ontology_name: str) -> list[dict[str, Any]]:
raise BaseError(f"Unexpected response from server: {res}") from e
return onto_graph

def _get_knora_api_ontology_from_server(self) -> list[dict[str, Any]]:
def get_knora_api_ontology_from_server(self) -> list[dict[str, Any]]:
"""
This function returns the knora-api ontology from the server.

Returns:
knora-api ontology in json format

Raises:
BaseError: if an unexpected response from the server occurred
"""
url = "/ontology/knora-api/v2#"
try:
res = self.con.get(url)
Expand All @@ -88,42 +102,3 @@ def _get_knora_api_ontology_from_server(self) -> list[dict[str, Any]]:
except KeyError as e:
raise BaseError(f"Unexpected response from server when retrieving knora-api ontology: {res}") from e
return onto_graph


def deserialize_ontology(onto_graph: list[dict[str, Any]]) -> OntoInfo:
"""
This function takes an ontology graph from the DSP-API.
It extracts the classes and properties.
And saves them in an instance of the class Ontology.

Args:
onto_graph: graph from DSP-API

Returns:
Ontology instance with the classes and properties
"""
classes = _get_all_cleaned_classes_from_graph(onto_graph)
properties = _get_all_cleaned_properties_from_graph(onto_graph)
return OntoInfo(classes, properties)


def _get_all_cleaned_classes_from_graph(onto_graph: list[dict[str, Any]]) -> list[str]:
classes = _get_all_classes_from_graph(onto_graph)
return _remove_prefixes(classes)


def _get_all_classes_from_graph(onto_graph: list[dict[str, Any]]) -> list[str]:
return [elem["@id"] for elem in onto_graph if elem.get("knora-api:isResourceClass")]


def _get_all_cleaned_properties_from_graph(onto_graph: list[dict[str, Any]]) -> list[str]:
props = _get_all_properties_from_graph(onto_graph)
return _remove_prefixes(props)


def _get_all_properties_from_graph(onto_graph: list[dict[str, Any]]) -> list[str]:
return [elem["@id"] for elem in onto_graph if not elem.get("knora-api:isResourceClass")]


def _remove_prefixes(ontology_elements: list[str]) -> list[str]:
return [x.split(":")[1] for x in ontology_elements]
10 changes: 3 additions & 7 deletions src/dsp_tools/commands/xmlupload/xmlupload.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from lxml import etree

from dsp_tools.commands.xmlupload.check_consistency_with_ontology import do_xml_consistency_check
from dsp_tools.commands.xmlupload.check_consistency_with_ontology import do_xml_consistency_check_with_ontology
from dsp_tools.commands.xmlupload.iri_resolver import IriResolver
from dsp_tools.commands.xmlupload.list_client import ListClient, ListClientLive
from dsp_tools.commands.xmlupload.models.permission import Permissions
Expand Down Expand Up @@ -94,7 +94,7 @@ def xmlupload(
default_ontology=default_ontology,
save_location=config.diagnostics.save_location,
)
do_xml_consistency_check(onto_client=ontology_client, root=root)
do_xml_consistency_check_with_ontology(onto_client=ontology_client, root=root)

resources, permissions_lookup, stash = _prepare_upload(
root=root,
Expand All @@ -104,11 +104,7 @@ def xmlupload(
)

project_client: ProjectClient = ProjectClientLive(con, config.shortcode)
if default_ontology not in project_client.get_ontology_name_dict():
raise UserError(
f"The default ontology '{default_ontology}' "
"specified in the XML file is not part of the project on the DSP server."
)

list_client: ListClient = ListClientLive(con, project_client.get_project_iri())

iri_resolver, failed_uploads = _upload(
Expand Down
Loading
Loading