Skip to content

Commit

Permalink
feat: Optimize tests for DAG and Terraform generation (#395)
Browse files Browse the repository at this point in the history
* feat: Use Airflow 2.2.5 and Python 3.8.12

* feat: use Airflow 2.2.5 and Python 3.8.12

* bumped black formatter version for pre-commits

* feat: fix DAG schedules

* bump python version

* feat: Optimize tests for DAG and Terraform generation

* fixed formatting errors
  • Loading branch information
adlersantos committed Jun 29, 2022
1 parent f3a9447 commit ffcd18c
Show file tree
Hide file tree
Showing 12 changed files with 951 additions and 544 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/unit-tests.yaml
Expand Up @@ -19,7 +19,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8]
python-version: [3.8.12]
steps:
- uses: actions/checkout@v2
- uses: hashicorp/setup-terraform@v2
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Expand Up @@ -22,7 +22,7 @@ repos:
hooks:
- id: check-yaml
- repo: https://github.com/psf/black
rev: 20.8b1
rev: '22.3.0'
hooks:
- id: black
name: black
Expand Down
Expand Up @@ -27,7 +27,7 @@
dag_id="cloud_storage_geo_index.cloud_storage_geo_index",
default_args=default_args,
max_active_runs=1,
schedule_interval="0 1 0 0 6",
schedule_interval="0 6 * * 1",
catchup=False,
default_view="graph",
) as dag:
Expand Down Expand Up @@ -89,9 +89,9 @@
)

# Run CSV transform within kubernetes pod
sentinel_2 = kubernetes_engine.GKEStartPodOperator(
task_id="sentinel_2",
name="sentinel_2",
sentinel_2_index = kubernetes_engine.GKEStartPodOperator(
task_id="sentinel_2_index",
name="sentinel_2_index",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="cloud-storage-geo-index",
Expand All @@ -109,7 +109,7 @@
"DATASET_ID": "cloud_storage_geo_index",
"TABLE_ID": "sentinel_2_index",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/cloud_storage_geo_index/sentinel_2/data_output.csv",
"TARGET_GCS_PATH": "data/cloud_storage_geo_index/sentinel_2_index/data_output.csv",
"SCHEMA_PATH": "data/cloud_storage_geo_index/schema/cloud_storage_geo_index_sentinel_2_schema.json",
"DROP_DEST_TABLE": "Y",
"INPUT_FIELD_DELIMITER": ",",
Expand All @@ -133,4 +133,4 @@
name="cloud-storage-geo-index",
)

create_cluster >> [landsat_index, sentinel_2] >> delete_cluster
create_cluster >> [landsat_index, sentinel_2_index] >> delete_cluster
Expand Up @@ -30,7 +30,7 @@ dag:
depends_on_past: False
start_date: '2021-03-01'
max_active_runs: 1
schedule_interval: "0 1 0 0 6"
schedule_interval: "0 6 * * 1" # 06:00 on Monday
catchup: False
default_view: graph

Expand Down
2 changes: 1 addition & 1 deletion datasets/noaa/pipelines/noaa/noaa_dag.py
Expand Up @@ -27,7 +27,7 @@
dag_id="noaa.noaa",
default_args=default_args,
max_active_runs=1,
schedule_interval="0 1 0 0 6",
schedule_interval="0 6 * * 1",
catchup=False,
default_view="graph",
) as dag:
Expand Down
2 changes: 1 addition & 1 deletion datasets/noaa/pipelines/noaa/pipeline.yaml
Expand Up @@ -48,7 +48,7 @@ dag:
depends_on_past: False
start_date: '2021-03-01'
max_active_runs: 1
schedule_interval: "0 1 0 0 6"
schedule_interval: "0 6 * * 1" # 06:00 on Monday
catchup: False
default_view: graph

Expand Down
1,382 changes: 880 additions & 502 deletions poetry.lock

Large diffs are not rendered by default.

15 changes: 8 additions & 7 deletions pyproject.toml
Expand Up @@ -8,21 +8,22 @@ authors = ["Adler Santos <adlersantos@users.noreply.github.com>",
packages = []

[tool.poetry.dependencies]
python = "~3.8"
python = "3.8.12"
pre-commit = "*"

[tool.poetry.group.pipelines.dependencies]
apache-airflow = "==2.1.4"
apache-airflow-providers-amazon = "2.4.0"
apache-airflow-providers-apache-beam = "3.0.0"
apache-airflow-providers-google = "5.0.0"
apache-airflow-providers-cncf-kubernetes = "2.0.2"
apache-airflow = "==2.2.5"
apache-airflow-providers-amazon = "*"
apache-airflow-providers-apache-beam = ">=2.38.0"
apache-airflow-providers-cncf-kubernetes = "*"
apache-airflow-providers-google = ">=8.0.0"
apache-beam = "2.37.0"
beautifulsoup4 = "==4.9.3"
black = "==22.3.0"
click = ">=8.0.0"
flask-openid = "==1.3.0"
flake8 = "==3.9.2"
google-cloud-orchestration-airflow = "1.3.0"
google-cloud-orchestration-airflow = "*"
isort = "*"
Jinja2 = "==2.11.3"
kubernetes = "*"
Expand Down
13 changes: 9 additions & 4 deletions scripts/generate_dag.py
Expand Up @@ -51,18 +51,21 @@ def main(
env: str,
all_pipelines: bool = False,
skip_builds: bool = False,
format_code: bool = True,
):
if not skip_builds:
build_images(dataset_id, env)

if all_pipelines:
for pipeline_dir in list_subdirs(DATASETS_PATH / dataset_id / "pipelines"):
generate_pipeline_dag(dataset_id, pipeline_dir.name, env)
generate_pipeline_dag(dataset_id, pipeline_dir.name, env, format_code)
else:
generate_pipeline_dag(dataset_id, pipeline_id, env)
generate_pipeline_dag(dataset_id, pipeline_id, env, format_code)


def generate_pipeline_dag(dataset_id: str, pipeline_id: str, env: str):
def generate_pipeline_dag(
dataset_id: str, pipeline_id: str, env: str, format_code: bool
):
pipeline_dir = DATASETS_PATH / dataset_id / "pipelines" / pipeline_id
config = yaml.load((pipeline_dir / "pipeline.yaml").read_text())

Expand All @@ -73,7 +76,9 @@ def generate_pipeline_dag(dataset_id: str, pipeline_id: str, env: str):
dag_path = pipeline_dir / f"{pipeline_id}_dag.py"
dag_path.touch()
write_to_file(dag_contents, dag_path)
format_python_code(dag_path)

if format_code:
format_python_code(dag_path)

copy_files_to_dot_dir(
dataset_id,
Expand Down
9 changes: 7 additions & 2 deletions scripts/generate_terraform.py
Expand Up @@ -50,6 +50,7 @@ def main(
tf_state_bucket: str,
tf_state_prefix: str,
tf_apply: bool = False,
format_code: bool = True,
):
validate_bucket_name(bucket_name_prefix)

Expand Down Expand Up @@ -78,6 +79,12 @@ def main(
infra_vars,
)

if format_code and (env_path / "datasets" / dataset_id / "infra").exists():
terraform_fmt(env_path / "datasets" / dataset_id / "infra")

if format_code and (DATASETS_PATH / dataset_id / "infra").exists():
terraform_fmt(DATASETS_PATH / dataset_id / "infra")

if tf_apply:
actuate_terraform_resources(dataset_id, env_path)

Expand Down Expand Up @@ -215,7 +222,6 @@ def generate_tfvars_file(

target_path = env_path / "datasets" / dataset_id / "infra" / "terraform.tfvars"
write_to_file(contents + "\n", target_path)
terraform_fmt(target_path)
print_created_files([target_path])


Expand Down Expand Up @@ -305,7 +311,6 @@ def create_file_in_dir_tree(

target_path = prefix / filename
write_to_file(contents + "\n", target_path)
terraform_fmt(target_path)
filepaths.append(target_path)

print_created_files(filepaths)
Expand Down
38 changes: 21 additions & 17 deletions tests/scripts/test_generate_dag.py
Expand Up @@ -111,7 +111,7 @@ def test_main_generates_dag_files(
):
copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path)

generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

for path_prefix in (
pipeline_path,
Expand All @@ -125,7 +125,7 @@ def test_main_copies_pipeline_yaml_file(
):
copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path)

generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

for path_prefix in (
pipeline_path,
Expand All @@ -141,7 +141,7 @@ def test_main_copies_custom_dir_if_it_exists(
custom_path = dataset_path / "pipelines" / pipeline_path.name / "custom"
custom_path.mkdir(parents=True, exist_ok=True)

generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

for path_prefix in (
pipeline_path,
Expand All @@ -163,7 +163,7 @@ def test_main_raises_an_error_when_airflow_version_is_not_specified(
yaml.dump(config, file)

with pytest.raises(KeyError):
generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)


def test_main_raises_an_error_when_airflow_version_is_incorrect(
Expand All @@ -178,7 +178,7 @@ def test_main_raises_an_error_when_airflow_version_is_incorrect(
yaml.dump(config, file)

with pytest.raises(ValueError):
generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)


def test_main_uses_airflow_operators_based_on_airflow_version_specified_in_the_config(
Expand All @@ -189,7 +189,7 @@ def test_main_uses_airflow_operators_based_on_airflow_version_specified_in_the_c
config = yaml.load(open(pipeline_path / "pipeline.yaml"))
airflow_version = config["dag"]["airflow_version"]

generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

for path_prefix in (
pipeline_path,
Expand Down Expand Up @@ -224,7 +224,7 @@ def test_main_only_depends_on_pipeline_yaml(

assert not (dataset_path / "pipelines" / "dataset.yaml").exists()

generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

for path_prefix in (
pipeline_path,
Expand All @@ -235,7 +235,9 @@ def test_main_only_depends_on_pipeline_yaml(

def test_main_errors_out_on_nonexisting_pipeline_path(dataset_path, env: str):
with pytest.raises(FileNotFoundError):
generate_dag.main(dataset_path.name, "non_existing_pipeline", env)
generate_dag.main(
dataset_path.name, "non_existing_pipeline", env, format_code=False
)


def test_main_errors_out_on_nonexisting_pipeline_yaml(
Expand All @@ -244,7 +246,7 @@ def test_main_errors_out_on_nonexisting_pipeline_yaml(
assert not (pipeline_path / "pipeline.yaml").exists()

with pytest.raises(FileNotFoundError):
generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)


def test_checks_for_task_operator_and_id():
Expand Down Expand Up @@ -275,7 +277,7 @@ def test_generated_dag_file_loads_properly_in_python(
):
copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path)

generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

dag_filename = f"{pipeline_path.name}_dag.py"

Expand All @@ -291,7 +293,7 @@ def test_generated_dag_files_contain_license_headers(
):
copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path)

generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

license_header = pathlib.Path(generate_dag.TEMPLATE_PATHS["license"]).read_text()

Expand Down Expand Up @@ -337,7 +339,7 @@ def test_dag_id_in_py_file_is_prepended_with_dataset_id(
):
copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path)

generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

dagpy_contents = (pipeline_path / f"{pipeline_path.name}_dag.py").read_text()
expected_dag_id = generate_dag.namespaced_dag_id(
Expand All @@ -353,7 +355,7 @@ def test_build_images_copies_image_files_to_env_dir(
generate_image_files(dataset_path, num_containers=random.randint(1, 3))

mocker.patch("scripts.generate_dag.build_and_push_image")
generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)

for image_dir in (dataset_path / "pipelines" / "_images").iterdir():
copied_image_dir = (
Expand All @@ -375,7 +377,7 @@ def test_build_images_called_when_dataset_has_images_dir(
generate_image_files(dataset_path, num_containers=random.randint(1, 3))

mocker.patch("scripts.generate_dag.build_images")
generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)
generate_dag.build_images.assert_called_once_with(dataset_path.name, env)


Expand All @@ -385,7 +387,9 @@ def test_build_images_not_called_given_skip_builds_argument(
copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path)

mocker.patch("scripts.generate_dag.build_images")
generate_dag.main(dataset_path.name, pipeline_path.name, env, skip_builds=True)
generate_dag.main(
dataset_path.name, pipeline_path.name, env, skip_builds=True, format_code=False
)
assert not generate_dag.build_images.called


Expand All @@ -398,7 +402,7 @@ def test_build_and_push_image_called_as_many_as_num_containers(
generate_image_files(dataset_path, num_containers=num_containers)

mocker.patch("scripts.generate_dag.build_and_push_image")
generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)
assert generate_dag.build_and_push_image.call_count == num_containers


Expand All @@ -408,5 +412,5 @@ def test_build_and_push_image_not_called_when_no_image_dirs_exist(
copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path)

mocker.patch("scripts.generate_dag.build_and_push_image")
generate_dag.main(dataset_path.name, pipeline_path.name, env)
generate_dag.main(dataset_path.name, pipeline_path.name, env, format_code=False)
assert not generate_dag.build_and_push_image.called

0 comments on commit ffcd18c

Please sign in to comment.