diff --git a/airflow/io/path.py b/airflow/io/path.py
index ed5ab664ebe59..bd7c320653aae 100644
--- a/airflow/io/path.py
+++ b/airflow/io/path.py
@@ -263,11 +263,11 @@ def read_block(self, offset: int, length: int, delimiter=None):
--------
>>> read_block(0, 13)
b'Alice, 100\\nBo'
- >>> read_block(0, 13, delimiter=b'\\n')
+ >>> read_block(0, 13, delimiter=b"\\n")
b'Alice, 100\\nBob, 200\\n'
Use ``length=None`` to read to the end of the file.
- >>> read_block(0, None, delimiter=b'\\n')
+ >>> read_block(0, None, delimiter=b"\\n")
b'Alice, 100\\nBob, 200\\nCharlie, 300'
See Also
diff --git a/airflow/macros/__init__.py b/airflow/macros/__init__.py
index 95e84d559a5d2..ee22a47dc631d 100644
--- a/airflow/macros/__init__.py
+++ b/airflow/macros/__init__.py
@@ -49,9 +49,9 @@ def ds_add(ds: str, days: int) -> str:
:param ds: anchor date in ``YYYY-MM-DD`` format to add to
:param days: number of days to add to the ds, you can use negative values
- >>> ds_add('2015-01-01', 5)
+ >>> ds_add("2015-01-01", 5)
'2015-01-06'
- >>> ds_add('2015-01-06', -5)
+ >>> ds_add("2015-01-06", -5)
'2015-01-01'
"""
if not days:
@@ -68,9 +68,9 @@ def ds_format(ds: str, input_format: str, output_format: str) -> str:
:param input_format: input string format. E.g. %Y-%m-%d
:param output_format: output string format E.g. %Y-%m-%d
- >>> ds_format('2015-01-01', "%Y-%m-%d", "%m-%d-%y")
+ >>> ds_format("2015-01-01", "%Y-%m-%d", "%m-%d-%y")
'01-01-15'
- >>> ds_format('1/5/2015', "%m/%d/%Y", "%Y-%m-%d")
+ >>> ds_format("1/5/2015", "%m/%d/%Y", "%Y-%m-%d")
'2015-01-05'
"""
return datetime.strptime(str(ds), input_format).strftime(output_format)
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 2ba7ec8ad1c1e..f7f1d6ccc6f7f 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -628,12 +628,7 @@ class derived from this one results in the creation of a task object,
**Example**: to run this task in a specific docker container through
the KubernetesExecutor ::
- MyOperator(...,
- executor_config={
- "KubernetesExecutor":
- {"image": "myCustomDockerImage"}
- }
- )
+ MyOperator(..., executor_config={"KubernetesExecutor": {"image": "myCustomDockerImage"}})
:param do_xcom_push: if True, an XCom is pushed containing the Operator's
result
@@ -1152,9 +1147,7 @@ def set_xcomargs_dependencies(self) -> None:
# This is equivalent to
with DAG(...):
generate_content = GenerateContentOperator(task_id="generate_content")
- send_email = EmailOperator(
- ..., html_content="{{ task_instance.xcom_pull('generate_content') }}"
- )
+ send_email = EmailOperator(..., html_content="{{ task_instance.xcom_pull('generate_content') }}")
generate_content >> send_email
"""
@@ -1866,12 +1859,7 @@ def chain_linear(*elements: DependencyMixin | Sequence[DependencyMixin]):
Then you can accomplish like so::
- chain_linear(
- op1,
- [op2, op3],
- [op4, op5, op6],
- op7
- )
+ chain_linear(op1, [op2, op3], [op4, op5, op6], op7)
:param elements: a list of operators / lists of operators
"""
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 654bc85cae8b8..9150adaf6ff4f 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -381,11 +381,12 @@ class DAG(LoggingMixin):
**Example**: to avoid Jinja from removing a trailing newline from template strings ::
- DAG(dag_id='my-dag',
+ DAG(
+ dag_id="my-dag",
jinja_environment_kwargs={
- 'keep_trailing_newline': True,
+ "keep_trailing_newline": True,
# some other jinja2 Environment options here
- }
+ },
)
**See**: `Jinja Environment documentation
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index f041dcf208949..ae6b1e35c1276 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -3319,19 +3319,23 @@ def get_relevant_upstream_map_indexes(
def this_task(v): # This is self.task.
return v * 2
+
@task_group
def tg1(inp):
val = upstream(inp) # This is the upstream task.
this_task(val) # When inp is 1, val here should resolve to 2.
return val
+
# This val is the same object returned by tg1.
val = tg1.expand(inp=[1, 2, 3])
+
@task_group
def tg2(inp):
another_task(inp, val) # val here should resolve to [2, 4, 6].
+
tg2.expand(inp=["a", "b"])
The surrounding mapped task groups of ``upstream`` and ``self.task`` are
diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py
index 7dc5577e6055f..60315617e40cd 100644
--- a/airflow/models/xcom_arg.py
+++ b/airflow/models/xcom_arg.py
@@ -59,8 +59,8 @@ class XComArg(ResolveMixin, DependencyMixin):
xcomarg >> op
xcomarg << op
- op >> xcomarg # By BaseOperator code
- op << xcomarg # By BaseOperator code
+ op >> xcomarg # By BaseOperator code
+ op << xcomarg # By BaseOperator code
**Example**: The moment you get a result from any operator (decorated or regular) you can ::
diff --git a/airflow/providers/amazon/aws/operators/s3.py b/airflow/providers/amazon/aws/operators/s3.py
index 37dbc6c52855d..9ce03736664c2 100644
--- a/airflow/providers/amazon/aws/operators/s3.py
+++ b/airflow/providers/amazon/aws/operators/s3.py
@@ -762,11 +762,11 @@ class S3ListOperator(BaseOperator):
``customers/2018/04/`` key in the ``data`` bucket. ::
s3_file = S3ListOperator(
- task_id='list_3s_files',
- bucket='data',
- prefix='customers/2018/04/',
- delimiter='/',
- aws_conn_id='aws_customers_conn'
+ task_id="list_3s_files",
+ bucket="data",
+ prefix="customers/2018/04/",
+ delimiter="/",
+ aws_conn_id="aws_customers_conn",
)
"""
@@ -843,11 +843,11 @@ class S3ListPrefixesOperator(BaseOperator):
from the S3 ``customers/2018/04/`` prefix in the ``data`` bucket. ::
s3_file = S3ListPrefixesOperator(
- task_id='list_s3_prefixes',
- bucket='data',
- prefix='customers/2018/04/',
- delimiter='/',
- aws_conn_id='aws_customers_conn'
+ task_id="list_s3_prefixes",
+ bucket="data",
+ prefix="customers/2018/04/",
+ delimiter="/",
+ aws_conn_id="aws_customers_conn",
)
"""
diff --git a/airflow/providers/amazon/aws/operators/sagemaker.py b/airflow/providers/amazon/aws/operators/sagemaker.py
index 66cc7103b7973..2eba21e73419a 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker.py
@@ -404,14 +404,14 @@ class SageMakerEndpointOperator(SageMakerBaseOperator):
If you need to create a SageMaker endpoint based on an existed
SageMaker model and an existed SageMaker endpoint config::
- config = endpoint_configuration;
+ config = endpoint_configuration
If you need to create all of SageMaker model, SageMaker endpoint-config and SageMaker endpoint::
config = {
- 'Model': model_configuration,
- 'EndpointConfig': endpoint_config_configuration,
- 'Endpoint': endpoint_configuration
+ "Model": model_configuration,
+ "EndpointConfig": endpoint_config_configuration,
+ "Endpoint": endpoint_configuration,
}
For details of the configuration parameter of model_configuration see
@@ -579,10 +579,7 @@ class SageMakerTransformOperator(SageMakerBaseOperator):
If you need to create both SageMaker model and SageMaker Transform job::
- config = {
- 'Model': model_config,
- 'Transform': transform_config
- }
+ config = {"Model": model_config, "Transform": transform_config}
For details of the configuration parameter of transform_config see
:py:meth:`SageMaker.Client.create_transform_job`
diff --git a/airflow/providers/apache/cassandra/hooks/cassandra.py b/airflow/providers/apache/cassandra/hooks/cassandra.py
index 66d3d13aa8c25..ac0e251c90fa8 100644
--- a/airflow/providers/apache/cassandra/hooks/cassandra.py
+++ b/airflow/providers/apache/cassandra/hooks/cassandra.py
@@ -48,11 +48,7 @@ class CassandraHook(BaseHook, LoggingMixin):
If SSL is enabled in Cassandra, pass in a dict in the extra field as kwargs for
``ssl.wrap_socket()``. For example::
- {
- 'ssl_options' : {
- 'ca_certs' : PATH_TO_CA_CERTS
- }
- }
+ {"ssl_options": {"ca_certs": PATH_TO_CA_CERTS}}
Default load balancing policy is RoundRobinPolicy. To specify a different
LB policy::
diff --git a/airflow/providers/apache/cassandra/sensors/record.py b/airflow/providers/apache/cassandra/sensors/record.py
index 9f0a76645609f..bcd417d49c7d3 100644
--- a/airflow/providers/apache/cassandra/sensors/record.py
+++ b/airflow/providers/apache/cassandra/sensors/record.py
@@ -38,10 +38,12 @@ class CassandraRecordSensor(BaseSensorOperator):
primary keys 'p1' and 'p2' to be populated in keyspace 'k' and table 't',
instantiate it as follows:
- >>> cassandra_sensor = CassandraRecordSensor(table="k.t",
- ... keys={"p1": "v1", "p2": "v2"},
- ... cassandra_conn_id="cassandra_default",
- ... task_id="cassandra_sensor")
+ >>> cassandra_sensor = CassandraRecordSensor(
+ ... table="k.t",
+ ... keys={"p1": "v1", "p2": "v2"},
+ ... cassandra_conn_id="cassandra_default",
+ ... task_id="cassandra_sensor",
+ ... )
:param table: Target Cassandra table.
Use dot notation to target a specific keyspace.
diff --git a/airflow/providers/apache/cassandra/sensors/table.py b/airflow/providers/apache/cassandra/sensors/table.py
index 62bfdaf72cb91..1ee99f687693f 100644
--- a/airflow/providers/apache/cassandra/sensors/table.py
+++ b/airflow/providers/apache/cassandra/sensors/table.py
@@ -38,9 +38,9 @@ class CassandraTableSensor(BaseSensorOperator):
For example, if you want to wait for a table called 't' to be created
in a keyspace 'k', instantiate it as follows:
- >>> cassandra_sensor = CassandraTableSensor(table="k.t",
- ... cassandra_conn_id="cassandra_default",
- ... task_id="cassandra_sensor")
+ >>> cassandra_sensor = CassandraTableSensor(
+ ... table="k.t", cassandra_conn_id="cassandra_default", task_id="cassandra_sensor"
+ ... )
:param table: Target Cassandra table.
Use dot notation to target a specific keyspace.
diff --git a/airflow/providers/apache/hive/hooks/hive.py b/airflow/providers/apache/hive/hooks/hive.py
index cf5ed97291025..00a3a7f14822a 100644
--- a/airflow/providers/apache/hive/hooks/hive.py
+++ b/airflow/providers/apache/hive/hooks/hive.py
@@ -582,12 +582,11 @@ def check_for_partition(self, schema: str, table: str, partition: str) -> bool:
:param schema: Name of hive schema (database) @table belongs to
:param table: Name of hive table @partition belongs to
- :param partition: Expression that matches the partitions to check for
- (eg `a = 'b' AND c = 'd'`)
+ :param partition: Expression that matches the partitions to check for (e.g. `a = 'b' AND c = 'd'`)
>>> hh = HiveMetastoreHook()
- >>> t = 'static_babynames_partitioned'
- >>> hh.check_for_partition('airflow', t, "ds='2015-01-01'")
+ >>> t = "static_babynames_partitioned"
+ >>> hh.check_for_partition("airflow", t, "ds='2015-01-01'")
True
"""
with self.metastore as client:
@@ -606,10 +605,10 @@ def check_for_named_partition(self, schema: str, table: str, partition_name: str
:param partition_name: Name of the partitions to check for (eg `a=b/c=d`)
>>> hh = HiveMetastoreHook()
- >>> t = 'static_babynames_partitioned'
- >>> hh.check_for_named_partition('airflow', t, "ds=2015-01-01")
+ >>> t = "static_babynames_partitioned"
+ >>> hh.check_for_named_partition("airflow", t, "ds=2015-01-01")
True
- >>> hh.check_for_named_partition('airflow', t, "ds=xxx")
+ >>> hh.check_for_named_partition("airflow", t, "ds=xxx")
False
"""
with self.metastore as client:
@@ -619,7 +618,7 @@ def get_table(self, table_name: str, db: str = "default") -> Any:
"""Get a metastore table object.
>>> hh = HiveMetastoreHook()
- >>> t = hh.get_table(db='airflow', table_name='static_babynames')
+ >>> t = hh.get_table(db="airflow", table_name="static_babynames")
>>> t.tableName
'static_babynames'
>>> [col.name for col in t.sd.cols]
@@ -649,8 +648,8 @@ def get_partitions(self, schema: str, table_name: str, partition_filter: str | N
For subpartitioned table, the number might easily exceed this.
>>> hh = HiveMetastoreHook()
- >>> t = 'static_babynames_partitioned'
- >>> parts = hh.get_partitions(schema='airflow', table_name=t)
+ >>> t = "static_babynames_partitioned"
+ >>> parts = hh.get_partitions(schema="airflow", table_name=t)
>>> len(parts)
1
>>> parts
@@ -765,9 +764,9 @@ def table_exists(self, table_name: str, db: str = "default") -> bool:
Check if table exists.
>>> hh = HiveMetastoreHook()
- >>> hh.table_exists(db='airflow', table_name='static_babynames')
+ >>> hh.table_exists(db="airflow", table_name="static_babynames")
True
- >>> hh.table_exists(db='airflow', table_name='does_not_exist')
+ >>> hh.table_exists(db="airflow", table_name="does_not_exist")
False
"""
try:
diff --git a/airflow/providers/apache/hive/macros/hive.py b/airflow/providers/apache/hive/macros/hive.py
index 24868f28addfc..1d5f6f6a5cead 100644
--- a/airflow/providers/apache/hive/macros/hive.py
+++ b/airflow/providers/apache/hive/macros/hive.py
@@ -39,7 +39,7 @@ def max_partition(
:param field: the field to get the max value from. If there's only
one partition field, this will be inferred
- >>> max_partition('airflow.static_babynames_partitioned')
+ >>> max_partition("airflow.static_babynames_partitioned")
'2015-01-01'
"""
from airflow.providers.apache.hive.hooks.hive import HiveMetastoreHook
@@ -94,8 +94,8 @@ def closest_ds_partition(
:param metastore_conn_id: which metastore connection to use
:returns: The closest date
- >>> tbl = 'airflow.static_babynames_partitioned'
- >>> closest_ds_partition(tbl, '2015-01-02')
+ >>> tbl = "airflow.static_babynames_partitioned"
+ >>> closest_ds_partition(tbl, "2015-01-02")
'2015-01-01'
"""
from airflow.providers.apache.hive.hooks.hive import HiveMetastoreHook
diff --git a/airflow/providers/databricks/operators/databricks.py b/airflow/providers/databricks/operators/databricks.py
index 5a54b7e9eea2f..c8e5180f9be8d 100644
--- a/airflow/providers/databricks/operators/databricks.py
+++ b/airflow/providers/databricks/operators/databricks.py
@@ -580,26 +580,20 @@ class DatabricksRunNowOperator(BaseOperator):
For example ::
json = {
- "job_id": 42,
- "notebook_params": {
- "dry-run": "true",
- "oldest-time-to-consider": "1457570074236"
- }
+ "job_id": 42,
+ "notebook_params": {"dry-run": "true", "oldest-time-to-consider": "1457570074236"},
}
- notebook_run = DatabricksRunNowOperator(task_id='notebook_run', json=json)
+ notebook_run = DatabricksRunNowOperator(task_id="notebook_run", json=json)
Another way to accomplish the same thing is to use the named parameters
of the ``DatabricksRunNowOperator`` directly. Note that there is exactly
one named parameter for each top level parameter in the ``run-now``
endpoint. In this method, your code would look like this: ::
- job_id=42
+ job_id = 42
- notebook_params = {
- "dry-run": "true",
- "oldest-time-to-consider": "1457570074236"
- }
+ notebook_params = {"dry-run": "true", "oldest-time-to-consider": "1457570074236"}
python_params = ["douglas adams", "42"]
@@ -612,7 +606,7 @@ class DatabricksRunNowOperator(BaseOperator):
notebook_params=notebook_params,
python_params=python_params,
jar_params=jar_params,
- spark_submit_params=spark_submit_params
+ spark_submit_params=spark_submit_params,
)
In the case where both the json parameter **AND** the named parameters
diff --git a/airflow/providers/fab/auth_manager/security_manager/override.py b/airflow/providers/fab/auth_manager/security_manager/override.py
index 58013cd89a6a9..df4d9e950e4ab 100644
--- a/airflow/providers/fab/auth_manager/security_manager/override.py
+++ b/airflow/providers/fab/auth_manager/security_manager/override.py
@@ -2090,9 +2090,9 @@ def oauth_user_info_getter(
@appbuilder.sm.oauth_user_info_getter
def my_oauth_user_info(sm, provider, response=None):
- if provider == 'github':
- me = sm.oauth_remotes[provider].get('user')
- return {'username': me.data.get('login')}
+ if provider == "github":
+ me = sm.oauth_remotes[provider].get("user")
+ return {"username": me.data.get("login")}
return {}
"""
diff --git a/airflow/providers/ftp/operators/ftp.py b/airflow/providers/ftp/operators/ftp.py
index 98ff11bc2151a..654c60bf7e519 100644
--- a/airflow/providers/ftp/operators/ftp.py
+++ b/airflow/providers/ftp/operators/ftp.py
@@ -66,7 +66,7 @@ class FTPFileTransmitOperator(BaseOperator):
remote_filepath="/tmp/tmp1/tmp2/file.txt",
operation="put",
create_intermediate_dirs=True,
- dag=dag
+ dag=dag,
)
"""
diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py
index 28acc8f9494ed..c9fdee102e186 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -884,13 +884,13 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
**Example**::
get_data = BigQueryGetDataOperator(
- task_id='get_data_from_bq',
- dataset_id='test_dataset',
- table_id='Transaction_partitions',
- project_id='internal-gcp-project',
+ task_id="get_data_from_bq",
+ dataset_id="test_dataset",
+ table_id="Transaction_partitions",
+ project_id="internal-gcp-project",
max_results=100,
- selected_fields='DATE',
- gcp_conn_id='airflow-conn-id'
+ selected_fields="DATE",
+ gcp_conn_id="airflow-conn-id",
)
:param dataset_id: The dataset ID of the requested table. (templated)
@@ -1331,8 +1331,10 @@ class BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
**Example**::
- schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
- {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}]
+ schema_fields = [
+ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
+ {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
+ ]
:param gcs_schema_object: Full path to the JSON file containing
schema (templated). For
@@ -1351,41 +1353,35 @@ class BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
**Example (with schema JSON in GCS)**::
CreateTable = BigQueryCreateEmptyTableOperator(
- task_id='BigQueryCreateEmptyTableOperator_task',
- dataset_id='ODS',
- table_id='Employees',
- project_id='internal-gcp-project',
- gcs_schema_object='gs://schema-bucket/employee_schema.json',
- gcp_conn_id='airflow-conn-id',
- google_cloud_storage_conn_id='airflow-conn-id'
+ task_id="BigQueryCreateEmptyTableOperator_task",
+ dataset_id="ODS",
+ table_id="Employees",
+ project_id="internal-gcp-project",
+ gcs_schema_object="gs://schema-bucket/employee_schema.json",
+ gcp_conn_id="airflow-conn-id",
+ google_cloud_storage_conn_id="airflow-conn-id",
)
**Corresponding Schema file** (``employee_schema.json``)::
[
- {
- "mode": "NULLABLE",
- "name": "emp_name",
- "type": "STRING"
- },
- {
- "mode": "REQUIRED",
- "name": "salary",
- "type": "INTEGER"
- }
+ {"mode": "NULLABLE", "name": "emp_name", "type": "STRING"},
+ {"mode": "REQUIRED", "name": "salary", "type": "INTEGER"},
]
**Example (with schema in the DAG)**::
CreateTable = BigQueryCreateEmptyTableOperator(
- task_id='BigQueryCreateEmptyTableOperator_task',
- dataset_id='ODS',
- table_id='Employees',
- project_id='internal-gcp-project',
- schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
- {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}],
- gcp_conn_id='airflow-conn-id-account',
- google_cloud_storage_conn_id='airflow-conn-id'
+ task_id="BigQueryCreateEmptyTableOperator_task",
+ dataset_id="ODS",
+ table_id="Employees",
+ project_id="internal-gcp-project",
+ schema_fields=[
+ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
+ {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
+ ],
+ gcp_conn_id="airflow-conn-id-account",
+ google_cloud_storage_conn_id="airflow-conn-id",
)
:param view: [Optional] A dictionary containing definition for the view.
@@ -1582,8 +1578,10 @@ class BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
**Example**::
- schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
- {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}]
+ schema_fields = [
+ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
+ {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
+ ]
Should not be set when source_format is 'DATASTORE_BACKUP'.
:param table_resource: Table resource as described in documentation:
@@ -1878,12 +1876,13 @@ class BigQueryDeleteDatasetOperator(GoogleCloudBaseOperator):
**Example**::
delete_temp_data = BigQueryDeleteDatasetOperator(
- dataset_id='temp-dataset',
- project_id='temp-project',
- delete_contents=True, # Force the deletion of the dataset as well as its tables (if any).
- gcp_conn_id='_my_gcp_conn_',
- task_id='Deletetemp',
- dag=dag)
+ dataset_id="temp-dataset",
+ project_id="temp-project",
+ delete_contents=True, # Force the deletion of the dataset as well as its tables (if any).
+ gcp_conn_id="_my_gcp_conn_",
+ task_id="Deletetemp",
+ dag=dag,
+ )
"""
template_fields: Sequence[str] = (
diff --git a/airflow/providers/google/cloud/operators/gcs.py b/airflow/providers/google/cloud/operators/gcs.py
index 449a1fb0d65db..1e5e8f54a3846 100644
--- a/airflow/providers/google/cloud/operators/gcs.py
+++ b/airflow/providers/google/cloud/operators/gcs.py
@@ -188,11 +188,11 @@ class GCSListObjectsOperator(GoogleCloudBaseOperator):
folder in ``data`` bucket. ::
GCS_Files = GCSListOperator(
- task_id='GCS_Files',
- bucket='data',
- prefix='sales/sales-2017/',
- match_glob='**/*/.avro',
- gcp_conn_id=google_cloud_conn_id
+ task_id="GCS_Files",
+ bucket="data",
+ prefix="sales/sales-2017/",
+ match_glob="**/*/.avro",
+ gcp_conn_id=google_cloud_conn_id,
)
"""
diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py b/airflow/providers/google/cloud/operators/kubernetes_engine.py
index c4eeadd07b2c5..233a9d62a049a 100644
--- a/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -192,15 +192,14 @@ class GKECreateClusterOperator(GoogleCloudBaseOperator):
The **minimum** required to define a cluster to create is:
``dict()`` ::
- cluster_def = {'name': 'my-cluster-name',
- 'initial_node_count': 1}
+ cluster_def = {"name": "my-cluster-name", "initial_node_count": 1}
or
``Cluster`` proto ::
from google.cloud.container_v1.types import Cluster
- cluster_def = Cluster(name='my-cluster-name', initial_node_count=1)
+ cluster_def = Cluster(name="my-cluster-name", initial_node_count=1)
**Operator Creation**: ::
diff --git a/airflow/providers/google/cloud/operators/pubsub.py b/airflow/providers/google/cloud/operators/pubsub.py
index 3751e9a371f73..0858c427b6408 100644
--- a/airflow/providers/google/cloud/operators/pubsub.py
+++ b/airflow/providers/google/cloud/operators/pubsub.py
@@ -58,24 +58,22 @@ class PubSubCreateTopicOperator(GoogleCloudBaseOperator):
By default, if the topic already exists, this operator will
not cause the DAG to fail. ::
- with DAG('successful DAG') as dag:
- (
- PubSubCreateTopicOperator(project_id='my-project', topic='my_new_topic')
- >> PubSubCreateTopicOperator(project_id='my-project', topic='my_new_topic')
- )
+ with DAG("successful DAG") as dag:
+ create_topic = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic")
+ create_topic_again = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic")
+
+ create_topic >> create_topic_again
The operator can be configured to fail if the topic already exists. ::
- with DAG('failing DAG') as dag:
- (
- PubSubCreateTopicOperator(project_id='my-project', topic='my_new_topic')
- >> PubSubCreateTopicOperator(
- project_id='my-project',
- topic='my_new_topic',
- fail_if_exists=True,
- )
+ with DAG("failing DAG") as dag:
+ create_topic = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic")
+ create_topic_again = PubSubCreateTopicOperator(
+ project_id="my-project", topic="my_new_topic", fail_if_exists=True
)
+ create_topic >> create_topic_again
+
Both ``project_id`` and ``topic`` are templated so you can use Jinja templating in their values.
:param project_id: Optional, the Google Cloud project ID where the topic will be created.
@@ -197,43 +195,35 @@ class PubSubCreateSubscriptionOperator(GoogleCloudBaseOperator):
By default, if the subscription already exists, this operator will
not cause the DAG to fail. However, the topic must exist in the project. ::
- with DAG('successful DAG') as dag:
- (
- PubSubCreateSubscriptionOperator(
- project_id='my-project',
- topic='my-topic',
- subscription='my-subscription'
- )
- >> PubSubCreateSubscriptionOperator(
- project_id='my-project',
- topic='my-topic',
- subscription='my-subscription',
- )
+ with DAG("successful DAG") as dag:
+ create_subscription = PubSubCreateSubscriptionOperator(
+ project_id="my-project", topic="my-topic", subscription="my-subscription"
+ )
+ create_subscription_again = PubSubCreateSubscriptionOperator(
+ project_id="my-project", topic="my-topic", subscription="my-subscription"
)
+ create_subscription >> create_subscription_again
+
+
The operator can be configured to fail if the subscription already exists.
::
- with DAG('failing DAG') as dag:
- (
- PubSubCreateSubscriptionOperator(
- project_id='my-project',
- topic='my-topic',
- subscription='my-subscription',
- )
- >> PubSubCreateSubscriptionOperator(
- project_id='my-project',
- topic='my-topic',
- subscription='my-subscription',
- fail_if_exists=True,
- )
+ with DAG("failing DAG") as dag:
+ create_subscription = PubSubCreateSubscriptionOperator(
+ project_id="my-project", topic="my-topic", subscription="my-subscription"
)
+ create_subscription_again = PubSubCreateSubscriptionOperator(
+ project_id="my-project", topic="my-topic", subscription="my-subscription", fail_if_exists=True
+ )
+
+ create_subscription >> create_subscription_again
Finally, subscription is not required. If not passed, the operator will
generated a universally unique identifier for the subscription's name. ::
- with DAG('DAG') as dag:
- PubSubCreateSubscriptionOperator(project_id='my-project', topic='my-topic')
+ with DAG("DAG") as dag:
+ PubSubCreateSubscriptionOperator(project_id="my-project", topic="my-topic")
``project_id``, ``topic``, ``subscription``, ``subscription_project_id`` and
``impersonation_chain`` are templated so you can use Jinja templating in their values.
@@ -410,14 +400,16 @@ class PubSubDeleteTopicOperator(GoogleCloudBaseOperator):
By default, if the topic does not exist, this operator will
not cause the DAG to fail. ::
- with DAG('successful DAG') as dag:
- PubSubDeleteTopicOperator(project_id='my-project', topic='non_existing_topic')
+ with DAG("successful DAG") as dag:
+ PubSubDeleteTopicOperator(project_id="my-project", topic="non_existing_topic")
The operator can be configured to fail if the topic does not exist. ::
- with DAG('failing DAG') as dag:
+ with DAG("failing DAG") as dag:
PubSubDeleteTopicOperator(
- project_id='my-project', topic='non_existing_topic', fail_if_not_exists=True,
+ project_id="my-project",
+ topic="non_existing_topic",
+ fail_if_not_exists=True,
)
Both ``project_id`` and ``topic`` are templated so you can use Jinja templating in their values.
@@ -506,16 +498,18 @@ class PubSubDeleteSubscriptionOperator(GoogleCloudBaseOperator):
By default, if the subscription does not exist, this operator will
not cause the DAG to fail. ::
- with DAG('successful DAG') as dag:
- PubSubDeleteSubscriptionOperator(project_id='my-project', subscription='non-existing')
+ with DAG("successful DAG") as dag:
+ PubSubDeleteSubscriptionOperator(project_id="my-project", subscription="non-existing")
The operator can be configured to fail if the subscription already exists.
::
- with DAG('failing DAG') as dag:
+ with DAG("failing DAG") as dag:
PubSubDeleteSubscriptionOperator(
- project_id='my-project', subscription='non-existing', fail_if_not_exists=True,
+ project_id="my-project",
+ subscription="non-existing",
+ fail_if_not_exists=True,
)
``project_id``, and ``subscription`` are templated so you can use Jinja templating in their values.
@@ -605,15 +599,13 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
in a single Google Cloud project. If the topic does not exist, this
task will fail. ::
- m1 = {'data': b'Hello, World!',
- 'attributes': {'type': 'greeting'}
- }
- m2 = {'data': b'Knock, knock'}
- m3 = {'attributes': {'foo': ''}}
+ m1 = {"data": b"Hello, World!", "attributes": {"type": "greeting"}}
+ m2 = {"data": b"Knock, knock"}
+ m3 = {"attributes": {"foo": ""}}
t1 = PubSubPublishMessageOperator(
- project_id='my-project',
- topic='my_topic',
+ project_id="my-project",
+ topic="my_topic",
messages=[m1, m2, m3],
create_topic=True,
dag=dag,
diff --git a/airflow/providers/google/cloud/transfers/adls_to_gcs.py b/airflow/providers/google/cloud/transfers/adls_to_gcs.py
index 7782d127dee72..7abbd9a9c3142 100644
--- a/airflow/providers/google/cloud/transfers/adls_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/adls_to_gcs.py
@@ -58,12 +58,12 @@ class ADLSToGCSOperator(ADLSListOperator):
resulting gcs path will be ``gs://mybucket/hello/world.avro`` ::
copy_single_file = AdlsToGoogleCloudStorageOperator(
- task_id='copy_single_file',
- src_adls='hello/world.avro',
- dest_gcs='gs://mybucket',
+ task_id="copy_single_file",
+ src_adls="hello/world.avro",
+ dest_gcs="gs://mybucket",
replace=False,
- azure_data_lake_conn_id='azure_data_lake_default',
- gcp_conn_id='google_cloud_default'
+ azure_data_lake_conn_id="azure_data_lake_default",
+ gcp_conn_id="google_cloud_default",
)
The following Operator would copy all parquet files from ADLS
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
index c82faaff01040..856d3153e5bf9 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
@@ -101,13 +101,13 @@ class GCSToGCSOperator(BaseOperator):
``copied_sales/2017/january-backup.avro`` in the ``data_backup`` bucket ::
copy_single_file = GCSToGCSOperator(
- task_id='copy_single_file',
- source_bucket='data',
- source_objects=['sales/sales-2017/january.avro'],
- destination_bucket='data_backup',
- destination_object='copied_sales/2017/january-backup.avro',
+ task_id="copy_single_file",
+ source_bucket="data",
+ source_objects=["sales/sales-2017/january.avro"],
+ destination_bucket="data_backup",
+ destination_object="copied_sales/2017/january-backup.avro",
exact_match=True,
- gcp_conn_id=google_cloud_conn_id
+ gcp_conn_id=google_cloud_conn_id,
)
The following Operator would copy all the Avro files from ``sales/sales-2017``
@@ -141,12 +141,12 @@ class GCSToGCSOperator(BaseOperator):
process. ::
move_files = GCSToGCSOperator(
- task_id='move_files',
- source_bucket='data',
- source_object='sales/sales-2017/*.avro',
- destination_bucket='data_backup',
+ task_id="move_files",
+ source_bucket="data",
+ source_object="sales/sales-2017/*.avro",
+ destination_bucket="data_backup",
move_object=True,
- gcp_conn_id=google_cloud_conn_id
+ gcp_conn_id=google_cloud_conn_id,
)
The following Operator would move all the Avro files from ``sales/sales-2019``
@@ -154,13 +154,13 @@ class GCSToGCSOperator(BaseOperator):
``data_backup`` bucket, deleting the original files in the process. ::
move_files = GCSToGCSOperator(
- task_id='move_files',
- source_bucket='data',
- source_objects=['sales/sales-2019/*.avro', 'sales/sales-2020'],
- destination_bucket='data_backup',
- delimiter='.avro',
+ task_id="move_files",
+ source_bucket="data",
+ source_objects=["sales/sales-2019/*.avro", "sales/sales-2020"],
+ destination_bucket="data_backup",
+ delimiter=".avro",
move_object=True,
- gcp_conn_id=google_cloud_conn_id
+ gcp_conn_id=google_cloud_conn_id,
)
"""
@@ -329,12 +329,12 @@ def _copy_source_without_wildcard(self, hook, prefix):
the ``data_backup`` bucket (b/a.csv, b/b.csv, b/c.csv) ::
copy_files = GCSToGCSOperator(
- task_id='copy_files_without_wildcard',
- source_bucket='data',
- source_objects=['a/'],
- destination_bucket='data_backup',
- destination_object='b/',
- gcp_conn_id=google_cloud_conn_id
+ task_id="copy_files_without_wildcard",
+ source_bucket="data",
+ source_objects=["a/"],
+ destination_bucket="data_backup",
+ destination_object="b/",
+ gcp_conn_id=google_cloud_conn_id,
)
Example 2:
@@ -345,13 +345,13 @@ def _copy_source_without_wildcard(self, hook, prefix):
the ``data_backup`` bucket (b/a.avro, b/b.avro, b/c.avro) ::
copy_files = GCSToGCSOperator(
- task_id='copy_files_without_wildcard',
- source_bucket='data',
- source_objects=['a/'],
- destination_bucket='data_backup',
- destination_object='b/',
- delimiter='.avro',
- gcp_conn_id=google_cloud_conn_id
+ task_id="copy_files_without_wildcard",
+ source_bucket="data",
+ source_objects=["a/"],
+ destination_bucket="data_backup",
+ destination_object="b/",
+ delimiter=".avro",
+ gcp_conn_id=google_cloud_conn_id,
)
Example 3:
@@ -362,12 +362,12 @@ def _copy_source_without_wildcard(self, hook, prefix):
the ``data_backup`` bucket (b/file_1.txt, b/file_2.csv, b/file_3.avro) ::
copy_files = GCSToGCSOperator(
- task_id='copy_files_without_wildcard',
- source_bucket='data',
- source_objects=['a/file_1.txt', 'a/file_2.csv', 'a/file_3.avro'],
- destination_bucket='data_backup',
- destination_object='b/',
- gcp_conn_id=google_cloud_conn_id
+ task_id="copy_files_without_wildcard",
+ source_bucket="data",
+ source_objects=["a/file_1.txt", "a/file_2.csv", "a/file_3.avro"],
+ destination_bucket="data_backup",
+ destination_object="b/",
+ gcp_conn_id=google_cloud_conn_id,
)
Example 4:
@@ -378,12 +378,12 @@ def _copy_source_without_wildcard(self, hook, prefix):
(b/foo.txt, b/foo.txt.abc, b/foo.txt/subfolder/file.txt) ::
copy_files = GCSToGCSOperator(
- task_id='copy_files_without_wildcard',
- source_bucket='data',
- source_object='a/foo.txt',
- destination_bucket='data_backup',
- destination_object='b/',
- gcp_conn_id=google_cloud_conn_id
+ task_id="copy_files_without_wildcard",
+ source_bucket="data",
+ source_object="a/foo.txt",
+ destination_bucket="data_backup",
+ destination_object="b/",
+ gcp_conn_id=google_cloud_conn_id,
)
"""
objects = hook.list(
diff --git a/airflow/providers/google/cloud/transfers/mssql_to_gcs.py b/airflow/providers/google/cloud/transfers/mssql_to_gcs.py
index d8f78ffd239bf..81e9af6d314ca 100644
--- a/airflow/providers/google/cloud/transfers/mssql_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/mssql_to_gcs.py
@@ -42,15 +42,15 @@ class MSSQLToGCSOperator(BaseSQLToGCSOperator):
'mssql-export' GCS bucket (along with a schema file). ::
export_customers = MSSQLToGCSOperator(
- task_id='export_customers',
- sql='SELECT * FROM dbo.Customers;',
- bit_fields=['some_bit_field', 'another_bit_field'],
- bucket='mssql-export',
- filename='data/customers/export.json',
- schema_filename='schemas/export.json',
- mssql_conn_id='mssql_default',
- gcp_conn_id='google_cloud_default',
- dag=dag
+ task_id="export_customers",
+ sql="SELECT * FROM dbo.Customers;",
+ bit_fields=["some_bit_field", "another_bit_field"],
+ bucket="mssql-export",
+ filename="data/customers/export.json",
+ schema_filename="schemas/export.json",
+ mssql_conn_id="mssql_default",
+ gcp_conn_id="google_cloud_default",
+ dag=dag,
)
.. seealso::
diff --git a/airflow/providers/microsoft/azure/operators/adls.py b/airflow/providers/microsoft/azure/operators/adls.py
index dd642d4f707d6..345336b2c4cb2 100644
--- a/airflow/providers/microsoft/azure/operators/adls.py
+++ b/airflow/providers/microsoft/azure/operators/adls.py
@@ -78,9 +78,9 @@ class ADLSListOperator(BaseOperator):
folder in the specified ADLS account ::
adls_files = ADLSListOperator(
- task_id='adls_files',
- path='folder/output/*.parquet',
- azure_data_lake_conn_id='azure_data_lake_default'
+ task_id="adls_files",
+ path="folder/output/*.parquet",
+ azure_data_lake_conn_id="azure_data_lake_default",
)
"""
diff --git a/airflow/providers/microsoft/azure/operators/container_instances.py b/airflow/providers/microsoft/azure/operators/container_instances.py
index dde105e022f4a..f00c933487419 100644
--- a/airflow/providers/microsoft/azure/operators/container_instances.py
+++ b/airflow/providers/microsoft/azure/operators/container_instances.py
@@ -93,29 +93,29 @@ class AzureContainerInstancesOperator(BaseOperator):
**Example**::
- AzureContainerInstancesOperator(
- ci_conn_id = "azure_service_principal",
- registry_conn_id = "azure_registry_user",
- resource_group = "my-resource-group",
- name = "my-container-name-{{ ds }}",
- image = "myprivateregistry.azurecr.io/my_container:latest",
- region = "westeurope",
- environment_variables = {"MODEL_PATH": "my_value",
- "POSTGRES_LOGIN": "{{ macros.connection('postgres_default').login }}",
- "POSTGRES_PASSWORD": "{{ macros.connection('postgres_default').password }}",
- "JOB_GUID": "{{ ti.xcom_pull(task_ids='task1', key='guid') }}" },
- secured_variables = ['POSTGRES_PASSWORD'],
- volumes = [("azure_container_instance_conn_id",
- "my_storage_container",
- "my_fileshare",
- "/input-data",
- True),],
- memory_in_gb=14.0,
- cpu=4.0,
- gpu=GpuResource(count=1, sku='K80'),
- command=["/bin/echo", "world"],
- task_id="start_container"
- )
+ AzureContainerInstancesOperator(
+ ci_conn_id="azure_service_principal",
+ registry_conn_id="azure_registry_user",
+ resource_group="my-resource-group",
+ name="my-container-name-{{ ds }}",
+ image="myprivateregistry.azurecr.io/my_container:latest",
+ region="westeurope",
+ environment_variables={
+ "MODEL_PATH": "my_value",
+ "POSTGRES_LOGIN": "{{ macros.connection('postgres_default').login }}",
+ "POSTGRES_PASSWORD": "{{ macros.connection('postgres_default').password }}",
+ "JOB_GUID": "{{ ti.xcom_pull(task_ids='task1', key='guid') }}",
+ },
+ secured_variables=["POSTGRES_PASSWORD"],
+ volumes=[
+ ("azure_container_instance_conn_id", "my_storage_container", "my_fileshare", "/input-data", True),
+ ],
+ memory_in_gb=14.0,
+ cpu=4.0,
+ gpu=GpuResource(count=1, sku="K80"),
+ command=["/bin/echo", "world"],
+ task_id="start_container",
+ )
"""
template_fields: Sequence[str] = ("name", "image", "command", "environment_variables", "volumes")
diff --git a/airflow/providers/sftp/operators/sftp.py b/airflow/providers/sftp/operators/sftp.py
index c03ff5c5f9a45..466c9fe729921 100644
--- a/airflow/providers/sftp/operators/sftp.py
+++ b/airflow/providers/sftp/operators/sftp.py
@@ -74,7 +74,7 @@ class SFTPOperator(BaseOperator):
remote_filepath="/tmp/tmp1/tmp2/file.txt",
operation="put",
create_intermediate_dirs=True,
- dag=dag
+ dag=dag,
)
"""
diff --git a/airflow/sensors/weekday.py b/airflow/sensors/weekday.py
index ba88ee14d672d..f48f0bef1e2dc 100644
--- a/airflow/sensors/weekday.py
+++ b/airflow/sensors/weekday.py
@@ -39,18 +39,14 @@ class DayOfWeekSensor(BaseSensorOperator):
**Example** (with single day): ::
weekend_check = DayOfWeekSensor(
- task_id='weekend_check',
- week_day='Saturday',
- use_task_logical_date=True,
- dag=dag)
+ task_id="weekend_check", week_day="Saturday", use_task_logical_date=True, dag=dag
+ )
**Example** (with multiple day using set): ::
weekend_check = DayOfWeekSensor(
- task_id='weekend_check',
- week_day={'Saturday', 'Sunday'},
- use_task_logical_date=True,
- dag=dag)
+ task_id="weekend_check", week_day={"Saturday", "Sunday"}, use_task_logical_date=True, dag=dag
+ )
**Example** (with :class:`~airflow.utils.weekday.WeekDay` enum): ::
@@ -58,10 +54,11 @@ class DayOfWeekSensor(BaseSensorOperator):
from airflow.utils.weekday import WeekDay
weekend_check = DayOfWeekSensor(
- task_id='weekend_check',
+ task_id="weekend_check",
week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
use_task_logical_date=True,
- dag=dag)
+ dag=dag,
+ )
:param week_day: Day of the week to check (full name). Optionally, a set
of days can also be provided using a set.
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
index 659c3bdb0f192..55dccc97f5450 100644
--- a/airflow/utils/email.py
+++ b/airflow/utils/email.py
@@ -125,7 +125,7 @@ def send_email_smtp(
:param custom_headers: Dictionary of custom headers to include in the email.
:param kwargs: Additional keyword arguments.
- >>> send_email('test@example.com', 'foo', 'Foo bar', ['/dev/null'], dryrun=True)
+ >>> send_email("test@example.com", "foo", "Foo bar", ["/dev/null"], dryrun=True)
"""
smtp_mail_from = conf.get("smtp", "SMTP_MAIL_FROM")
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index acbe25bd92d21..3ad87bd104035 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -155,7 +155,7 @@ def as_flattened_list(iterable: Iterable[Iterable[T]]) -> list[T]:
"""
Return an iterable with one level flattened.
- >>> as_flattened_list((('blue', 'red'), ('green', 'yellow', 'pink')))
+ >>> as_flattened_list((("blue", "red"), ("green", "yellow", "pink")))
['blue', 'red', 'green', 'yellow', 'pink']
"""
return [e for i in iterable for e in i]
diff --git a/airflow/utils/types.py b/airflow/utils/types.py
index 32a0bda6bbb9e..cf3de511b0d2d 100644
--- a/airflow/utils/types.py
+++ b/airflow/utils/types.py
@@ -35,6 +35,7 @@ def is_arg_passed(arg: Union[ArgNotSet, None] = NOTSET) -> bool:
return False
return True
+
is_arg_passed() # False.
is_arg_passed(None) # True.
"""
diff --git a/airflow/www/extensions/init_appbuilder.py b/airflow/www/extensions/init_appbuilder.py
index ddb44200f93b2..bda10d8ddc701 100644
--- a/airflow/www/extensions/init_appbuilder.py
+++ b/airflow/www/extensions/init_appbuilder.py
@@ -411,32 +411,27 @@ def add_view(
# or not instantiated
appbuilder.add_view(MyModelView, "My View")
# Register a view, a submenu "Other View" from "Other" with a phone icon.
- appbuilder.add_view(
- MyOtherModelView,
- "Other View",
- icon='fa-phone',
- category="Others"
- )
+ appbuilder.add_view(MyOtherModelView, "Other View", icon="fa-phone", category="Others")
# Register a view, with category icon and translation.
appbuilder.add_view(
YetOtherModelView,
"Other View",
- icon='fa-phone',
- label=_('Other View'),
+ icon="fa-phone",
+ label=_("Other View"),
category="Others",
- category_icon='fa-envelop',
- category_label=_('Other View')
+ category_icon="fa-envelop",
+ category_label=_("Other View"),
)
# Register a view whose menu item will be conditionally displayed
appbuilder.add_view(
YourFeatureView,
"Your Feature",
- icon='fa-feature',
- label=_('Your Feature'),
+ icon="fa-feature",
+ label=_("Your Feature"),
menu_cond=lambda: is_feature_enabled("your-feature"),
)
# Add a link
- appbuilder.add_link("google", href="www.google.com", icon = "fa-google-plus")
+ appbuilder.add_link("google", href="www.google.com", icon="fa-google-plus")
"""
baseview = self._check_and_init(baseview)
log.info(LOGMSG_INF_FAB_ADD_VIEW, baseview.__class__.__name__, name)
diff --git a/dev/mypy/plugin/outputs.py b/dev/mypy/plugin/outputs.py
index 308ba4aed0ae9..fe1ccd5e7cf24 100644
--- a/dev/mypy/plugin/outputs.py
+++ b/dev/mypy/plugin/outputs.py
@@ -41,6 +41,7 @@ class OperatorOutputPlugin(Plugin):
def f(a: str) -> int:
return len(a)
+
f(op.output) # "op" is an operator instance.
f(g()) # "g" is a taskflow task.
diff --git a/pyproject.toml b/pyproject.toml
index 6187f8ca6f1e4..7a305243a1096 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -75,6 +75,9 @@ extend-ignore = [
namespace-packages = ["airflow/providers"]
+[tool.ruff.format]
+docstring-code-format = true
+
[tool.pytest.ini_options]
# * Disable `flaky` plugin for pytest. This plugin conflicts with `rerunfailures` because provide same marker.
# * Disable `nose` builtin plugin for pytest. This feature deprecated in 7.2 and will be removed in pytest>=8
diff --git a/tests/conftest.py b/tests/conftest.py
index b48c51a0cbb8f..646ca322286c3 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -616,8 +616,8 @@ def dag_maker(request):
the same argument as DAG::
with dag_maker(dag_id="mydag") as dag:
- task1 = EmptyOperator(task_id='mytask')
- task2 = EmptyOperator(task_id='mytask2')
+ task1 = EmptyOperator(task_id="mytask")
+ task2 = EmptyOperator(task_id="mytask2")
If the DagModel you want to use needs different parameters than the one
automatically created by the dag_maker, you have to update the DagModel as below::
@@ -854,7 +854,7 @@ def create_dummy_dag(dag_maker):
is not here, please use `default_args` so that the DAG will pass it to the
Task::
- dag, task = create_dummy_dag(default_args={'start_date':timezone.datetime(2016, 1, 1)})
+ dag, task = create_dummy_dag(default_args={"start_date": timezone.datetime(2016, 1, 1)})
You cannot be able to alter the created DagRun or DagModel, use `dag_maker` fixture instead.
"""
diff --git a/tests/test_utils/providers.py b/tests/test_utils/providers.py
index 99f64a6849548..02ca9bed41989 100644
--- a/tests/test_utils/providers.py
+++ b/tests/test_utils/providers.py
@@ -36,7 +36,7 @@ def get_provider_version(provider_name):
Returns provider version given provider package name.
Example::
- if provider_version('apache-airflow-providers-cncf-kubernetes') >= (6, 0):
+ if provider_version("apache-airflow-providers-cncf-kubernetes") >= (6, 0):
raise Exception(
"You must now remove `get_kube_client` from PodManager "
"and make kube_client a required argument."
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index 55568d4d8f1a6..8cd257d89e600 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -797,6 +797,7 @@ def _get_appbuilder_pk_string(model_view_cls, instance) -> str:
Example usage::
from airflow.www.views import TaskInstanceModelView
+
ti = session.Query(TaskInstance).filter(...).one()
pk = _get_appbuilder_pk_string(TaskInstanceModelView, ti)
client.post("...", data={"action": "...", "rowid": pk})