Skip to content

Commit

Permalink
Add BaseOperatorMetaclassRule (#12629)
Browse files Browse the repository at this point in the history
Adds an upgrade check rule that ensures that users are not using custom
metaclasses in their custom operators
  • Loading branch information
dimberman committed Nov 27, 2020
1 parent 5b61c21 commit 847820f
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 0 deletions.
55 changes: 55 additions & 0 deletions airflow/upgrade/rules/custom_operator_metaclass_rule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import absolute_import

from airflow.models.dagbag import DagBag
from airflow.upgrade.rules.base_rule import BaseRule
from airflow.utils.db import provide_session


def check_task_for_metaclasses(task):
class_type = type(task.__class__)
if class_type != type:
res = (
"Class {class_name} contained invalid custom metaclass "
"{metaclass_name}. Custom metaclasses for operators are not "
"allowed in Airflow 2.0. Please remove this custom metaclass.".format(
class_name=task.__class__, metaclass_name=class_type
)
)
return res
else:
return None


class BaseOperatorMetaclassRule(BaseRule):
title = "Ensure users are not using custom metaclasses in custom operators"

description = """\
In Airflow 2.0, we require that all custom operators use the BaseOperatorMeta metaclass.\
To ensure this, we can no longer allow custom metaclasses in custom operators.
"""

@provide_session
def check(self, session=None):
dagbag = DagBag(include_examples=False)
for dag_id, dag in dagbag.dags.items():
for task in dag.tasks:
res = check_task_for_metaclasses(task)
if res:
yield res
55 changes: 55 additions & 0 deletions tests/upgrade/rules/test_custom_operator_metaclass_rule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from unittest import TestCase

from airflow.models.baseoperator import BaseOperator
from airflow.upgrade.rules.custom_operator_metaclass_rule import (
BaseOperatorMetaclassRule,
check_task_for_metaclasses,
)
from six import with_metaclass


class MyMeta(type):
pass


class MyMetaOperator(with_metaclass(MyMeta, BaseOperator)):
def execute(self, context):
pass


class TestBaseOperatorMetaclassRule(TestCase):
def test_individual_task(self):
task = MyMetaOperator(task_id="foo")
res = check_task_for_metaclasses(task)
expected_error = (
"Class <class 'tests.upgrade.rules.test_custom_operator_metaclass_rule.MyMetaOperator'> "
"contained invalid custom metaclass <class "
"'tests.upgrade.rules.test_custom_operator_metaclass_rule.MyMeta'>. "
"Custom metaclasses for operators are not allowed in Airflow 2.0. "
"Please remove this custom metaclass."
)
self.assertEqual(expected_error, res)

def test_check(self):
rule = BaseOperatorMetaclassRule()

assert isinstance(rule.description, str)
assert isinstance(rule.title, str)
msgs = list(rule.check())
self.assertEqual(msgs, [])

0 comments on commit 847820f

Please sign in to comment.