Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Optimize tests for DAG and Terraform generation #395

Merged
merged 8 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8]
python-version: [3.8.12]
steps:
- uses: actions/checkout@v2
- uses: hashicorp/setup-terraform@v2
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ repos:
hooks:
- id: check-yaml
- repo: https://github.com/psf/black
rev: 20.8b1
rev: '22.3.0'
hooks:
- id: black
name: black
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
dag_id="cloud_storage_geo_index.cloud_storage_geo_index",
default_args=default_args,
max_active_runs=1,
schedule_interval="0 1 0 0 6",
schedule_interval="0 6 * * 1",
catchup=False,
default_view="graph",
) as dag:
Expand Down Expand Up @@ -89,9 +89,9 @@
)

# Run CSV transform within kubernetes pod
sentinel_2 = kubernetes_engine.GKEStartPodOperator(
task_id="sentinel_2",
name="sentinel_2",
sentinel_2_index = kubernetes_engine.GKEStartPodOperator(
task_id="sentinel_2_index",
name="sentinel_2_index",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="cloud-storage-geo-index",
Expand All @@ -109,7 +109,7 @@
"DATASET_ID": "cloud_storage_geo_index",
"TABLE_ID": "sentinel_2_index",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/cloud_storage_geo_index/sentinel_2/data_output.csv",
"TARGET_GCS_PATH": "data/cloud_storage_geo_index/sentinel_2_index/data_output.csv",
"SCHEMA_PATH": "data/cloud_storage_geo_index/schema/cloud_storage_geo_index_sentinel_2_schema.json",
"DROP_DEST_TABLE": "Y",
"INPUT_FIELD_DELIMITER": ",",
Expand All @@ -133,4 +133,4 @@
name="cloud-storage-geo-index",
)

create_cluster >> [landsat_index, sentinel_2] >> delete_cluster
create_cluster >> [landsat_index, sentinel_2_index] >> delete_cluster
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dag:
depends_on_past: False
start_date: '2021-03-01'
max_active_runs: 1
schedule_interval: "0 1 0 0 6"
schedule_interval: "0 6 * * 1" # 06:00 on Monday
catchup: False
default_view: graph

Expand Down
2 changes: 1 addition & 1 deletion datasets/noaa/pipelines/noaa/noaa_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
dag_id="noaa.noaa",
default_args=default_args,
max_active_runs=1,
schedule_interval="0 1 0 0 6",
schedule_interval="0 6 * * 1",
catchup=False,
default_view="graph",
) as dag:
Expand Down
2 changes: 1 addition & 1 deletion datasets/noaa/pipelines/noaa/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ dag:
depends_on_past: False
start_date: '2021-03-01'
max_active_runs: 1
schedule_interval: "0 1 0 0 6"
schedule_interval: "0 6 * * 1" # 06:00 on Monday
catchup: False
default_view: graph

Expand Down
1,382 changes: 880 additions & 502 deletions poetry.lock

Large diffs are not rendered by default.

15 changes: 8 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,22 @@ authors = ["Adler Santos <adlersantos@users.noreply.github.com>",
packages = []

[tool.poetry.dependencies]
python = "~3.8"
python = "3.8.12"
pre-commit = "*"

[tool.poetry.group.pipelines.dependencies]
apache-airflow = "==2.1.4"
apache-airflow-providers-amazon = "2.4.0"
apache-airflow-providers-apache-beam = "3.0.0"
apache-airflow-providers-google = "5.0.0"
apache-airflow-providers-cncf-kubernetes = "2.0.2"
apache-airflow = "==2.2.5"
apache-airflow-providers-amazon = "*"
apache-airflow-providers-apache-beam = ">=2.38.0"
apache-airflow-providers-cncf-kubernetes = "*"
apache-airflow-providers-google = ">=8.0.0"
apache-beam = "2.37.0"
beautifulsoup4 = "==4.9.3"
black = "==22.3.0"
click = ">=8.0.0"
flask-openid = "==1.3.0"
flake8 = "==3.9.2"
google-cloud-orchestration-airflow = "1.3.0"
google-cloud-orchestration-airflow = "*"
isort = "*"
Jinja2 = "==2.11.3"
kubernetes = "*"
Expand Down
13 changes: 9 additions & 4 deletions scripts/generate_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,21 @@ def main(
env: str,
all_pipelines: bool = False,
skip_builds: bool = False,
format_code: bool = True,
):
if not skip_builds:
build_images(dataset_id, env)

if all_pipelines:
for pipeline_dir in list_subdirs(DATASETS_PATH / dataset_id / "pipelines"):
generate_pipeline_dag(dataset_id, pipeline_dir.name, env)
generate_pipeline_dag(dataset_id, pipeline_dir.name, env, format_code)
else:
generate_pipeline_dag(dataset_id, pipeline_id, env)
generate_pipeline_dag(dataset_id, pipeline_id, env, format_code)


def generate_pipeline_dag(dataset_id: str, pipeline_id: str, env: str):
def generate_pipeline_dag(
dataset_id: str, pipeline_id: str, env: str, format_code: bool
):
pipeline_dir = DATASETS_PATH / dataset_id / "pipelines" / pipeline_id
config = yaml.load((pipeline_dir / "pipeline.yaml").read_text())

Expand All @@ -73,7 +76,9 @@ def generate_pipeline_dag(dataset_id: str, pipeline_id: str, env: str):
dag_path = pipeline_dir / f"{pipeline_id}_dag.py"
dag_path.touch()
write_to_file(dag_contents, dag_path)
format_python_code(dag_path)

if format_code:
format_python_code(dag_path)

copy_files_to_dot_dir(
dataset_id,
Expand Down
9 changes: 7 additions & 2 deletions scripts/generate_terraform.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def main(
tf_state_bucket: str,
tf_state_prefix: str,
tf_apply: bool = False,
format_code: bool = True,
):
validate_bucket_name(bucket_name_prefix)

Expand Down Expand Up @@ -78,6 +79,12 @@ def main(
infra_vars,
)

if format_code and (env_path / "datasets" / dataset_id / "infra").exists():
terraform_fmt(env_path / "datasets" / dataset_id / "infra")

if format_code and (DATASETS_PATH / dataset_id / "infra").exists():
terraform_fmt(DATASETS_PATH / dataset_id / "infra")

if tf_apply:
actuate_terraform_resources(dataset_id, env_path)

Expand Down Expand Up @@ -215,7 +222,6 @@ def generate_tfvars_file(

target_path = env_path / "datasets" / dataset_id / "infra" / "terraform.tfvars"
write_to_file(contents + "\n", target_path)
terraform_fmt(target_path)
print_created_files([target_path])


Expand Down Expand Up @@ -305,7 +311,6 @@ def create_file_in_dir_tree(

target_path = prefix / filename
write_to_file(contents + "\n", target_path)
terraform_fmt(target_path)
filepaths.append(target_path)

print_created_files(filepaths)
Expand Down
38 changes: 21 additions & 17 deletions tests/scripts/test_generate_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def test_main_generates_dag_files(
):
copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path)

generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

for path_prefix in (
pipeline_path,
Expand All @@ -125,7 +125,7 @@ def test_main_copies_pipeline_yaml_file(
):
copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path)

generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

for path_prefix in (
pipeline_path,
Expand All @@ -141,7 +141,7 @@ def test_main_copies_custom_dir_if_it_exists(
custom_path = dataset_path / "pipelines" / pipeline_path.name / "custom"
custom_path.mkdir(parents=True, exist_ok=True)

generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

for path_prefix in (
pipeline_path,
Expand All @@ -163,7 +163,7 @@ def test_main_raises_an_error_when_airflow_version_is_not_specified(
yaml.dump(config, file)

with pytest.raises(KeyError):
generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)


def test_main_raises_an_error_when_airflow_version_is_incorrect(
Expand All @@ -178,7 +178,7 @@ def test_main_raises_an_error_when_airflow_version_is_incorrect(
yaml.dump(config, file)

with pytest.raises(ValueError):
generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)


def test_main_uses_airflow_operators_based_on_airflow_version_specified_in_the_config(
Expand All @@ -189,7 +189,7 @@ def test_main_uses_airflow_operators_based_on_airflow_version_specified_in_the_c
config = yaml.load(open(pipeline_path / "pipeline.yaml"))
airflow_version = config["dag"]["airflow_version"]

generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

for path_prefix in (
pipeline_path,
Expand Down Expand Up @@ -224,7 +224,7 @@ def test_main_only_depends_on_pipeline_yaml(

assert not (dataset_path / "pipelines" / "dataset.yaml").exists()

generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

for path_prefix in (
pipeline_path,
Expand All @@ -235,7 +235,9 @@ def test_main_only_depends_on_pipeline_yaml(

def test_main_errors_out_on_nonexisting_pipeline_path(dataset_path, env: str):
with pytest.raises(FileNotFoundError):
generate_dag.main(dataset_path.name, "non_existing_pipeline", env)
generate_dag.main(
dataset_path.name, "non_existing_pipeline", env, format_code=False
)


def test_main_errors_out_on_nonexisting_pipeline_yaml(
Expand All @@ -244,7 +246,7 @@ def test_main_errors_out_on_nonexisting_pipeline_yaml(
assert not (pipeline_path / "pipeline.yaml").exists()

with pytest.raises(FileNotFoundError):
generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)


def test_checks_for_task_operator_and_id():
Expand Down Expand Up @@ -275,7 +277,7 @@ def test_generated_dag_file_loads_properly_in_python(
):
copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path)

generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

dag_filename = f"{pipeline_path.name}_dag.py"

Expand All @@ -291,7 +293,7 @@ def test_generated_dag_files_contain_license_headers(
):
copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path)

generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

license_header = pathlib.Path(generate_dag.TEMPLATE_PATHS["license"]).read_text()

Expand Down Expand Up @@ -337,7 +339,7 @@ def test_dag_id_in_py_file_is_prepended_with_dataset_id(
):
copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path)

generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

dagpy_contents = (pipeline_path / f"{pipeline_path.name}_dag.py").read_text()
expected_dag_id = generate_dag.namespaced_dag_id(
Expand All @@ -353,7 +355,7 @@ def test_build_images_copies_image_files_to_env_dir(
generate_image_files(dataset_path, num_containers=random.randint(1, 3))

mocker.patch("scripts.generate_dag.build_and_push_image")
generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

for image_dir in (dataset_path / "pipelines" / "_images").iterdir():
copied_image_dir = (
Expand All @@ -375,7 +377,7 @@ def test_build_images_called_when_dataset_has_images_dir(
generate_image_files(dataset_path, num_containers=random.randint(1, 3))

mocker.patch("scripts.generate_dag.build_images")
generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)
generate_dag.build_images.assert_called_once_with(dataset_path.name, env)


Expand All @@ -385,7 +387,9 @@ def test_build_images_not_called_given_skip_builds_argument(
copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path)

mocker.patch("scripts.generate_dag.build_images")
generate_dag.main(dataset_path.name, pipeline_path.name, env, skip_builds=True)
generate_dag.main(
dataset_path.name, pipeline_path.name, env, skip_builds=True, format_code=False
)
assert not generate_dag.build_images.called


Expand All @@ -398,7 +402,7 @@ def test_build_and_push_image_called_as_many_as_num_containers(
generate_image_files(dataset_path, num_containers=num_containers)

mocker.patch("scripts.generate_dag.build_and_push_image")
generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)
assert generate_dag.build_and_push_image.call_count == num_containers


Expand All @@ -408,5 +412,5 @@ def test_build_and_push_image_not_called_when_no_image_dirs_exist(
copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path)

mocker.patch("scripts.generate_dag.build_and_push_image")
generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)
assert not generate_dag.build_and_push_image.called