Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execution_date not rendering after airflow upgrade #19618

Closed
1 of 2 tasks
AnithaG-Oak opened this issue Nov 16, 2021 · 31 comments · Fixed by #24413
Closed
1 of 2 tasks

Execution_date not rendering after airflow upgrade #19618

AnithaG-Oak opened this issue Nov 16, 2021 · 31 comments · Fixed by #24413

Comments

@AnithaG-Oak
Copy link

Apache Airflow version

2.2.2 (latest released)

Operating System

Debian GNU/Linux

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==2.3.0
apache-airflow-providers-apache-spark==2.0.1
apache-airflow-providers-cncf-kubernetes==2.1.0
apache-airflow-providers-ftp==2.0.1
apache-airflow-providers-http==2.0.1
apache-airflow-providers-imap==2.0.1
apache-airflow-providers-sqlite==2.0.1

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

What happened

Hi,
We recently upgraded airflow from 2.1.0 to 2.2.2 (2.1.0 to 2.2.0 to 2.2.1 to 2.2.2) and DAGs aren't running as expected. All these DAGs were added before the upgrade itself and they were running fine.
We use execution_date parameter in SparkSubmitOperator which was rendering fine before the upgrade fails now returning None

"{{ (execution_date if execution_date.microsecond > 0 else dag.following_schedule(execution_date)).isoformat() }}"

DAG run fails with the error

jinja2.exceptions.UndefinedError: 'None' has no attribute 'isoformat'

Tried wiping out the database and ran as a fresh DAG but still same error

Any help would be appreciated

What you expected to happen

No response

How to reproduce

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@AnithaG-Oak AnithaG-Oak added area:core kind:bug This is a clearly a bug labels Nov 16, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Nov 16, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@uranusjr
Copy link
Member

Please provide a DAG for reproduction.

@AnithaG-Oak
Copy link
Author

AnithaG-Oak commented Nov 16, 2021

properties = SparkProperties.from_config("small")

def execution_date():
    return "{{ (execution_date if execution_date.microsecond > 0 else dag.following_schedule(execution_date)).isoformat() }}"

def spark_job(task_id: str,
    spark: SparkProperties,
    main_class: Optional[str],
    *args,
    py_files: Optional[str] = None,
    jars: Optional[str] = None,
    **kwargs,) -> SparkSubmitOperator:
   return SparkSubmitOperator(
        task_id=task_id,
        application=path,
        conf=spark.conf(),
        py_files=py_files,
        jars=jars,
        conn_id=spark.conn_id,
        java_class=main_class or None,
        total_executor_cores=spark.total_executor_cores,
        executor_cores=spark.executor_cores,
        executor_memory=spark.executor_memory,
        driver_memory=spark.driver_memory,
        name=task_id,
        num_executors=spark.num_executors,
        application_args=spark_args(*args, **kwargs),
        env_vars=spark.env_vars(),
    )

def test1() -> SparkSubmitOperator:
    return spark_job(
        task_id=f"test_dag",
        spark=properties,
        main_class="TestSparkJob",
        execution_date=execution_date()
    )


def test_dag(this: DAG) -> DAG:
    with this:
        (label("start") >> test1() >> label("finish"))
    return this


with DAG(
    f"test_workflow", default_args=DEFAULT_ARGS, schedule_interval="@once", max_active_runs=1,
) as dag:
    (test_dag(dag))

@AnithaG-Oak
Copy link
Author

Any spark job would do to reproduce this issue since the DAG fails before triggering the job

@potiuk
Copy link
Member

potiuk commented Nov 16, 2021

Any spark job would do to reproduce this issue since the DAG fails before triggering the job

What's spark_job() doing ? This must be something internal to your organisation?

@AnithaG-Oak
Copy link
Author

It creates a SparkSubmitOperator. I have updated the example

@potiuk
Copy link
Member

potiuk commented Nov 16, 2021

It creates a SparkSubmitOperator. I have updated the example

OK. Now we need to know what spark_args() does as well.

@AnithaG-Oak
Copy link
Author

AnithaG-Oak commented Nov 16, 2021

It creates a SparkSubmitOperator. I have updated the example

OK. Now we need to know what spark_args() does as well.

ah mb.. it creates application_args (List[String]) from args and kwargs arguments. Something like this

def spark_args(*args, **kwargs) -> List[str]:
    arr = [a for a in args]
    for key, value in kwargs.items():
        arr += [f"--{key}", str(value)]
    return arr

@AnithaG-Oak
Copy link
Author

On a related note, noticed from the airflow doc that {{ execution_date }} is deprecated.
I see that {{ ts }} is equivalent to execution_date isoformat but this is a string type. Do we have an equivalent for execution_date in datetime type ?

@potiuk
Copy link
Member

potiuk commented Nov 16, 2021

OK. Seems prety legit then. Hmm. Interesting - we converted the execution_date to be lazy-proxy object so essentially it should work in a backwards compatible way (correct @uranusjr ?).

I wonder what happen if you replace the execution_date with dag_run.logical_date - see deprecated values here https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html?highlight=macros#variables - but it should work regardless, so it would be great to track this one down.

@potiuk
Copy link
Member

potiuk commented Nov 16, 2021

I believe I know what the problem is. yes the dag_run.logical_date should work and I know what the problem is: @uranusjr - this is the "coerce_date" in following_schedule:

    def following_schedule(self, dttm):
        """
        Calculates the following schedule for this dag in UTC.

        :param dttm: utc datetime
        :return: utc datetime
        """
        warnings.warn(
            "`DAG.following_schedule()` is deprecated. Use `DAG.next_dagrun_info(restricted=False)` instead.",
            category=DeprecationWarning,
            stacklevel=2,
        )
        data_interval = self.infer_automated_data_interval(timezone.coerce_datetime(dttm))
        next_info = self.next_dagrun_info(data_interval, restricted=False)
        if next_info is None:
            return None
        return next_info.data_interval.start

Here is coerce_date:


def coerce_datetime(v: Union[None, dt.datetime, DateTime]) -> Optional[DateTime]:
    """Convert whatever is passed in to an timezone-aware ``pendulum.DateTime``."""
    if v is None:
        return None
    if v.tzinfo is None:
        v = make_aware(v)
    if isinstance(v, DateTime):
        return v
    return pendulum.instance(v)

@uranusjr
Copy link
Member

uranusjr commented Nov 16, 2021

I can see why the error happens, but not why the error is wrong. A @once schedule does not have a following schedule (because it runs only once?), so dag.following_schedule(execution_date) is correct to return None. This also should be the case if you use dag_run.logical_date.

@potiuk
Copy link
Member

potiuk commented Nov 16, 2021

Ah. I see the @once indeed you arre right.

@potiuk
Copy link
Member

potiuk commented Nov 16, 2021

Different behaviour of if execution_date.microsecond > 0 ?

@potiuk
Copy link
Member

potiuk commented Nov 16, 2021

or maybe this did not return None before for @once schedule ?

        next_info = self.timetable.next_dagrun_info(
            last_automated_dagrun=pendulum.instance(dttm),
            restriction=TimeRestriction(earliest=None, latest=None, catchup=True),
        )

@potiuk
Copy link
Member

potiuk commented Nov 16, 2021

or even this which was before the timetables:

    def following_schedule(self, dttm):
        """
        Calculates the following schedule for this dag in UTC.

        :param dttm: utc datetime
        :return: utc datetime
        """
        if isinstance(self.normalized_schedule_interval, str):
            # we don't want to rely on the transitions created by
            # croniter as they are not always correct
            dttm = pendulum.instance(dttm)
            naive = timezone.make_naive(dttm, self.timezone)
            cron = croniter(self.normalized_schedule_interval, naive)

            # We assume that DST transitions happen on the minute/hour
            if not self.is_fixed_time_schedule():
                # relative offset (eg. every 5 minutes)
                delta = cron.get_next(datetime) - naive
                following = dttm.in_timezone(self.timezone) + delta
            else:
                # absolute (e.g. 3 AM)
                naive = cron.get_next(datetime)
                tz = self.timezone
                following = timezone.make_aware(naive, tz)
            return timezone.convert_to_utc(following)
        elif self.normalized_schedule_interval is not None:
            return timezone.convert_to_utc(dttm + self.normalized_schedule_interval)

@uranusjr
Copy link
Member

The lazy object proxy handles microsecond correctly:

>>> from lazy_object_proxy import Proxy
>>> from airflow.utils import timezone
>>> from datetime import datetime
>>> p = Proxy(lambda: datetime.now())
>>> p.microsecond
262739

But regardless of that, the template line doesn’t really make sense. Whether execution_date.microsecond > 0 is entirely context-dependant—if you run the DAG enough times you’d eventually get a case where execution_date.microsecond == 0—and when that happens, the line would fail unconditionally.

I’m inclined to categorise this as a user error that we can’t help with.

@uranusjr
Copy link
Member

Before the timetable implementation, schedule_interval=@once is normalised to normalized_schedule_interval=None internally. That does not match either branches. DAG.following_schedule() returns None regardless what you pass to it.

@uranusjr uranusjr added won't fix and removed kind:bug This is a clearly a bug area:core labels Nov 16, 2021
@uranusjr
Copy link
Member

Closing since this is not a bug, everything is working as expected. The template accidentally works previously, and unfortunately breaks now. It’s still a bug in user code and can only be fixed by the user.

@AnithaG-Oak
Copy link
Author

@potiuk @uranusjr the @once is just one of the example DAGs here. This error is being seen in DAGs with schedule intervals as well

for example:

with DAG(
        f"workflow_{type}",
        default_args=DEFAULT_ARGS,
        schedule_interval=f"0 {first},{second} * * *",
        description=f"{type} workflow",
        max_active_runs=1,
        catchup=False,
    ) as dag:

@AnithaG-Oak
Copy link
Author

If schedule_interval is a valid case, is it coerce_date issue ?
Could you confirm if dag_run.logical_date would work for this ?

@AnithaG-Oak
Copy link
Author

When the DAGs are manually triggered(after a couple of runs), it renders the date. But in scheduled runs, it always fails returning None

@potiuk potiuk reopened this Nov 16, 2021
@potiuk
Copy link
Member

potiuk commented Nov 16, 2021

Could you confirm if dag_run.logical_date would work for this ?

Can you try it pleaase? I cannot "confirm it" but if it works that will give us a clue to look further

@AnithaG-Oak
Copy link
Author

Could you confirm if dag_run.logical_date would work for this ?

Can you try it pleaase? I cannot "confirm it" but if it works that will give us a clue to look further

dag_run.logical_date didn't work either
Screenshot 2021-11-17 at 1 20 45 AM
Screenshot 2021-11-17 at 1 21 11 AM

@AnithaG-Oak
Copy link
Author

Narrowing down further, dag.following_schedule(execution_date) returns null for scheduled sub dags only. Main dags return value

@uranusjr
Copy link
Member

uranusjr commented Nov 17, 2021

Oh I know what’s going on now. We’ve been looking at entirely wrong places.

The change that actually broke the logic is #16141. Previously, a manually-triggered run always has its execution date set to timezone.now(), which has an approximately 99.9999% chance to return a non-zero millisecond, so

(execution_date if execution_date.microsecond > 0 else dag.following_schedule(execution_date)).isoformat()

is (almost) always evaluated to

execution_date.isoformat()

which works.

But the calendar widget introduced in #16141 does not employ sub-second accurary, so the milliseconds become zero, and all manually-triggered runs now evaluate the expression to

dag.following_schedule(execution_date).isoformat()

instead, which always fails because a manually triggered run never has a following schedule (both prior and after AIP-39).

Ultimately, using the millisecond field to distinguish between scheduled and manual runs is a poor implementation in the first place. There’s still a 0.0001% chance the detection would fail, even if Airflow never broke that. It is both more semantically correct and reliable to use dag_run.run_type instead.

I think the best we can do here is to add an entry to UPDATING.md to describe this behavioural change.

@AnithaG-Oak
Copy link
Author

AnithaG-Oak commented Nov 17, 2021

One correction. (execution_date if execution_date.microsecond > 0 else dag.following_schedule(execution_date)).isoformat()
fails for scheduled runs as well. dag.following_schedule(execution_date) returns null for scheduled sub dags

Even if we use dag_run.run_type to figure out the run type, what is the right way to get execution_date for scheduled runs ?

@uranusjr
Copy link
Member

uranusjr commented Nov 17, 2021

execution_date will still work until 3.0. logical_date is the new variable name (the value is the same). But by the fact you’re using following_schedule, I suspect data_interval_end actually provides more suitable semantics for you use case.

@AnithaG-Oak
Copy link
Author

AnithaG-Oak commented Nov 17, 2021

But the calendar widget introduced in #16141 does not employ sub-second accurary, so the milliseconds become zero, and all manually-triggered runs now evaluate the expression to dag.following_schedule(execution_date).isoformat()
instead, which always fails because a manually triggered run never has a following schedule (both prior and after AIP-39).

also this reasoning doesn't hold true for this issue since the error is seen for scheduled sub dags

@eladkal
Copy link
Contributor

eladkal commented Jun 13, 2022

I think the best we can do here is to add an entry to UPDATING.md to describe this behavioural change.

@uranusjr was this added? seems like there is no actual task on this issue other than clarification?

@uranusjr
Copy link
Member

No I don’t think so. I’ll do that.

also this reasoning doesn't hold true for this issue since the error is seen for scheduled sub dags

Sorry I missed this. A sub-DAG’s execution date is inherited from its parent DAG, so the behaviour change in the parent DAG creeps down to its sub-DAGs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants