In [None]:
import os

from dotenv import load_dotenv
from cognite.client import CogniteClient, ClientConfig
from cognite.client.credentials import OAuthClientCredentials
from dataclasses import dataclass

from cognite.client.data_classes.data_modeling import (
    Node,
    NodeId,
    NodeList,
    NodeApplyList,
    ViewId,
    NodeApply,
    NodeOrEdgeData,
)

In [None]:
def create_client(cdf_project, cdf_cluster, client_id, client_secret, tenant_id, debug: bool = False):
    SCOPES = [f"https://{cdf_cluster}.cognitedata.com/.default"]
    TOKEN_URL = (
        f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
    )
    creds = OAuthClientCredentials(
        token_url=TOKEN_URL,
        client_id=client_id,
        client_secret=client_secret,
        scopes=SCOPES,
    )
    cnf = ClientConfig(
        client_name="DEV_Working",
        project=cdf_project,
        base_url=f"https://{cdf_cluster}.cognitedata.com",  # NOTE: base_url might need to be adjusted if on PSAAS or Private Link
        credentials=creds,
        debug=debug,
    )
    client = CogniteClient(cnf)
    return client

In [None]:
# Configuration Classes
@dataclass
class ViewPropertyConfig:
    schema_space: str
    external_id: str
    version: str
    instance_space: str | None = None

    def as_view_id(self) -> ViewId:
        return ViewId(
            space=self.schema_space, external_id=self.external_id, version=self.version
        )

    def as_property_ref(self, property) -> list[str]:
        return [self.schema_space, f"{self.external_id}/{self.version}", property]

In [None]:
# Connect to cognite project
load_dotenv()
cdf_project=os.getenv("CDF_PROJECT")
cdf_cluster=os.getenv("CDF_CLUSTER")
client_id=os.getenv("IDP_CLIENT_ID")
client_secret=os.getenv("IDP_CLIENT_SECRET")
tenant_id=os.getenv("IDP_TENANT_ID")

cdf_client = create_client(cdf_project, cdf_cluster, client_id, client_secret, tenant_id)

In [None]:
# Replace the value of organization with the one used in config.<env>.yaml
organization: str = <insert>
file_view_name: str = f"{organization}File"

# Create a view class
file_view: ViewPropertyConfig = ViewPropertyConfig(
    schema_space="sp_enterprise_process_industry",
    external_id=file_view_name,
    version="v1",
    instance_space="springfield_instances",
)

In [None]:
# retrieve instances of txFile
files: NodeList[Node] = cdf_client.data_modeling.instances.list(instance_type="node", sources=file_view.as_view_id(), limit=-1)
print(files[1])

In [None]:
# now we need to enrich the data... -> lets add detectInDiagram and toAnnotate to the tags property
file_node_apply_list: NodeApplyList = files.as_write()

for file in file_node_apply_list:
    file.sources[0].properties["tags"] = ["ToAnnotate", "DetectInDiagrams"]
    alias = []
    name = file.sources[0].properties["name"]
    alias.append(name.replace(".pdf", ""))
    file.sources[0].properties["aliases"] = alias

print(file_node_apply_list[0])

In [None]:
# Great now that we've updated the nodes' tags property, let's apply those changes
cdf_client.data_modeling.instances.apply(
    nodes=file_node_apply_list
)

In [None]:
# Now lets do the same with the equipment nodes in the project so that we have entities to match against
equipment_view_name: str = f"{organization}Equipment"
equipment_view: ViewPropertyConfig = ViewPropertyConfig(
    schema_space="sp_enterprise_process_industry",
    external_id=equipment_view_name,
    version="v1",
    instance_space="springfield_instances",
)

# retrieve instances of txEquipment
equipments: NodeList[Node] = cdf_client.data_modeling.instances.list(instance_type="node", sources=equipment_view.as_view_id(), limit=-1)

# Now lets do the same with the equipment nodes in the project so that we have entities to match against
asset_view_name: str = f"{organization}Asset"
asset_view: ViewPropertyConfig = ViewPropertyConfig(
    schema_space="sp_enterprise_process_industry",
    external_id=asset_view_name,
    version="v1",
    instance_space="springfield_instances",
)

asset_node_apply_list = []
for equipment in equipments:
    external_id = "asset:"+equipment.external_id
    space = equipment.space

    properties:dict = {}
    equipment_name = equipment.properties[equipment_view.as_view_id()]["name"]

    properties["tags"] = ["DetectInDiagrams"]
    properties["name"] = equipment_name
    properties["description"] = equipment.properties[equipment_view.as_view_id()]["description"]
    properties["sourceId"] = equipment.properties[equipment_view.as_view_id()]["sourceId"]
    properties["sourceUpdatedUser"] = equipment.properties[equipment_view.as_view_id()]["sourceUpdatedUser"]

    aliases = []
    name_tokens = equipment_name.split("-")
    alt_alias = ""
    aliases.append(equipment_name)
    for index,token in enumerate(name_tokens):
        if index == 0:
            continue
        if index == 1:
            alt_alias = token
        else:
            alt_alias = alt_alias + "-" + token
    aliases.append(alt_alias)
    
    properties["aliases"] = aliases
    asset_node_apply_list.append(
        NodeApply(
            space=equipment.space,
            external_id="asset:"+equipment.external_id,
            sources=[
                NodeOrEdgeData(
                    source=asset_view.as_view_id(),
                    properties=properties,
                )
            ],
        )
    )

print(len(asset_node_apply_list))
print(asset_node_apply_list[0])


In [None]:
update_results = cdf_client.data_modeling.instances.apply(
    nodes=asset_node_apply_list,
    auto_create_direct_relations=True,
    replace=True,  # ensures we reset the properties of the node
)
print(update_results)