diff --git a/datasets/scalable_open_source/infra/provider.tf b/datasets/scalable_open_source/infra/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/scalable_open_source/infra/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/scalable_open_source/infra/scalable_open_source_dataset.tf b/datasets/scalable_open_source/infra/scalable_open_source_dataset.tf new file mode 100644 index 000000000..4bdd1df26 --- /dev/null +++ b/datasets/scalable_open_source/infra/scalable_open_source_dataset.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "deps_dev_v1" { + dataset_id = "deps_dev_v1" + project = var.project_id +} + +data "google_iam_policy" "bq_ds__deps_dev_v1" { + dynamic "binding" { + for_each = var.iam_policies["bigquery_datasets"]["deps_dev_v1"] + content { + role = binding.value["role"] + members = binding.value["members"] + } + } +} + +resource "google_bigquery_dataset_iam_policy" "deps_dev_v1" { + dataset_id = google_bigquery_dataset.deps_dev_v1.dataset_id + policy_data = data.google_iam_policy.bq_ds__deps_dev_v1.policy_data +} +output "bigquery_dataset-deps_dev_v1-dataset_id" { + value = google_bigquery_dataset.deps_dev_v1.dataset_id +} diff --git a/datasets/scalable_open_source/infra/variables.tf b/datasets/scalable_open_source/infra/variables.tf new file mode 100644 index 000000000..53f483735 --- /dev/null +++ b/datasets/scalable_open_source/infra/variables.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} +variable "iam_policies" { + default = {} +} + diff --git a/datasets/scalable_open_source/pipelines/_images/bq_data_transfer/Dockerfile b/datasets/scalable_open_source/pipelines/_images/bq_data_transfer/Dockerfile new file mode 100644 index 000000000..de6dfa5ad --- /dev/null +++ b/datasets/scalable_open_source/pipelines/_images/bq_data_transfer/Dockerfile @@ -0,0 +1,21 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.8 +ENV PYTHONUNBUFFERED True +COPY requirements.txt ./ +RUN python3 -m pip install --no-cache-dir -r requirements.txt +WORKDIR /custom +COPY ./script.py . +CMD ["python3", "script.py"] diff --git a/datasets/scalable_open_source/pipelines/_images/bq_data_transfer/requirements.txt b/datasets/scalable_open_source/pipelines/_images/bq_data_transfer/requirements.txt new file mode 100644 index 000000000..b927663c5 --- /dev/null +++ b/datasets/scalable_open_source/pipelines/_images/bq_data_transfer/requirements.txt @@ -0,0 +1,4 @@ +google-api-core +google-cloud-bigquery +google-cloud-bigquery-datatransfer +protobuf diff --git a/datasets/scalable_open_source/pipelines/_images/bq_data_transfer/script.py b/datasets/scalable_open_source/pipelines/_images/bq_data_transfer/script.py new file mode 100644 index 000000000..7ca058d8c --- /dev/null +++ b/datasets/scalable_open_source/pipelines/_images/bq_data_transfer/script.py @@ -0,0 +1,180 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import operator +import os +import time + +from google.api_core.exceptions import ResourceExhausted +from google.cloud import bigquery_datatransfer_v1 +from google.protobuf.timestamp_pb2 import Timestamp + +RETRY_DELAY = 10 + + +class TimeoutError(Exception): + """Raised when the BQ transfer jobs haven't all finished within the allotted time""" + + pass + + +def main( + source_project_id: str, + source_bq_dataset: str, + target_project_id: str, + target_bq_dataset: str, + service_account: str, + timeout: int, +): + client = bigquery_datatransfer_v1.DataTransferServiceClient() + transfer_config_name = f"{source_project_id}-{source_bq_dataset}-copy" + existing_config = find_existing_config( + client, target_project_id, transfer_config_name + ) + + if not existing_config: + existing_config = create_transfer_config( + client, + source_project_id, + source_bq_dataset, + target_project_id, + target_bq_dataset, + transfer_config_name, + service_account, + ) + + trigger_config(client, existing_config) + wait_for_completion(client, existing_config, timeout) + + +def find_existing_config( + client: bigquery_datatransfer_v1.DataTransferServiceClient, + gcp_project: str, + transfer_config_name: str, +) -> bigquery_datatransfer_v1.types.TransferConfig: + all_transfer_configs = client.list_transfer_configs( + request=bigquery_datatransfer_v1.types.ListTransferConfigsRequest( + parent=f"projects/{gcp_project}" + ) + ) + + return next( + ( + config + for config in all_transfer_configs + if config.display_name == transfer_config_name + ), + None, + ) + + +def wait_for_completion( + client: bigquery_datatransfer_v1.DataTransferServiceClient, + running_config: bigquery_datatransfer_v1.types.TransferConfig, + timeout: int, +) -> None: + _start = int(time.time()) + + while True: + latest_runs = [] + latest_runs.append(latest_transfer_run(client, running_config)) + + logging.info(f"States: {[str(run.state) for run in latest_runs]}") + + # Mark as complete when all runs have succeeded + if all([str(run.state) == "TransferState.SUCCEEDED" for run in latest_runs]): + return + + # Stop the process when it's longer than the allotted time + if int(time.time()) - _start > timeout: + raise TimeoutError + + time.sleep(RETRY_DELAY) + + +def latest_transfer_run( + client: bigquery_datatransfer_v1.DataTransferServiceClient, + config: bigquery_datatransfer_v1.types.TransferConfig, +) -> bigquery_datatransfer_v1.types.TransferRun: + transfer_runs = client.list_transfer_runs(parent=config.name) + return max(transfer_runs, key=operator.attrgetter("run_time")) + + +def create_transfer_config( + client: bigquery_datatransfer_v1.DataTransferServiceClient, + source_project_id: str, + source_dataset_id: str, + target_project_id: str, + target_dataset_id: str, + display_name: str, + service_account: str, +) -> bigquery_datatransfer_v1.types.TransferConfig: + transfer_config = bigquery_datatransfer_v1.TransferConfig( + destination_dataset_id=target_dataset_id, + display_name=display_name, + data_source_id="cross_region_copy", + dataset_region="US", + params={ + "overwrite_destination_table": True, + "source_project_id": source_project_id, + "source_dataset_id": source_dataset_id, + }, + schedule_options=bigquery_datatransfer_v1.ScheduleOptions( + disable_auto_scheduling=True + ), + ) + + request = bigquery_datatransfer_v1.types.CreateTransferConfigRequest( + parent=client.common_project_path(target_project_id), + transfer_config=transfer_config, + service_account_name=service_account, + ) + + return client.create_transfer_config(request=request) + + +def trigger_config( + client: bigquery_datatransfer_v1.DataTransferServiceClient, + config: bigquery_datatransfer_v1.types.TransferConfig, +) -> None: + now = time.time() + seconds = int(now) + nanos = int((now - seconds) * pow(10, 9)) + + try: + client.start_manual_transfer_runs( + request=bigquery_datatransfer_v1.types.StartManualTransferRunsRequest( + parent=config.name, + requested_run_time=Timestamp(seconds=seconds, nanos=nanos), + ) + ) + except ResourceExhausted: + logging.info( + f"Transfer job is currently running for config ({config.display_name}) {config.name}." + ) + return + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_project_id=os.environ["SOURCE_PROJECT_ID"], + source_bq_dataset=os.environ["SOURCE_BQ_DATASET"], + target_project_id=os.environ["TARGET_PROJECT_ID"], + target_bq_dataset=os.environ["TARGET_BQ_DATASET"], + service_account=os.environ["SERVICE_ACCOUNT"], + timeout=int(os.getenv("TIMEOUT", 1200)), + ) diff --git a/datasets/scalable_open_source/pipelines/dataset.yaml b/datasets/scalable_open_source/pipelines/dataset.yaml new file mode 100644 index 000000000..1c70ee63b --- /dev/null +++ b/datasets/scalable_open_source/pipelines/dataset.yaml @@ -0,0 +1,25 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + name: scalable_open_source + friendly_name: ~ + description: ~ + dataset_sources: ~ + terms_of_use: ~ + +resources: + - type: bigquery_dataset + dataset_id: deps_dev_v1 + description: ~ diff --git a/datasets/scalable_open_source/pipelines/deps_dev/deps_dev_dag.py b/datasets/scalable_open_source/pipelines/deps_dev/deps_dev_dag.py new file mode 100644 index 000000000..a9ee1b0d3 --- /dev/null +++ b/datasets/scalable_open_source/pipelines/deps_dev/deps_dev_dag.py @@ -0,0 +1,53 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2022-05-01", +} + + +with DAG( + dag_id="scalable_open_source.deps_dev", + default_args=default_args, + max_active_runs=1, + schedule_interval="@weekly", + catchup=False, + default_view="graph", +) as dag: + + # Copy deps.dev dataset + copy_bq_datasets = kubernetes_pod.KubernetesPodOperator( + task_id="copy_bq_datasets", + name="copy_bq_datasets", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.scalable_open_source.container_registry.copy_bq_datasets }}", + env_vars={ + "SOURCE_PROJECT_ID": "{{ var.json.scalable_open_source.source_project_id }}", + "SOURCE_BQ_DATASET": "{{ var.json.scalable_open_source.source_bq_dataset }}", + "TARGET_PROJECT_ID": "{{ var.json.scalable_open_source.target_project_id }}", + "TARGET_BQ_DATASET": "deps_dev_v1", + "SERVICE_ACCOUNT": "{{ var.json.scalable_open_source.service_account }}", + }, + resources={"request_memory": "128M", "request_cpu": "200m"}, + ) + + copy_bq_datasets diff --git a/datasets/scalable_open_source/pipelines/deps_dev/pipeline.yaml b/datasets/scalable_open_source/pipelines/deps_dev/pipeline.yaml new file mode 100644 index 000000000..157b81cd8 --- /dev/null +++ b/datasets/scalable_open_source/pipelines/deps_dev/pipeline.yaml @@ -0,0 +1,52 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: ~ + +dag: + airflow_version: 2 + initialize: + dag_id: deps_dev + default_args: + owner: "Google" + depends_on_past: False + start_date: '2022-05-01' + max_active_runs: 1 + schedule_interval: "@weekly" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + description: "Copy deps.dev dataset" + args: + task_id: "copy_bq_datasets" + name: "copy_bq_datasets" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.scalable_open_source.container_registry.copy_bq_datasets }}" + env_vars: + SOURCE_PROJECT_ID: "{{ var.json.scalable_open_source.source_project_id }}" + SOURCE_BQ_DATASET: "{{ var.json.scalable_open_source.source_bq_dataset }}" + TARGET_PROJECT_ID: "{{ var.json.scalable_open_source.target_project_id }}" + TARGET_BQ_DATASET: deps_dev_v1 + SERVICE_ACCOUNT: "{{ var.json.scalable_open_source.service_account }}" + resources: + request_memory: "128M" + request_cpu: "200m" + + graph_paths: + - "copy_bq_datasets"