Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Allow DAG deploys without variables.json #91

Merged
merged 5 commits into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,15 @@ Docker images will be built and pushed to GCR by default whenever the command ab

## 5. Declare and set your pipeline variables

Running the command in the previous step will parse your pipeline config and inform you about the templated variables that need to be set for your pipeline to run.
Running the command in the previous step will parse your pipeline config and inform you about the templated variables that need to be set for your pipeline to run, if any.

All variables used by a dataset must have their values set in
If your pipeline doesn't use any Airflow variables, you can skip this step. Otherwise, create the following file

```
[.dev|.test]/datasets/{DATASET}/{DATASET}_variables.json
```

Airflow variables use JSON dot notation to access the variable's value. For example, if you're using the following variables in your pipeline config:
Pipelines use the JSON dot notation to access Airflow variables. Make sure to nest your variables in the JSON file under some namespace, typically your dataset's name. Airflow variables are globally accessed by any pipeline, which means nesting your variables helps avoid collisions. For example, if you're using the following variables in your pipeline config:

- `{{ var.json.shared.composer_bucket }}`
- `{{ var.json.parent.nested }}`
Expand Down
56 changes: 37 additions & 19 deletions scripts/deploy_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import pathlib
import subprocess
import typing
import warnings

CURRENT_PATH = pathlib.Path(__file__).resolve().parent
PROJECT_ROOT = CURRENT_PATH.parent
Expand Down Expand Up @@ -67,6 +68,10 @@ def main(
)


def run_gsutil_cmd(args: typing.List[str], cwd: pathlib.Path):
subprocess.check_call(["gsutil"] + args, cwd=cwd)


def copy_variables_to_airflow_data_folder(
local: bool,
env_path: pathlib.Path,
Expand All @@ -80,7 +85,9 @@ def copy_variables_to_airflow_data_folder(
cwd = env_path / "datasets" / dataset_id
filename = f"{dataset_id}_variables.json"

check_existence_of_variables_file(cwd / filename)
if not (cwd / filename).exists():
warnings.warn(f"Airflow variables file {filename} does not exist.")
return

if local:
"""
Expand All @@ -106,7 +113,31 @@ def copy_variables_to_airflow_data_folder(
f" Source:\n {cwd / filename}\n\n"
f" Destination:\n {gcs_uri}\n"
)
subprocess.check_call(["gsutil", "cp", filename, gcs_uri], cwd=cwd)
run_gsutil_cmd(["cp", filename, gcs_uri], cwd=cwd)


def run_cloud_composer_vars_import(
composer_env: str,
composer_region: str,
airflow_path: pathlib.Path,
cwd: pathlib.Path,
):
subprocess.check_call(
[
"gcloud",
"composer",
"environments",
"run",
str(composer_env),
"--location",
str(composer_region),
"variables",
"--",
"--import",
str(airflow_path),
],
cwd=cwd,
)


def import_variables_to_airflow_env(
Expand Down Expand Up @@ -136,21 +167,8 @@ def import_variables_to_airflow_env(
gcs_uri = f"gs://{composer_bucket}/data/variables/{filename}"
airflow_path = f"/home/airflow/gcs/data/variables/{filename}"
print(f"\nImporting Airflow variables from {gcs_uri} ({airflow_path})...\n")
subprocess.check_call(
[
"gcloud",
"composer",
"environments",
"run",
str(composer_env),
"--location",
str(composer_region),
"variables",
"--",
"--import",
str(airflow_path),
],
cwd=cwd,
run_cloud_composer_vars_import(
composer_env, composer_region, airflow_path, cwd=cwd
)


Expand Down Expand Up @@ -189,7 +207,7 @@ def copy_generated_dag_to_airflow_dags_folder(
f" Source:\n {cwd / filename}\n\n"
f" Destination:\n {target}\n"
)
subprocess.check_call(["gsutil", "cp", filename, target], cwd=cwd)
run_gsutil_cmd(["cp", filename, target], cwd=cwd)


def copy_custom_callables_to_airflow_dags_folder(
Expand Down Expand Up @@ -231,7 +249,7 @@ def copy_custom_callables_to_airflow_dags_folder(
f" Source:\n {cwd / 'custom'}\n\n"
f" Destination:\n {target}\n"
)
subprocess.check_call(["gsutil", "cp", "-r", "custom", target], cwd=cwd)
run_gsutil_cmd(["cp", "-r", "custom", target], cwd=cwd)


def check_existence_of_variables_file(file_path: pathlib.Path):
Expand Down
35 changes: 35 additions & 0 deletions tests/scripts/test_deploy_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,41 @@ def test_script_always_requires_dataset_arg(
pipeline_path_2 = pipeline_path


def test_script_can_deploy_without_variables_file(
dataset_path: pathlib.Path,
pipeline_path: pathlib.Path,
airflow_home: pathlib.Path,
env: str,
mocker,
):
setup_dag_and_variables(
dataset_path,
pipeline_path,
airflow_home,
env,
f"{dataset_path.name}_variables.json",
)

# Delete the variables file
vars_file = f"{dataset_path.name}_variables.json"
(ENV_DATASETS_PATH / dataset_path.name / vars_file).unlink()
assert not (ENV_DATASETS_PATH / dataset_path.name / vars_file).exists()

mocker.patch("scripts.deploy_dag.run_gsutil_cmd")
mocker.patch("scripts.deploy_dag.run_cloud_composer_vars_import")

deploy_dag.main(
local=False,
env_path=ENV_PATH,
dataset_id=dataset_path.name,
pipeline=pipeline_path.name,
airflow_home=airflow_home,
composer_env="test-env",
composer_bucket="test-bucket",
composer_region="test-region",
)


def test_script_with_pipeline_arg_deploys_only_that_pipeline(
dataset_path: pathlib.Path,
pipeline_path: pathlib.Path,
Expand Down