Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ assists users migrating to a new version.

## Airflow Master

### Additional arguments passed to BaseOperator cause an exception

Previous versions of Airflow took additional arguments and displayed a message on the console. When the
message was not noticed by users, it caused very difficult to detect errors.

In order to restore the previous behavior, you must set an ``True`` in the ``allow_illegal_arguments``
option of section ``[operators]`` in the ``airflow.cfg`` file. In the future it is possible to completely
delete this option.

### Simplification of the TriggerDagRunOperator

The TriggerDagRunOperator now takes a `conf` argument to which a dict can be provided as conf for the DagRun.
Expand Down
4 changes: 3 additions & 1 deletion airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0
allow_illegal_arguments = True
# Is allowed to pass additional/unused arguments (args, kwargs) to the BaseOperator operator.
# If set to False, an exception will be thrown, otherwise only the console message will be displayed.
allow_illegal_arguments = False

[hive]
# Default mapreduce queue for HiveOperator tasks
Expand Down
10 changes: 8 additions & 2 deletions airflow/contrib/operators/qubole_check_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class QuboleValueCheckOperator(ValueCheckOperator, QuboleOperator):
ui_fgcolor = '#000'

@apply_defaults
def __init__(self, pass_value, tolerance=None,
def __init__(self, pass_value, tolerance=None, results_parser_callable=None,
qubole_conn_id="qubole_default", *args, **kwargs):

sql = get_sql_from_qbol_cmd(kwargs)
Expand All @@ -168,6 +168,7 @@ def __init__(self, pass_value, tolerance=None,
sql=sql, pass_value=pass_value, tolerance=tolerance,
*args, **kwargs)

self.results_parser_callable = results_parser_callable
self.on_failure_callback = QuboleCheckHook.handle_failure_retry
self.on_retry_callback = QuboleCheckHook.handle_failure_retry

Expand All @@ -185,7 +186,12 @@ def get_hook(self, context=None):
if hasattr(self, 'hook') and (self.hook is not None):
return self.hook
else:
return QuboleCheckHook(context=context, *self.args, **self.kwargs)
return QuboleCheckHook(
context=context,
*self.args,
results_parser_callable=self.results_parser_callable,
**self.kwargs
)

def __getattribute__(self, name):
if name in QuboleValueCheckOperator.template_fields:
Expand Down
3 changes: 1 addition & 2 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,6 @@ def __init__(
):

if args or kwargs:
# TODO remove *args and **kwargs in Airflow 2.0
if not conf.getboolean('operators', 'ALLOW_ILLEGAL_ARGUMENTS'):
raise AirflowException(
"Invalid arguments were passed to {c} (task_id: {t}). Invalid "
Expand All @@ -333,7 +332,7 @@ def __init__(
warnings.warn(
'Invalid arguments were passed to {c} (task_id: {t}). '
'Support for passing such arguments will be dropped in '
'Airflow 2.0. Invalid arguments were:'
'future. Invalid arguments were:'
'\n*args: {a}\n**kwargs: {k}'.format(
c=self.__class__.__name__, a=args, k=kwargs, t=task_id),
category=PendingDeprecationWarning,
Expand Down
2 changes: 1 addition & 1 deletion tests/contrib/sensors/test_celery_queue_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,5 @@ def test_poke_fail(self, mock_inspect):
def test_poke_success_with_taskid(self, mock_inspect):
test_sensor = self.sensor(celery_queue='test_queue',
task_id='test-task',
af_task_id='target-task')
target_task_id='target-task')
self.assertTrue(test_sensor.poke(None))
8 changes: 0 additions & 8 deletions tests/contrib/sensors/test_emr_base_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ def failure_message_from_response(response):
operator = EmrBaseSensorSubclass(
task_id='test_task',
poke_interval=2,
job_flow_id='j-8989898989',
aws_conn_id='aws_test'
)

operator.execute(None)
Expand All @@ -76,8 +74,6 @@ def state_from_response(response):
operator = EmrBaseSensorSubclass(
task_id='test_task',
poke_interval=2,
job_flow_id='j-8989898989',
aws_conn_id='aws_test'
)

self.assertEqual(operator.poke(None), False)
Expand All @@ -101,8 +97,6 @@ def state_from_response(response):
operator = EmrBaseSensorSubclass(
task_id='test_task',
poke_interval=2,
job_flow_id='j-8989898989',
aws_conn_id='aws_test'
)

self.assertEqual(operator.poke(None), False)
Expand Down Expand Up @@ -137,8 +131,6 @@ def failure_message_from_response(response):
operator = EmrBaseSensorSubclass(
task_id='test_task',
poke_interval=2,
job_flow_id='j-8989898989',
aws_conn_id='aws_test'
)

with self.assertRaises(AirflowException) as context:
Expand Down
36 changes: 18 additions & 18 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,30 +443,30 @@ def test_illegal_args(self):
"""
msg = 'Invalid arguments were passed to BashOperator '
'(task_id: test_illegal_args).'
with self.assertWarns(PendingDeprecationWarning) as warning:
BashOperator(
task_id='test_illegal_args',
bash_command='echo success',
dag=self.dag,
illegal_argument_1234='hello?')
assert any(msg in str(w) for w in warning.warnings)
with conf_vars({('operators', 'allow_illegal_arguments'): 'True'}):
with self.assertWarns(PendingDeprecationWarning) as warning:
BashOperator(
task_id='test_illegal_args',
bash_command='echo success',
dag=self.dag,
illegal_argument_1234='hello?')
assert any(msg in str(w) for w in warning.warnings)

def test_illegal_args_forbidden(self):
"""
Tests that operators raise exceptions on illegal arguments when
illegal arguments are not allowed.
"""
with conf_vars({('operators', 'allow_illegal_arguments'): 'False'}):
with self.assertRaises(AirflowException) as ctx:
BashOperator(
task_id='test_illegal_args',
bash_command='echo success',
dag=self.dag,
illegal_argument_1234='hello?')
self.assertIn(
('Invalid arguments were passed to BashOperator '
'(task_id: test_illegal_args).'),
str(ctx.exception))
with self.assertRaises(AirflowException) as ctx:
BashOperator(
task_id='test_illegal_args',
bash_command='echo success',
dag=self.dag,
illegal_argument_1234='hello?')
self.assertIn(
('Invalid arguments were passed to BashOperator '
'(task_id: test_illegal_args).'),
str(ctx.exception))

def test_bash_operator(self):
t = BashOperator(
Expand Down
2 changes: 0 additions & 2 deletions tests/operators/test_slack_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def setUp(self):
]
self.test_attachments_in_json = json.dumps(self.test_attachments)
self.test_api_params = {'key': 'value'}
self.test_kwarg = 'test_kwarg'

self.expected_method = 'chat.postMessage'
self.expected_api_params = {
Expand All @@ -80,7 +79,6 @@ def __construct_operator(self, test_token, test_slack_conn_id, test_api_params=N
icon_url=self.test_icon_url,
attachments=self.test_attachments,
api_params=test_api_params,
kwarg=self.test_kwarg
)

@mock.patch('airflow.operators.slack_operator.SlackHook')
Expand Down