Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 140 additions & 9 deletions tests/always/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import ast
import glob
import itertools
import mmap
import os
import unittest
from typing import List

from parameterized import parameterized

Expand Down Expand Up @@ -114,6 +115,42 @@ def test_providers_modules_should_have_tests(self):
)


def get_imports_from_file(filepath: str):
with open(filepath) as py_file:
content = py_file.read()
doc_node = ast.parse(content, filepath)
import_names: List[str] = []
for current_node in ast.walk(doc_node):
if not isinstance(current_node, (ast.Import, ast.ImportFrom)):
continue
for alias in current_node.names:
name = alias.name
fullname = f'{current_node.module}.{name}' if isinstance(current_node, ast.ImportFrom) else name
import_names.append(fullname)
return import_names


def filepath_to_module(filepath: str):
filepath = os.path.relpath(os.path.abspath(filepath), ROOT_FOLDER)
return filepath.replace("/", ".")[: -(len('.py'))]


def get_classes_from_file(filepath: str):
with open(filepath) as py_file:
content = py_file.read()
doc_node = ast.parse(content, filepath)
module = filepath_to_module(filepath)
results: List[str] = []
for current_node in ast.walk(doc_node):
if not isinstance(current_node, ast.ClassDef):
continue
name = current_node.name
if not name.endswith("Operator") and not name.endswith("Sensor") and not name.endswith("Operator"):
continue
results.append(f"{module}.{name}")
return results


class TestGoogleProviderProjectStructure(unittest.TestCase):
MISSING_EXAMPLE_DAGS = {
('cloud', 'adls_to_gcs'),
Expand All @@ -124,6 +161,79 @@ class TestGoogleProviderProjectStructure(unittest.TestCase):
('ads', 'ads_to_gcs'),
}

MISSING_EXAMPLES_FOR_OPERATORS = {
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueueDeleteOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueueResumeOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueuePauseOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueuePurgeOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksTaskGetOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksTasksListOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksTaskDeleteOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueueGetOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueueUpdateOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueuesListOperator',
# Deprecated operator. Ignore it.
'airflow.providers.google.cloud.operators.cloud_storage_transfer_service'
'.CloudDataTransferServiceS3ToGCSOperator',
# Deprecated operator. Ignore it.
'airflow.providers.google.cloud.operators.cloud_storage_transfer_service'
'.CloudDataTransferServiceGCSToGCSOperator',
# Base operator. Ignore it.
'airflow.providers.google.cloud.operators.cloud_sql.CloudSQLBaseOperator',
# Deprecated operator. Ignore it
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitHadoopJobOperator',
'airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator',
'airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateWorkflowTemplateOperator',
# Deprecated operator. Ignore it
'airflow.providers.google.cloud.operators.dataproc.DataprocScaleClusterOperator',
# Base operator. Ignore it
'airflow.providers.google.cloud.operators.dataproc.DataprocJobBaseOperator',
# Deprecated operator. Ignore it
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitSparkJobOperator',
# Deprecated operator. Ignore it
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitSparkSqlJobOperator',
# Deprecated operator. Ignore it
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitHiveJobOperator',
# Deprecated operator. Ignore it
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitPigJobOperator',
# Deprecated operator. Ignore it
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitPySparkJobOperator',
'airflow.providers.google.cloud.operators.mlengine.MLEngineTrainingCancelJobOperator',
# Deprecated operator. Ignore it
'airflow.providers.google.cloud.operators.mlengine.MLEngineManageModelOperator',
# Deprecated operator. Ignore it
'airflow.providers.google.cloud.operators.mlengine.MLEngineManageVersionOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetStoredInfoTypeOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPReidentifyContentOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPCreateDeidentifyTemplateOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPCreateDLPJobOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPUpdateDeidentifyTemplateOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeidentifyContentOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetDLPJobTriggerOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPListDeidentifyTemplatesOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetDeidentifyTemplateOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPListInspectTemplatesOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPListStoredInfoTypesOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPUpdateInspectTemplateOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeleteDLPJobOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPListJobTriggersOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPCancelDLPJobOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetDLPJobOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetInspectTemplateOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPListInfoTypesOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeleteDeidentifyTemplateOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPListDLPJobsOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPRedactImageOperator',
'airflow.providers.google.cloud.operators.datastore.CloudDatastoreDeleteOperationOperator',
'airflow.providers.google.cloud.operators.datastore.CloudDatastoreGetOperationOperator',
# Base operator. Ignore it
'airflow.providers.google.cloud.operators.compute.ComputeEngineBaseOperator',
'airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor',
'airflow.providers.google.cloud.sensors.gcs.GCSObjectUpdateSensor',
'airflow.providers.google.cloud.sensors.gcs.GCSObjectsWtihPrefixExistenceSensor',
'airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor',
}

def test_example_dags(self):
operators_modules = itertools.chain(
*[self.find_resource_files(resource_type=d) for d in ["operators", "sensors", "transfers"]]
Expand Down Expand Up @@ -174,15 +284,36 @@ def has_example_dag(operator_set):
"Can you remove it from the list of missing example, please?"
)

@parameterized.expand(
[
(
resource_type,
suffix,
def test_missing_example_for_operator(self):
missing_operators = []

for resource_type in ["operators", "sensors", "tranfers"]:
operator_files = set(
self.find_resource_files(top_level_directory="airflow", resource_type=resource_type)
)
for suffix in ["_system.py", "_system_helper.py"]
for resource_type in ["operators", "sensors", "tranfers"]
]
for filepath in operator_files:
service_name = os.path.basename(filepath)[: -(len(".py"))]
example_dags = list(
glob.glob(
f"{ROOT_FOLDER}/airflow/providers/google/*/example_dags/example_{service_name}*.py"
)
)
if not example_dags:
# Ignore. We have separate tests that detect this.
continue
example_paths = {
path for example_dag in example_dags for path in get_imports_from_file(example_dag)
}
example_paths = {
path for path in example_paths if f'.{resource_type}.{service_name}.' in path
}
print("example_paths=", example_paths)
operators_paths = set(get_classes_from_file(f"{ROOT_FOLDER}/{filepath}"))
missing_operators.extend(operators_paths - example_paths)
self.assertEqual(set(missing_operators), self.MISSING_EXAMPLES_FOR_OPERATORS)

@parameterized.expand(
itertools.product(["_system.py", "_system_helper.py"], ["operators", "sensors", "tranfers"])
)
def test_detect_invalid_system_tests(self, resource_type, filename_suffix):
operators_tests = self.find_resource_files(top_level_directory="tests", resource_type=resource_type)
Expand Down