Refactor Opensearch log formatter to use timezone.from_timestamp and export it in Task SDK#66856
Refactor Opensearch log formatter to use timezone.from_timestamp and export it in Task SDK#6685623tae wants to merge 5 commits into
Conversation
SameerMesiah97
left a comment
There was a problem hiding this comment.
There's a contradiction between the #TODO and what you are doing in this PR. I have left a comment.
| def formatTime(self, record, datefmt=None): | ||
| """Return the creation time of the LogRecord in ISO 8601 date/time format in the local time zone.""" | ||
| # TODO: Use airflow.utils.timezone.from_timestamp(record.created, tz="local") | ||
| # as soon as min Airflow 2.9.0 |
There was a problem hiding this comment.
I am not sure why you are importing timezone from task sdk when the intention was to use timezone from airflow.utils.
There was a problem hiding this comment.
Thanks for the review.
I initially tried to follow the TODO and use airflow.utils.timezone, but the linter (Ruff) gave an error (TID251) and recommended airflow.sdk.timezone instead. I also saw that other files in this provider were already using the Task SDK, so I followed that pattern.
I’ve also moved the import inside the block to fix the ImportError in CI. Let me know if you’d still like me to revert it to utils.
There was a problem hiding this comment.
Thanks for the review. I initially tried to follow the TODO and use
airflow.utils.timezone, but the linter (Ruff) gave an error (TID251) and recommendedairflow.sdk.timezoneinstead. I also saw that other files in this provider were already using the Task SDK, so I followed that pattern. I’ve also moved the import inside the block to fix theImportErrorin CI. Let me know if you’d still like me to revert it toutils.
Okay. That makes sense. Thanks for clarifying that.
| coerce_datetime, | ||
| convert_to_utc, | ||
| datetime, | ||
| from_timestamp, |
There was a problem hiding this comment.
Is there no way to avoid expanding the API surface of the task SDK? It seems overkill for a simple provider PR. Maybe you can try using datetime?
There was a problem hiding this comment.
Thanks for the suggestion.
I tried to use existing SDK utilities like coerce_datetime to avoid the expansion, but I noticed there's already a linter rule in pyproject.toml (line 886) that explicitly recommends airflow.sdk.timezone.from_timestamp over pendulum.from_timestamp.
It seems the intention was for providers to use this function, but it was just missing from the SDK's public __all__. So I think adding it there is the right fix rather than working around it.
|
cc @Owen-CH-Leung can you help review this PR? @23tae can you confirm if the issue affect only Opensearch and not also Elasticsearch? |
|
@eladkal Yes, I confirmed that the exact same legacy |
We can have seperate PR for elastic that we will review in parallel to this one |
b3e5103 to
374f24c
Compare
…earch log formatter
…lity with older Airflow 3.x
374f24c to
4efb3fe
Compare
Description
This PR refactors the
OpensearchJSONFormatterto use the standard Airflowtimezone.from_timestamputility, addressing a TODO in the codebase.Key changes
from_timestampinairflow.sdk.timezoneto make it part of the public Task SDK API, ensuring consistency across the project.timezone.from_timestampon Airflow 3.3.0+ and maintains backward compatibility by falling back to the originalpendulum/datetimelogic on older Airflow 3.x releases (e.g., 3.0.6, 3.2.1).Verification Results
I have verified the changes using both
prek(ruff) andbreeze(standardized Docker environment).Was generative AI tooling used to co-author this PR?
Generated-by: Antigravity following the guidelines
Please review this refactor.
cc. @choo121600 @noeunkim
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.