airbyte+ dbt + dagster: dbt upstream dependency issue #20961
-
Hello everyone, I'm relatively new to Dagster and currently following a tutorial found here: Airbyte + dbt + Dagster Quickstart. I've successfully set up everything as per the tutorial, and everything works fine when I manually trigger materializations from the Dagster UI. All my models work well, the connections to Airbyte are triggered followed by the triggering of dbt models. However, I encounter an issue when I attempt to utilize the scheduling function. My goal is to automate the workflow where Airbyte flows are triggered first, followed by dbt model materializations. Here's the code snippet I'm using for scheduling: from dagster_dbt import build_schedule_from_dbt_selection
from .assets import dbt_project_dbt_assets
schedules = [
build_schedule_from_dbt_selection(
[dbt_project_dbt_assets],
job_name="materialize_dbt_models",
cron_schedule="0 0 * * *",
dbt_select="fqn:*",
),
] With this setup, I observe that a schedule is correctly created in the UI and it runs as expected. However, it only triggers the dbt materialization and not the upstream Airbyte flow, despite the UI showing that assets are correctly connected with lineage. From my understanding, Dagster should orchestrate each asset's materialization along with all its upstream dependencies. Here is other part of my script : asset.py import os
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
from dagster_airbyte import AirbyteResource, load_assets_from_airbyte_instance
from .constants import dbt_manifest_path
airbyte_instance = AirbyteResource(
host="localhost",
port="8000",
# If using basic auth, include username and password:
username="airbyte",
password=os.getenv("AIRBYTE_PASSWORD")
)
airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance)
@dbt_assets(manifest=dbt_manifest_path)
def dbt_project_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream() constants.py import os
from pathlib import Path
from dagster_dbt import DbtCliResource
dbt_project_dir = Path(__file__).joinpath("..", "..", ".." , "dbt_project").resolve()
dbt = DbtCliResource(project_dir=os.fspath(dbt_project_dir))
# If DAGSTER_DBT_PARSE_PROJECT_ON_LOAD is set, a manifest will be created at run time.
# Otherwise, we expect a manifest to be present in the project's target directory.
if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD"):
dbt_manifest_path = (
dbt.cli(
["--quiet", "parse"],
target_path=Path("target"),
)
.wait()
.target_path.joinpath("manifest.json")
)
else:
dbt_manifest_path = dbt_project_dir.joinpath("target", "manifest.json") definitions.py import os
from dagster import Definitions
from dagster_dbt import DbtCliResource
from .assets import dbt_project_dbt_assets, airbyte_assets
from .constants import dbt_project_dir
from .schedules import schedules
defs = Definitions(
assets=[dbt_project_dbt_assets, airbyte_assets],
schedules=schedules,
resources={
"dbt": DbtCliResource(project_dir=os.fspath(dbt_project_dir)),
},
) Am I required to schedule my Airbyte flows separately? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
I suspect that you need to update the In Our documentation shows example schedules for Airbyte syncs. It doesn't use dbt, but it does specify the use of from dagster import define_asset_job, ScheduleDefinition, Definitions
# Materialize all assets
run_everything_job = define_asset_job("run_everything", selection="*")
# Define the schedule for the job
schedule = ScheduleDefinition(
job=run_everything_job,
cron_schedule="0 0 * * *", # Daily at midnight
)
# Combine assets and schedules into a Definitions object
defs = Definitions(
assets=[airbyte_assets, dbt_project_dbt_assets],
schedules=[schedule],
) |
Beta Was this translation helpful? Give feedback.
I suspect that you need to update the
schedule.py
to include the Airbyte flows specifically.In
assets.py
the code creates both the dbt assets viadbt_project_dbt_assets
and the upstream Airbyte assets using the load_assets_from_airbyte_instance function. However the schedule definition only invokesdbt_project_dbt_assets
.Our documentation shows example schedules for Airbyte syncs. It doesn't use dbt, but it does specify the use of
airbyte_assets
in the schedule definition. Update yourschedule.py
similarly and give that a try!