Skip to content

Commit

Permalink
feat!: Unified variables and adds support for IAM policies (#341)
Browse files Browse the repository at this point in the history
* support infra vars and IAM policies for TF generation

* support IAM policies in Terraform templates

* wait for black formatting to finish

* use pipeline vars and removed shared vars
  • Loading branch information
adlersantos committed Apr 12, 2022
1 parent 6721490 commit c4a45a0
Show file tree
Hide file tree
Showing 11 changed files with 358 additions and 101 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Expand Up @@ -16,10 +16,13 @@ terraform.tfstate
# airflow cache
tmp

# env or editor config
# python env or editor config
.python-version
.vscode

# env variables files
.vars.*.yaml

# generated files and folders
.dev
.test
Expand Down
52 changes: 28 additions & 24 deletions scripts/deploy_dag.py
Expand Up @@ -16,7 +16,6 @@
import argparse
import json
import pathlib
import re
import subprocess
import typing

Expand All @@ -39,9 +38,9 @@ def main(
env_path: pathlib.Path,
dataset_id: str,
composer_env: str,
composer_bucket: None,
composer_bucket: typing.Union[str, None],
composer_region: str,
pipeline: str = None,
pipeline: typing.Union[str, None],
):
if composer_bucket is None:
composer_bucket = get_composer_bucket(composer_env, composer_region)
Expand Down Expand Up @@ -81,7 +80,7 @@ def main(
def get_composer_bucket(
composer_env: str,
composer_region: str,
):
) -> str:
project_sub = subprocess.check_output(
[
"gcloud",
Expand All @@ -106,11 +105,10 @@ def get_composer_bucket(
# Make the request
response = client.get_environment(request=request)

gcs_pattern = re.compile(r"^gs:\/\/(.*)\/")

composer_bucket = gcs_pattern.match(response.config.dag_gcs_prefix)[1]

# Handle the response
composer_bucket = response.config.dag_gcs_prefix.replace("/dags", "").replace(
"gs://", ""
)
return composer_bucket


Expand All @@ -121,22 +119,33 @@ def run_gsutil_cmd(args: typing.List[str], cwd: pathlib.Path):
def copy_variables_to_airflow_data_folder(
env_path: pathlib.Path,
dataset_id: str,
composer_bucket: str = None,
composer_bucket: str,
):
"""
[remote]
gsutil cp {DATASET_ID}_variables.json gs://{COMPOSER_BUCKET}/data/variables/{filename}...
cd .{ENV}/datasets or .{ENV}/datasets/{dataset_id}
"""First checks if a `.vars.[ENV].yaml` file exists in the dataset folder and if the `pipelines` key exists in that file.
If so, copy the JSON object equivalent of `pipelines` into the variables file at `.[ENV]/datasets/pipelines/[DATASET]_variables.json`.
Finally, upload the pipeline variables file to the Composer bucket.
"""
cwd = env_path / "datasets" / dataset_id / "pipelines"
filename = f"{dataset_id}_variables.json"
gcs_uri = f"gs://{composer_bucket}/data/variables/{filename}"
pipeline_vars_file = f"{dataset_id}_variables.json"
env_vars_file = DATASETS_PATH / dataset_id / f".vars{env_path.name}.yaml"
env_vars = yaml.load(open(env_vars_file)) if env_vars_file.exists() else {}

if "pipelines" in env_vars:
print(
f"Pipeline variables found in {env_vars_file}:\n"
f"{json.dumps(env_vars['pipelines'], indent=2)}"
)
with open(cwd / pipeline_vars_file, "w") as file_:
file_.write(json.dumps(env_vars["pipelines"]))

gcs_uri = f"gs://{composer_bucket}/data/variables/{pipeline_vars_file}"
print(
"\nCopying variables JSON file into Cloud Composer data folder\n\n"
f" Source:\n {cwd / filename}\n\n"
f" Source:\n {cwd / pipeline_vars_file}\n\n"
f" Destination:\n {gcs_uri}\n"
)
run_gsutil_cmd(["cp", filename, gcs_uri], cwd=cwd)
run_gsutil_cmd(["cp", pipeline_vars_file, gcs_uri], cwd=cwd)


def run_cloud_composer_vars_import(
Expand Down Expand Up @@ -187,7 +196,7 @@ def copy_generated_dag_to_airflow_dags_folder(
env_path: pathlib.Path,
dataset_id: str,
pipeline_id: str,
composer_bucket: str = None,
composer_bucket: str,
):
"""
Runs the command
Expand All @@ -212,7 +221,7 @@ def copy_custom_callables_to_airflow_dags_folder(
env_path: pathlib.Path,
dataset_id: str,
pipeline_id: str,
composer_bucket: str = None,
composer_bucket: str,
):
"""
Runs the command
Expand Down Expand Up @@ -354,11 +363,6 @@ def check_airflow_version_compatibility(
"Argument `-n|--composer-env` (Composer environment name) not specified"
)

if not args.composer_bucket:
raise ValueError(
"Argument `-b|--composer-bucket` (Composer bucket name) not specified"
)

if not args.composer_region:
raise ValueError(
"Argument `-r|--composer-region` (Composer environment region) not specified"
Expand Down
15 changes: 3 additions & 12 deletions scripts/generate_dag.py
Expand Up @@ -61,8 +61,6 @@ def main(
else:
generate_pipeline_dag(dataset_id, pipeline_id, env)

generate_shared_variables_file(env)


def generate_pipeline_dag(dataset_id: str, pipeline_id: str, env: str):
pipeline_dir = DATASETS_PATH / dataset_id / "pipelines" / pipeline_id
Expand Down Expand Up @@ -134,15 +132,6 @@ def generate_task_contents(task: dict, airflow_version: str) -> str:
)


def generate_shared_variables_file(env: str) -> None:
shared_variables_file = pathlib.Path(
PROJECT_ROOT / f".{env}" / "datasets" / "shared_variables.json"
)
if not shared_variables_file.exists():
shared_variables_file.touch()
shared_variables_file.write_text("{}", encoding="utf-8")


def dag_init(config: dict) -> dict:
return config["dag"].get("initialize") or config["dag"].get("init")

Expand Down Expand Up @@ -201,7 +190,9 @@ def write_to_file(contents: str, filepath: pathlib.Path):


def format_python_code(target_file: pathlib.Path):
subprocess.Popen(f"black -q {target_file}", stdout=subprocess.PIPE, shell=True)
subprocess.Popen(
f"black -q {target_file}", stdout=subprocess.PIPE, shell=True
).wait()
subprocess.check_call(["isort", "--profile", "black", "."], cwd=PROJECT_ROOT)


Expand Down
56 changes: 40 additions & 16 deletions scripts/generate_terraform.py
Expand Up @@ -62,19 +62,31 @@ def main(
dataset_config = yaml.load(
open(DATASETS_PATH / dataset_id / "pipelines" / "dataset.yaml")
)
generate_dataset_tf(dataset_id, project_id, dataset_config, env)
infra_vars = load_env_vars(dataset_id, env).get("infra")
generate_dataset_tf(dataset_id, project_id, dataset_config, infra_vars, env)

generate_all_pipelines_tf(dataset_id, project_id, env_path)

generate_variables_tf(dataset_id, env_path)
generate_tfvars_file(
project_id, bucket_name_prefix, dataset_id, region, impersonating_acct, env_path
project_id,
bucket_name_prefix,
dataset_id,
region,
impersonating_acct,
env_path,
infra_vars,
)

if tf_apply:
actuate_terraform_resources(dataset_id, env_path)


def load_env_vars(dataset_id: str, env: str) -> dict:
env_vars_file = PROJECT_ROOT / "datasets" / dataset_id / f".vars.{env}.yaml"
return yaml.load(open(env_vars_file)) if env_vars_file.exists() else {}


def generate_provider_tf(
project_id: str,
dataset_id: str,
Expand Down Expand Up @@ -113,8 +125,17 @@ def generate_backend_tf(
)


def generate_dataset_tf(dataset_id: str, project_id: str, config: dict, env: str):
subs = {"project_id": project_id, "dataset_id": dataset_id, "env": env}
def generate_dataset_tf(
dataset_id: str,
project_id: str,
config: dict,
infra_vars: typing.Union[dict, None],
env: str,
):
if infra_vars:
subs = infra_vars
else:
subs = {"project_id": project_id, "dataset_id": dataset_id, "env": env}

if not config["resources"]:
return
Expand Down Expand Up @@ -175,14 +196,18 @@ def generate_tfvars_file(
region: str,
impersonating_acct: str,
env_path: pathlib.Path,
infra_vars: typing.Union[dict, None],
):
tf_vars = {
"project_id": project_id,
"bucket_name_prefix": bucket_name_prefix,
"impersonating_acct": impersonating_acct,
"region": region,
"env": env_path.name.replace(".", ""),
}
if infra_vars:
tf_vars = infra_vars
else:
tf_vars = {
"project_id": project_id,
"bucket_name_prefix": bucket_name_prefix,
"impersonating_acct": impersonating_acct,
"region": region,
"env": env_path.name.replace(".", ""),
}

contents = apply_substitutions_to_template(
TEMPLATE_PATHS["tfvars"], {"tf_vars": tf_vars}
Expand Down Expand Up @@ -309,7 +334,7 @@ def terraform_fmt(target_file: pathlib.Path):
f"terraform fmt -write=true {target_file}",
stdout=subprocess.DEVNULL,
shell=True,
)
).wait()


def actuate_terraform_resources(dataset_id: str, env_path: pathlib.Path):
Expand All @@ -325,27 +350,26 @@ def apply_substitutions_to_template(template: pathlib.Path, subs: dict) -> str:

if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Generate Terraform infra code for BigQuery datasets"
description="Generate Terraform code for infrastructure required by every dataset"
)
parser.add_argument(
"-d",
"--dataset",
required=True,
type=str,
dest="dataset",
help="The directory name of the dataset.",
help="The directory name of the dataset",
)
parser.add_argument(
"--gcp-project-id",
required=True,
type=str,
default="",
dest="project_id",
help="The Google Cloud project ID",
)
parser.add_argument(
"-b",
"--bucket-name-prefix",
required=True,
type=str,
default="",
dest="bucket_name_prefix",
Expand Down
17 changes: 17 additions & 0 deletions templates/terraform/google_bigquery_dataset.tf.jinja2
Expand Up @@ -29,6 +29,23 @@ resource "google_bigquery_dataset" "{{ dataset_id }}" {
{% endif -%}
}

{% if iam_policies -%}
data "google_iam_policy" "bq_ds__{{ dataset_id }}" {
dynamic "binding" {
for_each = var.iam_policies["bigquery_datasets"]["{{ dataset_id }}"]
content {
role = binding.value["role"]
members = binding.value["members"]
}
}
}

resource "google_bigquery_dataset_iam_policy" "{{ dataset_id }}" {
dataset_id = google_bigquery_dataset.{{ dataset_id }}.dataset_id
policy_data = data.google_iam_policy.bq_ds__{{ dataset_id }}.policy_data
}
{% endif -%}

output "bigquery_dataset-{{ dataset_id }}-dataset_id" {
value = google_bigquery_dataset.{{ dataset_id }}.dataset_id
}
17 changes: 17 additions & 0 deletions templates/terraform/google_storage_bucket.tf.jinja2
Expand Up @@ -31,6 +31,23 @@ resource "google_storage_bucket" "{{ name }}" {
}
}

{% if iam_policies -%}
data "google_iam_policy" "storage_bucket__{{ name }}" {
dynamic "binding" {
for_each = var.iam_policies["storage_buckets"]["{{ name }}"]
content {
role = binding.value["role"]
members = binding.value["members"]
}
}
}

resource "google_storage_bucket_iam_policy" "{{ name }}" {
bucket = google_storage_bucket.{{ name }}.name
policy_data = data.google_iam_policy.storage_bucket__{{ name }}.policy_data
}
{% endif -%}

output "storage_bucket-{{ name }}-name" {
value = google_storage_bucket.{{ name }}.name
}
6 changes: 5 additions & 1 deletion templates/terraform/terraform.tfvars.jinja2
Expand Up @@ -16,5 +16,9 @@


{% for name, val in tf_vars.items() -%}
{{name}}="{{ val }}"
{% if name == "iam_policies" -%}
{{name}}={{ val|tojson }}
{% else -%}
{{name}}="{{ val }}"
{% endif -%}
{% endfor -%}
3 changes: 3 additions & 0 deletions templates/terraform/variables.tf.jinja2
Expand Up @@ -20,3 +20,6 @@ variable "bucket_name_prefix" {}
variable "impersonating_acct" {}
variable "region" {}
variable "env" {}
variable "iam_policies" {
default = {}
}

0 comments on commit c4a45a0

Please sign in to comment.