# API - Level 4 PoX - IBM Knowledge Catalog

https://cloud.ibm.com/apidocs/watson-data-api-cpd

## Authorisation

In [1]:
import json
import requests # type: ignore
import time

class credentials :

    file_credentials = ""
    url_server = ""
    username = ""
    apikey = ""
    access_token = ""

    def __init__(self, file_credentials):
        try :
            with open(file_credentials) as f :
                data = json.load(f)
                self.url_server = data["url_server"]
                self.username = data["username"]
                self.apikey = data["api_key"]
                self.file_credentials = file_credentials
        except :
            print("Error with the file ", file_credentials)
    
   
    def urlRequest(self, urlSuffix):
        return self.url_server + urlSuffix

    def get_bearer_token(self):
        
        # Get a bearer token with the API key - Cloud Pak for Data SaaS
        # url = "https://iam.cloud.ibm.com/identity/token"
        # headers = {"Content-Type" : "application/x-www-form-urlencoded"}
        # data = "grant_type=urn:ibm:params:oauth:grant-type:apikey&apikey=" + apikey
        # r = requests.post(url, headers=headers, data=data)
        # access_token = r.json()["access_token"]

        # Get a bearer token with the API key - Cloud Pak for Data Software
        urlSuffix = "/icp4d-api/v1/authorize"
        headers = {'Accept': 'application/json', 'Content-type': 'application/json'}
        data = {"username" : self.username, "api_key" : self.apikey}
        r = requests.post(self.urlRequest(urlSuffix), headers=headers, data=json.dumps(data))

        if r.status_code != 200:
            print("Error with the request. Code: ", r.status_code)
            print(r.text)
        else :
            try:
                self.access_token = r.json()["token"]
            except KeyError:
                print("Error with the token. Code: ", r.status_code)
                print("Hint: check the credentials file ", self.file_credentials)
                print(r.text)
                
            return self.access_token
    
FILE_CREDENTIALS = "ikcapikey.json"

myconn = credentials(FILE_CREDENTIALS)
access_token = myconn.get_bearer_token()


## Define the Business Vocabulary


### 1. Create Categories
   

In [None]:
print("---- Import Categories from CSV----")

IMPORT_CSV_FILE = "governance-categories.csv"
urlSuffix='/v3/governance_artifact_types/category/import?merge_option=all'
headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
files = {'file': (IMPORT_CSV_FILE, open(IMPORT_CSV_FILE, 'rb'), 'text/csv')}

r = requests.post(myconn.urlRequest(urlSuffix), headers=headers, files=files)

if r.status_code == 200 :
    status = r.json()["status"]
    print("Import finished. Status = ", status)
    print(r.text)
elif r.status_code == 202 :
    process_id = r.json()["process_id"]
    print(f"----- Import process started: {process_id} ----- ")
    print("----- Entering wait loop ------")
    urlSuffix='/v3/governance_artifact_types/import/status/' + process_id
    headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
    while True :
        r = requests.get(myconn.urlRequest(urlSuffix), headers=headers)
        if r.status_code != 200 :
            print("Error with the request. Code: ", r.status_code)
            print(r.text)
        status = r.json()["status"]
        if status != "IN_PROGRESS" :
            break
        else :
            print ("Import in progess, please wait")
            time.sleep(5)
else :
    print("Error with the request. Code: ", r.status_code)
    print(r.text)


### 2. Update Classifications

#### 2.a. Change the definitions 

In [2]:
print("---- Update Classifications from CSV----")

IMPORT_CSV_FILE = "governance-classifications.csv" 
urlSuffix='/v3/governance_artifact_types/classification/import?merge_option=specified'
headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
files = {'file': (IMPORT_CSV_FILE, open(IMPORT_CSV_FILE, 'rb'), 'text/csv')}

r = requests.post(myconn.urlRequest(urlSuffix), headers=headers, files=files)

if r.status_code == 200 :
    status = r.json()["status"]
    print("Import finished. Status = ", status)
    print(r.text)

elif r.status_code == 202 :
    process_id = r.json()["process_id"]
    print(f"----- Import process started: {process_id} ----- ")
    print("----- Entering wait loop ------")
    urlSuffix='/v3/governance_artifact_types/import/status/' + process_id
    headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}

    while True :
        r = requests.get(myconn.urlRequest(urlSuffix), headers=headers)
        if r.status_code != 200 :
            print("Error with the request. Code: ", r.status_code)
            print(r.text)
            break
        status = r.json()["status"]
        if status != "IN_PROGRESS" :
            break
        else :
            print ("Import in progess, please wait")
            time.sleep(5)
else :
    print("Error with the request. Code: ", r.status_code)
    print(r.text)

workflow_id = r.json()["workflow_id"]


---- Update Classifications from CSV----
Import finished. Status =  SUCCEEDED
{
  "process_id": "573630ed-b047-4fba-96e5-884ef8c90a9a",
  "status": "SUCCEEDED",
  "step_number": 16,
  "total_steps": 16,
  "step_message": "Artifacts import finished",
  "creation_time": "2024-04-17T08:01:51.376Z",
  "completion_time": "2024-04-17T08:01:57.949Z",
  "tenant_id": "999",
  "workflow_id": "c09982fd-fc90-11ee-9527-0a580a820487",
  "heartbeat_time": "2024-04-17T08:01:57.949Z",
  "messages": {
    "resources": [],
    "offset": 0,
    "set_uri": false,
    "limit": 200,
    "count": 0
  },
  "operations_count": {
    "relationship": {
      "IMPORT_CREATE": 8
    },
    "classification": {
      "IMPORT_CREATE": 0,
      "IMPORT_MODIFY": 4
    }
  }
}


#### 2.b. Publish the definitions

Before executing this cell, you may want to check the "Task Inbox" in CloudPak for Data if you are not sure about what will be published

In [3]:
urlSuffix='/v3/workflows/' + workflow_id + '?include_user_tasks=true'
headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
r = requests.get(myconn.urlRequest(urlSuffix), headers=headers)

user_tasks = r.json()["entity"]["user_tasks"]
for i in user_tasks :
    if i["metadata"]["workflow_id"] == workflow_id :
        task_id = i["metadata"]["task_id"]

urlSuffix='/v3/workflow_user_tasks/' + task_id + '/actions'
headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
payload = {'action': 'complete', 'form_properties': [{'id': 'action', 'value': '#publish'}]}
r = requests.post(myconn.urlRequest(urlSuffix), headers=headers, json=payload)

if r.status_code == 202 or r.status_code == 204 :
    print("Publish Successful, Code = ", r.status_code)
else :
    print("Error in publishing artifacts, Code = ", r.status_code)
    print(r.text)

{"metadata":{"name":"{\"id\":\"wkc-governance-workflows.default.workflowName.IMPORT.many\", \"defaultMessage\":\"{draft_mode} {artifactType} {artifactName}\", \"§artifactType\":\"wkc-governance-workflows.default.artifactType.classification\", \"§artifactTypePlural\":\"wkc-governance-workflows.default.artifactType.classification.plural\", \"artifactName\":\"Confidential\",\"§draft_mode\":\"wkc-governance-workflows.default.triggerProperty.event.IMPORT\"}","state":"running","artifact_type":"workflow","workflow_id":"c09982fd-fc90-11ee-9527-0a580a820487","configuration_id":"d6d1d2c9-0a54-4a45-a6ba-ba3daad330ee","created_by":"1000330999","created_at":"2024-04-17T08:01:53.184Z"},"entity":{"artifacts":[{"metadata":{"name":"Confidential","artifact_type":"classification","artifact_id":"d462e63f-b0ff-49f9-887c-6b6cad752404","version_id":"a6ea9ccf-2a22-4d9d-9349-16cb2030c951","published_ancestor_id":"f5042f76-7a74-4d3d-9a5c-1703c7ea89fa_0","primary_category":{"id":"e39ada11-8338-3704-90e3-681a71e7

### 3. Create Data Classes

#### 3.a. Add new Data Classes

In [None]:
print("---- Create Data Classes from CSV----")

IMPORT_CSV_FILE = "governance-data-classes.csv" 
urlSuffix='/v3/governance_artifact_types/data_class/import?merge_option=all'
headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
files = {'file': (IMPORT_CSV_FILE, open(IMPORT_CSV_FILE, 'rb'), 'text/csv')}

r = requests.post(myconn.urlRequest(urlSuffix), headers=headers, files=files)

if r.status_code == 200 :
    status = r.json()["status"]
    print("Import finished. Status = ", status)
    print(r.text)

elif r.status_code == 202 :
    process_id = r.json()["process_id"]
    print(f"----- Import process started: {process_id} ----- ")
    print("----- Entering wait loop ------")
    urlSuffix='/v3/governance_artifact_types/import/status/' + process_id
    headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}

    while True :
        r = requests.get(myconn.urlRequest(urlSuffix), headers=headers)
        if r.status_code != 200 :
            print("Error with the request. Code: ", r.status_code)
            print(r.text)
            break
        status = r.json()["status"]
        if status != "IN_PROGRESS" :
            break
        else :
            print ("Import in progess, please wait")
            time.sleep(5)
else :
    print("Error with the request. Code: ", r.status_code)
    print(r.text)

workflow_id = r.json()["workflow_id"]

#### 3.b. Publish the changes

In [None]:
urlSuffix='/v3/workflows/' + workflow_id + '?include_user_tasks=true'
headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
r = requests.get(myconn.urlRequest(urlSuffix), headers=headers)

user_tasks = r.json()["entity"]["user_tasks"]
for i in user_tasks :
    if i["metadata"]["workflow_id"] == workflow_id :
        task_id = i["metadata"]["task_id"]

urlSuffix='/v3/workflow_user_tasks/' + task_id + '/actions'
headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
payload = {'action': 'complete', 'form_properties': [{'id': 'action', 'value': '#publish'}]}
r = requests.post(myconn.urlRequest(urlSuffix), headers=headers, json=payload)

if r.status_code == 202 or r.status_code == 204 :
    print("Publish Successful, Code = ", r.status_code)
else :
    print("Error in publishing artifacts, Code = ", r.status_code)
    print(r.text)


### 4. Create Business Terms

#### 4.a. Add new Business Terms

In [None]:
print("---- Create Business Terms from CSV----")

IMPORT_CSV_FILE = "governance-business-terms.csv"
urlSuffix='/v3/governance_artifact_types/glossary_term/import?merge_option=all'
headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
files = {'file': (IMPORT_CSV_FILE, open(IMPORT_CSV_FILE, 'rb'), 'text/csv')}

r = requests.post(myconn.urlRequest(urlSuffix), headers=headers, files=files)

if r.status_code == 200 :
    status = r.json()["status"]
    print("Import finished. Status = ", status)

elif r.status_code == 202 :
    process_id = r.json()["process_id"]
    print(f"----- Import process started: {process_id} ----- ")
    print("----- Entering wait loop ------")
    urlSuffix='/v3/governance_artifact_types/import/status/' + process_id
    headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}

    while True :
        r = requests.get(myconn.urlRequest(urlSuffix), headers=headers)
        if r.status_code != 200 :
            print("Error with the request. Code: ", r.status_code)
            print(r.text)
            break
        status = r.json()["status"]
        if status != "IN_PROGRESS" :
            break
        else :
            print ("Import in progess, please wait")
            time.sleep(5)
else :
    print("Error with the request. Code: ", r.status_code)
    print(r.text)

workflow_id = r.json()["workflow_id"]

#### 4.b. Publish the changes


In [None]:
urlSuffix='/v3/workflows/' + workflow_id + '?include_user_tasks=true'
headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
r = requests.get(myconn.urlRequest(urlSuffix), headers=headers)

user_tasks = r.json()["entity"]["user_tasks"]
for i in user_tasks :
    if i["metadata"]["workflow_id"] == workflow_id :
        task_id = i["metadata"]["task_id"]

urlSuffix='/v3/workflow_user_tasks/' + task_id + '/actions'
headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
payload = {'action': 'complete', 'form_properties': [{'id': 'action', 'value': '#publish'}]}
r = requests.post(myconn.urlRequest(urlSuffix), headers=headers, json=payload)

if r.status_code == 202 or r.status_code == 204 :
    print("Publish Successful, Code = ", r.status_code)
else :
    print("Error in publishing artifacts, Code = ", r.status_code)
    print(r.text)


### 5. Create Reference Data

#### 5.a Add new Reference Data

In [1]:
print("---- Create Reference Data from CSV----")

IMPORT_CSV_FILE = "governance-reference-data.csv"
urlSuffix='/v3/governance_artifact_types/reference_data/import?merge_option=all'
headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
files = {'file': (IMPORT_CSV_FILE, open(IMPORT_CSV_FILE, 'rb'), 'text/csv')}

r = requests.post(myconn.urlRequest(urlSuffix), headers=headers, files=files)

if r.status_code == 200 :
    status = r.json()["status"]
    print("Import finished. Status = ", status)

elif r.status_code == 202 :
    process_id = r.json()["process_id"]
    print(f"----- Import process started: {process_id} ----- ")
    print("----- Entering wait loop ------")
    urlSuffix='/v3/governance_artifact_types/import/status/' + process_id
    headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}

    while True :
        r = requests.get(myconn.urlRequest(urlSuffix), headers=headers)
        if r.status_code != 200 :
            print("Error with the request. Code: ", r.status_code)
            print(r.text)
            break
        status = r.json()["status"]
        if status != "IN_PROGRESS" :
            break
        else :
            print ("Import in progess, please wait")
            time.sleep(5)
else :
    print("Error with the request. Code: ", r.status_code)
    print(r.text)

workflow_id = r.json()["workflow_id"]

---- Create Reference Data from CSV----


NameError: name 'access_token' is not defined

#### 5.b Publish the changes


In [None]:
urlSuffix='/v3/workflows/' + workflow_id + '?include_user_tasks=true'
headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
r = requests.get(myconn.urlRequest(urlSuffix), headers=headers)

user_tasks = r.json()["entity"]["user_tasks"]
for i in user_tasks :
    if i["metadata"]["workflow_id"] == workflow_id :
        task_id = i["metadata"]["task_id"]

urlSuffix='/v3/workflow_user_tasks/' + task_id + '/actions'
headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
payload = {'action': 'complete', 'form_properties': [{'id': 'action', 'value': '#publish'}]}
r = requests.post(myconn.urlRequest(urlSuffix), headers=headers, json=payload)

if r.status_code == 202 or r.status_code == 204 :
    print("Publish Successful, Code = ", r.status_code)
else :
    print("Error in publishing artifacts, Code = ", r.status_code)
    print(r.text)

### 6. Load Department Lookup Data


#### 6.a Add the lookup data

In [None]:
print("---- Load Department Lookup Data from CSV----")

IMPORT_CSV_FILE = "governance-reference-department.csv"

artifact_id = None
version_id = None
urlSuffix='/v3/governance_artifact_types/reference_data?workflow_status=published&limit=200'
headers = {"accept": "application/json" ,"Authorization" : "Bearer " + access_token}

r = requests.get(myconn.urlRequest(urlSuffix), headers=headers)

if r.status_code == 200 :
    for i in r.json()["resources"] :
        if i["name"] == "Department Lookup" :
            artifact_id = i["artifact_id"]
            version_id = i["version_id"]
            print("artifact_id = ", artifact_id, " version_id = " , version_id)
            break
else :
    print("Error in retrieving reference data artifacts, Code = ", r.status_code)
    print(r.text)

if artifact_id is None or version_id is None:
    print("Department Lookup not found")
else :    
    urlSuffix='/v4/reference_data_sets/' + artifact_id + '/versions/' + version_id + '/value_imports'
    headers = {"Authorization" : "Bearer " + access_token }
    import_parameters = {
        "artifact_id_mode": False,
        "code": "DEPARTMENT_CODE",
        "first_row_header": True,
        "import_relationships_only": False,
        "skip_workflow_if_possible": False, 
        "trim_white_spaces": True,
        "value": "DEPARTMENT_EN",
        "value_conflicts": "OVERWRITE" 
    }
    files={
        'import_csv_file'   : ('import_csv_file', open(IMPORT_CSV_FILE,'rb') ),
        'import_parameters' : (None, str(import_parameters))   
    }
    
    r = requests.post(myconn.urlRequest(urlSuffix), headers=headers, files=files)

    if r.status_code == 202 :
        import_id = r.json()["import_info"]["import_id"]
        print(f"----- Import process started: {import_id} ----- ")
        print("----- Entering wait loop ------")
        urlSuffix='/v4/reference_data_sets/' + artifact_id + '/versions/' + version_id + '/value_imports/' + import_id
        headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}

        workflow_id = r.json()["workflow_id"]

        while True :
            r = requests.get(myconn.urlRequest(urlSuffix), headers=headers)
            if r.status_code != 200 :
                print("Error with the request. Code: ", r.status_code)
                print(r.text)
                break
            status = r.json()["import_info"]["import_state"]
            if status != "IN_PROGRESS" :
                break
            else :
                print ("Import in progess, please wait")
                time.sleep(5)
        print("Import finished. Status = ", r.status_code)
    else :
        print("Error with the request. Code: ", r.status_code)
        print(r.text)

#### 6.b. Publish the changes

In [8]:
urlSuffix='/v3/workflows/' + workflow_id + '?include_user_tasks=true'
headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
r = requests.get(myconn.urlRequest(urlSuffix), headers=headers)

user_tasks = r.json()["entity"]["user_tasks"]
for i in user_tasks :
    if i["metadata"]["workflow_id"] == workflow_id :
        task_id = i["metadata"]["task_id"]

urlSuffix='/v3/workflow_user_tasks/' + task_id + '/actions'
headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
payload = {'action': 'complete', 'form_properties': [{'id': 'action', 'value': '#publish'}]}
r = requests.post(myconn.urlRequest(urlSuffix), headers=headers, json=payload)

if r.status_code == 202 or r.status_code == 204 :
    print("Publish Successful, Code = ", r.status_code)
else :
    print("Error in publishing artifacts, Code = ", r.status_code)
    print(r.text)

Publish Successful, Code =  202


### 7. Load Position Lookup Data

#### 7.a. Add the new data

In [9]:

print("---- Load Position Lookup Data from CSV----")

IMPORT_CSV_FILE = "governance-reference-position.csv"

artifact_id = None
version_id = None
urlSuffix='/v3/governance_artifact_types/reference_data?workflow_status=published&limit=200'
headers = {"accept": "application/json" ,"Authorization" : "Bearer " + access_token}

r = requests.get(myconn.urlRequest(urlSuffix), headers=headers)

if r.status_code == 200 :
    for i in r.json()["resources"] :
        if i["name"] == "Position Lookup" :
            artifact_id = i["artifact_id"]
            version_id = i["version_id"]
            print("artifact_id = ", artifact_id, " version_id = " , version_id)
            break
else :
    print("Error in retrieving reference data artifacts, Code = ", r.status_code)
    print(r.text)

if artifact_id is None or version_id is None:
    print("Position Lookup not found")
else :    
    urlSuffix='/v4/reference_data_sets/' + artifact_id + '/versions/' + version_id + '/value_imports'
    headers = {"Authorization" : "Bearer " + access_token }
    import_parameters = {
        "artifact_id_mode": False,
        "code": "POSITION_CODE",
        "first_row_header": True,
        "import_relationships_only": False,
        "skip_workflow_if_possible": False, 
        "trim_white_spaces": True,
        "value": "POSITION_EN",
        "value_conflicts": "OVERWRITE" 
    }
    files={
        'import_csv_file'   : ('import_csv_file', open(IMPORT_CSV_FILE,'rb') ),
        'import_parameters' : (None, str(import_parameters))   
    }
    
    r = requests.post(myconn.urlRequest(urlSuffix), headers=headers, files=files)

    if r.status_code == 202 :
        import_id = r.json()["import_info"]["import_id"]
        print(f"----- Import process started: {import_id} ----- ")
        print("----- Entering wait loop ------")
        urlSuffix='/v4/reference_data_sets/' + artifact_id + '/versions/' + version_id + '/value_imports/' + import_id
        headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}

        workflow_id = r.json()["workflow_id"]

        while True :
            r = requests.get(myconn.urlRequest(urlSuffix), headers=headers)
            if r.status_code != 200 :
                print("Error with the request. Code: ", r.status_code)
                print(r.text)
                break
            status = r.json()["import_info"]["import_state"]
            if status != "IN_PROGRESS" :
                break
            else :
                print ("Import in progess, please wait")
                time.sleep(5)
        print("Import finished. Status = ", r.status_code)
    else :
        print("Error with the request. Code: ", r.status_code)
        print(r.text)

---- Load Position Lookup Data from CSV----
artifact_id =  d5ee35ab-7084-4dd1-861f-b2724ae16eb9  version_id =  5e9dc25d-6719-4057-a4ac-f2c87549b101_0
{
  "import_info": {
    "href": "/v4/reference_data_sets/d5ee35ab-7084-4dd1-861f-b2724ae16eb9/versions/5e9dc25d-6719-4057-a4ac-f2c87549b101_0/value_imports/ab3efcfe-71a0-4897-be4a-7faa2b14012f",
    "import_id": "ab3efcfe-71a0-4897-be4a-7faa2b14012f",
    "import_state": "IN_PROGRESS",
    "import_message": "Import in progress",
    "values_count": 45,
    "values_processed": 45,
    "values_skipped": 0,
    "values_inserted": 0,
    "rels_values_processed": 0,
    "rels_processed": 0,
    "rels_inserted": 0,
    "rels_deleted": 0,
    "rels_skipped": 0,
    "started_by": "1000330999",
    "start_time": "2024-04-17T13:45:07.808Z",
  },
  "href": "/v4/reference_data_sets/d5ee35ab-7084-4dd1-861f-b2724ae16eb9/versions/5e9dc25d-6719-4057-a4ac-f2c87549b101_0",
  "artifact_id": "d5ee35ab-7084-4dd1-861f-b2724ae16eb9",
  "version_id": "eee7e13e-

#### 7.b. Publish the draft

In [10]:
urlSuffix='/v3/workflows/' + workflow_id + '?include_user_tasks=true'
headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
r = requests.get(myconn.urlRequest(urlSuffix), headers=headers)

user_tasks = r.json()["entity"]["user_tasks"]
for i in user_tasks :
    if i["metadata"]["workflow_id"] == workflow_id :
        task_id = i["metadata"]["task_id"]

urlSuffix='/v3/workflow_user_tasks/' + task_id + '/actions'
headers = {"accept": "application/json", "Authorization" : "Bearer " + access_token}
payload = {'action': 'complete', 'form_properties': [{'id': 'action', 'value': '#publish'}]}
r = requests.post(myconn.urlRequest(urlSuffix), headers=headers, json=payload)

if r.status_code == 202 or r.status_code == 204 :
    print("Publish Successful, Code = ", r.status_code)
else :
    print("Error in publishing artifacts, Code = ", r.status_code)
    print(r.text)

Publish Successful, Code =  202
