/
test_bash.py
132 lines (102 loc) · 4.93 KB
/
test_bash.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import os
from datetime import datetime
from unittest.mock import patch
import pytest
from openlineage.client.facet import SourceCodeJobFacet
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.openlineage.extractors.bash import BashExtractor
from airflow.providers.openlineage.utils.utils import is_source_enabled
from tests.test_utils.config import conf_vars
pytestmark = pytest.mark.db_test
with DAG(
dag_id="test_dummy_dag",
description="Test dummy DAG",
schedule="*/2 * * * *",
start_date=datetime(2020, 1, 8),
catchup=False,
max_active_runs=1,
) as dag:
bash_task = BashOperator(task_id="bash-task", bash_command="ls -halt && exit 0", dag=dag)
@pytest.fixture(autouse=True, scope="function")
def clear_cache():
is_source_enabled.cache_clear()
try:
yield
finally:
is_source_enabled.cache_clear()
def test_extract_operator_bash_command_disables_without_env():
operator = BashOperator(task_id="taskid", bash_command="exit 0")
extractor = BashExtractor(operator)
assert "sourceCode" not in extractor.extract().job_facets
@patch.dict(os.environ, {"OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE": "False"})
def test_extract_operator_bash_command_enables_on_true_env():
operator = BashOperator(task_id="taskid", bash_command="exit 0")
extractor = BashExtractor(operator)
assert extractor.extract().job_facets["sourceCode"] == SourceCodeJobFacet("bash", "exit 0")
@conf_vars({("openlineage", "disable_source_code"): "False"})
def test_extract_operator_bash_command_enables_on_true_conf():
operator = BashOperator(task_id="taskid", bash_command="exit 0")
extractor = BashExtractor(operator)
assert extractor.extract().job_facets["sourceCode"] == SourceCodeJobFacet("bash", "exit 0")
@patch.dict(
os.environ,
{k: v for k, v in os.environ.items() if k != "OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE"},
clear=True,
)
def test_extract_dag_bash_command_disabled_without_env():
extractor = BashExtractor(bash_task)
assert "sourceCode" not in extractor.extract().job_facets
@patch.dict(os.environ, {"OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE": "False"})
def test_extract_dag_bash_command_enables_on_true_env():
extractor = BashExtractor(bash_task)
print(extractor.extract().job_facets)
assert extractor.extract().job_facets["sourceCode"] == SourceCodeJobFacet("bash", "ls -halt && exit 0")
@conf_vars({("openlineage", "disable_source_code"): "False"})
def test_extract_dag_bash_command_enables_on_true_conf():
extractor = BashExtractor(bash_task)
assert extractor.extract().job_facets["sourceCode"] == SourceCodeJobFacet("bash", "ls -halt && exit 0")
@patch.dict(os.environ, {"OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE": "True"})
def test_extract_dag_bash_command_env_disables_on_true():
extractor = BashExtractor(bash_task)
assert "sourceCode" not in extractor.extract().job_facets
@conf_vars({("openlineage", "disable_source_code"): "true"})
def test_extract_dag_bash_command_conf_disables_on_true():
extractor = BashExtractor(bash_task)
assert "sourceCode" not in extractor.extract().job_facets
@patch.dict(os.environ, {"OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE": "asdftgeragdsfgawef"})
def test_extract_dag_bash_command_env_does_not_disable_on_random_string():
extractor = BashExtractor(bash_task)
assert extractor.extract().job_facets["sourceCode"] == SourceCodeJobFacet("bash", "ls -halt && exit 0")
@conf_vars({("openlineage", "disable_source_code"): "asdftgeragdsfgawef"})
def test_extract_dag_bash_command_conf_does_not_disable_on_random_string():
extractor = BashExtractor(bash_task)
assert extractor.extract().job_facets["sourceCode"] == SourceCodeJobFacet("bash", "ls -halt && exit 0")
@patch.dict(
os.environ,
{"AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS": "airflow.operators.bash.BashOperator"},
)
def test_bash_extraction_disabled_operator():
operator = BashOperator(task_id="taskid", bash_command="echo 1;")
extractor = BashExtractor(operator)
metadata = extractor.extract()
assert metadata is None
metadata = extractor.extract_on_complete(None)
assert metadata is None