# Notebook to get a list of tables from datahub and connect them to the mapped term via the shacl ontology

requires: rdflib, datahub client

In [1]:
from unittest import result
from rdflib import Graph
import logging
import pandas as pd

from datahub.emitter.mce_builder import get_sys_time, make_dataset_urn, make_term_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
import datahub.metadata.schema_classes as models

# Imports for metadata model classes
from datahub.metadata.schema_classes import (
    AuditStampClass,
    ChangeTypeClass,
    GlossaryTermAssociationClass,
    GlossaryTermsClass,
)

In [2]:
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO) 

gms_endpoint = "http://localhost:8080"
graph = DataHubGraph(DatahubClientConfig(server=gms_endpoint))
rest_emitter = DatahubRestEmitter(gms_server=gms_endpoint)


In [3]:
datasets = graph.list_all_entity_urns(entity_type='dataset', start=0, count=1000
)

filtered = filter(lambda schema: '.public' in schema, datasets)
datasets = list(filtered)

The sparql query is traversing the graph constructed from the 3 different turtle files. You can also query a sparql endpoint if available.

In [4]:
g = Graph()
g.parse("../ontologies/areaaldata.ttl")
g.parse("../ontologies/areaaldata_begrippen.ttl")
g.parse("../ontologies/areaaldata_av.shapes.ttl")

query = """
SELECT ?shape ?target ?concept
WHERE {
    ?shape a sh:NodeShape ;
        sh:target ?target ;
        sh:targetClass ?targetClass .
    ?targetClass dcterms:subject ?concept .
    }
"""

result = g.query(query)

cls_terms = {}
for row in result:
    cls_terms[str(row[1].lower())] = str(row[2])
    
#cls_terms

In [5]:
def add_term(cls, term):
    dataset_urn = make_dataset_urn(platform="postgres", name="postgres.public." + cls, env="PROD")

    term_to_add = make_term_urn("Areaaldata begrippenkader." + term)
    term_association_to_add = GlossaryTermAssociationClass(urn=term_to_add)
    unknown_audit_stamp = AuditStampClass(time=get_sys_time(), actor="urn:li:corpuser:ingestion")

    # create a brand new terms aspect
    terms_aspect = GlossaryTermsClass(
        terms=[term_association_to_add],
        auditStamp=unknown_audit_stamp,
    )

    event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
        entityType="dataset",
        changeType=ChangeTypeClass.UPSERT,
        entityUrn=dataset_urn,
        aspectName="glossaryTerms",
        aspect=terms_aspect,
        systemMetadata=models.SystemMetadataClass(runId="map-datasets-terms")
    )
    return event, term_to_add, dataset_urn

loop through the selected datasets and look for a match in the ontology. if found map the table to term and use the python rest emitter to update datahub.

In [6]:
for ds in datasets:
        sub = ds.split(',')[1].split('.')[-1]
        if sub in cls_terms:
            concept = cls_terms[sub]
            event, term_to_add, dataset_urn = add_term(sub, concept.split('/')[-1])
        
            rest_emitter.emit(event)
            log.info(f"Attached term {term_to_add} to dataset {dataset_urn}")

INFO:__main__:Attached term urn:li:glossaryTerm:Areaaldata begrippenkader.bak to dataset urn:li:dataset:(urn:li:dataPlatform:postgres,postgres.public.av_bak_p,PROD)
INFO:__main__:Attached term urn:li:glossaryTerm:Areaaldata begrippenkader.begroeidTerreindeel to dataset urn:li:dataset:(urn:li:dataPlatform:postgres,postgres.public.av_begroeidterreindeel_v,PROD)
INFO:__main__:Attached term urn:li:glossaryTerm:Areaaldata begrippenkader.begroeidTerreindeelBerm to dataset urn:li:dataset:(urn:li:dataPlatform:postgres,postgres.public.av_begroeidterreindeelberm_v,PROD)
INFO:__main__:Attached term urn:li:glossaryTerm:Areaaldata begrippenkader.begroeidTerreindeelBermKr to dataset urn:li:dataset:(urn:li:dataPlatform:postgres,postgres.public.av_begroeidterreindeelbermkr_l,PROD)
INFO:__main__:Attached term urn:li:glossaryTerm:Areaaldata begrippenkader.begroeidTerreindeelKruin to dataset urn:li:dataset:(urn:li:dataPlatform:postgres,postgres.public.av_begroeidterreindeelkruin_l,PROD)
INFO:__main__:Att

In [8]:
mapping = []
for ds in datasets:
        sub = ds.split(',')[1].split('.')[-1]
        if sub in cls_terms:
            concept = cls_terms[sub]
            event, term_to_add, dataset_urn = add_term(sub, concept.split('/')[-1])
            mapping.append(event)

mapping_df = pd.DataFrame(mapping)

# mapping_df.head()
mapping_df.to_csv('mappingfile.csv')
