From dbf23007d1033613a3fc0e4026f00cc15eac5e98 Mon Sep 17 00:00:00 2001 From: gkodukula <95613561+gkodukula@users.noreply.github.com> Date: Mon, 29 Aug 2022 20:50:14 +0530 Subject: [PATCH] Feat: Onboard Uniref50 dataset (#443) --- datasets/uniref50/infra/provider.tf | 28 + datasets/uniref50/infra/uniref50_dataset.tf | 42 ++ datasets/uniref50/infra/uniref50_pipeline.tf | 34 + datasets/uniref50/infra/variables.tf | 26 + .../_images/run_csv_transform_kub/Dockerfile | 21 + .../run_csv_transform_kub/csv_transform.py | 378 +++++++++++ .../run_csv_transform_kub/requirements.txt | 6 + .../pipelines/_images/uniref50_schema.json | 44 ++ datasets/uniref50/pipelines/dataset.yaml | 31 + .../uniref50/pipelines/uniref50/pipeline.yaml | 617 +++++++++++++++++ .../pipelines/uniref50/uniref50_dag.py | 641 ++++++++++++++++++ 11 files changed, 1868 insertions(+) create mode 100644 datasets/uniref50/infra/provider.tf create mode 100644 datasets/uniref50/infra/uniref50_dataset.tf create mode 100644 datasets/uniref50/infra/uniref50_pipeline.tf create mode 100644 datasets/uniref50/infra/variables.tf create mode 100644 datasets/uniref50/pipelines/_images/run_csv_transform_kub/Dockerfile create mode 100755 datasets/uniref50/pipelines/_images/run_csv_transform_kub/csv_transform.py create mode 100644 datasets/uniref50/pipelines/_images/run_csv_transform_kub/requirements.txt create mode 100644 datasets/uniref50/pipelines/_images/uniref50_schema.json create mode 100644 datasets/uniref50/pipelines/dataset.yaml create mode 100644 datasets/uniref50/pipelines/uniref50/pipeline.yaml create mode 100644 datasets/uniref50/pipelines/uniref50/uniref50_dag.py diff --git a/datasets/uniref50/infra/provider.tf b/datasets/uniref50/infra/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/uniref50/infra/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/uniref50/infra/uniref50_dataset.tf b/datasets/uniref50/infra/uniref50_dataset.tf new file mode 100644 index 000000000..ac77b3279 --- /dev/null +++ b/datasets/uniref50/infra/uniref50_dataset.tf @@ -0,0 +1,42 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_storage_bucket" "uniref50" { + name = "${var.bucket_name_prefix}-uniref50" + force_destroy = true + location = "US" + uniform_bucket_level_access = true + lifecycle { + ignore_changes = [ + logging, + ] + } +} + +output "storage_bucket-uniref50-name" { + value = google_storage_bucket.uniref50.name +} + +resource "google_bigquery_dataset" "uniref50" { + dataset_id = "uniref50" + project = var.project_id + location = "US" +} + +output "bigquery_dataset-uniref50-dataset_id" { + value = google_bigquery_dataset.uniref50.dataset_id +} diff --git a/datasets/uniref50/infra/uniref50_pipeline.tf b/datasets/uniref50/infra/uniref50_pipeline.tf new file mode 100644 index 000000000..77f2756e7 --- /dev/null +++ b/datasets/uniref50/infra/uniref50_pipeline.tf @@ -0,0 +1,34 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "uniref50_uniref50" { + project = var.project_id + dataset_id = "uniref50" + table_id = "uniref50" + description = "The Uniref Dataset" + depends_on = [ + google_bigquery_dataset.uniref50 + ] +} + +output "bigquery_table-uniref50_uniref50-table_id" { + value = google_bigquery_table.uniref50_uniref50.table_id +} + +output "bigquery_table-uniref50_uniref50-id" { + value = google_bigquery_table.uniref50_uniref50.id +} diff --git a/datasets/uniref50/infra/variables.tf b/datasets/uniref50/infra/variables.tf new file mode 100644 index 000000000..53f483735 --- /dev/null +++ b/datasets/uniref50/infra/variables.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} +variable "iam_policies" { + default = {} +} + diff --git a/datasets/uniref50/pipelines/_images/run_csv_transform_kub/Dockerfile b/datasets/uniref50/pipelines/_images/run_csv_transform_kub/Dockerfile new file mode 100644 index 000000000..748bc3bec --- /dev/null +++ b/datasets/uniref50/pipelines/_images/run_csv_transform_kub/Dockerfile @@ -0,0 +1,21 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.8 +ENV PYTHONUNBUFFERED True +COPY requirements.txt ./ +RUN python3 -m pip install --no-cache-dir -r requirements.txt +WORKDIR /custom +COPY ./csv_transform.py . +CMD ["python3", "csv_transform.py"] diff --git a/datasets/uniref50/pipelines/_images/run_csv_transform_kub/csv_transform.py b/datasets/uniref50/pipelines/_images/run_csv_transform_kub/csv_transform.py new file mode 100755 index 000000000..06f6e886a --- /dev/null +++ b/datasets/uniref50/pipelines/_images/run_csv_transform_kub/csv_transform.py @@ -0,0 +1,378 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import csv +import json +import logging +import os +import pathlib + +from Bio import SeqIO +from google.api_core.exceptions import NotFound +from google.cloud import bigquery, storage + + +def main( + pipeline_name: str, + source_gcs_bucket: str, + source_gcs_object: str, + source_file: pathlib.Path, + batch_file: str, + target_file: pathlib.Path, + project_id: str, + dataset_id: str, + table_id: str, + schema_path: str, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, +) -> None: + logging.info(f"{pipeline_name} process started") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + execute_pipeline( + source_gcs_bucket=source_gcs_bucket, + source_gcs_object=source_gcs_object, + source_file=source_file, + batch_file=batch_file, + target_file=target_file, + project_id=project_id, + dataset_id=dataset_id, + destination_table=table_id, + schema_path=schema_path, + chunksize=chunksize, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + ) + logging.info(f"{pipeline_name} process completed") + + +def execute_pipeline( + source_gcs_bucket: str, + source_gcs_object: str, + source_file: pathlib.Path, + batch_file: str, + target_file: pathlib.Path, + project_id: str, + dataset_id: str, + destination_table: str, + schema_path: str, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, +) -> None: + download_blob(source_gcs_bucket, source_gcs_object, source_file) + process_source_file( + source_file=source_file, + batch_file=batch_file, + target_file=target_file, + chunksize=chunksize, + ) + if os.path.exists(target_file): + upload_file_to_gcs( + file_path=target_file, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + ) + table_exists = create_dest_table( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + schema_filepath=schema_path, + bucket_name=target_gcs_bucket, + ) + if table_exists: + load_data_to_bq( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + file_path=target_file, + truncate_table=False, + field_delimiter=",", + ) + else: + error_msg = f"Error: Data was not loaded because the destination table {project_id}.{dataset_id}.{destination_table} does not exist and/or could not be created." + raise ValueError(error_msg) + else: + logging.info( + f"Informational: The data file {target_file} was not generated because no data file was available. Continuing." + ) + + +def download_blob(source_gcs_bucket: str, source_gcs_object: str, source_file: str): + """Downloads a blob from the bucket.""" + logging.info( + f"Downloading data from gs://{source_gcs_bucket}/{source_gcs_object} to {source_file} ..." + ) + storage_client = storage.Client() + bucket = storage_client.bucket(source_gcs_bucket) + blob = bucket.blob(source_gcs_object) + blob.download_to_filename(source_file) + logging.info("Downloading Completed.") + + +def process_source_file( + source_file: str, + batch_file: str, + target_file: str, + chunksize: str, +) -> None: + logging.info(f"Opening source file {source_file}") + csv.field_size_limit(512 << 10) + csv.register_dialect("TabDialect", quotechar='"', delimiter=",", strict=True) + append_header_data( + batch_file, + headers_list=[ + "ClusterID", + "RepID", + "TaxID", + "Sequence", + "ClusterName", + "Organism", + "Size", + ], + ) + fasta_sequences = SeqIO.parse(open(source_file), "fasta") + logging.info(f"Finished opening source file {source_file}") + row_position = 0 + for fasta in fasta_sequences: + description, sequence = str(fasta.description), str(fasta.seq) + description_list = description.split(" ") + row_list = [] + string = "" + append_row_list(description_list, sequence, row_list) + description_list = [" {0}".format(elem) for elem in description_list] + iteration_list = iter(description_list[1:]) + iteration_string = "" + for item in description_list: + try: + iteration_string = next(iteration_list) + if " n=" in iteration_string: + string = string + item + row_list.append(string.lstrip()) + string = "" + elif " n=" in item: + string = string + item + row_list.append(string.lstrip()[2:]) + string = "" + else: + string = string + item + except StopIteration: + string = string + item + row_list.append(string.lstrip()[4:]) + + write_batch_file(batch_file, row_list) + row_position = row_position + 1 + if row_position % int(chunksize) == 0 and row_position > 0: + process_chunk( + batch_file=batch_file, + target_file=target_file, + ) + row_position = 0 + + if row_position != 0: + process_chunk( + batch_file=batch_file, + target_file=target_file, + ) + + +def append_row_list(description_list: list, sequence: str, row_list: list) -> None: + row_list.append(description_list.pop(0)) + row_list.append(description_list.pop()[6:]) + row_list.append(description_list.pop()[6:]) + row_list.append(str(sequence)) + return row_list + + +def write_batch_file(batch_file: str, row_list: list) -> None: + with open( + batch_file, + "a", + ) as rowobj: + row_append = csv.writer(rowobj) + row_append.writerow(row_list) + + +def process_chunk( + batch_file: str, + target_file: str, +) -> None: + logging.info("Processing batch file") + target_file_batch = batch_file + append_batch_file(target_file_batch, target_file) + logging.info(f"Processing batch file {target_file_batch} completed") + + +def load_data_to_bq( + project_id: str, + dataset_id: str, + table_id: str, + file_path: str, + truncate_table: bool, + field_delimiter: str, +) -> None: + logging.info( + f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} started" + ) + client = bigquery.Client(project=project_id) + table_ref = client.dataset(dataset_id).table(table_id) + job_config = bigquery.LoadJobConfig() + job_config.source_format = bigquery.SourceFormat.CSV + job_config.field_delimiter = field_delimiter + if truncate_table: + job_config.write_disposition = "WRITE_TRUNCATE" + else: + job_config.write_disposition = "WRITE_APPEND" + print("appending started") + job_config.skip_leading_rows = 1 # ignore the header + job_config.autodetect = False + with open(file_path, "rb") as source_file: + job = client.load_table_from_file(source_file, table_ref, job_config=job_config) + job.result() + logging.info( + f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} completed" + ) + + +def append_header_data(batch_file: str, headers_list: list) -> None: + with open(batch_file, "w") as headerobj: + header_write = csv.writer(headerobj) + header_write.writerow(headers_list) + + +def create_dest_table( + project_id: str, + dataset_id: str, + table_id: str, + schema_filepath: list, + bucket_name: str, +) -> bool: + table_ref = f"{project_id}.{dataset_id}.{table_id}" + logging.info(f"Attempting to create table {table_ref} if it doesn't already exist") + client = bigquery.Client() + table_exists = False + try: + table = client.get_table(table_ref) + table_exists_id = table.table_id + logging.info(f"Table {table_exists_id} currently exists.") + table = True + except NotFound: + table = None + if not table: + logging.info( + ( + f"Table {table_ref} currently does not exist. Attempting to create table." + ) + ) + if check_gcs_file_exists(schema_filepath, bucket_name): + schema = create_table_schema([], bucket_name, schema_filepath) + table = bigquery.Table(table_ref, schema=schema) + client.create_table(table) + print(f"Table {table_ref} was created".format(table_id)) + table_exists = True + else: + file_name = os.path.split(schema_filepath)[1] + file_path = os.path.split(schema_filepath)[0] + logging.info( + f"Error: Unable to create table {table_ref} because schema file {file_name} does not exist in location {file_path} in bucket {bucket_name}" + ) + table_exists = False + else: + table_exists = True + return table_exists + + +def check_gcs_file_exists(file_path: str, bucket_name: str) -> bool: + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + exists = storage.Blob(bucket=bucket, name=file_path).exists(storage_client) + return exists + + +def create_table_schema( + schema_structure: list, bucket_name: str = "", schema_filepath: str = "" +) -> list: + logging.info(f"Defining table schema... {bucket_name} ... {schema_filepath}") + schema = [] + if not (schema_filepath): + schema_struct = schema_structure + else: + storage_client = storage.Client() + bucket = storage_client.get_bucket(bucket_name) + blob = bucket.blob(schema_filepath) + schema_struct = json.loads(blob.download_as_string(client=None)) + for schema_field in schema_struct: + fld_name = schema_field["name"] + fld_type = schema_field["type"] + try: + fld_descr = schema_field["description"] + except KeyError: + fld_descr = "" + fld_mode = schema_field["mode"] + schema.append( + bigquery.SchemaField( + name=fld_name, field_type=fld_type, mode=fld_mode, description=fld_descr + ) + ) + return schema + + +def append_batch_file(target_file_batch: str, target_file: str) -> None: + + with open(target_file_batch, "r") as data_file: + with open(target_file, "a+") as target_file: + logging.info(f"Appending batch file {target_file_batch} to {target_file}") + target_file.write(data_file.read()) + if os.path.exists(target_file_batch): + os.remove(target_file_batch) + + +def upload_file_to_gcs( + file_path: pathlib.Path, target_gcs_bucket: str, target_gcs_path: str +) -> None: + if os.path.exists(file_path): + logging.info( + f"Uploading output file to gs://{target_gcs_bucket}/{target_gcs_path}" + ) + storage_client = storage.Client() + bucket = storage_client.bucket(target_gcs_bucket) + blob = bucket.blob(target_gcs_path) + blob.upload_from_filename(file_path) + + else: + logging.info( + f"Cannot upload file to gs://{target_gcs_bucket}/{target_gcs_path} as it does not exist." + ) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + pipeline_name=os.environ.get("PIPELINE_NAME"), + source_gcs_bucket=os.environ.get("SOURCE_GCS_BUCKET"), + source_gcs_object=os.environ.get("SOURCE_GCS_OBJECT"), + source_file=pathlib.Path(os.environ.get("SOURCE_FILE")).expanduser(), + batch_file=os.environ.get("BATCH_FILE"), + target_file=pathlib.Path(os.environ.get("TARGET_FILE")).expanduser(), + chunksize=os.environ.get("CHUNKSIZE"), + target_gcs_bucket=os.environ.get( + "TARGET_GCS_BUCKET", + ), + target_gcs_path=os.environ.get("TARGET_GCS_PATH"), + project_id=os.environ.get("PROJECT_ID"), + dataset_id=os.environ.get("DATASET_ID"), + table_id=os.environ["TABLE_ID"], + schema_path=os.environ["SCHEMA_PATH"], + ) diff --git a/datasets/uniref50/pipelines/_images/run_csv_transform_kub/requirements.txt b/datasets/uniref50/pipelines/_images/run_csv_transform_kub/requirements.txt new file mode 100644 index 000000000..033f3ab5d --- /dev/null +++ b/datasets/uniref50/pipelines/_images/run_csv_transform_kub/requirements.txt @@ -0,0 +1,6 @@ +google-cloud-bigquery +google-cloud-storage +pandas +Bio +SeqIO +sh diff --git a/datasets/uniref50/pipelines/_images/uniref50_schema.json b/datasets/uniref50/pipelines/_images/uniref50_schema.json new file mode 100644 index 000000000..0fda8efb0 --- /dev/null +++ b/datasets/uniref50/pipelines/_images/uniref50_schema.json @@ -0,0 +1,44 @@ +[ + { + "name": "ClusterID", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "RepID", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "TaxID", + "type": "String", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "Sequence", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "ClusterName", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "Size", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "Organism", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + } +] diff --git a/datasets/uniref50/pipelines/dataset.yaml b/datasets/uniref50/pipelines/dataset.yaml new file mode 100644 index 000000000..f8ac11426 --- /dev/null +++ b/datasets/uniref50/pipelines/dataset.yaml @@ -0,0 +1,31 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + name: uniref50 + friendly_name: uniref50 + description: ~ + dataset_sources: ~ + terms_of_use: ~ + +resources: + - type: storage_bucket + name: uniref50 + uniform_bucket_level_access: True + location: US + + - type: bigquery_dataset + dataset_id: uniref50 + uniform_bucket_level_access: True + location: US diff --git a/datasets/uniref50/pipelines/uniref50/pipeline.yaml b/datasets/uniref50/pipelines/uniref50/pipeline.yaml new file mode 100644 index 000000000..4595a8c08 --- /dev/null +++ b/datasets/uniref50/pipelines/uniref50/pipeline.yaml @@ -0,0 +1,617 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- + +resources: + + - type: bigquery_table + table_id: uniref50 + description: "The Uniref Dataset" + +dag: + airflow_version: 2 + initialize: + dag_id: uniref50 + default_args: + owner: "Google" + depends_on_past: False + start_date: '2022-06-10' + max_active_runs: 1 + schedule_interval: "@once" + catchup: False + default_view: graph + + tasks: + - operator: "BashOperator" + description: "Task to copy `uniref50.fasta` to gcs" + args: + task_id: "download_zip_file" + bash_command: | + mkdir -p $data_dir/uniref + curl -o $data_dir/uniref/uniref50.fasta.gz -L $uniref50 + gunzip $data_dir/uniref/uniref50.fasta.gz + awk 'BEGIN {n_seq=0;} /^>/ {if(n_seq%10000000==0){file=sprintf("/home/airflow/gcs/data/uniref50/uniref/myseq%d.fa",n_seq);} + print >> file; n_seq++; next;} { print >> file; }' < $data_dir/uniref/uniref50.fasta + awk 'BEGIN {n_seq=0;} /^>/ {if(n_seq%3500000==0){file=sprintf("/home/airflow/gcs/data/uniref50/uniref/myseq_1%d.fa",n_seq);} + print >> file; n_seq++; next;} { print >> file; }' < $data_dir/uniref/myseq0.fa + rm $data_dir/uniref/uniref50.fasta.gz + rm $data_dir/uniref/uniref50.fasta + rm $data_dir/uniref/myseq0.fa + + env: + data_dir: /home/airflow/gcs/data/uniref50 + uniref50: https://ftp.uniprot.org/pub/databases/uniprot/uniref/uniref50/uniref50.fasta.gz + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "uniref50_transform_csv_1" + startup_timeout_seconds: 600 + name: "uniref50" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.uniref50.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" + SOURCE_GCS_OBJECT: "data/uniref50/uniref/myseq_10.fa" + SOURCE_FILE: "files/uniref50.fa" + BATCH_FILE: "files/batch.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/uniref50/uniref/data_output_1.csv" + PIPELINE_NAME: "uniref50" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "uniref50" + TABLE_ID: "uniref50" + SCHEMA_PATH: "data/uniref50/uniref50_schema.json" + CHUNKSIZE: "100000" + resources: + request_memory: "4G" + request_cpu: "1" + request_ephemeral_storage: "50G" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_uniref50_to_bq_1" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/uniref50/uniref/data_output_1.csv"] + source_format: "CSV" + destination_project_dataset_table: "uniref50.uniref50" + skip_leading_rows: 0 + allow_quoted_newlines: True + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - name: "ClusterID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "ClusterName" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Size" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Organism" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "TaxID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "RepID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Sequence" + type: "STRING" + description: "" + mode: "NULLABLE" + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "uniref50_transform_csv_2" + startup_timeout_seconds: 600 + name: "uniref50" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.uniref50.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" + SOURCE_GCS_OBJECT: "data/uniref50/uniref/myseq_13500000.fa" + SOURCE_FILE: "files/uniref50.fa" + BATCH_FILE: "files/batch.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/uniref50/uniref/data_output_2.csv" + PIPELINE_NAME: "uniref50" + TABLE_ID: "uniref50" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "uniref50" + SCHEMA_PATH: "data/uniref50/uniref50_schema.json" + CHUNKSIZE: "100000" + resources: + request_memory: "4G" + request_cpu: "1" + request_ephemeral_storage: "50G" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_uniref50_to_bq_2" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/uniref50/uniref/data_output_2.csv"] + source_format: "CSV" + destination_project_dataset_table: "uniref50.uniref50" + skip_leading_rows: 0 + allow_quoted_newlines: True + write_disposition: "WRITE_APPEND" + schema_fields: + - name: "ClusterID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "ClusterName" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Size" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Organism" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "TaxID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "RepID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Sequence" + type: "STRING" + description: "" + mode: "NULLABLE" + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "uniref50_transform_csv_3" + startup_timeout_seconds: 600 + name: "uniref50" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.uniref50.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" + SOURCE_GCS_OBJECT: "data/uniref50/uniref/myseq_17000000.fa" + SOURCE_FILE: "files/uniref50.fa" + BATCH_FILE: "files/batch.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/uniref50/uniref/data_output_3.csv" + PIPELINE_NAME: "uniref50" + TABLE_ID: "uniref50" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "uniref50" + SCHEMA_PATH: "data/uniref50/uniref50_schema.json" + CHUNKSIZE: "100000" + resources: + request_memory: "4G" + request_cpu: "1" + request_ephemeral_storage: "50G" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_uniref50_to_bq_3" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/uniref50/uniref/data_output_3.csv"] + source_format: "CSV" + destination_project_dataset_table: "uniref50.uniref50" + skip_leading_rows: 0 + allow_quoted_newlines: True + write_disposition: "WRITE_APPEND" + schema_fields: + - name: "ClusterID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "ClusterName" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Size" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Organism" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "TaxID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "RepID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Sequence" + type: "STRING" + description: "" + mode: "NULLABLE" + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "uniref50_transform_csv_4" + startup_timeout_seconds: 600 + name: "uniref50" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.uniref50.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" + SOURCE_GCS_OBJECT: "data/uniref50/uniref/myseq10000000.fa" + SOURCE_FILE: "files/uniref50.fa" + BATCH_FILE: "files/batch.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/uniref50/uniref/data_output_4.csv" + PIPELINE_NAME: "uniref50" + TABLE_ID: "uniref50" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "uniref50" + SCHEMA_PATH: "data/uniref50/uniref50_schema.json" + CHUNKSIZE: "100000" + resources: + request_memory: "4G" + request_cpu: "1" + request_ephemeral_storage: "50G" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_uniref50_to_bq_4" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/uniref50/uniref/data_output_4.csv"] + source_format: "CSV" + destination_project_dataset_table: "uniref50.uniref50" + skip_leading_rows: 0 + allow_quoted_newlines: True + write_disposition: "WRITE_APPEND" + schema_fields: + - name: "ClusterID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "ClusterName" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Size" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Organism" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "TaxID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "RepID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Sequence" + type: "STRING" + description: "" + mode: "NULLABLE" + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "uniref50_transform_csv_5" + startup_timeout_seconds: 600 + name: "uniref50" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.uniref50.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" + SOURCE_GCS_OBJECT: "data/uniref50/uniref/myseq20000000.fa" + SOURCE_FILE: "files/uniref50.fa" + BATCH_FILE: "files/batch.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/uniref50/uniref/data_output_5.csv" + PIPELINE_NAME: "uniref50" + TABLE_ID: "uniref50" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "uniref50" + SCHEMA_PATH: "data/uniref50/uniref50_schema.json" + CHUNKSIZE: "100000" + resources: + request_memory: "4G" + request_cpu: "1" + request_ephemeral_storage: "50G" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_uniref50_to_bq_5" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/uniref50/uniref/data_output_5.csv"] + source_format: "CSV" + destination_project_dataset_table: "uniref50.uniref50" + skip_leading_rows: 0 + allow_quoted_newlines: True + write_disposition: "WRITE_APPEND" + schema_fields: + - name: "ClusterID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "ClusterName" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Size" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Organism" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "TaxID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "RepID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Sequence" + type: "STRING" + description: "" + mode: "NULLABLE" + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "uniref50_transform_csv_6" + startup_timeout_seconds: 600 + name: "uniref50" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.uniref50.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" + SOURCE_GCS_OBJECT: "data/uniref50/uniref/myseq30000000.fa" + SOURCE_FILE: "files/uniref50.fa" + BATCH_FILE: "files/batch.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/uniref50/uniref/data_output_6.csv" + PIPELINE_NAME: "uniref50" + TABLE_ID: "uniref50" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "uniref50" + SCHEMA_PATH: "data/uniref50/uniref50_schema.json" + CHUNKSIZE: "100000" + resources: + request_memory: "4G" + request_cpu: "1" + request_ephemeral_storage: "50G" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_uniref50_to_bq_6" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/uniref50/uniref/data_output_6.csv"] + source_format: "CSV" + destination_project_dataset_table: "uniref50.uniref50" + skip_leading_rows: 0 + allow_quoted_newlines: True + write_disposition: "WRITE_APPEND" + schema_fields: + - name: "ClusterID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "ClusterName" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Size" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Organism" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "TaxID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "RepID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Sequence" + type: "STRING" + description: "" + mode: "NULLABLE" + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "uniref50_transform_csv_7" + startup_timeout_seconds: 600 + name: "uniref50" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.uniref50.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" + SOURCE_GCS_OBJECT: "data/uniref50/uniref/myseq40000000.fa" + SOURCE_FILE: "files/uniref50.fa" + BATCH_FILE: "files/batch.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/uniref50/uniref/data_output_7.csv" + PIPELINE_NAME: "uniref50" + TABLE_ID: "uniref50" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "uniref50" + SCHEMA_PATH: "data/uniref50/uniref50_schema.json" + CHUNKSIZE: "100000" + resources: + request_memory: "4G" + request_cpu: "1" + request_ephemeral_storage: "50G" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_uniref50_to_bq_7" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/uniref50/uniref/data_output_7.csv"] + source_format: "CSV" + destination_project_dataset_table: "uniref50.uniref50" + skip_leading_rows: 0 + allow_quoted_newlines: True + write_disposition: "WRITE_APPEND" + schema_fields: + - name: "ClusterID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "ClusterName" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Size" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Organism" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "TaxID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "RepID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Sequence" + type: "STRING" + description: "" + mode: "NULLABLE" + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "uniref50_transform_csv_8" + startup_timeout_seconds: 600 + name: "uniref50" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.uniref50.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" + SOURCE_GCS_OBJECT: "data/uniref50/uniref/myseq50000000.fa" + SOURCE_FILE: "files/uniref50.fa" + BATCH_FILE: "files/batch.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/uniref50/uniref/data_output_8.csv" + PIPELINE_NAME: "uniref50" + TABLE_ID: "uniref50" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "uniref50" + SCHEMA_PATH: "data/uniref50/uniref50_schema.json" + CHUNKSIZE: "100000" + resources: + request_memory: "4G" + request_cpu: "1" + request_ephemeral_storage: "50G" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_uniref50_to_bq_8" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/uniref50/uniref/data_output_8.csv"] + source_format: "CSV" + destination_project_dataset_table: "uniref50.uniref50" + skip_leading_rows: 0 + allow_quoted_newlines: True + write_disposition: "WRITE_APPEND" + schema_fields: + - name: "ClusterID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "ClusterName" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Size" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Organism" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "TaxID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "RepID" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "Sequence" + type: "STRING" + description: "" + mode: "NULLABLE" + graph_paths: + - "download_zip_file >> uniref50_transform_csv_1 >> load_uniref50_to_bq_1 >> uniref50_transform_csv_2 >> load_uniref50_to_bq_2 >> uniref50_transform_csv_3 >> load_uniref50_to_bq_3 >> uniref50_transform_csv_4 >> load_uniref50_to_bq_4 >> uniref50_transform_csv_5 >> load_uniref50_to_bq_5 >> uniref50_transform_csv_6 >> load_uniref50_to_bq_6 >> uniref50_transform_csv_7 >> load_uniref50_to_bq_7 >> uniref50_transform_csv_8 >> load_uniref50_to_bq_8" diff --git a/datasets/uniref50/pipelines/uniref50/uniref50_dag.py b/datasets/uniref50/pipelines/uniref50/uniref50_dag.py new file mode 100644 index 000000000..84ceb9a27 --- /dev/null +++ b/datasets/uniref50/pipelines/uniref50/uniref50_dag.py @@ -0,0 +1,641 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.operators import bash +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.transfers import gcs_to_bigquery + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2022-06-10", +} + + +with DAG( + dag_id="uniref50.uniref50", + default_args=default_args, + max_active_runs=1, + schedule_interval="@weekly", + catchup=False, + default_view="graph", +) as dag: + + # Task to copy `uniref50.fasta` to gcs + download_zip_file = bash.BashOperator( + task_id="download_zip_file", + bash_command='mkdir -p $data_dir/uniref\ncurl -o $data_dir/uniref/uniref50.fasta.gz -L $uniref50\ngunzip $data_dir/uniref/uniref50.fasta.gz\nawk \u0027BEGIN {n_seq=0;} /^\u003e/ {if(n_seq%10000000==0){file=sprintf("/home/airflow/gcs/data/uniref50/uniref/myseq%d.fa",n_seq);}\nprint \u003e\u003e file; n_seq++; next;} { print \u003e\u003e file; }\u0027 \u003c $data_dir/uniref/uniref50.fasta\nawk \u0027BEGIN {n_seq=0;} /^\u003e/ {if(n_seq%3500000==0){file=sprintf("/home/airflow/gcs/data/uniref50/uniref/myseq_1%d.fa",n_seq);}\nprint \u003e\u003e file; n_seq++; next;} { print \u003e\u003e file; }\u0027 \u003c $data_dir/uniref/myseq0.fa\nrm $data_dir/uniref/uniref50.fasta.gz\nrm $data_dir/uniref/uniref50.fasta\nrm $data_dir/uniref/myseq0.fa\n', + env={ + "data_dir": "/home/airflow/gcs/data/uniref50", + "uniref50": "https://ftp.uniprot.org/pub/databases/uniprot/uniref/uniref50/uniref50.fasta.gz", + }, + ) + + # Run CSV transform within kubernetes pod + uniref50_transform_csv_1 = kubernetes_pod.KubernetesPodOperator( + task_id="uniref50_transform_csv_1", + startup_timeout_seconds=600, + name="uniref50", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.uniref50.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "SOURCE_GCS_OBJECT": "data/uniref50/uniref/myseq_10.fa", + "SOURCE_FILE": "files/uniref50.fa", + "BATCH_FILE": "files/batch.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/uniref50/uniref/data_output_1.csv", + "PIPELINE_NAME": "uniref50", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "uniref50", + "TABLE_ID": "uniref50", + "SCHEMA_PATH": "data/uniref50/uniref50_schema.json", + "CHUNKSIZE": "100000", + }, + resources={ + "request_memory": "4G", + "request_cpu": "1", + "request_ephemeral_storage": "50G", + }, + ) + + # Task to load CSV data to a BigQuery table + load_uniref50_to_bq_1 = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_uniref50_to_bq_1", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/uniref50/uniref/data_output_1.csv"], + source_format="CSV", + destination_project_dataset_table="uniref50.uniref50", + skip_leading_rows=0, + allow_quoted_newlines=True, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "ClusterID", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "ClusterName", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "Size", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "Organism", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "TaxID", "type": "STRING", "description": "", "mode": "NULLABLE"}, + {"name": "RepID", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "Sequence", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + ], + ) + + # Run CSV transform within kubernetes pod + uniref50_transform_csv_2 = kubernetes_pod.KubernetesPodOperator( + task_id="uniref50_transform_csv_2", + startup_timeout_seconds=600, + name="uniref50", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.uniref50.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "SOURCE_GCS_OBJECT": "data/uniref50/uniref/myseq_13500000.fa", + "SOURCE_FILE": "files/uniref50.fa", + "BATCH_FILE": "files/batch.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/uniref50/uniref/data_output_2.csv", + "PIPELINE_NAME": "uniref50", + "TABLE_ID": "uniref50", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "uniref50", + "SCHEMA_PATH": "data/uniref50/uniref50_schema.json", + "CHUNKSIZE": "100000", + }, + resources={ + "request_memory": "4G", + "request_cpu": "1", + "request_ephemeral_storage": "50G", + }, + ) + + # Task to load CSV data to a BigQuery table + load_uniref50_to_bq_2 = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_uniref50_to_bq_2", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/uniref50/uniref/data_output_2.csv"], + source_format="CSV", + destination_project_dataset_table="uniref50.uniref50", + skip_leading_rows=0, + allow_quoted_newlines=True, + write_disposition="WRITE_APPEND", + schema_fields=[ + { + "name": "ClusterID", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "ClusterName", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "Size", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "Organism", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "TaxID", "type": "STRING", "description": "", "mode": "NULLABLE"}, + {"name": "RepID", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "Sequence", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + ], + ) + + # Run CSV transform within kubernetes pod + uniref50_transform_csv_3 = kubernetes_pod.KubernetesPodOperator( + task_id="uniref50_transform_csv_3", + startup_timeout_seconds=600, + name="uniref50", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.uniref50.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "SOURCE_GCS_OBJECT": "data/uniref50/uniref/myseq_17000000.fa", + "SOURCE_FILE": "files/uniref50.fa", + "BATCH_FILE": "files/batch.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/uniref50/uniref/data_output_3.csv", + "PIPELINE_NAME": "uniref50", + "TABLE_ID": "uniref50", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "uniref50", + "SCHEMA_PATH": "data/uniref50/uniref50_schema.json", + "CHUNKSIZE": "100000", + }, + resources={ + "request_memory": "4G", + "request_cpu": "1", + "request_ephemeral_storage": "50G", + }, + ) + + # Task to load CSV data to a BigQuery table + load_uniref50_to_bq_3 = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_uniref50_to_bq_3", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/uniref50/uniref/data_output_3.csv"], + source_format="CSV", + destination_project_dataset_table="uniref50.uniref50", + skip_leading_rows=0, + allow_quoted_newlines=True, + write_disposition="WRITE_APPEND", + schema_fields=[ + { + "name": "ClusterID", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "ClusterName", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "Size", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "Organism", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "TaxID", "type": "STRING", "description": "", "mode": "NULLABLE"}, + {"name": "RepID", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "Sequence", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + ], + ) + + # Run CSV transform within kubernetes pod + uniref50_transform_csv_4 = kubernetes_pod.KubernetesPodOperator( + task_id="uniref50_transform_csv_4", + startup_timeout_seconds=600, + name="uniref50", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.uniref50.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "SOURCE_GCS_OBJECT": "data/uniref50/uniref/myseq10000000.fa", + "SOURCE_FILE": "files/uniref50.fa", + "BATCH_FILE": "files/batch.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/uniref50/uniref/data_output_4.csv", + "PIPELINE_NAME": "uniref50", + "TABLE_ID": "uniref50", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "uniref50", + "SCHEMA_PATH": "data/uniref50/uniref50_schema.json", + "CHUNKSIZE": "100000", + }, + resources={ + "request_memory": "4G", + "request_cpu": "1", + "request_ephemeral_storage": "50G", + }, + ) + + # Task to load CSV data to a BigQuery table + load_uniref50_to_bq_4 = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_uniref50_to_bq_4", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/uniref50/uniref/data_output_4.csv"], + source_format="CSV", + destination_project_dataset_table="uniref50.uniref50", + skip_leading_rows=0, + allow_quoted_newlines=True, + write_disposition="WRITE_APPEND", + schema_fields=[ + { + "name": "ClusterID", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "ClusterName", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "Size", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "Organism", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "TaxID", "type": "STRING", "description": "", "mode": "NULLABLE"}, + {"name": "RepID", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "Sequence", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + ], + ) + + # Run CSV transform within kubernetes pod + uniref50_transform_csv_5 = kubernetes_pod.KubernetesPodOperator( + task_id="uniref50_transform_csv_5", + startup_timeout_seconds=600, + name="uniref50", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.uniref50.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "SOURCE_GCS_OBJECT": "data/uniref50/uniref/myseq20000000.fa", + "SOURCE_FILE": "files/uniref50.fa", + "BATCH_FILE": "files/batch.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/uniref50/uniref/data_output_5.csv", + "PIPELINE_NAME": "uniref50", + "TABLE_ID": "uniref50", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "uniref50", + "SCHEMA_PATH": "data/uniref50/uniref50_schema.json", + "CHUNKSIZE": "100000", + }, + resources={ + "request_memory": "4G", + "request_cpu": "1", + "request_ephemeral_storage": "50G", + }, + ) + + # Task to load CSV data to a BigQuery table + load_uniref50_to_bq_5 = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_uniref50_to_bq_5", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/uniref50/uniref/data_output_5.csv"], + source_format="CSV", + destination_project_dataset_table="uniref50.uniref50", + skip_leading_rows=0, + allow_quoted_newlines=True, + write_disposition="WRITE_APPEND", + schema_fields=[ + { + "name": "ClusterID", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "ClusterName", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "Size", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "Organism", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "TaxID", "type": "STRING", "description": "", "mode": "NULLABLE"}, + {"name": "RepID", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "Sequence", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + ], + ) + + # Run CSV transform within kubernetes pod + uniref50_transform_csv_6 = kubernetes_pod.KubernetesPodOperator( + task_id="uniref50_transform_csv_6", + startup_timeout_seconds=600, + name="uniref50", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.uniref50.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "SOURCE_GCS_OBJECT": "data/uniref50/uniref/myseq30000000.fa", + "SOURCE_FILE": "files/uniref50.fa", + "BATCH_FILE": "files/batch.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/uniref50/uniref/data_output_6.csv", + "PIPELINE_NAME": "uniref50", + "TABLE_ID": "uniref50", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "uniref50", + "SCHEMA_PATH": "data/uniref50/uniref50_schema.json", + "CHUNKSIZE": "100000", + }, + resources={ + "request_memory": "4G", + "request_cpu": "1", + "request_ephemeral_storage": "50G", + }, + ) + + # Task to load CSV data to a BigQuery table + load_uniref50_to_bq_6 = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_uniref50_to_bq_6", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/uniref50/uniref/data_output_6.csv"], + source_format="CSV", + destination_project_dataset_table="uniref50.uniref50", + skip_leading_rows=0, + allow_quoted_newlines=True, + write_disposition="WRITE_APPEND", + schema_fields=[ + { + "name": "ClusterID", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "ClusterName", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "Size", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "Organism", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "TaxID", "type": "STRING", "description": "", "mode": "NULLABLE"}, + {"name": "RepID", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "Sequence", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + ], + ) + + # Run CSV transform within kubernetes pod + uniref50_transform_csv_7 = kubernetes_pod.KubernetesPodOperator( + task_id="uniref50_transform_csv_7", + startup_timeout_seconds=600, + name="uniref50", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.uniref50.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "SOURCE_GCS_OBJECT": "data/uniref50/uniref/myseq40000000.fa", + "SOURCE_FILE": "files/uniref50.fa", + "BATCH_FILE": "files/batch.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/uniref50/uniref/data_output_7.csv", + "PIPELINE_NAME": "uniref50", + "TABLE_ID": "uniref50", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "uniref50", + "SCHEMA_PATH": "data/uniref50/uniref50_schema.json", + "CHUNKSIZE": "100000", + }, + resources={ + "request_memory": "4G", + "request_cpu": "1", + "request_ephemeral_storage": "50G", + }, + ) + + # Task to load CSV data to a BigQuery table + load_uniref50_to_bq_7 = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_uniref50_to_bq_7", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/uniref50/uniref/data_output_7.csv"], + source_format="CSV", + destination_project_dataset_table="uniref50.uniref50", + skip_leading_rows=0, + allow_quoted_newlines=True, + write_disposition="WRITE_APPEND", + schema_fields=[ + { + "name": "ClusterID", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "ClusterName", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "Size", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "Organism", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "TaxID", "type": "STRING", "description": "", "mode": "NULLABLE"}, + {"name": "RepID", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "Sequence", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + ], + ) + + # Run CSV transform within kubernetes pod + uniref50_transform_csv_8 = kubernetes_pod.KubernetesPodOperator( + task_id="uniref50_transform_csv_8", + startup_timeout_seconds=600, + name="uniref50", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.uniref50.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "SOURCE_GCS_OBJECT": "data/uniref50/uniref/myseq50000000.fa", + "SOURCE_FILE": "files/uniref50.fa", + "BATCH_FILE": "files/batch.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/uniref50/uniref/data_output_8.csv", + "PIPELINE_NAME": "uniref50", + "TABLE_ID": "uniref50", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "uniref50", + "SCHEMA_PATH": "data/uniref50/uniref50_schema.json", + "CHUNKSIZE": "100000", + }, + resources={ + "request_memory": "4G", + "request_cpu": "1", + "request_ephemeral_storage": "50G", + }, + ) + + # Task to load CSV data to a BigQuery table + load_uniref50_to_bq_8 = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_uniref50_to_bq_8", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/uniref50/uniref/data_output_8.csv"], + source_format="CSV", + destination_project_dataset_table="uniref50.uniref50", + skip_leading_rows=0, + allow_quoted_newlines=True, + write_disposition="WRITE_APPEND", + schema_fields=[ + { + "name": "ClusterID", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "ClusterName", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "Size", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "Organism", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "TaxID", "type": "STRING", "description": "", "mode": "NULLABLE"}, + {"name": "RepID", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "Sequence", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + ], + ) + + ( + download_zip_file + >> uniref50_transform_csv_1 + >> load_uniref50_to_bq_1 + >> uniref50_transform_csv_2 + >> load_uniref50_to_bq_2 + >> uniref50_transform_csv_3 + >> load_uniref50_to_bq_3 + >> uniref50_transform_csv_4 + >> load_uniref50_to_bq_4 + >> uniref50_transform_csv_5 + >> load_uniref50_to_bq_5 + >> uniref50_transform_csv_6 + >> load_uniref50_to_bq_6 + >> uniref50_transform_csv_7 + >> load_uniref50_to_bq_7 + >> uniref50_transform_csv_8 + >> load_uniref50_to_bq_8 + )