Skip to content

Commit

Permalink
feat(ingest): support snapshots in dbt and dbt-cloud (datahub-project…
Browse files Browse the repository at this point in the history
…#7062)

Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
  • Loading branch information
2 people authored and Eric Yomi committed Jan 18, 2023
1 parent 6ede47d commit 63deeba
Show file tree
Hide file tree
Showing 15 changed files with 1,797 additions and 10,510 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/docs/sources/dbt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Ingesting metadata from dbt requires either using the **dbt** module or the **db
| dbt Model - view | [Dataset](../../metamodel/entities/dataset.md) | Subtype `view` |
| dbt Model - incremental | [Dataset](../../metamodel/entities/dataset.md) | Subtype `incremental` |
| dbt Model - ephemeral | [Dataset](../../metamodel/entities/dataset.md) | Subtype `ephemeral` |
| dbt Snapshot | [Dataset](../../metamodel/entities/dataset.md) | Subtype `snapshot` |
| dbt Test | [Assertion](../../metamodel/entities/assertion.md) | |
| dbt Test Result | [Assertion Run Result](../../metamodel/entities/assertion.md) | |

Expand Down
7 changes: 4 additions & 3 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,10 @@ def get_urns_by_filter(
entities_yielded += 1
log.debug(f"yielding {x['entity']}")
yield x["entity"]
log.warning(
f"Discrepancy in entities yielded {entities_yielded} and num entities {num_entities}. This means all entities may not have been deleted."
)
if entities_yielded != num_entities:
log.warning(
f"Discrepancy in entities yielded {entities_yielded} and num entities {num_entities}. This means all entities may not have been deleted."
)
else:
log.error(f"Failed to execute search query with {str(response.content)}")
response.raise_for_status()
Expand Down
34 changes: 29 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class DBTCloudConfig(DBTCommonConfig):
#}
"""

_DBT_GRAPHQL_MODEL_SEED_FIELDS = """
_DBT_GRAPHQL_MODEL_SEED_SNAPSHOT_FIELDS = """
alias
error
status
Expand All @@ -109,15 +109,15 @@ class DBTCloudConfig(DBTCommonConfig):
models(jobId: $jobId, runId: $runId) {{
{ _DBT_GRAPHQL_COMMON_FIELDS }
{ _DBT_GRAPHQL_NODE_COMMON_FIELDS }
{ _DBT_GRAPHQL_MODEL_SEED_FIELDS }
{ _DBT_GRAPHQL_MODEL_SEED_SNAPSHOT_FIELDS }
dependsOn
materializedType
}}
seeds(jobId: $jobId, runId: $runId) {{
{ _DBT_GRAPHQL_COMMON_FIELDS }
{ _DBT_GRAPHQL_NODE_COMMON_FIELDS }
{ _DBT_GRAPHQL_MODEL_SEED_FIELDS }
{ _DBT_GRAPHQL_MODEL_SEED_SNAPSHOT_FIELDS }
}}
sources(jobId: $jobId, runId: $runId) {{
Expand All @@ -133,6 +133,18 @@ class DBTCloudConfig(DBTCommonConfig):
loader
}}
snapshots(jobId: $jobId, runId: $runId) {{
{ _DBT_GRAPHQL_COMMON_FIELDS }
{ _DBT_GRAPHQL_NODE_COMMON_FIELDS }
{ _DBT_GRAPHQL_MODEL_SEED_SNAPSHOT_FIELDS }
parentsSources {{
uniqueId
}}
parentsModels {{
uniqueId
}}
}}
tests(jobId: $jobId, runId: $runId) {{
{ _DBT_GRAPHQL_COMMON_FIELDS }
state
Expand Down Expand Up @@ -224,6 +236,7 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]:
*data["models"],
*data["seeds"],
*data["sources"],
*data["snapshots"],
*data["tests"],
]

Expand Down Expand Up @@ -253,10 +266,21 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode:

if node["resourceType"] == "model":
materialization = node["materializedType"]
elif node["resourceType"] == "snapshot":
materialization = "snapshot"
else:
materialization = None

upstream_nodes = node.get("dependsOn", [])
if node["resourceType"] == "snapshot":
upstream_nodes = [
obj["uniqueId"]
for obj in [
*node.get("parentsModels", []),
*node.get("parentsSources", []),
]
]
else:
upstream_nodes = node.get("dependsOn", [])

catalog_type = node.get("type")

Expand All @@ -269,7 +293,7 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode:
tags = node["tags"]
tags = [self.config.tag_prefix + tag for tag in tags]

if node["resourceType"] in {"model", "seed"}:
if node["resourceType"] in {"model", "seed", "snapshot"}:
status = node["status"]
if status is None and materialization != "ephemeral":
self.report.report_warning(
Expand Down
40 changes: 24 additions & 16 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
ConfigurationError,
LineageConfig,
)
from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated
from datahub.emitter import mce_builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
Expand Down Expand Up @@ -137,6 +138,10 @@ class DBTEntitiesEnabled(ConfigModel):
EmitDirective.YES,
description="Emit metadata for dbt seeds when set to Yes or Only",
)
snapshots: EmitDirective = Field(
EmitDirective.YES,
description="Emit metadata for dbt snapshots when set to Yes or Only",
)
test_definitions: EmitDirective = Field(
EmitDirective.YES,
description="Emit metadata for test definitions when enabled when set to Yes or Only",
Expand Down Expand Up @@ -167,17 +172,18 @@ def process_only_directive(cls, values):
def can_emit_node_type(self, node_type: str) -> bool:
# Node type comes from dbt's node types.

field_to_node_type_map = {
"model": "models",
"source": "sources",
"seed": "seeds",
"test": "test_definitions",
node_type_allow_map = {
"model": self.models,
"source": self.sources,
"seed": self.seeds,
"snapshot": self.snapshots,
"test": self.test_definitions,
}
field = field_to_node_type_map.get(node_type)
if not field:
allowed = node_type_allow_map.get(node_type)
if allowed is None:
return False

return self.__getattribute__(field) == EmitDirective.YES
return allowed == EmitDirective.YES

@property
def can_emit_test_results(self) -> bool:
Expand All @@ -200,6 +206,8 @@ class DBTCommonConfig(StatefulIngestionConfigBase, LineageConfig):
default=False,
description="Use model identifier instead of model name if defined (if not, default to model name).",
)
_deprecate_use_identifiers = pydantic_field_deprecated("use_identifiers")

entities_enabled: DBTEntitiesEnabled = Field(
DBTEntitiesEnabled(),
description="Controls for enabling / disabling metadata emission for different dbt entities (models, test definitions, test results, etc.)",
Expand Down Expand Up @@ -252,6 +260,9 @@ class DBTCommonConfig(StatefulIngestionConfigBase, LineageConfig):
False,
description="[deprecated] Prior to version 0.8.41, lineage edges to sources were directed to the target platform node rather than the dbt source node. This contradicted the established pattern for other lineage edges to point to upstream dbt nodes. To revert lineage logic to this legacy approach, set this flag to true.",
)
_deprecate_skip_source_on_lineage_edge = pydantic_field_deprecated(
"backcompat_skip_source_on_lineage_edge"
)

incremental_lineage: bool = Field(
# Copied from LineageConfig, and changed the default.
Expand Down Expand Up @@ -349,7 +360,7 @@ class DBTNode:

node_type: str # source, model
max_loaded_at: Optional[datetime]
materialization: Optional[str] # table, view, ephemeral, incremental
materialization: Optional[str] # table, view, ephemeral, incremental, snapshot
# see https://docs.getdbt.com/reference/artifacts/manifest-json
catalog_type: Optional[str]

Expand Down Expand Up @@ -419,7 +430,6 @@ def get_custom_properties(node: DBTNode) -> Dict[str, str]:
def get_upstreams(
upstreams: List[str],
all_nodes: Dict[str, DBTNode],
use_identifiers: bool,
target_platform: str,
target_platform_instance: Optional[str],
environment: str,
Expand All @@ -428,7 +438,7 @@ def get_upstreams(
) -> List[str]:
upstream_urns = []

for upstream in upstreams:
for upstream in sorted(upstreams):
if upstream not in all_nodes:
logger.debug(
f"Upstream node - {upstream} not found in all manifest entities."
Expand All @@ -444,7 +454,7 @@ def get_upstreams(
materialized = upstream_manifest_node.materialization

resource_type = upstream_manifest_node.node_type
if materialized in {"view", "table", "incremental"} or (
if materialized in {"view", "table", "incremental", "snapshot"} or (
resource_type == "source" and legacy_skip_source_lineage
):
# upstream urns point to the target platform
Expand Down Expand Up @@ -718,7 +728,6 @@ def create_test_entity_mcps(
upstream_urns = get_upstreams(
upstreams=node.upstream_nodes,
all_nodes=all_nodes_map,
use_identifiers=self.config.use_identifiers,
target_platform=self.config.target_platform,
target_platform_instance=self.config.target_platform_instance,
environment=self.config.env,
Expand Down Expand Up @@ -1124,7 +1133,7 @@ def get_external_url(self, node: DBTNode) -> Optional[str]:
pass

def _create_view_properties_aspect(self, node: DBTNode) -> ViewPropertiesClass:
materialized = node.materialization in {"table", "incremental"}
materialized = node.materialization in {"table", "incremental", "snapshot"}
# this function is only called when raw sql is present. assert is added to satisfy lint checks
assert node.raw_code is not None
view_properties = ViewPropertiesClass(
Expand Down Expand Up @@ -1319,7 +1328,7 @@ def _create_subType_wu(
if not node.node_type:
return None
subtypes: Optional[List[str]]
if node.node_type == "model":
if node.node_type in {"model", "snapshot"}:
if node.materialization:
subtypes = [node.materialization, "view"]
else:
Expand All @@ -1343,7 +1352,6 @@ def _create_lineage_aspect_for_dbt_node(
upstream_urns = get_upstreams(
node.upstream_nodes,
all_nodes_map,
self.config.use_identifiers,
self.config.target_platform,
self.config.target_platform_instance,
self.config.env,
Expand Down
28 changes: 9 additions & 19 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def extract_dbt_entities(
for key, manifest_node in all_manifest_entities.items():
name = manifest_node["name"]

if "identifier" in manifest_node and use_identifiers:
if use_identifiers and manifest_node.get("identifier"):
name = manifest_node["identifier"]

if (
Expand Down Expand Up @@ -393,11 +393,6 @@ def load_file_as_json(self, uri: str) -> Any:

def loadManifestAndCatalog(
self,
manifest_path: str,
catalog_path: str,
sources_path: Optional[str],
use_identifiers: bool,
tag_prefix: str,
) -> Tuple[
List[DBTNode],
Optional[str],
Expand All @@ -406,12 +401,12 @@ def loadManifestAndCatalog(
Optional[str],
Optional[str],
]:
dbt_manifest_json = self.load_file_as_json(manifest_path)
dbt_manifest_json = self.load_file_as_json(self.config.manifest_path)

dbt_catalog_json = self.load_file_as_json(catalog_path)
dbt_catalog_json = self.load_file_as_json(self.config.catalog_path)

if sources_path is not None:
dbt_sources_json = self.load_file_as_json(sources_path)
if self.config.sources_path is not None:
dbt_sources_json = self.load_file_as_json(self.config.sources_path)
sources_results = dbt_sources_json["results"]
else:
sources_results = {}
Expand All @@ -438,8 +433,8 @@ def loadManifestAndCatalog(
all_catalog_entities,
sources_results,
manifest_adapter,
use_identifiers,
tag_prefix,
self.config.use_identifiers,
self.config.tag_prefix,
self.report,
)

Expand All @@ -460,13 +455,8 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]:
manifest_adapter,
catalog_schema,
catalog_version,
) = self.loadManifestAndCatalog(
self.config.manifest_path,
self.config.catalog_path,
self.config.sources_path,
self.config.use_identifiers,
self.config.tag_prefix,
)
) = self.loadManifestAndCatalog()

additional_custom_props = {
"manifest_schema": manifest_schema,
"manifest_version": manifest_version,
Expand Down
16 changes: 16 additions & 0 deletions metadata-ingestion/tests/integration/dbt/copy-from-sample-dbt.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

set -euxo pipefail

# This arg should point at a local copy of https://github.com/hsheth2/sample-dbt,
# after the generation script has been run.
sample_dbt=$1

cp $sample_dbt/target_processed/dbt_catalog.json sample_dbt_catalog.json
cp $sample_dbt/target_processed/dbt_manifest.json sample_dbt_manifest.json
cp $sample_dbt/target_processed/dbt_sources.json sample_dbt_sources.json

# We don't currently test run_results from sample-dbt.
# cp $sample_dbt/target_processed/dbt_run_results.json sample_dbt_run_results.json


Loading

0 comments on commit 63deeba

Please sign in to comment.