Skip to content

Commit

Permalink
feat(ingest): add a transformer for adding ownership (#2532)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed May 12, 2021
1 parent 95782b1 commit 2811d23
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 7 deletions.
23 changes: 21 additions & 2 deletions metadata-ingestion/README.md
Expand Up @@ -586,8 +586,27 @@ transformers:
some_property: "some.value"
```

A transformer class needs to inherit from [`Transformer`](./src/datahub/ingestion/api/transform.py)
At the moment there are no built-in transformers.
A transformer class needs to inherit from [`Transformer`](./src/datahub/ingestion/api/transform.py).

### `simple_add_dataset_ownership`

Adds a set of owners to every dataset.

```yml
transformers:
- type: "simple_add_dataset_ownership"
config:
owner_urns:
- "urn:li:corpuser:username1"
- "urn:li:corpuser:username2"
- "urn:li:corpGroup:groupname"
```

:::tip

If you'd like to add more complex logic for assigning ownership, you can use the more generic [`AddDatasetOwnership` transformer](./src/datahub/ingestion/transformer/add_dataset_ownership.py), which calls a user-provided function to determine the ownership of each dataset.

:::

## Using as a library

Expand Down
35 changes: 30 additions & 5 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
@@ -1,7 +1,7 @@
"""Convenience functions for creating MCEs"""

import time
from typing import List, Union
from typing import List, Optional, Type, TypeVar

from datahub.metadata.schema_classes import (
AuditStampClass,
Expand All @@ -15,6 +15,12 @@
DEFAULT_ENV = "PROD"
DEFAULT_FLOW_CLUSTER = "prod"

T = TypeVar("T")


def get_sys_time() -> int:
return int(time.time() * 1000)


def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str:
return f"urn:li:dataset:(urn:li:dataPlatform:{platform},{name},{env})"
Expand Down Expand Up @@ -47,14 +53,12 @@ def make_data_job_urn(


def make_lineage_mce(
upstream_urns: Union[str, List[str]],
upstream_urns: List[str],
downstream_urn: str,
actor: str = make_user_urn("datahub"),
lineage_type: str = DatasetLineageTypeClass.TRANSFORMED,
) -> MetadataChangeEventClass:
sys_time = int(time.time() * 1000)
if not isinstance(upstream_urns, list):
upstream_urns = [upstream_urns]
sys_time = get_sys_time()

mce = MetadataChangeEventClass(
proposedSnapshot=DatasetSnapshotClass(
Expand All @@ -77,3 +81,24 @@ def make_lineage_mce(
)
)
return mce


def get_aspect_if_available(
mce: MetadataChangeEventClass, type: Type[T]
) -> Optional[T]:
all_aspects = mce.proposedSnapshot.aspects
aspects: List[T] = [aspect for aspect in all_aspects if isinstance(aspect, type)]

if len(aspects) > 1:
raise ValueError(f"MCE contains multiple aspects of type {type}: {aspects}")
if aspects:
return aspects[0]
return None


def get_or_add_aspect(mce: MetadataChangeEventClass, default: T) -> T:
existing = get_aspect_if_available(mce, type(default))
if existing is not None:
return existing
mce.proposedSnapshot.aspects.append(default)
return default
@@ -0,0 +1,96 @@
from typing import Callable, Iterable, List, Union

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.transform import Transformer
from datahub.metadata.schema_classes import (
AuditStampClass,
DatasetSnapshotClass,
MetadataChangeEventClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
)


class AddDatasetOwnershipConfig(ConfigModel):
# Workaround for https://github.com/python/mypy/issues/708.
# Suggested by https://stackoverflow.com/a/64528725/5004662.
get_owners_to_add: Union[
Callable[[DatasetSnapshotClass], List[OwnerClass]],
Callable[[DatasetSnapshotClass], List[OwnerClass]],
]
default_actor: str = builder.make_user_urn("etl")


class AddDatasetOwnership(Transformer):
"""Transformer that adds owners to datasets according to a callback function."""

ctx: PipelineContext
config: AddDatasetOwnershipConfig

def __init__(self, config: AddDatasetOwnershipConfig, ctx: PipelineContext):
self.ctx = ctx
self.config = config

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetOwnership":
config = AddDatasetOwnershipConfig.parse_obj(config_dict)
return cls(config, ctx)

def transform(
self, record_envelopes: Iterable[RecordEnvelope]
) -> Iterable[RecordEnvelope]:
for envelope in record_envelopes:
if isinstance(envelope.record, MetadataChangeEventClass):
envelope.record = self.transform_one(envelope.record)
yield envelope

def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass:
if not isinstance(mce.proposedSnapshot, DatasetSnapshotClass):
return mce

owners_to_add = self.config.get_owners_to_add(mce.proposedSnapshot)
if owners_to_add:
ownership = builder.get_or_add_aspect(
mce,
OwnershipClass(
owners=[],
lastModified=AuditStampClass(
time=builder.get_sys_time(),
actor=self.config.default_actor,
),
),
)
ownership.owners.extend(owners_to_add)

return mce


class SimpleDatasetOwnershipConfig(ConfigModel):
owner_urns: List[str]
default_actor: str = builder.make_user_urn("etl")


class SimpleAddDatasetOwnership(AddDatasetOwnership):
"""Transformer that adds a specified set of owners to each dataset."""

def __init__(self, config: SimpleDatasetOwnershipConfig, ctx: PipelineContext):
owners = [
OwnerClass(owner=owner, type=OwnershipTypeClass.DATAOWNER)
for owner in config.owner_urns
]

generic_config = AddDatasetOwnershipConfig(
get_owners_to_add=lambda _: owners,
default_actor=config.default_actor,
)
super().__init__(generic_config, ctx)

@classmethod
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "SimpleAddDatasetOwnership":
config = SimpleDatasetOwnershipConfig.parse_obj(config_dict)
return cls(config, ctx)
@@ -1,4 +1,9 @@
from datahub.ingestion.api.registry import Registry
from datahub.ingestion.api.transform import Transformer

from .add_dataset_ownership import AddDatasetOwnership, SimpleAddDatasetOwnership

transform_registry = Registry[Transformer]()

transform_registry.register("add_dataset_ownership", AddDatasetOwnership)
transform_registry.register("simple_add_dataset_ownership", SimpleAddDatasetOwnership)
87 changes: 87 additions & 0 deletions metadata-ingestion/tests/unit/test_transform_dataset_ownership.py
@@ -0,0 +1,87 @@
import datahub.emitter.mce_builder as builder
import datahub.metadata.schema_classes as models
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.transformer.add_dataset_ownership import (
SimpleAddDatasetOwnership,
)


def test_simple_dataset_ownership_tranformation(mock_time):
no_owner_aspect = models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspects=[
models.StatusClass(removed=False),
],
),
)
with_owner_aspect = models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)",
aspects=[
models.OwnershipClass(
owners=[
models.OwnerClass(
owner=builder.make_user_urn("fake_owner"),
type=models.OwnershipTypeClass.DATAOWNER,
),
],
lastModified=models.AuditStampClass(
time=builder.get_sys_time(), actor="urn:li:corpuser:datahub"
),
)
],
),
)

not_a_dataset = models.MetadataChangeEventClass(
proposedSnapshot=models.DataJobSnapshotClass(
urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)",
aspects=[
models.DataJobInfoClass(
name="User Deletions",
description="Constructs the fct_users_deleted from logging_events",
type=models.AzkabanJobTypeClass.SQL,
)
],
)
)

inputs = [
no_owner_aspect,
with_owner_aspect,
not_a_dataset,
]

transformer = SimpleAddDatasetOwnership.create(
{
"owner_urns": [
builder.make_user_urn("person1"),
builder.make_user_urn("person2"),
]
},
PipelineContext(run_id="test"),
)

outputs = list(
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)

assert len(outputs) == len(inputs)

# Check the first entry.
first_ownership_aspect = builder.get_aspect_if_available(
outputs[0].record, models.OwnershipClass
)
assert first_ownership_aspect
assert len(first_ownership_aspect.owners) == 2

# Check the second entry.
second_ownership_aspect = builder.get_aspect_if_available(
outputs[1].record, models.OwnershipClass
)
assert second_ownership_aspect
assert len(second_ownership_aspect.owners) == 3

# Verify that the third entry is unchanged.
assert inputs[2] == outputs[2].record

0 comments on commit 2811d23

Please sign in to comment.