In [1]:
import os
from dotenv import load_dotenv
from typing import List
from datahub.emitter.mcp import MetadataChangeProposalWrapper
import datahub.metadata.schema_classes as models
import datahub.emitter.mce_builder as builder
from datahub.emitter.rest_emitter import DatahubRestEmitter
load_dotenv()

# replace .env_example with a file called .env and add your own environment variables into it.
datashub_gms_server = os.getenv('DATAHUB_GMS_SERVER', '')
datahub_token = os.getenv('DATAHUB_TOKEN', '')
datahub_actor = os.getenv('DATAHUB_ACTOR', 'urn:li:corpuser:admin')

# start by putting things into DEV to keep PROD clean until you know what you are doing
datahub_env = 'DEV'

### Create emitter

In [25]:
# create an emitter
emitter = DatahubRestEmitter(
    gms_server=datashub_gms_server, 
    token=datahub_token, 
    extra_headers={'X-DataHub-Actor': datahub_actor}
)

### Create a tag

In [26]:
# create a tag
tag_name = 'healthy'
tag_description = 'Healthy'
mcpw = MetadataChangeProposalWrapper(
    "tag",
    models.ChangeTypeClass.UPSERT,
    entityUrn=builder.make_tag_urn(tag_name),
    aspectName="tagProperties",
    aspect=models.TagPropertiesClass(tag_name, tag_description)
)
emitter.emit_mcp(mcp=mcpw)

### Create a user

In [31]:
# create a user
user_name = 'Joe Bloggs'
user_email = 'joe.bloggs@fake.com'
mcpw = MetadataChangeProposalWrapper(
    "corpUser",
    models.ChangeTypeClass.UPSERT,
    entityUrn=builder.make_user_urn(username=user_name),
    aspectName="corpUserInfo",
    aspect=models.CorpUserInfoClass(active=True, displayName=user_name, email=user_email)
)
emitter.emit_mcp(mcp=mcpw)

### Create a dataset

In [42]:
# create a dataset
dataset_platform = 'bigquery'
dataset_name = 'project_a.dataset_a.table_a'
dataset_description = 'my great dataset'
dataset_url = 'https://www.google.ie/'
dataset_urn = builder.make_dataset_urn(platform=dataset_platform, name=dataset_name, env=datahub_env)
mcpw = MetadataChangeProposalWrapper(
    "dataset",
    models.ChangeTypeClass.UPSERT,
    entityUrn=dataset_urn,
    aspectName="datasetProperties",
    aspect=models.DatasetPropertiesClass(description=dataset_description, externalUrl=dataset_url)
)
emitter.emit_mcp(mcp=mcpw)

### Tag a dataset

In [None]:
# tag dataset as 'healthy'
mcpw = MetadataChangeProposalWrapper(
    "dataset",
    models.ChangeTypeClass.UPSERT,
    entityUrn=builder.make_dataset_urn(platform=dataset_platform, name=dataset_name, env=datahub_env),
    aspectName="globalTags",
    aspect=models.GlobalTagsClass(tags=[models.TagAssociationClass(builder.make_tag_urn('healthy'))])
)
emitter.emit_mcp(mcp=mcpw)

### Add owner to a dataset

In [39]:
# set dataset owner as "Joe Blogss"
mcpw = MetadataChangeProposalWrapper(
    "dataset",
    models.ChangeTypeClass.UPSERT,
    entityUrn=builder.make_dataset_urn(platform=dataset_platform, name=dataset_name, env=datahub_env),
    aspectName="ownership",
    aspect=models.OwnershipClass(owners=[models.OwnerClass(builder.make_user_urn(user_name), type='DATAOWNER')])
)
emitter.emit_mcp(mcp=mcpw)

### Add lineage to dataset

In [43]:
# create upstream dataset
dataset_platform_b = 'bigquery'
dataset_name_b = 'project_a.dataset_a.table_b'
dataset_description_b = 'my great dataset b'
dataset_url_b = 'https://www.google.ie/'
dataset_urn_b = builder.make_dataset_urn(platform=dataset_platform_b, name=dataset_name_b, env=datahub_env)
mcpw = MetadataChangeProposalWrapper(
    "dataset",
    models.ChangeTypeClass.UPSERT,
    entityUrn=dataset_urn_b,
    aspectName="datasetProperties",
    aspect=models.DatasetPropertiesClass(description=dataset_description_b, externalUrl=dataset_url_b)
)
emitter.emit_mcp(mcp=mcpw)

# create lineage
upstream_datasets = [dataset_urn_b]
mcpw = MetadataChangeProposalWrapper(
    "dataset",
    models.ChangeTypeClass.UPSERT,
    entityUrn=dataset_urn,
    aspectName="upstreamLineage",
    aspect=models.UpstreamLineageClass(upstreams=[models.UpstreamClass(dataset=d, type=models.DatasetLineageTypeClass.TRANSFORMED) for d in upstream_datasets])
    )
emitter.emit_mcp(mcp=mcpw)

### Add link to a dataset

In [41]:
# add a link to dataset
#mcpw = MetadataChangeProposalWrapper(
#    "dataset",
#    models.ChangeTypeClass.UPSERT,
#    entityUrn=builder.make_dataset_urn(platform=dataset_platform, name=dataset_name, env=datahub_env),
#    aspectName="addLink",
#    aspect=??? TODO
#)
#emitter.emit_mcp(mcp=mcpw)

OperationalError: ('Unable to emit metadata to DataHub GMS', {'exceptionClass': 'com.linkedin.restli.server.RestLiServiceException', 'stackTrace': 'com.linkedin.restli.server.RestLiServiceException [HTTP Status:500]: java.lang.UnsupportedOperationException: Aspect and aspect name is required for create and update operations\n\tat com.linkedin.metadata.restli.RestliUtil.toTask(RestliUtil.java:42)\n\tat com.linkedin.metadata.restli.RestliUtil.toTask(RestliUtil.java:50)\n\tat com.linkedin.metadata.resources.entity.AspectResource.ingestProposal(AspectResource.java:132)\n\tat sun.reflect.GeneratedMethodAccessor55.invoke(Unknown Source)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat com.linkedin.restli.internal.server.RestLiMethodInvoker.doInvoke(RestLiMethodInvoker.java:172)\n\tat com.linkedin.restli.internal.server.RestLiMethodInvoker.invoke(RestLiMethodInvoker.java:326)\n\tat com.linkedin.restli.internal.server.filter.FilterChainDispatcherImpl.onRequestSuccess(FilterChainDispatcherImpl.java:47)\n\tat com.linkedin.restli.internal.server.filter.RestLiFilterChainIterator.onRequest(RestLiFilterChainIterator.java:86)\n\tat com.linkedin.restli.internal.server.filter.RestLiFilterChainIterator.lambda$onRequest$0(RestLiFilterChainIterator.java:73)\n\tat java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)\n\tat java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)\n\tat java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)\n\tat com.linkedin.restli.internal.server.filter.RestLiFilterChainIterator.onRequest(RestLiFilterChainIterator.java:72)\n\tat com.linkedin.restli.internal.server.filter.RestLiFilterChain.onRequest(RestLiFilterChain.java:55)\n\tat com.linkedin.restli.server.BaseRestLiServer.handleResourceRequest(BaseRestLiServer.java:218)\n\tat com.linkedin.restli.server.RestRestLiServer.handleResourceRequestWithRestLiResponse(RestRestLiServer.java:242)\n\tat com.linkedin.restli.server.RestRestLiServer.handleResourceRequest(RestRestLiServer.java:211)\n\tat com.linkedin.restli.server.RestRestLiServer.handleResourceRequest(RestRestLiServer.java:181)\n\tat com.linkedin.restli.server.RestRestLiServer.doHandleRequest(RestRestLiServer.java:164)\n\tat com.linkedin.restli.server.RestRestLiServer.handleRequest(RestRestLiServer.java:120)\n\tat com.linkedin.restli.server.RestLiServer.handleRequest(RestLiServer.java:132)\n\tat com.linkedin.restli.server.DelegatingTransportDispatcher.handleRestRequest(DelegatingTransportDispatcher.java:70)\n\tat com.linkedin.r2.filter.transport.DispatcherRequestFilter.onRestRequest(DispatcherRequestFilter.java:70)\n\tat com.linkedin.r2.filter.TimedRestFilter.onRestRequest(TimedRestFilter.java:72)\n\tat com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:146)\n\tat com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:132)\n\tat com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:62)\n\tat com.linkedin.r2.filter.TimedNextFilter.onRequest(TimedNextFilter.java:55)\n\tat com.linkedin.r2.filter.transport.ServerQueryTunnelFilter.onRestRequest(ServerQueryTunnelFilter.java:58)\n\tat com.linkedin.r2.filter.TimedRestFilter.onRestRequest(TimedRestFilter.java:72)\n\tat com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:146)\n\tat com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:132)\n\tat com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:62)\n\tat com.linkedin.r2.filter.TimedNextFilter.onRequest(TimedNextFilter.java:55)\n\tat com.linkedin.r2.filter.message.rest.RestFilter.onRestRequest(RestFilter.java:50)\n\tat com.linkedin.r2.filter.TimedRestFilter.onRestRequest(TimedRestFilter.java:72)\n\tat com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:146)\n\tat com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:132)\n\tat com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:62)\n\tat com.linkedin.r2.filter.FilterChainImpl.onRestRequest(FilterChainImpl.java:96)\n\tat com.linkedin.r2.filter.transport.FilterChainDispatcher.handleRestRequest(FilterChainDispatcher.java:75)\n\tat com.linkedin.r2.util.finalizer.RequestFinalizerDispatcher.handleRestRequest(RequestFinalizerDispatcher.java:61)\n\tat com.linkedin.r2.transport.http.server.HttpDispatcher.handleRequest(HttpDispatcher.java:101)\n\tat com.linkedin.r2.transport.http.server.AbstractR2Servlet.service(AbstractR2Servlet.java:105)\n\tat javax.servlet.http.HttpServlet.service(HttpServlet.java:790)\n\tat com.linkedin.restli.server.spring.ParallelRestliHttpRequestHandler.handleRequest(ParallelRestliHttpRequestHandler.java:63)\n\tat org.springframework.web.context.support.HttpRequestHandlerServlet.service(HttpRequestHandlerServlet.java:73)\n\tat javax.servlet.http.HttpServlet.service(HttpServlet.java:790)\n\tat org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:852)\n\tat org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:544)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)\n\tat org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:536)\n\tat org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:235)\n\tat org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1581)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)\n\tat org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1307)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)\n\tat org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:482)\n\tat org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1549)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)\n\tat org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1204)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)\n\tat org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:221)\n\tat org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)\n\tat org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)\n\tat org.eclipse.jetty.server.Server.handle(Server.java:494)\n\tat org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:374)\n\tat org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:268)\n\tat org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)\n\tat org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)\n\tat org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)\n\tat org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)\n\tat org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)\n\tat org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)\n\tat org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)\n\tat org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:367)\n\tat org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:782)\n\tat org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:918)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.UnsupportedOperationException: Aspect and aspect name is required for create and update operations\n\tat com.linkedin.metadata.entity.ebean.EbeanEntityService.ingestProposal(EbeanEntityService.java:376)\n\tat com.linkedin.metadata.resources.entity.AspectResource.lambda$ingestProposal$3(AspectResource.java:135)\n\tat com.linkedin.metadata.restli.RestliUtil.toTask(RestliUtil.java:30)\n\t... 81 more\n', 'message': 'java.lang.UnsupportedOperationException: Aspect and aspect name is required for create and update operations', 'status': 500})