Skip to content

Commit

Permalink
[dagster-airflow] pass airflow dag timezone to dagster schedule defin…
Browse files Browse the repository at this point in the history
…ition (#11663)
  • Loading branch information
Ramshackle-Jamathon committed Jan 12, 2023
1 parent 9adccb5 commit 5482236
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,10 @@ def make_dagster_schedule_from_airflow_dag(dag, job_def):

if isinstance(dag.normalized_schedule_interval, str) and is_valid_cron_schedule(cron_schedule):
return ScheduleDefinition(
job=job_def, cron_schedule=cron_schedule, description=schedule_description
job=job_def,
cron_schedule=cron_schedule,
description=schedule_description,
execution_timezone=dag.timezone.name,
)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import pendulum
import pytest
from airflow import __version__ as airflow_version
from airflow.models.dag import DAG
from dagster import job
from dagster_airflow.dagster_pipeline_factory import make_dagster_schedule_from_airflow_dag


@pytest.mark.skipif(airflow_version < "2.0.0", reason="requires airflow 2")
def test_schedule_timezone():
args = {
"owner": "airflow",
"start_date": pendulum.today("Europe/London").add(days=-2),
}
dag = DAG(
dag_id="test_schedules",
default_args=args,
schedule="0 0 * * *",
)

@job
def job_def():
return

schedule = make_dagster_schedule_from_airflow_dag(dag=dag, job_def=job_def)
assert schedule.cron_schedule == "0 0 * * *"
assert schedule.execution_timezone == "Europe/London"

0 comments on commit 5482236

Please sign in to comment.