Skip to content

Commit

Permalink
[dagster-dbt] fix dbt row count behavior on warehouses, test row coun…
Browse files Browse the repository at this point in the history
…t code on snowflake & bigquery (#21952)

## Summary

Adds Snowflake & BQ tests of new row-count functionality and fixes bug
with `Table` return on certain warehouses (incl Snowflake).

Added to the new OSS nightly pipeline introduced in #21956 to avoid
racking up unnecessary spend. We do already run some tests on these
warehouses as part of normal builds for their respective integrations,
so maybe it's fine to do the same here - open to either.

## Test Plan

Nightly build run:

https://buildkite.com/dagster/full-moon-with-face-dagster-nightly/builds/9
  • Loading branch information
benpankow authored May 21, 2024
1 parent 873b0f1 commit e0846dc
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .buildkite/dagster-buildkite/dagster_buildkite/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ def dagster() -> None:
def dagster_nightly() -> None:
PythonPackages.load_from_git(GitInfo(directory=Path(".")))
steps = build_dagster_oss_nightly_steps()
buildkite_yaml = buildkite_yaml_for_steps(steps)
buildkite_yaml = buildkite_yaml_for_steps(steps, custom_slack_channel="eng-buildkite-nightly")
print(buildkite_yaml) # noqa: T201
Original file line number Diff line number Diff line change
@@ -1,9 +1,33 @@
from typing import List

from dagster_buildkite.utils import BuildkiteStep
from ..package_spec import PackageSpec
from ..python_version import AvailablePythonVersion
from ..steps.packages import build_steps_from_package_specs, gcp_creds_extra_cmds
from ..utils import (
BuildkiteStep,
)


def build_dagster_oss_nightly_steps() -> List[BuildkiteStep]:
steps: List[BuildkiteStep] = []

steps += build_steps_from_package_specs(
[
PackageSpec(
"python_modules/libraries/dagster-dbt",
pytest_tox_factors=["dbt18-snowflake", "dbt18-bigquery"],
env_vars=[
"SNOWFLAKE_ACCOUNT",
"SNOWFLAKE_USER",
"SNOWFLAKE_PASSWORD",
"GCP_PROJECT_ID",
],
pytest_extra_cmds=gcp_creds_extra_cmds,
unsupported_python_versions=[
AvailablePythonVersion.V3_12,
],
),
]
)

return steps
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def k8s_extra_cmds(version: str, _) -> List[str]:
]


gcp_extra_cmds = (
gcp_creds_extra_cmds = (
[
rf"aws s3 cp s3://\${{BUILDKITE_SECRETS_BUCKET}}/{GCP_CREDS_FILENAME} "
+ GCP_CREDS_LOCAL_FILE,
Expand Down Expand Up @@ -600,7 +600,7 @@ def tox_factors_for_folder(tests_folder_name: str) -> List[str]:
"BUILDKITE_SECRETS_BUCKET",
"GCP_PROJECT_ID",
],
pytest_extra_cmds=gcp_extra_cmds,
pytest_extra_cmds=gcp_creds_extra_cmds,
# Remove once https://github.com/dagster-io/dagster/issues/2511 is resolved
retries=2,
),
Expand All @@ -612,7 +612,7 @@ def tox_factors_for_folder(tests_folder_name: str) -> List[str]:
"BUILDKITE_SECRETS_BUCKET",
"GCP_PROJECT_ID",
],
pytest_extra_cmds=gcp_extra_cmds,
pytest_extra_cmds=gcp_creds_extra_cmds,
retries=2,
),
PackageSpec(
Expand All @@ -623,7 +623,7 @@ def tox_factors_for_folder(tests_folder_name: str) -> List[str]:
"BUILDKITE_SECRETS_BUCKET",
"GCP_PROJECT_ID",
],
pytest_extra_cmds=gcp_extra_cmds,
pytest_extra_cmds=gcp_creds_extra_cmds,
),
PackageSpec(
"python_modules/libraries/dagster-ge",
Expand Down
14 changes: 12 additions & 2 deletions .buildkite/dagster-buildkite/dagster_buildkite/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def safe_getenv(env_var: str) -> str:
return os.environ[env_var]


def buildkite_yaml_for_steps(steps) -> str:
def buildkite_yaml_for_steps(steps, custom_slack_channel: Optional[str] = None) -> str:
return yaml.dump(
{
"env": {
Expand All @@ -101,7 +101,17 @@ def buildkite_yaml_for_steps(steps) -> str:
),
}
for buildkite_email, slack_channel in BUILD_CREATOR_EMAIL_TO_SLACK_CHANNEL_MAP.items()
],
]
+ (
[
{
"slack": f"elementl#{custom_slack_channel}",
"if": "build.state != 'canceled'",
}
]
if custom_slack_channel
else []
),
},
default_flow_style=False,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,10 @@ def _fetch_and_attach_row_count_metadata(
""",
fetch=True,
)
row_count = query_result[1][0]["row_count"]
query_result_table = query_result[1]
# some adapters do not output the column names, so we need
# to index by position
row_count = query_result_table[0][0]
additional_metadata = {**TableMetadataSet(row_count=row_count)}

return event.with_metadata(metadata={**event.metadata, **additional_metadata})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from pathlib import Path
from typing import Any, Dict, Iterator, List
from typing import Any, Dict, Iterator, List, Optional

import pytest
from dagster_dbt import DbtCliResource
Expand Down Expand Up @@ -54,8 +54,12 @@ def disable_openblas_threading_affinity_fixture() -> None:
os.environ["GOTOBLAS_MAIN_FREE"] = "1"


def _create_dbt_invocation(project_dir: Path, build_project: bool = False) -> DbtCliInvocation:
dbt = DbtCliResource(project_dir=os.fspath(project_dir), global_config_flags=["--quiet"])
def _create_dbt_invocation(
project_dir: Path, build_project: bool = False, target: Optional[str] = None
) -> DbtCliInvocation:
dbt = DbtCliResource(
project_dir=os.fspath(project_dir), global_config_flags=["--quiet"], target=target
)

if not project_dir.joinpath("dbt_packages").exists():
dbt.cli(["deps"], raise_on_error=False).wait()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
_check as check,
materialize,
)
from dagster._core.definitions.metadata import IntMetadataValue
from dagster_dbt.asset_decorator import dbt_assets
from dagster_dbt.core.resources_v2 import DbtCliInvocation, DbtCliResource

Expand All @@ -21,7 +20,8 @@ def standalone_duckdb_dbfile_path_fixture(request) -> None:
"""Generate a unique duckdb dbfile path for certain tests which need
it, rather than using the default one-file-per-worker approach.
"""
jaffle_shop_duckdb_db_file_name = f"{request.node.name}_jaffle_shop"
node_name = cast(str, request.node.name).replace("[", "_").replace("]", "_")
jaffle_shop_duckdb_db_file_name = f"{node_name}_jaffle_shop"
jaffle_shop_duckdb_dbfile_path = f"target/{jaffle_shop_duckdb_db_file_name}.duckdb"

os.environ["DAGSTER_DBT_PYTEST_XDIST_DUCKDB_DBFILE_NAME"] = jaffle_shop_duckdb_db_file_name
Expand Down Expand Up @@ -60,39 +60,72 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
)


def test_row_count(
test_jaffle_shop_manifest_standalone_duckdb_dbfile: Dict[str, Any],
) -> None:
@dbt_assets(manifest=test_jaffle_shop_manifest_standalone_duckdb_dbfile)
@pytest.fixture(name="test_jaffle_shop_manifest_snowflake")
def test_jaffle_shop_manifest_snowflake_fixture() -> Dict[str, Any]:
return _create_dbt_invocation(test_jaffle_shop_path, target="snowflake").get_artifact(
"manifest.json"
)


@pytest.fixture(name="test_jaffle_shop_manifest_bigquery")
def test_jaffle_shop_manifest_bigquery_fixture() -> Dict[str, Any]:
return _create_dbt_invocation(test_jaffle_shop_path, target="bigquery").get_artifact(
"manifest.json"
)


@pytest.mark.parametrize(
"target, manifest_fixture_name",
[
pytest.param(None, "test_jaffle_shop_manifest_standalone_duckdb_dbfile", id="duckdb"),
pytest.param(
"snowflake",
"test_jaffle_shop_manifest_snowflake",
marks=pytest.mark.snowflake,
id="snowflake",
),
pytest.param(
"bigquery",
"test_jaffle_shop_manifest_bigquery",
marks=pytest.mark.bigquery,
id="bigquery",
),
],
)
def test_row_count(request: pytest.FixtureRequest, target: str, manifest_fixture_name: str) -> None:
manifest = cast(Dict[str, Any], request.getfixturevalue(manifest_fixture_name))

@dbt_assets(manifest=manifest)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream().fetch_row_counts()

result = materialize(
[my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=os.fspath(test_jaffle_shop_path))},
resources={
"dbt": DbtCliResource(project_dir=os.fspath(test_jaffle_shop_path), target=target)
},
)

assert result.success

metadata_by_asset_key = {
check.not_none(event.asset_key): event.materialization.metadata
for event in result.get_asset_materialization_events()
}

# Validate that we have row counts for all models which are not views
assert all(
"dagster/row_count" not in event.materialization.metadata
for event in result.get_asset_materialization_events()
"dagster/row_count" not in metadata
for asset_key, metadata in metadata_by_asset_key.items()
# staging tables are views, so we don't attempt to get row counts for them
if "stg" in check.not_none(event.asset_key).path[-1]
)
if "stg" in asset_key.path[-1]
), metadata_by_asset_key
assert all(
"dagster/row_count" in event.materialization.metadata
for event in result.get_asset_materialization_events()
if "stg" not in check.not_none(event.asset_key).path[-1]
)

row_counts = [
cast(IntMetadataValue, event.materialization.metadata["dagster/row_count"]).value
for event in result.get_asset_materialization_events()
if "stg" not in check.not_none(event.asset_key).path[-1]
]
assert all(row_count and row_count > 0 for row_count in row_counts), row_counts
"dagster/row_count" in metadata
for asset_key, metadata in metadata_by_asset_key.items()
# staging tables are views, so we don't attempt to get row counts for them
if "stg" not in asset_key.path[-1]
), metadata_by_asset_key


def test_row_count_err(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,18 @@ jaffle_shop:
path: "{{ env_var('DAGSTER_DBT_PYTEST_XDIST_DUCKDB_DBFILE_PATH', 'target/local.duckdb') }}"
schema: "{{ env_var('DAGSTER_DBT_JAFFLE_SCHEMA', 'dev') }}"
threads: 24
snowflake:
type: snowflake
account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
user: "{{ env_var('SNOWFLAKE_USER') }}"
password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
client_session_keep_alive: False
database: TESTDB
schema: TESTSCHEMA
bigquery:
type: bigquery
method: service-account
project: "{{ env_var('GCP_PROJECT_ID') }}"
dataset: BIGQUERY_IO_MANAGER_SCHEMA
threads: 4
keyfile: "{{ env_var('GOOGLE_APPLICATION_CREDENTIALS') }}"
2 changes: 2 additions & 0 deletions python_modules/libraries/dagster-dbt/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ markers =
cloud: marks tests that use dbt Cloud APIs
core: marks tests that use `dagster-dbt>=0.20.0` APIs
legacy: marks tests that use `dagster-dbt<0.20.0` APIs
snowflake: marks tests that access a Snowflake warehouse, run nightly
bigquery: marks tests that access a BigQuery warehouse, run nightly
12 changes: 10 additions & 2 deletions python_modules/libraries/dagster-dbt/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ skipsdist = true

[testenv]
download = True
passenv = CI_* COVERALLS_REPO_TOKEN BUILDKITE*
passenv = CI_* COVERALLS_REPO_TOKEN BUILDKITE* SNOWFLAKE_ACCOUNT SNOWFLAKE_USER SNOWFLAKE_PASSWORD GOOGLE_APPLICATION_CREDENTIALS GCP_PROJECT_ID
install_command = uv pip install {opts} {packages}
deps =
-e ../../dagster[test]
Expand All @@ -12,10 +12,16 @@ deps =
-e ../dagster-duckdb-pandas
dbt16: dbt-core==1.6.*
dbt16: dbt-duckdb==1.6.*
dbt16: dbt-snowflake==1.6.*
dbt16: dbt-bigquery==1.6.*
dbt17: dbt-core==1.7.*
dbt17: dbt-duckdb==1.7.*
dbt17: dbt-snowflake==1.7.*
dbt17: dbt-bigquery==1.7.*
dbt18: dbt-core==1.8.*
dbt18: dbt-duckdb==1.8.*
dbt18: dbt-snowflake==1.8.*
dbt18: dbt-bigquery==1.8.*
pydantic1: pydantic!=1.10.7,<2.0.0
-e .[test]
allowlist_externals =
Expand All @@ -24,5 +30,7 @@ allowlist_externals =
commands =
!windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster'
cloud: pytest --numprocesses 6 --durations 10 -c ../../../pyproject.toml -m "cloud" -vv {posargs}
core: pytest --numprocesses 6 --durations 10 -c ../../../pyproject.toml -m "core" -vv {posargs}
core: pytest --numprocesses 6 --durations 10 -c ../../../pyproject.toml -m "core and not snowflake and not bigquery" -vv {posargs}
legacy: pytest --durations 10 -c ../../../pyproject.toml -m "legacy" -vv {posargs}
snowflake: pytest -c ../../../pyproject.toml -vv --durations 10 {posargs} -m 'snowflake'
bigquery: pytest -c ../../../pyproject.toml -vv --durations 10 {posargs} -m 'bigquery'

0 comments on commit e0846dc

Please sign in to comment.