diff --git a/UPDATING.md b/UPDATING.md index fd7d4fc9b8df1..6e990e07b0730 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -41,6 +41,15 @@ assists users migrating to a new version. ## Airflow Master +### Additional arguments passed to BaseOperator cause an exception + +Previous versions of Airflow took additional arguments and displayed a message on the console. When the +message was not noticed by users, it caused very difficult to detect errors. + +In order to restore the previous behavior, you must set an ``True`` in the ``allow_illegal_arguments`` +option of section ``[operators]`` in the ``airflow.cfg`` file. In the future it is possible to completely +delete this option. + ### Simplification of the TriggerDagRunOperator The TriggerDagRunOperator now takes a `conf` argument to which a dict can be provided as conf for the DagRun. diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index a7d7d97a4ee10..824ea18b9f515 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -250,7 +250,9 @@ default_cpus = 1 default_ram = 512 default_disk = 512 default_gpus = 0 -allow_illegal_arguments = True +# Is allowed to pass additional/unused arguments (args, kwargs) to the BaseOperator operator. +# If set to False, an exception will be thrown, otherwise only the console message will be displayed. +allow_illegal_arguments = False [hive] # Default mapreduce queue for HiveOperator tasks diff --git a/airflow/contrib/operators/qubole_check_operator.py b/airflow/contrib/operators/qubole_check_operator.py index 75c5fd164103d..73e8cc47f336c 100644 --- a/airflow/contrib/operators/qubole_check_operator.py +++ b/airflow/contrib/operators/qubole_check_operator.py @@ -159,7 +159,7 @@ class QuboleValueCheckOperator(ValueCheckOperator, QuboleOperator): ui_fgcolor = '#000' @apply_defaults - def __init__(self, pass_value, tolerance=None, + def __init__(self, pass_value, tolerance=None, results_parser_callable=None, qubole_conn_id="qubole_default", *args, **kwargs): sql = get_sql_from_qbol_cmd(kwargs) @@ -168,6 +168,7 @@ def __init__(self, pass_value, tolerance=None, sql=sql, pass_value=pass_value, tolerance=tolerance, *args, **kwargs) + self.results_parser_callable = results_parser_callable self.on_failure_callback = QuboleCheckHook.handle_failure_retry self.on_retry_callback = QuboleCheckHook.handle_failure_retry @@ -185,7 +186,12 @@ def get_hook(self, context=None): if hasattr(self, 'hook') and (self.hook is not None): return self.hook else: - return QuboleCheckHook(context=context, *self.args, **self.kwargs) + return QuboleCheckHook( + context=context, + *self.args, + results_parser_callable=self.results_parser_callable, + **self.kwargs + ) def __getattribute__(self, name): if name in QuboleValueCheckOperator.template_fields: diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 4bf12bec3694f..6cc8a2d0612ca 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -323,7 +323,6 @@ def __init__( ): if args or kwargs: - # TODO remove *args and **kwargs in Airflow 2.0 if not conf.getboolean('operators', 'ALLOW_ILLEGAL_ARGUMENTS'): raise AirflowException( "Invalid arguments were passed to {c} (task_id: {t}). Invalid " @@ -333,7 +332,7 @@ def __init__( warnings.warn( 'Invalid arguments were passed to {c} (task_id: {t}). ' 'Support for passing such arguments will be dropped in ' - 'Airflow 2.0. Invalid arguments were:' + 'future. Invalid arguments were:' '\n*args: {a}\n**kwargs: {k}'.format( c=self.__class__.__name__, a=args, k=kwargs, t=task_id), category=PendingDeprecationWarning, diff --git a/tests/contrib/sensors/test_celery_queue_sensor.py b/tests/contrib/sensors/test_celery_queue_sensor.py index 437507412cab2..ed949b450a1a5 100644 --- a/tests/contrib/sensors/test_celery_queue_sensor.py +++ b/tests/contrib/sensors/test_celery_queue_sensor.py @@ -75,5 +75,5 @@ def test_poke_fail(self, mock_inspect): def test_poke_success_with_taskid(self, mock_inspect): test_sensor = self.sensor(celery_queue='test_queue', task_id='test-task', - af_task_id='target-task') + target_task_id='target-task') self.assertTrue(test_sensor.poke(None)) diff --git a/tests/contrib/sensors/test_emr_base_sensor.py b/tests/contrib/sensors/test_emr_base_sensor.py index b4e0c1e24da88..b4b2d124a5293 100644 --- a/tests/contrib/sensors/test_emr_base_sensor.py +++ b/tests/contrib/sensors/test_emr_base_sensor.py @@ -51,8 +51,6 @@ def failure_message_from_response(response): operator = EmrBaseSensorSubclass( task_id='test_task', poke_interval=2, - job_flow_id='j-8989898989', - aws_conn_id='aws_test' ) operator.execute(None) @@ -76,8 +74,6 @@ def state_from_response(response): operator = EmrBaseSensorSubclass( task_id='test_task', poke_interval=2, - job_flow_id='j-8989898989', - aws_conn_id='aws_test' ) self.assertEqual(operator.poke(None), False) @@ -101,8 +97,6 @@ def state_from_response(response): operator = EmrBaseSensorSubclass( task_id='test_task', poke_interval=2, - job_flow_id='j-8989898989', - aws_conn_id='aws_test' ) self.assertEqual(operator.poke(None), False) @@ -137,8 +131,6 @@ def failure_message_from_response(response): operator = EmrBaseSensorSubclass( task_id='test_task', poke_interval=2, - job_flow_id='j-8989898989', - aws_conn_id='aws_test' ) with self.assertRaises(AirflowException) as context: diff --git a/tests/core.py b/tests/core.py index 02bbb5fb00847..91645cefee757 100644 --- a/tests/core.py +++ b/tests/core.py @@ -443,30 +443,30 @@ def test_illegal_args(self): """ msg = 'Invalid arguments were passed to BashOperator ' '(task_id: test_illegal_args).' - with self.assertWarns(PendingDeprecationWarning) as warning: - BashOperator( - task_id='test_illegal_args', - bash_command='echo success', - dag=self.dag, - illegal_argument_1234='hello?') - assert any(msg in str(w) for w in warning.warnings) + with conf_vars({('operators', 'allow_illegal_arguments'): 'True'}): + with self.assertWarns(PendingDeprecationWarning) as warning: + BashOperator( + task_id='test_illegal_args', + bash_command='echo success', + dag=self.dag, + illegal_argument_1234='hello?') + assert any(msg in str(w) for w in warning.warnings) def test_illegal_args_forbidden(self): """ Tests that operators raise exceptions on illegal arguments when illegal arguments are not allowed. """ - with conf_vars({('operators', 'allow_illegal_arguments'): 'False'}): - with self.assertRaises(AirflowException) as ctx: - BashOperator( - task_id='test_illegal_args', - bash_command='echo success', - dag=self.dag, - illegal_argument_1234='hello?') - self.assertIn( - ('Invalid arguments were passed to BashOperator ' - '(task_id: test_illegal_args).'), - str(ctx.exception)) + with self.assertRaises(AirflowException) as ctx: + BashOperator( + task_id='test_illegal_args', + bash_command='echo success', + dag=self.dag, + illegal_argument_1234='hello?') + self.assertIn( + ('Invalid arguments were passed to BashOperator ' + '(task_id: test_illegal_args).'), + str(ctx.exception)) def test_bash_operator(self): t = BashOperator( diff --git a/tests/operators/test_slack_operator.py b/tests/operators/test_slack_operator.py index 617bb9531ace3..3948a6847ef60 100644 --- a/tests/operators/test_slack_operator.py +++ b/tests/operators/test_slack_operator.py @@ -58,7 +58,6 @@ def setUp(self): ] self.test_attachments_in_json = json.dumps(self.test_attachments) self.test_api_params = {'key': 'value'} - self.test_kwarg = 'test_kwarg' self.expected_method = 'chat.postMessage' self.expected_api_params = { @@ -80,7 +79,6 @@ def __construct_operator(self, test_token, test_slack_conn_id, test_api_params=N icon_url=self.test_icon_url, attachments=self.test_attachments, api_params=test_api_params, - kwarg=self.test_kwarg ) @mock.patch('airflow.operators.slack_operator.SlackHook')