Skip to content

Commit

Permalink
feat(sdk): support patches as MCPs in file source (#8220)
Browse files Browse the repository at this point in the history
Co-authored-by: Shirshanka Das <shirshanka@apache.org>
  • Loading branch information
hsheth2 and shirshanka committed Jun 14, 2023
1 parent 9254a1b commit 2d7692a
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 3 deletions.
5 changes: 5 additions & 0 deletions metadata-ingestion/src/datahub/emitter/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ def try_from_mcpc(
Raises:
Exception if the generic aspect is invalid, e.g. contains invalid json.
"""

if mcpc.changeType != ChangeTypeClass.UPSERT:
# We can only generate MCPWs for upserts.
return None

converted, aspect = _try_from_generic_aspect(mcpc.aspectName, mcpc.aspect)
if converted:
return cls(
Expand Down
42 changes: 42 additions & 0 deletions metadata-ingestion/tests/unit/patch/complex_dataset_patch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "datasetProperties",
"aspect": {
"value": "[{\"op\": \"replace\", \"path\": \"/description\", \"value\": \"test description\"}, {\"op\": \"add\", \"path\": \"/customProperties/test_key_1\", \"value\": \"test_value_1\"}, {\"op\": \"add\", \"path\": \"/customProperties/test_key_2\", \"value\": \"test_value_2\"}]",
"contentType": "application/json-patch+json"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "globalTags",
"aspect": {
"value": "[{\"op\": \"add\", \"path\": \"/tags/urn:li:tag:test_tag\", \"value\": {\"tag\": \"urn:li:tag:test_tag\"}}]",
"contentType": "application/json-patch+json"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "upstreamLineage",
"aspect": {
"value": "[{\"op\": \"add\", \"path\": \"/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29\", \"value\": {\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)\", \"type\": \"TRANSFORMED\"}}]",
"contentType": "application/json-patch+json"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "editableSchemaMetadata",
"aspect": {
"value": "[{\"op\": \"add\", \"path\": \"/editableSchemaFieldInfo/field1/globalTags/tags/urn:li:tag:tag1\", \"value\": {\"tag\": \"urn:li:tag:tag1\"}}]",
"contentType": "application/json-patch+json"
}
}
]
66 changes: 66 additions & 0 deletions metadata-ingestion/tests/unit/patch/test_patch_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import json
import pathlib

import pytest

from datahub.emitter.mce_builder import make_dataset_urn, make_tag_urn
from datahub.ingestion.sink.file import write_metadata_file
from datahub.metadata.schema_classes import (
DatasetLineageTypeClass,
GenericAspectClass,
MetadataChangeProposalClass,
TagAssociationClass,
UpstreamClass,
)
from datahub.specific.dataset import DatasetPatchBuilder


def test_basic_dataset_patch_builder():
patcher = DatasetPatchBuilder(
make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
).add_tag(TagAssociationClass(tag=make_tag_urn("test_tag")))

assert patcher.build() == [
MetadataChangeProposalClass(
entityType="dataset",
entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
changeType="PATCH",
aspectName="globalTags",
aspect=GenericAspectClass(
value=b'[{"op": "add", "path": "/tags/urn:li:tag:test_tag", "value": {"tag": "urn:li:tag:test_tag"}}]',
contentType="application/json-patch+json",
),
),
]


def test_complex_dataset_patch(
pytestconfig: pytest.Config, tmp_path: pathlib.Path
) -> None:
patcher = (
DatasetPatchBuilder(
make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
)
.set_description("test description")
.add_custom_property("test_key_1", "test_value_1")
.add_custom_property("test_key_2", "test_value_2")
.add_tag(TagAssociationClass(tag=make_tag_urn("test_tag")))
.add_upstream_lineage(
upstream=UpstreamClass(
dataset=make_dataset_urn(
platform="hive", name="fct_users_created_upstream", env="PROD"
),
type=DatasetLineageTypeClass.TRANSFORMED,
)
)
)
patcher.for_field("field1").add_tag(TagAssociationClass(tag=make_tag_urn("tag1")))

out_path = tmp_path / "patch.json"
write_metadata_file(out_path, patcher.build())

assert json.loads(out_path.read_text()) == json.loads(
(
pytestconfig.rootpath / "tests/unit/patch/complex_dataset_patch.json"
).read_text()
)
7 changes: 4 additions & 3 deletions metadata-ingestion/tests/unit/serde/test_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@
"tests/unit/serde/test_serde_usage.json",
# Profiles with the MetadataChangeProposal format.
"tests/unit/serde/test_serde_profile.json",
# Test one that uses patch.
"tests/unit/serde/test_serde_patch.json",
],
)
def test_serde_to_json(
pytestconfig: PytestConfig, tmp_path: pathlib.Path, json_filename: str
) -> None:
golden_file = pytestconfig.rootpath / json_filename
output_file = tmp_path / "output.json"

output_filename = "output.json"
output_file = tmp_path / output_filename
pipeline = Pipeline.create(
{
"source": {"type": "file", "config": {"filename": str(golden_file)}},
Expand All @@ -57,7 +58,7 @@ def test_serde_to_json(

mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/{output_filename}",
output_path=f"{output_file}",
golden_path=golden_file,
)

Expand Down
58 changes: 58 additions & 0 deletions metadata-ingestion/tests/unit/serde/test_serde_patch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "datasetProperties",
"aspect": {
"value": "[{\"op\": \"replace\", \"path\": \"/description\", \"value\": \"test description\"}, {\"op\": \"add\", \"path\": \"/customProperties/test_key_1\", \"value\": \"test_value_1\"}, {\"op\": \"add\", \"path\": \"/customProperties/test_key_2\", \"value\": \"test_value_2\"}]",
"contentType": "application/json-patch+json"
},
"systemMetadata": {
"lastObserved": 1626980046000,
"runId": "serde_test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "globalTags",
"aspect": {
"value": "[{\"op\": \"add\", \"path\": \"/tags/urn:li:tag:test_tag\", \"value\": {\"tag\": \"urn:li:tag:test_tag\"}}]",
"contentType": "application/json-patch+json"
},
"systemMetadata": {
"lastObserved": 1626980046000,
"runId": "serde_test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "upstreamLineage",
"aspect": {
"value": "[{\"op\": \"add\", \"path\": \"/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29\", \"value\": {\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)\", \"type\": \"TRANSFORMED\"}}]",
"contentType": "application/json-patch+json"
},
"systemMetadata": {
"lastObserved": 1626980046000,
"runId": "serde_test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "editableSchemaMetadata",
"aspect": {
"value": "[{\"op\": \"add\", \"path\": \"/editableSchemaFieldInfo/field1/globalTags/tags/urn:li:tag:tag1\", \"value\": {\"tag\": \"urn:li:tag:tag1\"}}]",
"contentType": "application/json-patch+json"
},
"systemMetadata": {
"lastObserved": 1626980046000,
"runId": "serde_test"
}
}
]

0 comments on commit 2d7692a

Please sign in to comment.