Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
f786736
changed macros to correct classes and modules
rustikk Jan 3, 2022
208faa6
Merge branch 'apache:main' into main
rustikk Feb 15, 2022
9329663
Merge branch 'apache:main' into main
rustikk Feb 15, 2022
7867991
Merge branch 'apache:main' into main
rustikk Feb 24, 2022
70d3f70
adding type check for default_args
rustikk Feb 24, 2022
f92ffbf
flake8 checks
rustikk Feb 24, 2022
5b5e356
Add Paxful to INTHEWILD.md (#21766)
ne1r0n Feb 24, 2022
de323a3
Add `2.2.4` to db migrations map (#21777)
jedcunningham Feb 24, 2022
bc34727
Fix max_active_runs=1 not scheduling runs when min_file_process_inter…
ephraimbuddy Feb 24, 2022
736394d
Correctly handle multiple '=' in LocalFileSystem secrets. (#21694)
kadai0308 Feb 24, 2022
bfb3991
Rewrite taskflow-mapping argument validation (#21759)
uranusjr Feb 24, 2022
adaec67
Fix bigquery_dts parameter docstring typo (#21786)
totogo Feb 24, 2022
32acd75
Add --platform as parameter of image building (#21695)
potiuk Feb 24, 2022
458a25f
Upgrade and record elasticsearch log_id_template changes (#21734)
ashb Feb 24, 2022
855f9e0
Rename operator mapping map() to apply() (#21754)
uranusjr Feb 24, 2022
73ca733
Restore image rendering in AWS Secrets Manager Backend doc (#21772)
josh-fell Feb 24, 2022
a6f2d7d
Use Pendulum's built-in UTC object (#21732)
malthe Feb 24, 2022
254a56e
Make sure emphasis in UPDATING in .md is consistent (#21804)
potiuk Feb 24, 2022
328aaf5
REST API: add rendered fields in task instance. (#21741)
gmcatsf Feb 24, 2022
5dc0cd5
Use DB where possible for quicker ``airflow dag`` subcommands (#21793)
ashb Feb 24, 2022
e08dd25
checking type for taskgroup default_args and task default_args
rustikk Feb 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions BREEZE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,13 @@ This is the current syntax for `./breeze <./breeze>`_:

3.7 3.8 3.9

--platform PLATFORM
Builds image for the platform specified.

One of:

linux/amd64

-a, --install-airflow-version INSTALL_AIRFLOW_VERSION
Uses different version of Airflow when building PROD image.

Expand Down Expand Up @@ -1472,6 +1479,13 @@ This is the current syntax for `./breeze <./breeze>`_:

3.7 3.8 3.9

--platform PLATFORM
Builds image for the platform specified.

One of:

linux/amd64

-a, --install-airflow-version INSTALL_AIRFLOW_VERSION
Uses different version of Airflow when building PROD image.

Expand Down Expand Up @@ -1532,6 +1546,13 @@ This is the current syntax for `./breeze <./breeze>`_:

3.7 3.8 3.9

--platform PLATFORM
Builds image for the platform specified.

One of:

linux/amd64

-I, --production-image
Use production image for entering the environment and builds (not for tests).

Expand Down Expand Up @@ -1599,6 +1620,13 @@ This is the current syntax for `./breeze <./breeze>`_:

3.7 3.8 3.9

--platform PLATFORM
Builds image for the platform specified.

One of:

linux/amd64

-v, --verbose
Show verbose information about executed docker, kind, kubectl, helm commands. Useful for
debugging - when you run breeze with --verbose flags you will be able to see the commands
Expand Down Expand Up @@ -1635,6 +1663,13 @@ This is the current syntax for `./breeze <./breeze>`_:

3.7 3.8 3.9

--platform PLATFORM
Builds image for the platform specified.

One of:

linux/amd64


####################################################################################################

Expand Down Expand Up @@ -1830,6 +1865,13 @@ This is the current syntax for `./breeze <./breeze>`_:

3.7 3.8 3.9

--platform PLATFORM
Builds image for the platform specified.

One of:

linux/amd64

-b, --backend BACKEND
Backend to use for tests - it determines which database is used.
One of:
Expand Down Expand Up @@ -1899,6 +1941,13 @@ This is the current syntax for `./breeze <./breeze>`_:

3.7 3.8 3.9

--platform PLATFORM
Builds image for the platform specified.

One of:

linux/amd64

-F, --force-build-images
Forces building of the local docker images. The images are rebuilt
automatically for the first time or when changes are detected in
Expand Down Expand Up @@ -2298,6 +2347,13 @@ This is the current syntax for `./breeze <./breeze>`_:

3.7 3.8 3.9

--platform PLATFORM
Builds image for the platform specified.

One of:

linux/amd64

****************************************************************************************************
Choose backend to run for Airflow

Expand Down
1 change: 1 addition & 0 deletions INTHEWILD.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ Currently, **officially** using Airflow:
1. [Paraná Banco](https://paranabanco.com.br/) [[@lopesdiego12](https://github.com/lopesdiego12/)]
1. [Parimatch Tech](https://parimatch.tech/) [[@KulykDmytro](https://github.com/KulykDmytro), [@Tonkonozhenko](https://github.com/Tonkonozhenko)]
1. [Pathstream](https://pathstream.com) [[@pJackDanger](https://github.com/JackDanger)]
1. [Paxful](https://paxful.com) [[@ne1r0n](https://github.com/ne1r0n)]
1. [PayFit](https://payfit.com) [[@pcorbel](https://github.com/pcorbel)]
1. [PAYMILL](https://www.paymill.com/) [[@paymill](https://github.com/paymill) & [@matthiashuschle](https://github.com/matthiashuschle)]
1. [PayPal](https://www.paypal.com/) [[@r39132](https://github.com/r39132) & [@jhsenjaliya](https://github.com/jhsenjaliya)]
Expand Down
14 changes: 14 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,20 @@ Previously, a task’s log is dynamically rendered from the `[core] log_filename

A new `log_template` table is introduced to solve this problem. This table is synchronised with the aforementioned config values every time Airflow starts, and a new field `log_template_id` is added to every DAG run to point to the format used by tasks (`NULL` indicates the first ever entry for compatibility).

### Default templates for log filenames and elasticsearch log_id changed

In order to support Dynamic Task Mapping the default templates for per-task instance logging has changed. If your config contains the old default values they will be upgraded-in-place.

If you are happy with the new config values you should *remove* the setting in `airflow.cfg` and let the default value be used. Old default values were:


- `[core] log_filename_template`: `{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log`
- `[elasticsearch] log_id_template`: `{dag_id}-{task_id}-{execution_date}-{try_number}`

`[core] log_filename_template` now uses "hive partition style" of `dag_id=<id>/run_id=<id>` by default, which may cause problems on some older FAT filesystems. If this affects you then you will have to change the log template.

If you have customized the templates you should ensure that they contain `{{ ti.map_index }}` if you want to use dynamically mapped tasks.

### `airflow.models.base.Operator` is removed

Previously, there was an empty class `airflow.models.base.Operator` for “type hinting”. This class was never really useful for anything (everything it did could be done better with `airflow.models.baseoperator.BaseOperator`), and has been removed. If you are relying on the class’s existence, use `BaseOperator` (for concrete operators), `airflow.models.abstractoperator.AbstractOperator` (the base class of both `BaseOperator` and the AIP-42 `MappedOperator`), or `airflow.models.operator.Operator` (a union type `BaseOperator | MappedOperator` for type annotation).
Expand Down
31 changes: 27 additions & 4 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from airflow.api_connexion.types import APIResponse
from airflow.models import SlaMiss
from airflow.models.dagrun import DagRun as DR
from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
from airflow.models.taskinstance import TaskInstance as TI, clear_task_instances
from airflow.security import permissions
from airflow.utils.session import NEW_SESSION, provide_session
Expand Down Expand Up @@ -77,6 +78,14 @@ def get_task_instance(
)
.add_entity(SlaMiss)
)
query = query.outerjoin(
RTIF,
and_(
RTIF.dag_id == TI.dag_id,
RTIF.execution_date == DR.execution_date,
RTIF.task_id == TI.task_id,
),
).add_entity(RTIF)
task_instance = query.one_or_none()
if task_instance is None:
raise NotFound("Task instance not found")
Expand Down Expand Up @@ -178,8 +187,15 @@ def get_task_instances(
SlaMiss.execution_date == DR.execution_date,
),
isouter=True,
)
ti_query = base_query.add_entity(SlaMiss)
).add_entity(SlaMiss)
ti_query = base_query.outerjoin(
RTIF,
and_(
RTIF.dag_id == TI.dag_id,
RTIF.task_id == TI.task_id,
RTIF.execution_date == DR.execution_date,
),
).add_entity(RTIF)
task_instances = ti_query.offset(offset).limit(limit).all()

return task_instance_collection_schema.dump(
Expand Down Expand Up @@ -237,8 +253,15 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse:
SlaMiss.execution_date == DR.execution_date,
),
isouter=True,
)
ti_query = base_query.add_entity(SlaMiss)
).add_entity(SlaMiss)
ti_query = base_query.outerjoin(
RTIF,
and_(
RTIF.dag_id == TI.dag_id,
RTIF.task_id == TI.task_id,
RTIF.execution_date == DR.execution_date,
),
).add_entity(RTIF)
task_instances = ti_query.all()

return task_instance_collection_schema.dump(
Expand Down
6 changes: 6 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2496,6 +2496,12 @@ components:
sla_miss:
$ref: '#/components/schemas/SLAMiss'
nullable: true
rendered_fields:
description: |
JSON object describing rendered fields.

*New in version 2.3.0*
type: object

TaskInstanceCollection:
type: object
Expand Down
15 changes: 15 additions & 0 deletions airflow/api_connexion/schemas/common_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import datetime
import inspect
import json
import typing

import marshmallow
Expand Down Expand Up @@ -165,3 +166,17 @@ def _get_class_name(self, obj):
if isinstance(obj, type):
return obj.__name__
return type(obj).__name__


class JsonObjectField(fields.Field):
"""JSON object field."""

def _serialize(self, value, attr, obj, **kwargs):
if not value:
return {}
return json.loads(value) if isinstance(value, str) else value

def _deserialize(self, value, attr, data, **kwargs):
if isinstance(value, str):
return json.loads(value)
return value
4 changes: 4 additions & 0 deletions airflow/api_connexion/schemas/task_instance_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field

from airflow.api_connexion.parameters import validate_istimezone
from airflow.api_connexion.schemas.common_schema import JsonObjectField
from airflow.api_connexion.schemas.enum_schemas import TaskInstanceStateField
from airflow.api_connexion.schemas.sla_miss_schema import SlaMissSchema
from airflow.models import SlaMiss, TaskInstance
Expand Down Expand Up @@ -58,6 +59,7 @@ class Meta:
pid = auto_field()
executor_config = auto_field()
sla_miss = fields.Nested(SlaMissSchema, dump_default=None)
rendered_fields = JsonObjectField()

def get_attribute(self, obj, attr, default):
if attr == "sla_miss":
Expand All @@ -66,6 +68,8 @@ def get_attribute(self, obj, attr, default):
# corresponding to the attr.
slamiss_instance = {"sla_miss": obj[1]}
return get_value(slamiss_instance, attr, default)
elif attr == "rendered_fields":
return get_value(obj[2], attr, None)
return get_value(obj[0], attr, default)


Expand Down
66 changes: 29 additions & 37 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,9 @@
from airflow.models.dag import DAG
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import cli as cli_utils
from airflow.utils.cli import (
get_dag,
get_dag_by_file_location,
process_subdir,
sigint_handler,
suppress_logs_and_warning,
)
from airflow.utils.cli import get_dag, process_subdir, sigint_handler, suppress_logs_and_warning
from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
from airflow.utils.session import create_session, provide_session
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.state import DagRunState


Expand Down Expand Up @@ -249,23 +243,25 @@ def _save_dot_to_file(dot: Dot, filename: str):


@cli_utils.action_cli
def dag_state(args):
@provide_session
def dag_state(args, session=NEW_SESSION):
"""
Returns the state (and conf if exists) of a DagRun at the command line.
>>> airflow dags state tutorial 2015-01-01T00:00:00.000000
running
>>> airflow dags state a_dag_with_conf_passed 2015-01-01T00:00:00.000000
failed, {"name": "bob", "age": "42"}
"""
if args.subdir:
dag = get_dag(args.subdir, args.dag_id)
else:
dag = get_dag_by_file_location(args.dag_id)
dr = DagRun.find(dag.dag_id, execution_date=args.execution_date)
out = dr[0].state if dr else None

dag = DagModel.get_dagmodel(args.dag_id, session=session)

if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
dr = session.query(DagRun).filter_by(dag_id=args.dag_id, execution_date=args.execution_date).one_or_none()
out = dr.state if dr else None
conf_out = ''
if out and dr[0].conf:
conf_out = ', ' + json.dumps(dr[0].conf)
if out and dr.conf:
conf_out = ', ' + json.dumps(dr.conf)
print(str(out) + conf_out)


Expand Down Expand Up @@ -351,32 +347,27 @@ def dag_report(args):

@cli_utils.action_cli
@suppress_logs_and_warning
def dag_list_jobs(args, dag=None):
@provide_session
def dag_list_jobs(args, dag=None, session=NEW_SESSION):
"""Lists latest n jobs"""
queries = []
if dag:
args.dag_id = dag.dag_id
if args.dag_id:
dagbag = DagBag()
dag = DagModel.get_dagmodel(args.dag_id, session=session)

if args.dag_id not in dagbag.dags:
error_message = f"Dag id {args.dag_id} not found"
raise AirflowException(error_message)
if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
queries.append(BaseJob.dag_id == args.dag_id)

if args.state:
queries.append(BaseJob.state == args.state)

fields = ['dag_id', 'state', 'job_type', 'start_date', 'end_date']
with create_session() as session:
all_jobs = (
session.query(BaseJob)
.filter(*queries)
.order_by(BaseJob.start_date.desc())
.limit(args.limit)
.all()
)
all_jobs = [{f: str(job.__getattribute__(f)) for f in fields} for job in all_jobs]
all_jobs = (
session.query(BaseJob).filter(*queries).order_by(BaseJob.start_date.desc()).limit(args.limit).all()
)
all_jobs = [{f: str(job.__getattribute__(f)) for f in fields} for job in all_jobs]

AirflowConsole().print_as(
data=all_jobs,
Expand All @@ -386,16 +377,16 @@ def dag_list_jobs(args, dag=None):

@cli_utils.action_cli
@suppress_logs_and_warning
def dag_list_dag_runs(args, dag=None):
@provide_session
def dag_list_dag_runs(args, dag=None, session=NEW_SESSION):
"""Lists dag runs for a given DAG"""
if dag:
args.dag_id = dag.dag_id
else:
dag = DagModel.get_dagmodel(args.dag_id, session=session)

dagbag = DagBag()

if args.dag_id is not None and args.dag_id not in dagbag.dags:
error_message = f"Dag id {args.dag_id} not found"
raise AirflowException(error_message)
if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")

state = args.state.lower() if args.state else None
dag_runs = DagRun.find(
Expand All @@ -404,6 +395,7 @@ def dag_list_dag_runs(args, dag=None):
no_backfills=args.no_backfill,
execution_start_date=args.start_date,
execution_end_date=args.end_date,
session=session,
)

dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
Expand Down
Loading