Skip to content

Commit

Permalink
Add ImportChangesRule for upgrade check (#11056)
Browse files Browse the repository at this point in the history
Adds new rule that will be run during airflow upgrade-check
and will look for old clasess and imports incompatible with
Airflow 2.0.
  • Loading branch information
turbaszek authored and potiuk committed Nov 16, 2020
1 parent 9ea83e6 commit 7d24c76
Show file tree
Hide file tree
Showing 4 changed files with 1,895 additions and 2 deletions.
8 changes: 6 additions & 2 deletions airflow/upgrade/rules/conn_type_is_not_nullable.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ class ConnTypeIsNotNullableRule(BaseRule):

@provide_session
def check(self, session=None):
invalid_connections = session.query(Connection).filter(Connection.conn_type.is_(None))
invalid_connections = session.query(Connection).filter(
Connection.conn_type.is_(None)
)
return (
'Connection<id={}", conn_id={}> have empty conn_type field.'.format(conn.id, conn.conn_id)
'Connection<id={}", conn_id={}> have empty conn_type field.'.format(
conn.id, conn.conn_id
)
for conn in invalid_connections
)
89 changes: 89 additions & 0 deletions airflow/upgrade/rules/import_changes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import NamedTuple, Optional, List

from cached_property import cached_property

from airflow import conf
from airflow.upgrade.rules.base_rule import BaseRule
from airflow.upgrade.rules.renamed_classes import ALL
from airflow.utils.dag_processing import list_py_file_paths


class ImportChange(
NamedTuple(
"ImportChange",
[("old_path", str), ("new_path", str), ("providers_package", Optional[None])],
)
):
def info(self, file_path=None):
msg = "Using `{}` will be replaced by `{}`".format(self.old_path, self.new_path)
if self.providers_package:
msg += " and requires `{}` providers package".format(
self.providers_package
)
if file_path:
msg += ". Affected file: {}".format(file_path)
return msg

@cached_property
def old_class(self):
return self.old_path.split(".")[-1]

@cached_property
def new_class(self):
return self.new_path.split(".")[-1]

@classmethod
def from_new_old_paths(cls, new_path, old_path):
providers_package = new_path.split(".")[2] if "providers" in new_path else None
return cls(
old_path=old_path, new_path=new_path, providers_package=providers_package
)


class ImportChangesRule(BaseRule):
title = "Changes in import paths of hooks, operators, sensors and others"
description = (
"Many hooks, operators and other classes has been renamed and moved. Those changes were part of "
"unifying names and imports paths as described in AIP-21.\nThe `contrib` folder has been replaced "
"by `providers` directory and packages:\n"
"https://github.com/apache/airflow#backport-packages"
)

ALL_CHANGES = [
ImportChange.from_new_old_paths(*args) for args in ALL
] # type: List[ImportChange]

@staticmethod
def _check_file(file_path):
problems = []
with open(file_path, "r") as file:
content = file.read()
for change in ImportChangesRule.ALL_CHANGES:
if change.old_class in content:
problems.append(change.info(file_path))
return problems

def check(self):
dag_folder = conf.get("core", "dags_folder")
files = list_py_file_paths(directory=dag_folder, include_examples=False)
problems = []
for file in files:
problems.extend(self._check_file(file))
return problems
Loading

0 comments on commit 7d24c76

Please sign in to comment.