Skip to content

Commit

Permalink
Configurable assets cleanup in GenerateCDFAssetsFromGraph step (#152)
Browse files Browse the repository at this point in the history
* Configurable assets cleanup

* Fix for
https://github.com/cognitedata/neat/issues/139
and version

* Suggested PR changes
  • Loading branch information
alivinco authored Oct 31, 2023
1 parent dcc2c0c commit b0fb624
Show file tree
Hide file tree
Showing 11 changed files with 20,279 additions and 50 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.PHONY: run-explorer run-tests run-linters build-ui build-python build-docker run-docker compose-up

version="0.36.0"
version="0.37.0"
run-explorer:
@echo "Running explorer API server..."
# open "http://localhost:8000/static/index.html" || true
Expand Down
2 changes: 1 addition & 1 deletion cognite/neat/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.36.0"
__version__ = "0.37.0"
4 changes: 3 additions & 1 deletion cognite/neat/app/api/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from cognite import neat
from cognite.neat.app.api.data_classes.configuration import Config, configure_logging
from cognite.neat.config import copy_examples_to_directory
from cognite.neat.config import copy_examples_to_directory, create_data_dir_structure
from cognite.neat.constants import PACKAGE_DIRECTORY
from cognite.neat.utils.cdf import ServiceCogniteClient
from cognite.neat.utils.utils import get_cognite_client_from_config, get_cognite_client_from_token
Expand Down Expand Up @@ -95,6 +95,8 @@ def create_neat_app() -> NeatApp:

if config.load_examples:
copy_examples_to_directory(config.data_store_path)
else:
create_data_dir_structure(config.data_store_path)

configure_logging(config.log_level, config.log_format)
logging.info(f" Starting NEAT version {neat.__version__}")
Expand Down
21 changes: 16 additions & 5 deletions cognite/neat/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@ def copy_examples_to_directory(target_data_dir: Path):
Copier over all the examples to the target_data_directory,
without overwriting
Parameters
----------
target_data_dir : The target directory
Args:
target_data_dir : The target directory
"""

Expand All @@ -23,6 +20,20 @@ def copy_examples_to_directory(target_data_dir: Path):
_copy_examples(EXAMPLE_WORKFLOWS, target_data_dir / "workflows")


def create_data_dir_structure(target_data_dir: Path):
"""
Create the data directory structure in empty directory
Args:
target_data_dir : The target directory
"""

(target_data_dir / "rules").mkdir(exist_ok=True, parents=True)
(target_data_dir / "source-graphs").mkdir(exist_ok=True, parents=True)
(target_data_dir / "workflows").mkdir(exist_ok=True, parents=True)


def _copy_examples(source_dir: Path, target_dir: Path):
for current in source_dir.rglob("*"):
if current.is_dir():
Expand Down
20,168 changes: 20,168 additions & 0 deletions cognite/neat/graph/examples/Knowledge-Graph-Nordic44-dirty.xml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cognite/neat/graph/transformations/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def check_commit(force_commit: bool = False):
query = build_sparql_query(domain_knowledge_graph, rule.traversal, transformation_rules.prefixes)
else:
raise ValueError(f"Unknown traversal type {type(rule.traversal)}")
logging.info(f"Query: {query}")
logging.debug(f"Query: {query}")

if query_results := list(domain_knowledge_graph.query(query)):
# Generate URI for class and property in target namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ description: null
implementation_module: null
name: graph_to_asset_hierarchy
steps:
- configs: null
- configs: {}
description: null
enabled: true
id: step_http_trigger
Expand Down Expand Up @@ -46,7 +46,7 @@ steps:
pos_y: 167
- configs:
add_base_iri: 'True'
file_path: source-graphs/Knowledge-Graph-Nordic44.xml
file_path: source-graphs/Knowledge-Graph-Nordic44-dirty.xml
mime_type: application/rdf+xml
description: null
enabled: true
Expand Down Expand Up @@ -81,7 +81,8 @@ steps:
ui_config:
pos_x: 452
pos_y: 592
- configs: {}
- configs:
assets_cleanup_type: full
description: null
enabled: true
id: step_generate_assets
Expand Down
108 changes: 73 additions & 35 deletions cognite/neat/workflows/steps/lib/graph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from cognite.neat.graph.loaders.validator import validate_asset_hierarchy
from cognite.neat.utils.utils import generate_exception_report
from cognite.neat.workflows._exceptions import StepFlowContextNotInitialized, StepNotInitialized
from cognite.neat.workflows.model import FlowMessage
from cognite.neat.workflows.model import FlowMessage, StepExecutionStatus
from cognite.neat.workflows.steps.data_contracts import (
CategorizedAssets,
CategorizedRelationships,
Expand Down Expand Up @@ -169,12 +169,25 @@ class GenerateCDFAssetsFromGraph(Step):
)
category = CATEGORY

configurables: ClassVar[list[Configurable]] = [
Configurable(
name="assets_cleanup_type",
value="nothing",
options=["nothing", "orphans", "circular", "full"],
label=(
"Configures asset cleanup process. Supported options: nothing - no cleanup, \
orphans - all oraphan assets will be removed, circular - all circular assets will be removed , \
full - full cleanup , both orphans and circular assets will be removed. "
),
),
]

def run( # type: ignore[override]
self, rules: RulesData, cdf_client: CogniteClient, solution_graph: SolutionGraph
) -> (FlowMessage, CategorizedAssets): # type: ignore[override, syntax]
if self.configs is None:
raise StepNotInitialized(type(self).__name__)

asset_cleanup_type = self.configs.get("assets_cleanup_type", "nothing")
meta_keys = NeatMetadataKeys.load(self.configs)
if self.metrics is None:
raise ValueError(self._not_configured_message)
Expand Down Expand Up @@ -218,48 +231,68 @@ def run( # type: ignore[override]
logging.info(f"Total count of assets in CDF before upload: { total_assets_before }")

orphan_assets, circular_assets = validate_asset_hierarchy(rdf_asset_dicts)

orphan_assets_count = len(orphan_assets)
circular_assets_count = len(circular_assets)
prom_data_issues_stats.labels(resource_type="circular_assets").set(len(circular_assets))
prom_data_issues_stats.labels(resource_type="orphan_assets").set(len(orphan_assets))

if orphan_assets:
logging.error(f"Found orphaned assets: {', '.join(orphan_assets)}")

orphanage_asset_external_id = (
f"{rules.rules.metadata.externalIdPrefix}orphanage-{rules.dataset_id}"
if rules.rules.metadata.externalIdPrefix
else "orphanage"
)

# Kill the process if you dont have orphanage asset in your asset hierarchy
# and inform the user that it is missing !
if orphanage_asset_external_id not in rdf_asset_dicts:
msg = f"You dont have Orphanage asset {orphanage_asset_external_id} in asset hierarchy!"
logging.error(msg)
raise Exception(msg)

logging.error("Orphaned assets will be assigned to 'Orphanage' root asset")

for external_id in orphan_assets:
rdf_asset_dicts[external_id]["parent_external_id"] = orphanage_asset_external_id

orphan_assets, circular_assets = validate_asset_hierarchy(rdf_asset_dicts)

logging.info(orphan_assets)
if asset_cleanup_type in ["orphans", "full"]:
logging.info("Removing orphaned assets")
for external_id in orphan_assets:
del rdf_asset_dicts[external_id]
else:
orphanage_asset_external_id = (
f"{rules.rules.metadata.externalIdPrefix}orphanage-{rules.dataset_id}"
if rules.rules.metadata.externalIdPrefix
else "orphanage"
)

# Kill the process if you dont have orphanage asset in your asset hierarchy
# and inform the user that it is missing !
if orphanage_asset_external_id not in rdf_asset_dicts:
msg = f"You dont have Orphanage asset {orphanage_asset_external_id} in asset hierarchy!"
logging.error(msg)
return FlowMessage(
error_text=msg, step_execution_status=StepExecutionStatus.ABORT_AND_FAIL
), CategorizedAssets(assets={})

logging.error("Orphaned assets will be assigned to 'Orphanage' root asset")

for external_id in orphan_assets:
rdf_asset_dicts[external_id]["parent_external_id"] = orphanage_asset_external_id
else:
logging.info("No orphaned assets found, your assets look healthy !")

if circular_assets:
msg = f"Found circular dependencies: {circular_assets!s}"
logging.error(msg)
raise Exception(msg)
elif orphan_assets:
msg = f"Not able to fix orphans: {', '.join(orphan_assets)}"
logging.error(msg)
raise Exception(msg)
logging.error(f"Found circular dependencies: {circular_assets}")
if asset_cleanup_type in ["circular", "full"]:
logging.info("Removing circular assets")
for circular_path in circular_assets:
circular_external_id = circular_path[-1]
del rdf_asset_dicts[circular_external_id]
else:
logging.info("No circular dependency among assets found, your assets hierarchy look healthy !")

if orphan_assets or circular_assets:
orphan_assets, circular_assets = validate_asset_hierarchy(rdf_asset_dicts)
if circular_assets:
msg = f"Found circular dependencies: {circular_assets!s}"
logging.error(msg)
return FlowMessage(
error_text=msg, step_execution_status=StepExecutionStatus.ABORT_AND_FAIL
), CategorizedAssets(assets={})
elif orphan_assets:
msg = f"Not able to fix orphans: {', '.join(orphan_assets)}"
logging.error(msg)
return FlowMessage(
error_text=msg, step_execution_status=StepExecutionStatus.ABORT_AND_FAIL
), CategorizedAssets(assets={})
else:
logging.info("No circular dependency among assets found, your assets hierarchy look healthy !")

categorized_assets, report = categorize_assets(
cdf_client, rdf_asset_dicts, rules.dataset_id, return_report=True
)
Expand All @@ -279,10 +312,15 @@ def run( # type: ignore[override]
logging.info(f"Total count of assets to be decommission: { count_decommission_assets }")
logging.info(f"Total count of assets to be resurrect: { count_resurrect_assets }")

msg = f"Total count of assets { len(rdf_asset_dicts) } of which: { count_create_assets } to be created"
msg += f", { count_update_assets } to be updated"
msg += f", { count_decommission_assets } to be decommissioned"
msg += f", { count_resurrect_assets } to be resurrected"
msg = f"Total count of assets { len(rdf_asset_dicts) } of which:"
msg += f"<p> { count_create_assets } to be created </p>"
msg += f"<p> { count_update_assets } to be updated </p>"
msg += f"<p> { count_decommission_assets } to be decommissioned </p>"
msg += f"<p> { count_resurrect_assets } to be resurrected </p>"
msg += f"<p> Found { orphan_assets_count } orphan assets and"
msg += f" { circular_assets_count } circular assets </p>"
if asset_cleanup_type != "nothing":
msg += " <p> All circular and orphan assets were removed successfully </p>"
number_of_updates = len(report["decommission"])
logging.info(f"Total number of updates: {number_of_updates}")

Expand Down
9 changes: 9 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ Changes are grouped as follows:
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [0.37.0] - 31-10-23

## Added
- Configurable assets cleanup in GenerateCDFAssetsFromGraph step. Now user can specify if he/she wants to delete all ophan or circular assets or keep them.

### Fixed
- https://github.com/cognitedata/neat/issues/146
- https://github.com/cognitedata/neat/issues/139

## [0.36.0] - 30-10-23
### Added
- Added `DMSImporter`
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "cognite-neat"
version = "0.36.0"
version = "0.37.0"
readme = "README.md"
description = "Knowledge graph transformation"
authors = [
Expand Down
4 changes: 2 additions & 2 deletions tests/app/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def test_get_datatype_properties(
assert response.status_code == 200
assert {
"id": "http://iec.ch/TC57/2013/CIM-schema-cim16#IdentifiedObject.name",
"count": 2502,
"count": 2503,
"name": "IdentifiedObject.name",
} in content["datatype_properties"]

Expand Down Expand Up @@ -346,5 +346,5 @@ def test_get_classes(

assert response.status_code == 200
assert content["fields"] == ["class", "instances"]
assert {"class": "http://iec.ch/TC57/2013/CIM-schema-cim16#Substation", "instances": "44"} in content["rows"]
assert {"class": "http://iec.ch/TC57/2013/CIM-schema-cim16#Substation", "instances": "45"} in content["rows"]
assert len(content["rows"]) == 59

0 comments on commit b0fb624

Please sign in to comment.