Skip to content

Fix type of "moment" when running an e2e example for deferred TI#45030

Merged
kaxil merged 4 commits intoapache:mainfrom
astronomer:AIP72-fix-e2e-example-deferrable
Dec 18, 2024
Merged

Fix type of "moment" when running an e2e example for deferred TI#45030
kaxil merged 4 commits intoapache:mainfrom
astronomer:AIP72-fix-e2e-example-deferrable

Conversation

@amoghrajesh
Copy link
Copy Markdown
Contributor

While trying to run an e2e example of a task that defers and then launches a trigger:

from airflow import DAG

from airflow.providers.standard.sensors.date_time import DateTimeSensorAsync
from airflow.utils import timezone
import datetime

with DAG(
    dag_id="demo_deferred",
    schedule=None,
    catchup=False,
) as dag:
    DateTimeSensorAsync(
            task_id="async",
            target_time=str(timezone.utcnow() + datetime.timedelta(seconds=3)),
            poke_interval=60,
            timeout=600,
        )

I realised that the "moment" inside "trigger_kwargs" is of pendulum.DateTime type, and since we have a "dict[str, ANY]`, defined here: https://github.com/apache/airflow/blob/main/airflow/api_fastapi/execution_api/datamodels/taskinstance.py#L82

on its datamodel (we cant really have a UtcDateTime for one specific field, like we do here), it fails to match the type defined in the Trigger table which is datetime.

So, I have added a "before" validator that checks for the type being string and if it is a string, translates it to a datetime object.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@amoghrajesh amoghrajesh requested review from ashb and kaxil December 18, 2024 13:02
@amoghrajesh
Copy link
Copy Markdown
Contributor Author

Simplifies it better now. This is a working example:

class A(BaseModel):
    """Schema for updating TaskInstance to a deferred state."""
    trigger_kwargs: Annotated[dict[str, Any], Field(default_factory=dict)]
    @field_validator("trigger_kwargs")
    def validate_moment(cls, v):
        if "moment" in v:
            v["moment"] = AwareDatetimeAdapter.validate_strings(v["moment"])
        return v

A(trigger_kwargs={"key": "value", "moment": "2024-12-18T00:00:00Z"})
Out[27]: A(trigger_kwargs={'key': 'value', 'moment': datetime.datetime(2024, 12, 18, 0, 0, tzinfo=TzInfo(UTC))})
A(trigger_kwargs={"key": "value", "moment": "2024-12-18T00:00:00"})
Traceback (most recent call last):
  File "/Users/amoghdesai/Documents/OSS/airflow/.venv/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3550, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-28-ec6c9f9d0fce>", line 1, in <module>
    A(trigger_kwargs={"key": "value", "moment": "2024-12-18T00:00:00"})
  File "/Users/amoghdesai/Documents/OSS/airflow/.venv/lib/python3.9/site-packages/pydantic/main.py", line 214, in __init__
    validated_self = self.__pydantic_validator__.validate_python(data, self_instance=self)
pydantic_core._pydantic_core.ValidationError: 1 validation error for A
trigger_kwargs
  Input should have timezone info [type=timezone_aware, input_value='2024-12-18T00:00:00', input_type=str]
    For further information visit https://errors.pydantic.dev/2.10/v/timezone_aware

@kaxil kaxil merged commit fc7d983 into apache:main Dec 18, 2024
@kaxil kaxil deleted the AIP72-fix-e2e-example-deferrable branch December 18, 2024 17:40
got686-yandex pushed a commit to got686-yandex/airflow that referenced this pull request Jan 30, 2025
…che#45030)

While trying to run an e2e example of a task that defers and then launches a trigger:
```
from airflow import DAG

from airflow.providers.standard.sensors.date_time import DateTimeSensorAsync
from airflow.utils import timezone
import datetime

with DAG(
    dag_id="demo_deferred",
    schedule=None,
    catchup=False,
) as dag:
    DateTimeSensorAsync(
            task_id="async",
            target_time=str(timezone.utcnow() + datetime.timedelta(seconds=3)),
            poke_interval=60,
            timeout=600,
        )
```

I realised that the "moment" inside "trigger_kwargs" is of `pendulum.DateTime` type, and since we have a "dict[str, ANY]`, defined here: https://github.com/apache/airflow/blob/main/airflow/api_fastapi/execution_api/datamodels/taskinstance.py#L82
 
on its datamodel (we cant really have a `UtcDateTime` for one specific field, like we do [here](https://github.com/apache/airflow/blob/main/airflow/api_fastapi/execution_api/datamodels/taskinstance.py#L57C15-L57C26)), it fails to match the type defined in the `Trigger` table which is datetime. 

So, I have added a "before" validator that checks for the type being string and if it is a string, translates it to a datetime object.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants