From 22cc27c01f6b2bc2d456733813ed047bf44b1294 Mon Sep 17 00:00:00 2001 From: vijay-google <106806401+vijay-google@users.noreply.github.com> Date: Mon, 29 Aug 2022 15:29:37 +0000 Subject: [PATCH] Feat: Onboard Multilingual Spoken Words Corpus - MLCommons Association dataset (#461) --- ...ultilingual_spoken_words_corpus_dataset.tf | 26 +++ ...ltilingual_spoken_words_corpus_pipeline.tf | 34 ++++ .../infra/provider.tf | 28 +++ .../infra/variables.tf | 26 +++ .../_images/run_csv_transform_kub/Dockerfile | 37 ++++ .../run_csv_transform_kub/csv_transform.py | 166 ++++++++++++++++++ .../run_csv_transform_kub/requirements.txt | 2 + .../pipelines/dataset.yaml | 25 +++ .../multilingual_spoken_words_corpus_dag.py | 138 +++++++++++++++ .../pipeline.yaml | 120 +++++++++++++ 10 files changed, 602 insertions(+) create mode 100644 datasets/multilingual_spoken_words_corpus/infra/multilingual_spoken_words_corpus_dataset.tf create mode 100644 datasets/multilingual_spoken_words_corpus/infra/multilingual_spoken_words_corpus_pipeline.tf create mode 100644 datasets/multilingual_spoken_words_corpus/infra/provider.tf create mode 100644 datasets/multilingual_spoken_words_corpus/infra/variables.tf create mode 100644 datasets/multilingual_spoken_words_corpus/pipelines/_images/run_csv_transform_kub/Dockerfile create mode 100644 datasets/multilingual_spoken_words_corpus/pipelines/_images/run_csv_transform_kub/csv_transform.py create mode 100644 datasets/multilingual_spoken_words_corpus/pipelines/_images/run_csv_transform_kub/requirements.txt create mode 100644 datasets/multilingual_spoken_words_corpus/pipelines/dataset.yaml create mode 100644 datasets/multilingual_spoken_words_corpus/pipelines/multilingual_spoken_words_corpus/multilingual_spoken_words_corpus_dag.py create mode 100644 datasets/multilingual_spoken_words_corpus/pipelines/multilingual_spoken_words_corpus/pipeline.yaml diff --git a/datasets/multilingual_spoken_words_corpus/infra/multilingual_spoken_words_corpus_dataset.tf b/datasets/multilingual_spoken_words_corpus/infra/multilingual_spoken_words_corpus_dataset.tf new file mode 100644 index 000000000..2248656a3 --- /dev/null +++ b/datasets/multilingual_spoken_words_corpus/infra/multilingual_spoken_words_corpus_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "multilingual_spoken_words_corpus" { + dataset_id = "multilingual_spoken_words_corpus" + project = var.project_id + description = "The Multilingual Spoken Words Corpus is a large and growing audio dataset of spoken words in 50 languages for academic research and commercial applications in keyword spotting and spoken term search. The dataset contains more than 340,000 keywords, totaling 23.4 million 1-second spoken examples (over 6,000 hours). The dataset has many use cases, ranging from voice-enabled consumer devices to call center automation. It was generated by applying forced alignment on crowd-sourced sentence-level audio to produce per-word timing estimates for extraction. All alignments are included in the dataset. Please see the paper(https://datasets-benchmarks-proceedings.neurips.cc/paper/2021/file/fe131d7f5a6b38b23cc967316c13dae2-Paper-round2.pdf) for a detailed analysis of the contents of the data and methods for detecting potential outliers, along with baseline accuracy metrics on keyword spotting models trained from the dataset compared to models trained on a manually-recorded keyword dataset." +} + +output "bigquery_dataset-multilingual_spoken_words_corpus-dataset_id" { + value = google_bigquery_dataset.multilingual_spoken_words_corpus.dataset_id +} diff --git a/datasets/multilingual_spoken_words_corpus/infra/multilingual_spoken_words_corpus_pipeline.tf b/datasets/multilingual_spoken_words_corpus/infra/multilingual_spoken_words_corpus_pipeline.tf new file mode 100644 index 000000000..f50fbe66b --- /dev/null +++ b/datasets/multilingual_spoken_words_corpus/infra/multilingual_spoken_words_corpus_pipeline.tf @@ -0,0 +1,34 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "multilingual_spoken_words_corpus_metadata" { + project = var.project_id + dataset_id = "multilingual_spoken_words_corpus" + table_id = "metadata" + description = "It contains metadata of all existing audio files in tabular format." + depends_on = [ + google_bigquery_dataset.multilingual_spoken_words_corpus + ] +} + +output "bigquery_table-multilingual_spoken_words_corpus_metadata-table_id" { + value = google_bigquery_table.multilingual_spoken_words_corpus_metadata.table_id +} + +output "bigquery_table-multilingual_spoken_words_corpus_metadata-id" { + value = google_bigquery_table.multilingual_spoken_words_corpus_metadata.id +} diff --git a/datasets/multilingual_spoken_words_corpus/infra/provider.tf b/datasets/multilingual_spoken_words_corpus/infra/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/multilingual_spoken_words_corpus/infra/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/multilingual_spoken_words_corpus/infra/variables.tf b/datasets/multilingual_spoken_words_corpus/infra/variables.tf new file mode 100644 index 000000000..53f483735 --- /dev/null +++ b/datasets/multilingual_spoken_words_corpus/infra/variables.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} +variable "iam_policies" { + default = {} +} + diff --git a/datasets/multilingual_spoken_words_corpus/pipelines/_images/run_csv_transform_kub/Dockerfile b/datasets/multilingual_spoken_words_corpus/pipelines/_images/run_csv_transform_kub/Dockerfile new file mode 100644 index 000000000..15669773d --- /dev/null +++ b/datasets/multilingual_spoken_words_corpus/pipelines/_images/run_csv_transform_kub/Dockerfile @@ -0,0 +1,37 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +FROM python:3.8 + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./csv_transform.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "csv_transform.py"] diff --git a/datasets/multilingual_spoken_words_corpus/pipelines/_images/run_csv_transform_kub/csv_transform.py b/datasets/multilingual_spoken_words_corpus/pipelines/_images/run_csv_transform_kub/csv_transform.py new file mode 100644 index 000000000..f4e6da1b0 --- /dev/null +++ b/datasets/multilingual_spoken_words_corpus/pipelines/_images/run_csv_transform_kub/csv_transform.py @@ -0,0 +1,166 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json +import logging +import os +import pathlib +import typing + +import pandas as pd +from google.cloud import storage + + +def main( + source_gcs_bucket: str, + source_gcs_object: str, + source_file: pathlib.Path, + columns: typing.List[str], + target_csv_file: pathlib.Path, + target_gcs_bucket: str, + target_gcs_path: str, +) -> None: + logging.info( + "Multilingual Spoken Words Corpus - MLCommons Association Dataset process started " + + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + logging.info("Creating './files/' folder.") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + download_blob(source_gcs_bucket, source_gcs_object, source_file) + logging.info("Reading json file") + meta_data = json.load(open(source_file)) + logging.info("Getting all existed languages") + lang_abbr = get_lang_abbr(meta_data) + logging.info("Creating empty dataframe") + df = pd.DataFrame(columns=columns) + write_to_file(df, target_csv_file, "w") + logging.info("Creating dataframe ") + create_dataframe(lang_abbr, meta_data, columns, target_csv_file) + upload_file_to_gcs(target_csv_file, target_gcs_bucket, target_gcs_path) + logging.info( + "Multilingual Spoken Words Corpus - MLCommons Association Dataset process completed " + + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + + +def download_blob( + source_gcs_bucket: str, source_gcs_object: str, target_file: pathlib.Path +) -> None: + """Downloads a blob from the bucket.""" + logging.info( + f"Downloading data from gs://{source_gcs_bucket}/{source_gcs_object} to {target_file} ..." + ) + storage_client = storage.Client() + bucket = storage_client.bucket(source_gcs_bucket) + blob = bucket.blob(source_gcs_object) + blob.download_to_filename(str(target_file)) + logging.info("Downloading Completed.") + + +def create_dataframe( + lang_abbr: str, + meta_data: dict, + columns: typing.List[str], + target_csv_file: pathlib.Path, +) -> None: + for idx, kv_pair in enumerate(lang_abbr.items()): + abbr, language = kv_pair + logging.info(f"\t\t\t{idx + 1} out of {len(lang_abbr)} languages.") + logging.info( + f"Process started for creating dataframe for {abbr} - {language} language." + ) + num_of_words = get_num_of_words(meta_data, abbr) + logging.info(f"\tCreating temporary datafame for all {num_of_words} words\n") + temp_dataframe( + meta_data, abbr, columns, num_of_words, language, target_csv_file + ) + + +def temp_dataframe( + meta_data: dict, + abbr: str, + columns: typing.List[str], + num_of_words: int, + language: str, + target_csv_file: pathlib.Path, +) -> None: + for word, count in get_lang_words_count(meta_data, abbr).items(): + temp = pd.DataFrame(columns=columns) + lang_word_filenames = get_lang_word_filenames(meta_data, abbr, word) + temp["filenames"] = lang_word_filenames + temp["lang_abbr"] = [abbr] * count + temp["word"] = [word] * count + temp["word_count"] = [count] * count + temp["number_of_words"] = [num_of_words] * count + temp["language"] = [language] * count + write_to_file(temp, str(target_csv_file), mode="a") + + +def get_lang_abbr(meta_data: dict, key: str = "language") -> dict: + lang_abbr = {} + for abbr in meta_data.keys(): + if isinstance(meta_data[abbr], dict): + lang_abbr[abbr] = meta_data[abbr].get(key, {}) + return lang_abbr + + +def get_num_of_words(meta_data: dict, abbr: str, key: str = "number_of_words") -> int: + return meta_data[abbr].get(key, 0) + + +def get_lang_words_count(meta_data: dict, abbr: str, key: str = "wordcounts") -> int: + return meta_data[abbr].get(key, 0) + + +def get_lang_word_filenames( + meta_data: dict, abbr: str, word: str, key: str = "filenames" +) -> typing.List[str]: + return meta_data[abbr][key].get(word, []) + + +def write_to_file( + df: pd.DataFrame, target_csv_file: pathlib.Path, mode: str = "w" +) -> None: + if mode == "w": + logging.info("Writing data to csv...") + df.to_csv(str(target_csv_file), index=False) + else: + df.to_csv(str(target_csv_file), mode=mode, index=False, header=False) + + +def upload_file_to_gcs( + target_csv_file: pathlib.Path, target_gcs_bucket: str, target_gcs_path: str +) -> None: + logging.info(f"Uploading output file to gs://{target_gcs_bucket}/{target_gcs_path}") + storage_client = storage.Client() + bucket = storage_client.bucket(target_gcs_bucket) + blob = bucket.blob(target_gcs_path) + blob.upload_from_filename(target_csv_file) + logging.info("Successfully uploaded file to gcs bucket.") + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + main( + source_gcs_bucket=os.environ.get("SOURCE_GCS_BUCKET", ""), + source_gcs_object=os.environ.get("SOURCE_GCS_OBJECT", ""), + source_file=pathlib.Path(os.environ.get("SOURCE_FILE", "")).expanduser(), + columns=json.loads(os.environ.get("COLUMNS", "[]")), + target_csv_file=pathlib.Path( + os.environ.get("TARGET_CSV_FILE", "") + ).expanduser(), + target_gcs_bucket=os.environ.get("TARGET_GCS_BUCKET", ""), + target_gcs_path=os.environ.get("TARGET_GCS_PATH", ""), + ) diff --git a/datasets/multilingual_spoken_words_corpus/pipelines/_images/run_csv_transform_kub/requirements.txt b/datasets/multilingual_spoken_words_corpus/pipelines/_images/run_csv_transform_kub/requirements.txt new file mode 100644 index 000000000..e2fabcc34 --- /dev/null +++ b/datasets/multilingual_spoken_words_corpus/pipelines/_images/run_csv_transform_kub/requirements.txt @@ -0,0 +1,2 @@ +google-cloud-storage +pandas diff --git a/datasets/multilingual_spoken_words_corpus/pipelines/dataset.yaml b/datasets/multilingual_spoken_words_corpus/pipelines/dataset.yaml new file mode 100644 index 000000000..01946d5bc --- /dev/null +++ b/datasets/multilingual_spoken_words_corpus/pipelines/dataset.yaml @@ -0,0 +1,25 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + name: multilingual_spoken_words_corpus + friendly_name: multilingual_spoken_words_corpus + description: This is a Multilingual Spoken Words Corpus - MLCommons Association Dataset. + dataset_sources: ~ + terms_of_use: ~ + +resources: + - type: bigquery_dataset + dataset_id: multilingual_spoken_words_corpus + description: The Multilingual Spoken Words Corpus is a large and growing audio dataset of spoken words in 50 languages for academic research and commercial applications in keyword spotting and spoken term search. The dataset contains more than 340,000 keywords, totaling 23.4 million 1-second spoken examples (over 6,000 hours). The dataset has many use cases, ranging from voice-enabled consumer devices to call center automation. It was generated by applying forced alignment on crowd-sourced sentence-level audio to produce per-word timing estimates for extraction. All alignments are included in the dataset. Please see the paper(https://datasets-benchmarks-proceedings.neurips.cc/paper/2021/file/fe131d7f5a6b38b23cc967316c13dae2-Paper-round2.pdf) for a detailed analysis of the contents of the data and methods for detecting potential outliers, along with baseline accuracy metrics on keyword spotting models trained from the dataset compared to models trained on a manually-recorded keyword dataset. diff --git a/datasets/multilingual_spoken_words_corpus/pipelines/multilingual_spoken_words_corpus/multilingual_spoken_words_corpus_dag.py b/datasets/multilingual_spoken_words_corpus/pipelines/multilingual_spoken_words_corpus/multilingual_spoken_words_corpus_dag.py new file mode 100644 index 000000000..63504ef83 --- /dev/null +++ b/datasets/multilingual_spoken_words_corpus/pipelines/multilingual_spoken_words_corpus/multilingual_spoken_words_corpus_dag.py @@ -0,0 +1,138 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.operators import bash +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.transfers import gcs_to_bigquery, gcs_to_gcs + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="multilingual_spoken_words_corpus.multilingual_spoken_words_corpus", + default_args=default_args, + max_active_runs=1, + schedule_interval="@quarterly", + catchup=False, + default_view="graph", +) as dag: + + # Task to run a GCS to GCS + copy_metadata_file_to_gcs = gcs_to_gcs.GCSToGCSOperator( + task_id="copy_metadata_file_to_gcs", + source_bucket="{{ var.json.multilingual_spoken_words_corpus.source_bucket }}", + source_object="metadata.json.gz", + destination_bucket="{{ var.value.composer_bucket }}", + destination_object="data/multilingual_spoken_words_corpus/metadata.json.gz", + move_object=False, + ) + + # Task to unzip tar file + unzip_metadata_gz = bash.BashOperator( + task_id="unzip_metadata_gz", + bash_command="echo Unzipping $data_dir/$source_file...\ngunzip -c $data_dir/$source_file \u003e $data_dir/metadata.json\necho Successfully unzipped .gz file to specified file metadata.json\n", + env={ + "data_dir": "/home/airflow/gcs/data/multilingual_spoken_words_corpus", + "source_file": "metadata.json.gz", + }, + ) + + # Run CSV transform within kubernetes pod + metadata_csv_transform = kubernetes_pod.KubernetesPodOperator( + task_id="metadata_csv_transform", + startup_timeout_seconds=600, + name="metadata_csv_transform", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.multilingual_spoken_words_corpus.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "SOURCE_GCS_OBJECT": "data/multilingual_spoken_words_corpus/metadata.json", + "SOURCE_FILE": "./files/metadata.json", + "TARGET_CSV_FILE": "./files/metadata_data_output.csv", + "COLUMNS": '["lang_abbr", "language", "number_of_words","word", "word_count", "filename"]', + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/multilingual_spoken_words_corpus/metadata_data_output.csv", + }, + resources={ + "request_memory": "4G", + "request_cpu": "1", + "request_ephemeral_storage": "4G", + }, + ) + + # Task to load CSV data to a BigQuery table + load_metadata_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_metadata_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=[ + "data/multilingual_spoken_words_corpus/metadata_data_output.csv" + ], + source_format="CSV", + destination_project_dataset_table="multilingual_spoken_words_corpus.metadata", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "lang_abbr", + "type": "string", + "description": "It consists of language abbrevation.", + "mode": "nullable", + }, + { + "name": "language", + "type": "string", + "description": "It consists of language name.", + "mode": "nullable", + }, + { + "name": "number_of_words", + "type": "integer", + "description": "It consists count of total number of words for corresponding language.", + "mode": "nullable", + }, + { + "name": "word", + "type": "string", + "description": "It contains words of each corresponding language.", + "mode": "nullable", + }, + { + "name": "word_count", + "type": "integer", + "description": "It consists word count for each word in corresponding language", + "mode": "nullable", + }, + { + "name": "filename", + "type": "string", + "description": "It consists filenames for corresponding word in specific language.", + "mode": "nullable", + }, + ], + ) + + ( + copy_metadata_file_to_gcs + >> unzip_metadata_gz + >> metadata_csv_transform + >> load_metadata_to_bq + ) diff --git a/datasets/multilingual_spoken_words_corpus/pipelines/multilingual_spoken_words_corpus/pipeline.yaml b/datasets/multilingual_spoken_words_corpus/pipelines/multilingual_spoken_words_corpus/pipeline.yaml new file mode 100644 index 000000000..30f9bbb6c --- /dev/null +++ b/datasets/multilingual_spoken_words_corpus/pipelines/multilingual_spoken_words_corpus/pipeline.yaml @@ -0,0 +1,120 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: metadata + description: "It contains metadata of all existing audio files in tabular format." + +dag: + airflow_version: 2 + initialize: + dag_id: multilingual_spoken_words_corpus + default_args: + owner: "Google" + depends_on_past: False + start_date: "2021-03-01" + max_active_runs: 1 + schedule_interval: "@quarterly" + catchup: False + default_view: graph + + tasks: + + - operator: "GoogleCloudStorageToGoogleCloudStorageOperator" + description: "Task to run a GCS to GCS" + args: + task_id: "copy_metadata_file_to_gcs" + source_bucket: "{{ var.json.multilingual_spoken_words_corpus.source_bucket }}" + source_object: "metadata.json.gz" + destination_bucket: "{{ var.value.composer_bucket }}" + destination_object: "data/multilingual_spoken_words_corpus/metadata.json.gz" + move_object: False + + - operator: "BashOperator" + description: "Task to unzip tar file" + args: + task_id: "unzip_metadata_gz" + bash_command: | + echo Unzipping $data_dir/$source_file... + gunzip -c $data_dir/$source_file > $data_dir/metadata.json + echo Successfully unzipped .gz file to specified file metadata.json + env: + data_dir: "/home/airflow/gcs/data/multilingual_spoken_words_corpus" + source_file: "metadata.json.gz" + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "metadata_csv_transform" + startup_timeout_seconds: 600 + name: "metadata_csv_transform" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.multilingual_spoken_words_corpus.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" + SOURCE_GCS_OBJECT: "data/multilingual_spoken_words_corpus/metadata.json" + SOURCE_FILE: "./files/metadata.json" + TARGET_CSV_FILE: "./files/metadata_data_output.csv" + COLUMNS: >- + ["lang_abbr", "language", "number_of_words","word", "word_count", "filename"] + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/multilingual_spoken_words_corpus/metadata_data_output.csv" + resources: + request_memory: "4G" + request_cpu: "1" + request_ephemeral_storage: "4G" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_metadata_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/multilingual_spoken_words_corpus/metadata_data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "multilingual_spoken_words_corpus.metadata" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - name: "lang_abbr" + type: "string" + description: "It consists of language abbrevation." + mode: "nullable" + - name: "language" + type: "string" + description: "It consists of language name." + mode: "nullable" + - name: "number_of_words" + type: "integer" + description: "It consists count of total number of words for corresponding language." + mode: "nullable" + - name: "word" + type: "string" + description: "It contains words of each corresponding language." + mode: "nullable" + - name: "word_count" + type: "integer" + description: "It consists word count for each word in corresponding language" + mode: "nullable" + - name: "filename" + type: "string" + description: "It consists filenames for corresponding word in specific language." + mode: "nullable" + + graph_paths: + - "copy_metadata_file_to_gcs >> unzip_metadata_gz >> metadata_csv_transform >> load_metadata_to_bq"