From 1596bee2a4d217a93464370ba0c65d3a70904e8b Mon Sep 17 00:00:00 2001 From: SatishChGit Date: Wed, 1 May 2024 23:07:25 -0700 Subject: [PATCH 1/7] Updates to Teradata Provider with release 2.2.0 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implemented enhancements to the Teradata Provider for Teradata. This release incorporates the following features: • Introduction of Stored Procedure Support in Teradata Hook • Inclusion of the TeradataStoredProcedureOperator for executing stored procedures • Integration of Azure Blob Storage to Teradata Transfer Operator • Integration of Amazon S3 to Teradata Transfer Operator • Provision of necessary documentation, along with unit and system tests, for the Teradata Provider modifications. --- airflow/providers/teradata/hooks/teradata.py | 67 +++++++ .../providers/teradata/operators/teradata.py | 43 ++++- airflow/providers/teradata/provider.yaml | 10 ++ .../transfers/azure_blob_to_teradata.py | 105 +++++++++++ .../teradata/transfers/s3_to_teradata.py | 110 ++++++++++++ dev/breeze/tests/test_selective_checks.py | 8 +- .../operators/azure_blob_to_teradata.rst | 73 ++++++++ .../operators/s3_to_teradata.rst | 70 ++++++++ .../operators/teradata.rst | 70 ++++++++ docs/spelling_wordlist.txt | 1 + generated/provider_dependencies.json | 8 +- .../providers/teradata/hooks/test_teradata.py | 11 ++ .../teradata/operators/test_teradata.py | 36 +++- .../transfers/test_azure_blob_to_teradata.py | 60 +++++++ .../teradata/transfers/test_s3_to_teradata.py | 79 +++++++++ ...example_azure_blob_to_teradata_transfer.py | 163 ++++++++++++++++++ .../example_s3_to_teradata_transfer.py | 163 ++++++++++++++++++ .../teradata/example_ssl_teradata.py | 8 +- .../providers/teradata/example_teradata.py | 6 +- .../teradata/example_teradata_call_sp.py | 114 ++++++++++++ .../example_teradata_to_teradata_transfer.py | 2 +- 21 files changed, 1190 insertions(+), 17 deletions(-) create mode 100644 airflow/providers/teradata/transfers/azure_blob_to_teradata.py create mode 100644 airflow/providers/teradata/transfers/s3_to_teradata.py create mode 100644 docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst create mode 100644 docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst create mode 100644 tests/providers/teradata/transfers/test_azure_blob_to_teradata.py create mode 100644 tests/providers/teradata/transfers/test_s3_to_teradata.py create mode 100644 tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py create mode 100644 tests/system/providers/teradata/example_s3_to_teradata_transfer.py create mode 100644 tests/system/providers/teradata/example_teradata_call_sp.py diff --git a/airflow/providers/teradata/hooks/teradata.py b/airflow/providers/teradata/hooks/teradata.py index 3afc32bc74746..6f9028759172b 100644 --- a/airflow/providers/teradata/hooks/teradata.py +++ b/airflow/providers/teradata/hooks/teradata.py @@ -32,6 +32,17 @@ if TYPE_CHECKING: from airflow.models.connection import Connection +PARAM_TYPES = {bool, float, int, str} + + +def _map_param(value): + if value in PARAM_TYPES: + # In this branch, value is a Python type; calling it produces + # an instance of the type which is understood by the Teradata driver + # in the out parameter mapping mechanism. + value = value() + return value + class TeradataHook(DbApiHook): """General hook for interacting with Teradata SQL Database. @@ -187,3 +198,59 @@ def get_ui_field_behaviour() -> dict: "password": "dbc", }, } + + def callproc( + self, + identifier: str, + autocommit: bool = False, + parameters: list | dict | None = None, + ) -> list | dict | tuple | None: + """ + Call the stored procedure identified by the provided string. + + Any OUT parameters must be provided with a value of either the + expected Python type (e.g., `int`) or an instance of that type. + + :param identifier: stored procedure name + :param autocommit: What to set the connection's autocommit setting to + before executing the query. + :param parameters: The `IN`, `OUT` and `INOUT` parameters for Teradata + stored procedure + + The return value is a list or mapping that includes parameters in + both directions; the actual return type depends on the type of the + provided `parameters` argument. + + """ + if parameters is None: + parameters = [] + + args = ",".join("?" for name in parameters) + + sql = f"{{CALL {identifier}({(args)})}}" + + def handler(cursor): + records = cursor.fetchall() + + if records is None: + return + + if isinstance(records, list): + return [row for row in records] + + if isinstance(records, dict): + return {n: v for (n, v) in records.items()} + raise TypeError(f"Unexpected results: {records}") + + result = self.run( + sql, + autocommit=autocommit, + parameters=( + {name: _map_param(value) for (name, value) in parameters.items()} + if isinstance(parameters, dict) + else [_map_param(value) for value in parameters] + ), + handler=handler, + ) + + return result diff --git a/airflow/providers/teradata/operators/teradata.py b/airflow/providers/teradata/operators/teradata.py index ae87fd785487f..00fd6062fe4d7 100644 --- a/airflow/providers/teradata/operators/teradata.py +++ b/airflow/providers/teradata/operators/teradata.py @@ -17,11 +17,15 @@ # under the License. from __future__ import annotations -from typing import Sequence +from typing import TYPE_CHECKING, Sequence +from airflow.models import BaseOperator from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from airflow.providers.teradata.hooks.teradata import TeradataHook +if TYPE_CHECKING: + from airflow.utils.context import Context + class TeradataOperator(SQLExecuteQueryOperator): """ @@ -41,8 +45,8 @@ class TeradataOperator(SQLExecuteQueryOperator): """ template_fields: Sequence[str] = ( - "parameters", "sql", + "parameters", ) template_ext: Sequence[str] = (".sql",) template_fields_renderers = {"sql": "sql"} @@ -62,3 +66,38 @@ def __init__( } super().__init__(**kwargs) self.conn_id = conn_id + + +class TeradataStoredProcedureOperator(BaseOperator): + """ + Executes stored procedure in a specific Teradata database. + + :param procedure: name of stored procedure to call (templated) + :param conn_id: The :ref:`Teradata connection id ` + reference to a specific Teradata database. + :param parameters: (optional, templated) the parameters provided in the call + + """ + + template_fields: Sequence[str] = ( + "procedure", + "parameters", + ) + ui_color = "#ededed" + + def __init__( + self, + *, + procedure: str, + conn_id: str = TeradataHook.default_conn_name, + parameters: dict | list | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.conn_id = conn_id + self.procedure = procedure + self.parameters = parameters + + def execute(self, context: Context): + hook = TeradataHook(teradata_conn_id=self.conn_id) + return hook.callproc(self.procedure, autocommit=True, parameters=self.parameters) diff --git a/airflow/providers/teradata/provider.yaml b/airflow/providers/teradata/provider.yaml index a3d1d34e61e02..a212b9acc0222 100644 --- a/airflow/providers/teradata/provider.yaml +++ b/airflow/providers/teradata/provider.yaml @@ -33,6 +33,8 @@ dependencies: - apache-airflow-providers-common-sql>=1.3.1 - teradatasqlalchemy>=17.20.0.0 - teradatasql>=17.20.0.28 + - apache-airflow-providers-microsoft-azure + - apache-airflow-providers-amazon integrations: - integration-name: Teradata @@ -57,6 +59,14 @@ transfers: target-integration-name: Teradata python-module: airflow.providers.teradata.transfers.teradata_to_teradata how-to-guide: /docs/apache-airflow-providers-teradata/operators/teradata_to_teradata.rst + - source-integration-name: Microsoft Azure Blob Storage + target-integration-name: Teradata + python-module: airflow.providers.teradata.transfers.azure_blob_to_teradata + how-to-guide: /docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst + - source-integration-name: Amazon Simple Storage Service (S3) + target-integration-name: Teradata + python-module: airflow.providers.teradata.transfers.s3_to_teradata + how-to-guide: /docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst connection-types: - hook-class-name: airflow.providers.teradata.hooks.teradata.TeradataHook diff --git a/airflow/providers/teradata/transfers/azure_blob_to_teradata.py b/airflow/providers/teradata/transfers/azure_blob_to_teradata.py new file mode 100644 index 0000000000000..45fc5d56daefa --- /dev/null +++ b/airflow/providers/teradata/transfers/azure_blob_to_teradata.py @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Sequence + +from airflow.models import BaseOperator +from airflow.providers.microsoft.azure.hooks.wasb import WasbHook +from airflow.providers.teradata.hooks.teradata import TeradataHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class AzureBlobStorageToTeradataOperator(BaseOperator): + """ + + Loads CSV, JSON and Parquet format data from Azure Blob Storage to Teradata. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AzureBlobStorageToTeradataOperator` + + :param blob_source_key: The URI format specifying the location of the Azure blob object store.(templated) + The URI format is `/az/YOUR-STORAGE-ACCOUNT.blob.core.windows.net/YOUR-CONTAINER/YOUR-BLOB-LOCATION`. + Refer to + https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US + :param azure_conn_id: The Airflow WASB connection used for azure blob credentials. + :param teradata_table: The name of the teradata table to which the data is transferred.(templated) + :param teradata_conn_id: The connection ID used to connect to Teradata + :ref:`Teradata connection ` + + Note that ``blob_source_key`` and ``teradata_table`` are + templated, so you can use variables in them if you wish. + """ + + template_fields: Sequence[str] = ("blob_source_key", "teradata_table") + ui_color = "#e07c24" + + def __init__( + self, + *, + blob_source_key: str, + azure_conn_id: str = "azure_default", + teradata_table: str, + teradata_conn_id: str = "teradata_default", + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.blob_source_key = blob_source_key + self.azure_conn_id = azure_conn_id + self.teradata_table = teradata_table + self.teradata_conn_id = teradata_conn_id + + def execute(self, context: Context) -> None: + self.log.info( + "transferring data from %s to teradata table %s...", self.blob_source_key, self.teradata_table + ) + azure_hook = WasbHook(wasb_conn_id=self.azure_conn_id) + conn = azure_hook.get_connection(self.azure_conn_id) + # Obtaining the Azure client ID and Azure secret in order to access a specified Blob container + access_id = conn.login if conn.login is not None else "" + access_secret = conn.password if conn.password is not None else "" + teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id) + sql = f""" + CREATE MULTISET TABLE {self.teradata_table} AS + ( + SELECT * FROM ( + LOCATION = '{self.blob_source_key}' + ACCESS_ID= '{access_id}' + ACCESS_KEY= '{access_secret}' + ) AS d + ) WITH DATA + """ + try: + teradata_hook.run(sql, True) + except Exception as ex: + # Handling permission issue errors + if "Error 3524" in str(ex): + self.log.error("The user does not have CREATE TABLE access in teradata") + raise + if "Error 9134" in str(ex): + self.log.error( + "There is an issue with the transfer operation. Please validate azure and " + "teradata connection details." + ) + raise + self.log.error("Issue occurred at Teradata: %s", str(ex)) + raise + self.log.info("The transfer of data from Azure Blob to Teradata was successful") diff --git a/airflow/providers/teradata/transfers/s3_to_teradata.py b/airflow/providers/teradata/transfers/s3_to_teradata.py new file mode 100644 index 0000000000000..4c01084ad2b58 --- /dev/null +++ b/airflow/providers/teradata/transfers/s3_to_teradata.py @@ -0,0 +1,110 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Sequence + +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.teradata.hooks.teradata import TeradataHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class S3ToTeradataOperator(BaseOperator): + """ + Loads CSV, JSON and Parquet format data from Amazon S3 to Teradata. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:S3ToTeradataOperator` + + :param s3_source_key: The URI format specifying the location of the S3 object store.(templated) + The URI format is /s3/YOUR-BUCKET.s3.amazonaws.com/YOUR-BUCKET-NAME. + Refer to + https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US + :param teradata_table: The name of the teradata table to which the data is transferred.(templated) + :param aws_conn_id: The Airflow AWS connection used for AWS credentials. + :param teradata_conn_id: The connection ID used to connect to Teradata + :ref:`Teradata connection `. + + Note that ``s3_source_key`` and ``teradata_table`` are + templated, so you can use variables in them if you wish. + """ + + template_fields: Sequence[str] = ("s3_source_key", "teradata_table") + ui_color = "#e07c24" + + def __init__( + self, + *, + s3_source_key: str, + teradata_table: str, + aws_conn_id: str = "aws_default", + teradata_conn_id: str = "teradata_default", + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.s3_source_key = s3_source_key + self.teradata_table = teradata_table + self.aws_conn_id = aws_conn_id + self.teradata_conn_id = teradata_conn_id + + def execute(self, context: Context) -> None: + self.log.info( + "transferring data from %s to teradata table %s...", self.s3_source_key, self.teradata_table + ) + + s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) + access_key = ( + s3_hook.conn_config.aws_access_key_id if s3_hook.conn_config.aws_access_key_id is not None else "" + ) + access_secret = ( + s3_hook.conn_config.aws_secret_access_key + if s3_hook.conn_config.aws_secret_access_key is not None + else "" + ) + + teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id) + sql = f""" + CREATE MULTISET TABLE {self.teradata_table} AS + ( + SELECT * FROM ( + LOCATION = '{self.s3_source_key}' + ACCESS_ID= '{access_key}' + ACCESS_KEY= '{access_secret}' + ) AS d + ) WITH DATA + """ + try: + teradata_hook.run(sql, True) + except Exception as ex: + # Handling permission issue errors + if "Error 3524" in str(ex): + self.log.error("The user does not have CREATE TABLE access in teradata") + raise + if "Error 9134" in str(ex): + self.log.error( + "There is an issue with the transfer operation. Please validate s3 and " + "teradata connection details." + ) + raise + self.log.error("Issue occurred at Teradata: %s", str(ex)) + raise + self.log.info("The transfer of data from S3 to Teradata was successful") diff --git a/dev/breeze/tests/test_selective_checks.py b/dev/breeze/tests/test_selective_checks.py index e2ee1f0a6b05f..8437b215d8633 100644 --- a/dev/breeze/tests/test_selective_checks.py +++ b/dev/breeze/tests/test_selective_checks.py @@ -483,7 +483,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): { "affected-providers-list-as-string": "amazon apache.hive cncf.kubernetes " "common.sql exasol ftp google http imap microsoft.azure " - "mongo mysql openlineage postgres salesforce ssh", + "mongo mysql openlineage postgres salesforce ssh teradata", "all-python-versions": "['3.8']", "all-python-versions-list-as-string": "3.8", "python-versions": "['3.8']", @@ -499,7 +499,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "run-amazon-tests": "true", "parallel-test-types-list-as-string": "Always Providers[amazon] " "Providers[apache.hive,cncf.kubernetes,common.sql,exasol,ftp,http," - "imap,microsoft.azure,mongo,mysql,openlineage,postgres,salesforce,ssh] Providers[google]", + "imap,microsoft.azure,mongo,mysql,openlineage,postgres,salesforce,ssh,teradata] Providers[google]", "needs-mypy": "true", "mypy-folders": "['providers']", }, @@ -533,7 +533,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): { "affected-providers-list-as-string": "amazon apache.hive cncf.kubernetes " "common.sql exasol ftp google http imap microsoft.azure " - "mongo mysql openlineage postgres salesforce ssh", + "mongo mysql openlineage postgres salesforce ssh teradata", "all-python-versions": "['3.8']", "all-python-versions-list-as-string": "3.8", "python-versions": "['3.8']", @@ -549,7 +549,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": "Always Providers[amazon] " "Providers[apache.hive,cncf.kubernetes,common.sql,exasol,ftp,http," - "imap,microsoft.azure,mongo,mysql,openlineage,postgres,salesforce,ssh] Providers[google]", + "imap,microsoft.azure,mongo,mysql,openlineage,postgres,salesforce,ssh,teradata] Providers[google]", "needs-mypy": "true", "mypy-folders": "['providers']", }, diff --git a/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst b/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst new file mode 100644 index 0000000000000..0ee9a7bb32595 --- /dev/null +++ b/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst @@ -0,0 +1,73 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +.. _howto/operator:AzureBlobStorageToTeradataOperator: + + +================================== +AzureBlobStorageToTeradataOperator +================================== + +The purpose of ``AzureBlobStorageToTeradataOperator`` is to define tasks involving CSV, JSON and Parquet +format data transfer from an Azure Blob Storage to Teradata table. +Use the :class:`AzureBlobStorageToTeradataOperator ` +to transfer data from an Azure Blob Storage to Teradata. + + +Transferring data in CSV format from Azure Blob Storage to Teradata +------------------------------------------------------------------- + +An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from Azure Blob Storage +to teradata table is as follows: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py + :language: python + :start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] + :end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] + +Transferring data in JSON format from Azure Blob Storage to Teradata +-------------------------------------------------------------------- + +An example usage of the AzureBlobStorageToTeradataOperator to transfer JSON data format from Azure Blob Storage +to teradata table is as follows: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py + :language: python + :start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json] + :end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json] + +Transferring data in PARQUET format from Azure Blob Storage to Teradata +----------------------------------------------------------------------- + +An example usage of the AzureBlobStorageToTeradataOperator to transfer PARQUET data format from Azure Blob Storage +to teradata table is as follows: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py + :language: python + :start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_parquet] + :end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_parquet] + +The complete ``AzureBlobStorageToTeradataOperator`` Operator DAG +---------------------------------------------------------------- + +When we put everything together, our DAG should look like this: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py + :language: python + :start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide] + :end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide] diff --git a/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst b/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst new file mode 100644 index 0000000000000..d5e862beb9c0a --- /dev/null +++ b/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst @@ -0,0 +1,70 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +.. _howto/operator:S3ToTeradataOperator: + + +============================ +S3ToTeradataOperator +============================ + +The purpose of ``S3ToTeradataOperator`` is to define tasks involving CSV, JSON and Parquet +format data transfer from an AWS Simple Storage Service (S3) to Teradata table. +Use the :class:`S3ToTeradataOperator ` +to transfer data from S3 to Teradata. + + +Transferring data in CSV format from S3 to Teradata +--------------------------------------------------- + +An example usage of the S3ToTeradataOperator to transfer CSV data format from S3 to teradata table is as follows: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_s3_to_teradata_transfer.py + :language: python + :start-after: [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] + :end-before: [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] + +Transferring data in JSON format from S3 to Teradata +---------------------------------------------------- + +An example usage of the S3ToTeradataOperator to transfer JSON data format from S3 to teradata table is as follows: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_s3_to_teradata_transfer.py + :language: python + :start-after: [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json] + :end-before: [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json] + +Transferring data in PARQUET format from S3 to Teradata +------------------------------------------------------- + +An example usage of the S3ToTeradataOperator to transfer PARQUET data format from S3 to teradata table is as follows: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_s3_to_teradata_transfer.py + :language: python + :start-after: [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet] + :end-before: [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet] + +The complete ``S3ToTeradataOperator`` Operator DAG +-------------------------------------------------- + +When we put everything together, our DAG should look like this: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_s3_to_teradata_transfer.py + :language: python + :start-after: [START s3_to_teradata_transfer_operator_howto_guide] + :end-before: [END s3_to_teradata_transfer_operator_howto_guide] diff --git a/docs/apache-airflow-providers-teradata/operators/teradata.rst b/docs/apache-airflow-providers-teradata/operators/teradata.rst index e894d37ade66c..e255a986afcec 100644 --- a/docs/apache-airflow-providers-teradata/operators/teradata.rst +++ b/docs/apache-airflow-providers-teradata/operators/teradata.rst @@ -113,3 +113,73 @@ When we put everything together, our DAG should look like this: :language: python :start-after: [START teradata_operator_howto_guide] :end-before: [END teradata_operator_howto_guide] + +TeradataStoredProcedureOperator +=============================== + +The purpose of TeradataStoredProcedureOperator is to define tasks involving executing teradata +stored procedures. + +Execute a Stored Procedure in a Teradata database +------------------------------------------------- + +To execute a Stored Procedure in an Teradata, use the +:class:`~airflow.providers.teradata.operators.teradata.TeradataStoredProcedureOperator`. + +Assume a stored procedure exists in the database that looks like this: + + .. code-block:: sql + + REPLACE PROCEDURE + TEST_PROCEDURE (IN val_in INTEGER, OUT val_out INTEGER) + BEGIN + set val_out = val_in * 2; + END; + / + +This stored procedure accepts a single integer argument, val_in, and outputs +a single integer argument, val_out. This can be represented with the following +call using :class:`~airflow.providers.teradata.operators.teradata.TeradataStoredProcedureOperator` +with parameters passed positionally as a list: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_call_sp.py + :language: python + :start-after: [START howto_teradata_stored_procedure_operator_with_in_inout] + :end-before: [END howto_teradata_stored_procedure_operator_with_in_inout] + + +Assume a stored procedure exists in the database that looks like this: + + .. code-block:: sql + + REPLACE PROCEDURE + TEST_PROCEDURE (IN val_in INTEGER, OUT val_out INTEGER) + BEGIN + DECLARE cur1 CURSOR WITH RETURN FOR SELECT * from DBC.DBCINFO ORDER BY 1 ; + DECLARE cur2 CURSOR WITH RETURN FOR SELECT infodata, infokey from DBC.DBCINFO order by 1 ; + open cur1 ; + open cur2 ; + set val_out = val_in * 2; + END; + / + +This stored procedure accepts a single integer argument, val_in, and outputs +a single integer argument, val_out and returns two cursors representing output of select queries. +This can be represented with the following call using +:class:`~airflow.providers.teradata.operators.teradata.TeradataStoredProcedureOperator` +with parameters passed positionally as a list: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_call_sp.py + :language: python + :start-after: [START howto_teradata_stored_procedure_operator_with_in_out_dynamic_result] + :end-before: [END howto_teradata_stored_procedure_operator_with_in_out_dynamic_result] + +The complete TeradataStoredProcedureOperator DAG +------------------------------------------------ + +When we put everything together, our DAG should look like this: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_call_sp.py + :language: python + :start-after: [START howto_teradata_operator_for_sp] + :end-before: [END howto_teradata_operator_for_sp] diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index da04944231852..99e7aaeb7475d 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -806,6 +806,7 @@ initcontainers initdb initialisation initialising +inout InsecureClient InspectContentResponse InspectTemplate diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 4d06b547e84f4..9c0c77db7fe92 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -1130,14 +1130,18 @@ }, "teradata": { "deps": [ + "apache-airflow-providers-amazon", "apache-airflow-providers-common-sql>=1.3.1", - "apache-airflow>=2.7.0", + "apache-airflow-providers-microsoft-azure", + "apache-airflow>=2.6.0", "teradatasql>=17.20.0.28", "teradatasqlalchemy>=17.20.0.0" ], "devel-deps": [], "cross-providers-deps": [ - "common.sql" + "amazon", + "common.sql", + "microsoft.azure" ], "excluded-python-versions": [], "state": "ready" diff --git a/tests/providers/teradata/hooks/test_teradata.py b/tests/providers/teradata/hooks/test_teradata.py index af77d66a636fe..689ab871d9f81 100644 --- a/tests/providers/teradata/hooks/test_teradata.py +++ b/tests/providers/teradata/hooks/test_teradata.py @@ -264,3 +264,14 @@ def test_bulk_insert_rows_no_rows(self): rows = [] with pytest.raises(ValueError): self.test_db_hook.bulk_insert_rows("table", rows) + + def test_callproc_dict(self): + parameters = {"a": 1, "b": 2, "c": 3} + + class bindvar(int): + def getvalue(self): + return self + + self.cur.fetchall.return_value = {k: bindvar(v) for k, v in parameters.items()} + result = self.test_db_hook.callproc("proc", True, parameters) + assert result == parameters diff --git a/tests/providers/teradata/operators/test_teradata.py b/tests/providers/teradata/operators/test_teradata.py index 3d491a66758b4..de7fcad1073d4 100644 --- a/tests/providers/teradata/operators/test_teradata.py +++ b/tests/providers/teradata/operators/test_teradata.py @@ -23,7 +23,7 @@ from airflow.exceptions import AirflowException from airflow.providers.common.sql.hooks.sql import fetch_all_handler from airflow.providers.teradata.hooks.teradata import TeradataHook -from airflow.providers.teradata.operators.teradata import TeradataOperator +from airflow.providers.teradata.operators.teradata import TeradataOperator, TeradataStoredProcedureOperator class TestTeradataOperator: @@ -59,13 +59,13 @@ def test_get_hook_default(self, mock_get_db_hook): @mock.patch("airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator.get_db_hook") def test_execute(self, mock_get_db_hook): sql = "SELECT * FROM test_table" - teradata_conn_id = "teradata_default" + conn_id = "teradata_default" parameters = {"parameter": "value"} autocommit = False context = "test_context" task_id = "test_task_id" - operator = TeradataOperator(sql=sql, conn_id=teradata_conn_id, parameters=parameters, task_id=task_id) + operator = TeradataOperator(sql=sql, conn_id=conn_id, parameters=parameters, task_id=task_id) operator.execute(context=context) mock_get_db_hook.return_value.run.assert_called_once_with( sql=sql, @@ -82,13 +82,13 @@ def test_teradata_operator_test_multi(self, mock_get_db_hook): "TRUNCATE TABLE test_airflow", "INSERT INTO test_airflow VALUES ('X')", ] - teradata_conn_id = "teradata_default" + conn_id = "teradata_default" parameters = {"parameter": "value"} autocommit = False context = "test_context" task_id = "test_task_id" - operator = TeradataOperator(sql=sql, conn_id=teradata_conn_id, parameters=parameters, task_id=task_id) + operator = TeradataOperator(sql=sql, conn_id=conn_id, parameters=parameters, task_id=task_id) operator.execute(context=context) mock_get_db_hook.return_value.run.assert_called_once_with( sql=sql, @@ -97,3 +97,29 @@ def test_teradata_operator_test_multi(self, mock_get_db_hook): handler=fetch_all_handler, return_last=True, ) + + +class TestTeradataStoredProcedureOperator: + @mock.patch.object(TeradataHook, "run", autospec=TeradataHook.run) + def test_execute(self, mock_run): + procedure = "test" + conn_id = "teradata_default" + parameters = {"parameter": "value"} + context = "test_context" + task_id = "test_task_id" + + operator = TeradataStoredProcedureOperator( + procedure=procedure, + conn_id=conn_id, + parameters=parameters, + task_id=task_id, + ) + result = operator.execute(context=context) + assert result is mock_run.return_value + mock_run.assert_called_once_with( + mock.ANY, + "{CALL test(?)}", + autocommit=True, + parameters=parameters, + handler=mock.ANY, + ) diff --git a/tests/providers/teradata/transfers/test_azure_blob_to_teradata.py b/tests/providers/teradata/transfers/test_azure_blob_to_teradata.py new file mode 100644 index 0000000000000..a9f0fd7f46e02 --- /dev/null +++ b/tests/providers/teradata/transfers/test_azure_blob_to_teradata.py @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +from airflow.providers.teradata.transfers.azure_blob_to_teradata import AzureBlobStorageToTeradataOperator + +AZURE_CONN_ID = "wasb_default" +TERADATA_CONN_ID = "teradata_default" +BLOB_SOURCE_KEY = "az/test" +TERADATA_TABLE = "test" +TASK_ID = "transfer_file" + + +class TestAzureBlobStorageToTeradataOperator: + def test_init(self): + operator = AzureBlobStorageToTeradataOperator( + azure_conn_id=AZURE_CONN_ID, + teradata_conn_id=TERADATA_CONN_ID, + teradata_table=TERADATA_TABLE, + blob_source_key=BLOB_SOURCE_KEY, + task_id=TASK_ID, + ) + + assert operator.azure_conn_id == AZURE_CONN_ID + assert operator.blob_source_key == BLOB_SOURCE_KEY + assert operator.teradata_conn_id == TERADATA_CONN_ID + assert operator.teradata_table == TERADATA_TABLE + assert operator.task_id == TASK_ID + + @mock.patch("airflow.providers.teradata.transfers.azure_blob_to_teradata.TeradataHook") + @mock.patch("airflow.providers.teradata.transfers.azure_blob_to_teradata.WasbHook") + def test_execute(self, mock_hook_wasb, mock_hook_teradata): + op = AzureBlobStorageToTeradataOperator( + azure_conn_id=AZURE_CONN_ID, + teradata_conn_id=TERADATA_CONN_ID, + teradata_table=TERADATA_TABLE, + blob_source_key=BLOB_SOURCE_KEY, + task_id=TASK_ID, + ) + op.execute(context=None) + mock_hook_wasb.assert_called_once_with(wasb_conn_id=AZURE_CONN_ID) + mock_hook_teradata.assert_called_once_with(teradata_conn_id=TERADATA_CONN_ID) + sql = "SQL" + mock_hook_teradata.run(sql) diff --git a/tests/providers/teradata/transfers/test_s3_to_teradata.py b/tests/providers/teradata/transfers/test_s3_to_teradata.py new file mode 100644 index 0000000000000..f88cacfb084f8 --- /dev/null +++ b/tests/providers/teradata/transfers/test_s3_to_teradata.py @@ -0,0 +1,79 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime +from unittest import mock + +from boto3.session import Session + +from airflow.models.connection import Connection +from airflow.providers.teradata.transfers.s3_to_teradata import S3ToTeradataOperator + +DEFAULT_DATE = datetime(2024, 1, 1) + +AWS_CONN_ID = "aws_default" +TERADATA_CONN_ID = "teradata_default" +S3_SOURCE_KEY = "aws/test" +TERADATA_TABLE = "test" +TASK_ID = "transfer_file" + + +class TestS3ToTeradataTransfer: + def test_init(self): + operator = S3ToTeradataOperator( + s3_source_key=S3_SOURCE_KEY, + teradata_table=TERADATA_TABLE, + aws_conn_id=AWS_CONN_ID, + teradata_conn_id=TERADATA_CONN_ID, + task_id=TASK_ID, + dag=None, + ) + + assert operator.aws_conn_id == AWS_CONN_ID + assert operator.s3_source_key == S3_SOURCE_KEY + assert operator.teradata_conn_id == TERADATA_CONN_ID + assert operator.teradata_table == TERADATA_TABLE + assert operator.task_id == TASK_ID + + @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection") + @mock.patch("airflow.models.connection.Connection") + @mock.patch("boto3.session.Session") + @mock.patch("airflow.providers.teradata.hooks.teradata.TeradataHook.run") + def test_execute(self, mock_run, mock_session, mock_connection, mock_hook): + access_key = "aws_access_key_id" + access_secret = "aws_secret_access_key" + mock_session.return_value = Session(access_key, access_secret) + mock_session.return_value.access_key = access_key + mock_session.return_value.secret_key = access_secret + mock_session.return_value.token = None + + mock_connection.return_value = Connection() + mock_hook.return_value = Connection() + + op = S3ToTeradataOperator( + s3_source_key=S3_SOURCE_KEY, + teradata_table=TERADATA_TABLE, + aws_conn_id=AWS_CONN_ID, + teradata_conn_id=TERADATA_CONN_ID, + task_id=TASK_ID, + dag=None, + ) + op.execute(None) + + assert mock_run.call_count == 1 diff --git a/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py new file mode 100644 index 0000000000000..9ae75bc0c29db --- /dev/null +++ b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py @@ -0,0 +1,163 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG to show usage of AzureBlobStorageToTeradataOperator + +The transfer operator moves files in CSV, JSON, and PARQUET formats from Azure Blob storage +to Teradata tables. In the example Directed Acyclic Graph (DAG) below, it assumes Airflow +Connections with the IDs `teradata_default` and `wasb_default` already exist. The DAG creates +tables using data from the Azure Blob location, reports the number of rows inserted into +the table, and subsequently drops the table. +""" + +from __future__ import annotations + +import datetime +import os + +import pytest + +from airflow import DAG + +try: + from airflow.providers.teradata.operators.teradata import TeradataOperator + from airflow.providers.teradata.transfers.azure_blob_to_teradata import AzureBlobStorageToTeradataOperator +except ImportError: + pytest.skip("Teradata provider apache-airflow-provider-teradata not available", allow_module_level=True) + +# [START azure_blob_to_teradata_transfer_operator_howto_guide] + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_azure_blob_to_teradata_transfer_operator" +CONN_ID = "teradata_default" + +with DAG( + dag_id=DAG_ID, + start_date=datetime.datetime(2020, 2, 2), + schedule="@once", + catchup=False, + default_args={"conn_id": "teradata_default"}, +) as dag: + # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] + transfer_data_csv = AzureBlobStorageToTeradataOperator( + task_id="transfer_data_blob_to_teradata_csv", + blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/", + teradata_table="example_blob_teradata_csv", + azure_conn_id="wasb_default", + teradata_conn_id="teradata_default", + trigger_rule="all_done", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + read_data_table_csv = TeradataOperator( + task_id="read_data_table_csv", + conn_id=CONN_ID, + sql=""" + SELECT count(1) from example_blob_teradata_csv; + """, + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] + drop_table_csv = TeradataOperator( + task_id="drop_table_csv", + conn_id=CONN_ID, + sql=""" + DROP TABLE example_blob_teradata_csv; + """, + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json] + transfer_data_json = AzureBlobStorageToTeradataOperator( + task_id="transfer_data_blob_to_teradata_json", + blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/", + teradata_table="example_blob_teradata_json", + azure_conn_id="wasb_default", + teradata_conn_id="teradata_default", + trigger_rule="all_done", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_json] + read_data_table_json = TeradataOperator( + task_id="read_data_table_json", + conn_id=CONN_ID, + sql=""" + SELECT count(1) from example_blob_teradata_json; + """, + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_json] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_json] + drop_table_json = TeradataOperator( + task_id="drop_table_json", + conn_id=CONN_ID, + sql=""" + DROP TABLE example_blob_teradata_json; + """, + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_json] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_parquet] + transfer_data_parquet = AzureBlobStorageToTeradataOperator( + task_id="transfer_data_blob_to_teradata_parquet", + blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/", + teradata_table="example_blob_teradata_parquet", + azure_conn_id="wasb_default", + teradata_conn_id="teradata_default", + trigger_rule="all_done", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_parquet] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_parquet] + read_data_table_parquet = TeradataOperator( + task_id="read_data_table_parquet", + conn_id=CONN_ID, + sql=""" + SELECT count(1) from example_blob_teradata_parquet; + """, + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_parquet] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_parquet] + drop_table_parquet = TeradataOperator( + task_id="drop_table_parquet", + conn_id=CONN_ID, + sql=""" + DROP TABLE example_blob_teradata_parquet; + """, + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_parquet] + + ( + transfer_data_csv, + transfer_data_json, + transfer_data_parquet, + read_data_table_csv, + read_data_table_json, + read_data_table_parquet, + drop_table_csv, + drop_table_json, + drop_table_parquet, + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide] + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/teradata/example_s3_to_teradata_transfer.py b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py new file mode 100644 index 0000000000000..ee84d550ea242 --- /dev/null +++ b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py @@ -0,0 +1,163 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG to show usage of S3StorageToTeradataOperator. + +The transfer operator moves files in CSV, JSON, and PARQUET formats from S3 +to Teradata tables. In the example Directed Acyclic Graph (DAG) below, it assumes Airflow +Connections with the IDs `teradata_default` and `aws_default` already exist. The DAG creates +tables using data from the S3, reports the number of rows inserted into +the table, and subsequently drops the table. +""" + +from __future__ import annotations + +import datetime +import os + +import pytest + +from airflow import DAG +from airflow.providers.teradata.transfers.s3_to_teradata import S3ToTeradataOperator + +try: + from airflow.providers.teradata.operators.teradata import TeradataOperator +except ImportError: + pytest.skip("Teradata provider apache-airflow-provider-teradata not available", allow_module_level=True) + +# [START s3_to_teradata_transfer_operator_howto_guide] + + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_s3_to_teradata_transfer_operator" +CONN_ID = "teradata_default" + +with DAG( + dag_id=DAG_ID, + start_date=datetime.datetime(2020, 2, 2), + schedule="@once", + catchup=False, + default_args={"conn_id": "teradata_default"}, +) as dag: + # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] + transfer_data_csv = S3ToTeradataOperator( + task_id="transfer_data_s3_to_teradata_csv", + s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/CSVDATA/", + teradata_table="example_s3_teradata_csv", + aws_conn_id="aws_default", + teradata_conn_id="teradata_default", + trigger_rule="all_done", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + read_data_table_csv = TeradataOperator( + task_id="read_data_table_csv", + conn_id=CONN_ID, + sql=""" + SELECT * from example_s3_teradata_csv; + """, + ) + # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv] + drop_table_csv = TeradataOperator( + task_id="drop_table_csv", + conn_id=CONN_ID, + sql=""" + DROP TABLE example_s3_teradata_csv; + """, + ) + # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json] + transfer_data_json = S3ToTeradataOperator( + task_id="transfer_data_s3_to_teradata_json", + s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/JSONDATA/", + teradata_table="example_s3_teradata_json", + aws_conn_id="aws_default", + teradata_conn_id="teradata_default", + trigger_rule="all_done", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json] + # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_json] + read_data_table_json = TeradataOperator( + task_id="read_data_table_json", + conn_id=CONN_ID, + sql=""" + SELECT * from example_s3_teradata_json; + """, + ) + # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_json] + # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_json] + drop_table_json = TeradataOperator( + task_id="drop_table_json", + conn_id=CONN_ID, + sql=""" + DROP TABLE example_s3_teradata_json; + """, + ) + # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_json] + # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet] + transfer_data_parquet = S3ToTeradataOperator( + task_id="transfer_data_s3_to_teradata_parquet", + s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/PARQUETDATA/", + teradata_table="example_s3_teradata_parquet", + aws_conn_id="aws_default", + teradata_conn_id="teradata_default", + trigger_rule="all_done", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet] + # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet] + read_data_table_parquet = TeradataOperator( + task_id="read_data_table_parquet", + conn_id=CONN_ID, + sql=""" + SELECT * from example_s3_teradata_parquet; + """, + ) + # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet] + # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet] + drop_table_parquet = TeradataOperator( + task_id="drop_table_parquet", + conn_id=CONN_ID, + sql=""" + DROP TABLE example_s3_teradata_parquet; + """, + ) + # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet] + ( + transfer_data_csv, + transfer_data_json, + transfer_data_parquet, + read_data_table_csv, + read_data_table_json, + read_data_table_parquet, + drop_table_csv, + drop_table_json, + drop_table_parquet, + ) + # [END s3_to_teradata_transfer_operator_howto_guide] + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/teradata/example_ssl_teradata.py b/tests/system/providers/teradata/example_ssl_teradata.py index 600f1d6b89685..c005d3017f96b 100644 --- a/tests/system/providers/teradata/example_ssl_teradata.py +++ b/tests/system/providers/teradata/example_ssl_teradata.py @@ -16,7 +16,11 @@ # specific language governing permissions and limitations # under the License. """ -Example use of Teradata related operators. +Example Airflow DAG to show usage of TeradataOperator with SSL teradata connection. + +This DAG assumes Airflow Connection with connection id `teradata_ssl_default` already exists in locally. It +shows how to use create, update, delete and select teradata statements with TeradataOperator as tasks in +airflow dags using TeradataStoredProcedureOperator. """ from __future__ import annotations @@ -42,7 +46,7 @@ # the Teradata Operator ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") -DAG_ID = "example_teradata" +DAG_ID = "example_ssl_teradata" with DAG( dag_id=DAG_ID, diff --git a/tests/system/providers/teradata/example_teradata.py b/tests/system/providers/teradata/example_teradata.py index 2e470a877a0da..5f1569ae9d840 100644 --- a/tests/system/providers/teradata/example_teradata.py +++ b/tests/system/providers/teradata/example_teradata.py @@ -16,7 +16,11 @@ # specific language governing permissions and limitations # under the License. """ -Example use of Teradata related operators. +Example Airflow DAG to show usage of TeradataOperator. + +This DAG assumes Airflow Connection with connection id `teradata_default` already exists in locally. It +shows how to use create, update, delete and select teradata statements with TeradataOperator as tasks in +airflow dags using TeradataStoredProcedureOperator. """ from __future__ import annotations diff --git a/tests/system/providers/teradata/example_teradata_call_sp.py b/tests/system/providers/teradata/example_teradata_call_sp.py new file mode 100644 index 0000000000000..bf43eb475b4fe --- /dev/null +++ b/tests/system/providers/teradata/example_teradata_call_sp.py @@ -0,0 +1,114 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Example Airflow DAG to show Stored Procedure creation and execution on teradata database using +TeradataStoredProcedureOperator. + +This DAG assumes Airflow Connection with connection id `teradata_sp_call` already exists in locally. It +shows how to create and execute Stored Procedure as tasks in airflow dags using +TeradataStoredProcedureOperator.""" + +from __future__ import annotations + +from datetime import datetime + +import pytest + +from airflow import DAG + +try: + from airflow.providers.teradata.operators.teradata import ( + TeradataOperator, + TeradataStoredProcedureOperator, + ) +except ImportError: + pytest.skip("Teradata provider apache-airflow-provider-teradata not available", allow_module_level=True) + +# [START howto_teradata_operator_for_sp] +CONN_ID = "teradata_sp_call" +DAG_ID = "example_teradata_call_sp" + +with DAG( + dag_id=DAG_ID, + max_active_runs=1, + max_active_tasks=3, + catchup=False, + default_args={"conn_id": CONN_ID}, + schedule="@once", + start_date=datetime(2023, 1, 1), +) as dag: + # [START howto_teradata_stored_procedure_operator_with_in_inout] + create_sp_in_inout = TeradataOperator( + task_id="create_sp_in_inout", + sql=r"""REPLACE PROCEDURE TEST_PROCEDURE (IN val_in INTEGER, OUT val_out INTEGER) + BEGIN + set val_out = val_in * 2; + END; + """, + ) + + opr_sp_in_inout = TeradataStoredProcedureOperator( + task_id="opr_sp_in_inout", + procedure="TEST_PROCEDURE", + parameters=[3, int], + ) + + # [END howto_teradata_stored_procedure_operator_with_in_inout] + + # [START howto_teradata_stored_procedure_operator_with_in_out_dynamic_result] + create_sp_param_dr = TeradataOperator( + task_id="create_sp_param_dr", + sql=r"""replace procedure examplestoredproc (in p1 integer, inout p2 integer, out p3 integer) + dynamic result sets 2 + begin + declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ; + declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ; + open cur1 ; + open cur2 ; + set p2 = p1 + p2 ; + set p3 = p1 * p2 ; + end ; + """, + ) + opr_sp_param_dr = TeradataStoredProcedureOperator( + task_id="opr_sp_param_dr", + procedure="examplestoredproc", + parameters=[3, 2, int], + ) + # [END howto_teradata_stored_procedure_operator_with_in_out_dynamic_result] + + # [START howto_teradata_stored_procedure_operator_drop] + drop_sp = TeradataOperator( + task_id="drop_sp", + sql=r"""drop procedure examplestoredproc; + """, + ) + # [END howto_teradata_stored_procedure_operator_drop] + (create_sp_in_inout >> opr_sp_in_inout >> create_sp_param_dr >> opr_sp_param_dr >> drop_sp) + + # [END howto_teradata_operator_for_sp] + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/teradata/example_teradata_to_teradata_transfer.py b/tests/system/providers/teradata/example_teradata_to_teradata_transfer.py index d4d4014ff4ff3..054bae7b85342 100644 --- a/tests/system/providers/teradata/example_teradata_to_teradata_transfer.py +++ b/tests/system/providers/teradata/example_teradata_to_teradata_transfer.py @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. """ -Example Airflow DAG to show usage of teradata to teradata transfer operator +Example Airflow DAG to show usage of teradata to teradata transfer operator. The transfer operator connects to source teradata server, runs query to fetch data from source and inserts that data into destination teradata database server. It assumes tables already exists. From 73ca455d108dcfb53d8ab5720ba6f7ec1fa23f2d Mon Sep 17 00:00:00 2001 From: SatishChGit Date: Thu, 2 May 2024 08:04:56 -0700 Subject: [PATCH 2/7] PR initial review comments addressed (#36) --- .../transfers/azure_blob_to_teradata.py | 12 +--------- .../teradata/transfers/s3_to_teradata.py | 12 +--------- ...example_azure_blob_to_teradata_transfer.py | 24 +++++++++---------- .../example_s3_to_teradata_transfer.py | 24 +++++++++---------- 4 files changed, 26 insertions(+), 46 deletions(-) diff --git a/airflow/providers/teradata/transfers/azure_blob_to_teradata.py b/airflow/providers/teradata/transfers/azure_blob_to_teradata.py index 45fc5d56daefa..d7595a3fa7048 100644 --- a/airflow/providers/teradata/transfers/azure_blob_to_teradata.py +++ b/airflow/providers/teradata/transfers/azure_blob_to_teradata.py @@ -90,16 +90,6 @@ def execute(self, context: Context) -> None: try: teradata_hook.run(sql, True) except Exception as ex: - # Handling permission issue errors - if "Error 3524" in str(ex): - self.log.error("The user does not have CREATE TABLE access in teradata") - raise - if "Error 9134" in str(ex): - self.log.error( - "There is an issue with the transfer operation. Please validate azure and " - "teradata connection details." - ) - raise - self.log.error("Issue occurred at Teradata: %s", str(ex)) + self.log.error(str(ex)) raise self.log.info("The transfer of data from Azure Blob to Teradata was successful") diff --git a/airflow/providers/teradata/transfers/s3_to_teradata.py b/airflow/providers/teradata/transfers/s3_to_teradata.py index 4c01084ad2b58..1d6f24a5bc8ed 100644 --- a/airflow/providers/teradata/transfers/s3_to_teradata.py +++ b/airflow/providers/teradata/transfers/s3_to_teradata.py @@ -95,16 +95,6 @@ def execute(self, context: Context) -> None: try: teradata_hook.run(sql, True) except Exception as ex: - # Handling permission issue errors - if "Error 3524" in str(ex): - self.log.error("The user does not have CREATE TABLE access in teradata") - raise - if "Error 9134" in str(ex): - self.log.error( - "There is an issue with the transfer operation. Please validate s3 and " - "teradata connection details." - ) - raise - self.log.error("Issue occurred at Teradata: %s", str(ex)) + self.log.error(str(ex)) raise self.log.info("The transfer of data from S3 to Teradata was successful") diff --git a/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py index 9ae75bc0c29db..5b1f5c2c60635 100644 --- a/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py +++ b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py @@ -56,7 +56,7 @@ # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] transfer_data_csv = AzureBlobStorageToTeradataOperator( task_id="transfer_data_blob_to_teradata_csv", - blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/", + blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/", teradata_table="example_blob_teradata_csv", azure_conn_id="wasb_default", teradata_conn_id="teradata_default", @@ -84,7 +84,7 @@ # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json] transfer_data_json = AzureBlobStorageToTeradataOperator( task_id="transfer_data_blob_to_teradata_json", - blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/", + blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/", teradata_table="example_blob_teradata_json", azure_conn_id="wasb_default", teradata_conn_id="teradata_default", @@ -112,7 +112,7 @@ # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_parquet] transfer_data_parquet = AzureBlobStorageToTeradataOperator( task_id="transfer_data_blob_to_teradata_parquet", - blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/", + blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/", teradata_table="example_blob_teradata_parquet", azure_conn_id="wasb_default", teradata_conn_id="teradata_default", @@ -139,15 +139,15 @@ # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_parquet] ( - transfer_data_csv, - transfer_data_json, - transfer_data_parquet, - read_data_table_csv, - read_data_table_json, - read_data_table_parquet, - drop_table_csv, - drop_table_json, - drop_table_parquet, + transfer_data_csv >> + transfer_data_json >> + transfer_data_parquet >> + read_data_table_csv >> + read_data_table_json >> + read_data_table_parquet >> + drop_table_csv >> + drop_table_json >> + drop_table_parquet ) # [END azure_blob_to_teradata_transfer_operator_howto_guide] diff --git a/tests/system/providers/teradata/example_s3_to_teradata_transfer.py b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py index ee84d550ea242..2b81d1048f039 100644 --- a/tests/system/providers/teradata/example_s3_to_teradata_transfer.py +++ b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py @@ -57,7 +57,7 @@ # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] transfer_data_csv = S3ToTeradataOperator( task_id="transfer_data_s3_to_teradata_csv", - s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/CSVDATA/", + s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/CSVDATA/09394500/2018/06/", teradata_table="example_s3_teradata_csv", aws_conn_id="aws_default", teradata_conn_id="teradata_default", @@ -85,7 +85,7 @@ # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json] transfer_data_json = S3ToTeradataOperator( task_id="transfer_data_s3_to_teradata_json", - s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/JSONDATA/", + s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/JSONDATA/09394500/2018/06/", teradata_table="example_s3_teradata_json", aws_conn_id="aws_default", teradata_conn_id="teradata_default", @@ -113,7 +113,7 @@ # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet] transfer_data_parquet = S3ToTeradataOperator( task_id="transfer_data_s3_to_teradata_parquet", - s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/PARQUETDATA/", + s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/PARQUETDATA/09394500/2018/06/", teradata_table="example_s3_teradata_parquet", aws_conn_id="aws_default", teradata_conn_id="teradata_default", @@ -139,15 +139,15 @@ ) # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet] ( - transfer_data_csv, - transfer_data_json, - transfer_data_parquet, - read_data_table_csv, - read_data_table_json, - read_data_table_parquet, - drop_table_csv, - drop_table_json, - drop_table_parquet, + transfer_data_csv >> + transfer_data_json >> + transfer_data_parquet >> + read_data_table_csv >> + read_data_table_json >> + read_data_table_parquet >> + drop_table_csv >> + drop_table_json >> + drop_table_parquet ) # [END s3_to_teradata_transfer_operator_howto_guide] From ba0e5d9f182cae685bb63d4665a2846554fbc33f Mon Sep 17 00:00:00 2001 From: Satish Ch Date: Thu, 2 May 2024 08:35:30 -0700 Subject: [PATCH 3/7] Generated provider dependenices --- generated/provider_dependencies.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 9c0c77db7fe92..87f5e6d0e32a1 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -1133,7 +1133,7 @@ "apache-airflow-providers-amazon", "apache-airflow-providers-common-sql>=1.3.1", "apache-airflow-providers-microsoft-azure", - "apache-airflow>=2.6.0", + "apache-airflow>=2.7.0", "teradatasql>=17.20.0.28", "teradatasqlalchemy>=17.20.0.0" ], From 0a9c4afa48772f6a807bacf4d31876b26bc6ebe7 Mon Sep 17 00:00:00 2001 From: Satish Ch Date: Thu, 2 May 2024 09:55:41 -0700 Subject: [PATCH 4/7] pre-commit format applied --- .../example_azure_blob_to_teradata_transfer.py | 18 +++++++++--------- .../example_s3_to_teradata_transfer.py | 18 +++++++++--------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py index 5b1f5c2c60635..1de354ec59188 100644 --- a/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py +++ b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py @@ -139,15 +139,15 @@ # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_parquet] ( - transfer_data_csv >> - transfer_data_json >> - transfer_data_parquet >> - read_data_table_csv >> - read_data_table_json >> - read_data_table_parquet >> - drop_table_csv >> - drop_table_json >> - drop_table_parquet + transfer_data_csv + >> transfer_data_json + >> transfer_data_parquet + >> read_data_table_csv + >> read_data_table_json + >> read_data_table_parquet + >> drop_table_csv + >> drop_table_json + >> drop_table_parquet ) # [END azure_blob_to_teradata_transfer_operator_howto_guide] diff --git a/tests/system/providers/teradata/example_s3_to_teradata_transfer.py b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py index 2b81d1048f039..a17f34941050d 100644 --- a/tests/system/providers/teradata/example_s3_to_teradata_transfer.py +++ b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py @@ -139,15 +139,15 @@ ) # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet] ( - transfer_data_csv >> - transfer_data_json >> - transfer_data_parquet >> - read_data_table_csv >> - read_data_table_json >> - read_data_table_parquet >> - drop_table_csv >> - drop_table_json >> - drop_table_parquet + transfer_data_csv + >> transfer_data_json + >> transfer_data_parquet + >> read_data_table_csv + >> read_data_table_json + >> read_data_table_parquet + >> drop_table_csv + >> drop_table_json + >> drop_table_parquet ) # [END s3_to_teradata_transfer_operator_howto_guide] From 1747e64f51f53a50a62ed31550be9ecf0c5e4ac7 Mon Sep 17 00:00:00 2001 From: SatishChGit Date: Fri, 3 May 2024 10:39:12 -0700 Subject: [PATCH 5/7] Addressed PR review comments Addressed following PR review comments Azure and Amazon optional dependency in teradata provider.yaml Added another parameter to s3 transfer operator to specify given bucket is public or not Added more examples of stored procedure operator in stored procedure system DAG Changed """ to " for single line SQL statements in system tests DAGs Applied dedent for SQL statements in cloud transfer operators --- airflow/providers/teradata/hooks/teradata.py | 3 +- airflow/providers/teradata/provider.yaml | 10 ++- .../transfers/azure_blob_to_teradata.py | 14 ++- .../teradata/transfers/s3_to_teradata.py | 35 +++++--- .../operators/teradata.rst | 66 +++++++++++--- generated/provider_dependencies.json | 2 - ...example_azure_blob_to_teradata_transfer.py | 24 ++--- .../example_s3_to_teradata_transfer.py | 27 ++---- .../teradata/example_ssl_teradata.py | 8 +- .../providers/teradata/example_teradata.py | 28 ++---- .../teradata/example_teradata_call_sp.py | 88 ++++++++++++++++--- .../example_teradata_to_teradata_transfer.py | 16 +--- 12 files changed, 201 insertions(+), 120 deletions(-) diff --git a/airflow/providers/teradata/hooks/teradata.py b/airflow/providers/teradata/hooks/teradata.py index 6f9028759172b..c5070d1daff39 100644 --- a/airflow/providers/teradata/hooks/teradata.py +++ b/airflow/providers/teradata/hooks/teradata.py @@ -234,7 +234,6 @@ def handler(cursor): if records is None: return - if isinstance(records, list): return [row for row in records] @@ -246,7 +245,7 @@ def handler(cursor): sql, autocommit=autocommit, parameters=( - {name: _map_param(value) for (name, value) in parameters.items()} + [_map_param(value) for (name, value) in parameters.items()] if isinstance(parameters, dict) else [_map_param(value) for value in parameters] ), diff --git a/airflow/providers/teradata/provider.yaml b/airflow/providers/teradata/provider.yaml index a212b9acc0222..66cea83513c41 100644 --- a/airflow/providers/teradata/provider.yaml +++ b/airflow/providers/teradata/provider.yaml @@ -33,8 +33,14 @@ dependencies: - apache-airflow-providers-common-sql>=1.3.1 - teradatasqlalchemy>=17.20.0.0 - teradatasql>=17.20.0.28 - - apache-airflow-providers-microsoft-azure - - apache-airflow-providers-amazon + +additional-extras: + - name: microsoft.azure + dependencies: + - apache-airflow-providers-microsoft-azure + - name: amazon + dependencies: + - apache-airflow-providers-amazon integrations: - integration-name: Teradata diff --git a/airflow/providers/teradata/transfers/azure_blob_to_teradata.py b/airflow/providers/teradata/transfers/azure_blob_to_teradata.py index d7595a3fa7048..416b4e7136cb0 100644 --- a/airflow/providers/teradata/transfers/azure_blob_to_teradata.py +++ b/airflow/providers/teradata/transfers/azure_blob_to_teradata.py @@ -17,10 +17,18 @@ # under the License. from __future__ import annotations +from textwrap import dedent from typing import TYPE_CHECKING, Sequence from airflow.models import BaseOperator -from airflow.providers.microsoft.azure.hooks.wasb import WasbHook + +try: + from airflow.providers.microsoft.azure.hooks.wasb import WasbHook +except ModuleNotFoundError as e: + from airflow.exceptions import AirflowOptionalProviderFeatureException + + raise AirflowOptionalProviderFeatureException(e) + from airflow.providers.teradata.hooks.teradata import TeradataHook if TYPE_CHECKING: @@ -77,7 +85,7 @@ def execute(self, context: Context) -> None: access_id = conn.login if conn.login is not None else "" access_secret = conn.password if conn.password is not None else "" teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id) - sql = f""" + sql = dedent(f""" CREATE MULTISET TABLE {self.teradata_table} AS ( SELECT * FROM ( @@ -86,7 +94,7 @@ def execute(self, context: Context) -> None: ACCESS_KEY= '{access_secret}' ) AS d ) WITH DATA - """ + """).rstrip() try: teradata_hook.run(sql, True) except Exception as ex: diff --git a/airflow/providers/teradata/transfers/s3_to_teradata.py b/airflow/providers/teradata/transfers/s3_to_teradata.py index 1d6f24a5bc8ed..f7998ea861135 100644 --- a/airflow/providers/teradata/transfers/s3_to_teradata.py +++ b/airflow/providers/teradata/transfers/s3_to_teradata.py @@ -17,10 +17,17 @@ # under the License. from __future__ import annotations +from textwrap import dedent from typing import TYPE_CHECKING, Sequence from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.s3 import S3Hook + +try: + from airflow.providers.amazon.aws.hooks.s3 import S3Hook +except ModuleNotFoundError as e: + from airflow.exceptions import AirflowOptionalProviderFeatureException + + raise AirflowOptionalProviderFeatureException(e) from airflow.providers.teradata.hooks.teradata import TeradataHook if TYPE_CHECKING: @@ -35,10 +42,13 @@ class S3ToTeradataOperator(BaseOperator): For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:S3ToTeradataOperator` - :param s3_source_key: The URI format specifying the location of the S3 object store.(templated) + :param s3_source_key: The URI format specifying the location of the S3 bucket.(templated) The URI format is /s3/YOUR-BUCKET.s3.amazonaws.com/YOUR-BUCKET-NAME. Refer to https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US + :param public_bucket: Specifies whether the provided S3 bucket is public. If the bucket is public, + it means that anyone can access the objects within it via a URL without requiring authentication. + If the bucket is private and authentication is not provided, the operator will throw an exception. :param teradata_table: The name of the teradata table to which the data is transferred.(templated) :param aws_conn_id: The Airflow AWS connection used for AWS credentials. :param teradata_conn_id: The connection ID used to connect to Teradata @@ -55,6 +65,7 @@ def __init__( self, *, s3_source_key: str, + public_bucket: bool = False, teradata_table: str, aws_conn_id: str = "aws_default", teradata_conn_id: str = "teradata_default", @@ -62,6 +73,7 @@ def __init__( ) -> None: super().__init__(**kwargs) self.s3_source_key = s3_source_key + self.public_bucket = public_bucket self.teradata_table = teradata_table self.aws_conn_id = aws_conn_id self.teradata_conn_id = teradata_conn_id @@ -72,17 +84,14 @@ def execute(self, context: Context) -> None: ) s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) - access_key = ( - s3_hook.conn_config.aws_access_key_id if s3_hook.conn_config.aws_access_key_id is not None else "" - ) - access_secret = ( - s3_hook.conn_config.aws_secret_access_key - if s3_hook.conn_config.aws_secret_access_key is not None - else "" - ) - + access_key = "" + access_secret = "" + if not self.public_bucket: + credentials = s3_hook.get_credentials() + access_key = credentials.access_key + access_secret = credentials.secret_key teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id) - sql = f""" + sql = dedent(f""" CREATE MULTISET TABLE {self.teradata_table} AS ( SELECT * FROM ( @@ -91,7 +100,7 @@ def execute(self, context: Context) -> None: ACCESS_KEY= '{access_secret}' ) AS d ) WITH DATA - """ + """).rstrip() try: teradata_hook.run(sql, True) except Exception as ex: diff --git a/docs/apache-airflow-providers-teradata/operators/teradata.rst b/docs/apache-airflow-providers-teradata/operators/teradata.rst index e255a986afcec..6fd7d371a7b09 100644 --- a/docs/apache-airflow-providers-teradata/operators/teradata.rst +++ b/docs/apache-airflow-providers-teradata/operators/teradata.rst @@ -130,22 +130,66 @@ Assume a stored procedure exists in the database that looks like this: .. code-block:: sql - REPLACE PROCEDURE - TEST_PROCEDURE (IN val_in INTEGER, OUT val_out INTEGER) + REPLACE PROCEDURE TEST_PROCEDURE ( + IN val_in INTEGER, + INOUT val_in_out INTEGER, + OUT val_out INTEGER, + OUT value_str_out varchar(100) + ) + BEGIN + set val_out = val_in * 2; + set val_in_out = val_in_out * 4; + set value_str_out = 'string output'; + END; + / + +This stored procedure takes an integer argument, val_in, as input. +It operates with a single inout argument, val_in_out, which serves as both input and output. +Additionally, it returns an integer argument, val_out, and a string argument, value_str_out. + +This stored procedure can be invoked using +:class:`~airflow.providers.teradata.operators.teradata.TeradataStoredProcedureOperator` in various manners. + +One approach involves passing parameters positionally as a list, with output parameters specified as Python data types: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_call_sp.py + :language: python + :start-after: [START howto_call_teradata_stored_procedure_operator_with_types] + :end-before: [END howto_call_teradata_stored_procedure_operator_with_types] + +Alternatively, parameters can be passed positionally as a list, with output parameters designated as placeholders: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_call_sp.py + :language: python + :start-after: [START howto_call_teradata_stored_procedure_operator_with_place_holder] + :end-before: [END howto_call_teradata_stored_procedure_operator_with_place_holder] + +Another method entails passing parameters positionally as a dictionary: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_call_sp.py + :language: python + :start-after: [START howto_call_teradata_stored_procedure_operator_with_dict_input] + :end-before: [END howto_call_teradata_stored_procedure_operator_with_dict_input] + +Assume a stored procedure exists in the database that looks like this: + + .. code-block:: sql + + REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP) BEGIN - set val_out = val_in * 2; + -- Assign current timestamp to the OUT parameter + SET out_timestamp = CURRENT_TIMESTAMP; END; / -This stored procedure accepts a single integer argument, val_in, and outputs -a single integer argument, val_out. This can be represented with the following -call using :class:`~airflow.providers.teradata.operators.teradata.TeradataStoredProcedureOperator` +This stored procedure yields a singular timestamp argument, out_timestamp, and is callable through +:class:`~airflow.providers.teradata.operators.teradata.TeradataStoredProcedureOperator` with parameters passed positionally as a list: .. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_call_sp.py :language: python - :start-after: [START howto_teradata_stored_procedure_operator_with_in_inout] - :end-before: [END howto_teradata_stored_procedure_operator_with_in_inout] + :start-after: [START howto_call_teradata_stored_procedure_operator_timestamp] + :end-before: [END howto_call_teradata_stored_procedure_operator_timestamp] Assume a stored procedure exists in the database that looks like this: @@ -163,9 +207,9 @@ Assume a stored procedure exists in the database that looks like this: END; / -This stored procedure accepts a single integer argument, val_in, and outputs -a single integer argument, val_out and returns two cursors representing output of select queries. -This can be represented with the following call using +This stored procedure takes a single integer argument, val_in, as input and produces a single integer argument, val_out. +Additionally, it yields two cursors representing the outputs of select queries. +This stored procedure can be invoked using :class:`~airflow.providers.teradata.operators.teradata.TeradataStoredProcedureOperator` with parameters passed positionally as a list: diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 87f5e6d0e32a1..cf65fdba52950 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -1130,9 +1130,7 @@ }, "teradata": { "deps": [ - "apache-airflow-providers-amazon", "apache-airflow-providers-common-sql>=1.3.1", - "apache-airflow-providers-microsoft-azure", "apache-airflow>=2.7.0", "teradatasql>=17.20.0.28", "teradatasqlalchemy>=17.20.0.0" diff --git a/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py index 1de354ec59188..551f86024dc74 100644 --- a/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py +++ b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py @@ -67,18 +67,14 @@ read_data_table_csv = TeradataOperator( task_id="read_data_table_csv", conn_id=CONN_ID, - sql=""" - SELECT count(1) from example_blob_teradata_csv; - """, + sql="SELECT count(1) from example_blob_teradata_csv;", ) # [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv] # [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] drop_table_csv = TeradataOperator( task_id="drop_table_csv", conn_id=CONN_ID, - sql=""" - DROP TABLE example_blob_teradata_csv; - """, + sql="DROP TABLE example_blob_teradata_csv;", ) # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json] @@ -95,18 +91,14 @@ read_data_table_json = TeradataOperator( task_id="read_data_table_json", conn_id=CONN_ID, - sql=""" - SELECT count(1) from example_blob_teradata_json; - """, + sql="SELECT count(1) from example_blob_teradata_json;", ) # [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_json] # [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_json] drop_table_json = TeradataOperator( task_id="drop_table_json", conn_id=CONN_ID, - sql=""" - DROP TABLE example_blob_teradata_json; - """, + sql="DROP TABLE example_blob_teradata_json;", ) # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_json] # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_parquet] @@ -123,18 +115,14 @@ read_data_table_parquet = TeradataOperator( task_id="read_data_table_parquet", conn_id=CONN_ID, - sql=""" - SELECT count(1) from example_blob_teradata_parquet; - """, + sql="SELECT count(1) from example_blob_teradata_parquet;", ) # [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_parquet] # [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_parquet] drop_table_parquet = TeradataOperator( task_id="drop_table_parquet", conn_id=CONN_ID, - sql=""" - DROP TABLE example_blob_teradata_parquet; - """, + sql="DROP TABLE example_blob_teradata_parquet;", ) # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_parquet] diff --git a/tests/system/providers/teradata/example_s3_to_teradata_transfer.py b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py index a17f34941050d..594085dedafd2 100644 --- a/tests/system/providers/teradata/example_s3_to_teradata_transfer.py +++ b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py @@ -58,6 +58,7 @@ transfer_data_csv = S3ToTeradataOperator( task_id="transfer_data_s3_to_teradata_csv", s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/CSVDATA/09394500/2018/06/", + public_bucket=True, teradata_table="example_s3_teradata_csv", aws_conn_id="aws_default", teradata_conn_id="teradata_default", @@ -68,24 +69,21 @@ read_data_table_csv = TeradataOperator( task_id="read_data_table_csv", conn_id=CONN_ID, - sql=""" - SELECT * from example_s3_teradata_csv; - """, + sql="SELECT * from example_s3_teradata_csv;", ) # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv] # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv] drop_table_csv = TeradataOperator( task_id="drop_table_csv", conn_id=CONN_ID, - sql=""" - DROP TABLE example_s3_teradata_csv; - """, + sql="DROP TABLE example_s3_teradata_csv;", ) # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv] # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json] transfer_data_json = S3ToTeradataOperator( task_id="transfer_data_s3_to_teradata_json", s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/JSONDATA/09394500/2018/06/", + public_bucket=True, teradata_table="example_s3_teradata_json", aws_conn_id="aws_default", teradata_conn_id="teradata_default", @@ -96,24 +94,21 @@ read_data_table_json = TeradataOperator( task_id="read_data_table_json", conn_id=CONN_ID, - sql=""" - SELECT * from example_s3_teradata_json; - """, + sql="SELECT * from example_s3_teradata_json;", ) # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_json] # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_json] drop_table_json = TeradataOperator( task_id="drop_table_json", conn_id=CONN_ID, - sql=""" - DROP TABLE example_s3_teradata_json; - """, + sql="DROP TABLE example_s3_teradata_json;", ) # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_json] # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet] transfer_data_parquet = S3ToTeradataOperator( task_id="transfer_data_s3_to_teradata_parquet", s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/PARQUETDATA/09394500/2018/06/", + public_bucket=True, teradata_table="example_s3_teradata_parquet", aws_conn_id="aws_default", teradata_conn_id="teradata_default", @@ -124,18 +119,14 @@ read_data_table_parquet = TeradataOperator( task_id="read_data_table_parquet", conn_id=CONN_ID, - sql=""" - SELECT * from example_s3_teradata_parquet; - """, + sql="SELECT * from example_s3_teradata_parquet;", ) # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet] # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet] drop_table_parquet = TeradataOperator( task_id="drop_table_parquet", conn_id=CONN_ID, - sql=""" - DROP TABLE example_s3_teradata_parquet; - """, + sql="DROP TABLE example_s3_teradata_parquet;", ) # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet] ( diff --git a/tests/system/providers/teradata/example_ssl_teradata.py b/tests/system/providers/teradata/example_ssl_teradata.py index c005d3017f96b..18015059bc880 100644 --- a/tests/system/providers/teradata/example_ssl_teradata.py +++ b/tests/system/providers/teradata/example_ssl_teradata.py @@ -89,23 +89,23 @@ # [START teradata_operator_howto_guide_get_all_countries] get_all_countries = TeradataOperator( task_id="get_all_countries", - sql=r"""SELECT * FROM SSL_Country;""", + sql=r"SELECT * FROM SSL_Country;", ) # [END teradata_operator_howto_guide_get_all_countries] # [START teradata_operator_howto_guide_params_passing_get_query] get_countries_from_continent = TeradataOperator( task_id="get_countries_from_continent", - sql=r"""SELECT * FROM SSL_Country where {{ params.column }}='{{ params.value }}';""", + sql=r"SELECT * FROM SSL_Country where {{ params.column }}='{{ params.value }}';", params={"column": "continent", "value": "Asia"}, ) # [END teradata_operator_howto_guide_params_passing_get_query] # [START teradata_operator_howto_guide_drop_country_table] drop_country_table = TeradataOperator( - task_id="drop_country_table", sql=r"""DROP TABLE SSL_Country;""", dag=dag + task_id="drop_country_table", sql=r"DROP TABLE SSL_Country;", dag=dag ) # [END teradata_operator_howto_guide_drop_country_table] # [START teradata_operator_howto_guide_drop_users_table] - drop_users_table = TeradataOperator(task_id="drop_users_table", sql=r"""DROP TABLE SSL_Users;""", dag=dag) + drop_users_table = TeradataOperator(task_id="drop_users_table", sql=r"DROP TABLE SSL_Users;", dag=dag) # [END teradata_operator_howto_guide_drop_users_table] ( diff --git a/tests/system/providers/teradata/example_teradata.py b/tests/system/providers/teradata/example_teradata.py index 5f1569ae9d840..6d4661b0462df 100644 --- a/tests/system/providers/teradata/example_teradata.py +++ b/tests/system/providers/teradata/example_teradata.py @@ -88,44 +88,34 @@ # [START teradata_operator_howto_guide_get_all_countries] get_all_countries = TeradataOperator( task_id="get_all_countries", - sql=r""" - SELECT * FROM Country; - """, + sql=r"SELECT * FROM Country;", ) # [END teradata_operator_howto_guide_get_all_countries] # [START teradata_operator_howto_guide_params_passing_get_query] get_countries_from_continent = TeradataOperator( task_id="get_countries_from_continent", - sql=r""" - SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}'; - """, + sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';", params={"column": "continent", "value": "Asia"}, ) # [END teradata_operator_howto_guide_params_passing_get_query] # [START teradata_operator_howto_guide_drop_country_table] drop_country_table = TeradataOperator( task_id="drop_country_table", - sql=r""" - DROP TABLE Country; - """, + sql=r"DROP TABLE Country;", dag=dag, ) # [END teradata_operator_howto_guide_drop_country_table] # [START teradata_operator_howto_guide_drop_users_table] drop_users_table = TeradataOperator( task_id="drop_users_table", - sql=r""" - DROP TABLE Users; - """, + sql=r"DROP TABLE Users;", dag=dag, ) # [END teradata_operator_howto_guide_drop_users_table] # [START teradata_operator_howto_guide_create_schema] create_schema = TeradataOperator( task_id="create_schema", - sql=r""" - CREATE DATABASE airflow_temp AS PERM=10e6; - """, + sql=r"CREATE DATABASE airflow_temp AS PERM=10e6;", ) # [END teradata_operator_howto_guide_create_schema] # [START teradata_operator_howto_guide_create_table_with_schema] @@ -144,9 +134,7 @@ # [START teradata_operator_howto_guide_drop_schema_table] drop_schema_table = TeradataOperator( task_id="drop_schema_table", - sql=r""" - DROP TABLE schema_table; - """, + sql=r"DROP TABLE schema_table;", dag=dag, schema="airflow_temp", ) @@ -154,9 +142,7 @@ # [START teradata_operator_howto_guide_drop_schema] drop_schema = TeradataOperator( task_id="drop_schema", - sql=r""" - DROP DATABASE airflow_temp; - """, + sql=r"DROP DATABASE airflow_temp;", dag=dag, ) diff --git a/tests/system/providers/teradata/example_teradata_call_sp.py b/tests/system/providers/teradata/example_teradata_call_sp.py index bf43eb475b4fe..911fb5cf82155 100644 --- a/tests/system/providers/teradata/example_teradata_call_sp.py +++ b/tests/system/providers/teradata/example_teradata_call_sp.py @@ -51,24 +51,64 @@ schedule="@once", start_date=datetime(2023, 1, 1), ) as dag: - # [START howto_teradata_stored_procedure_operator_with_in_inout] + # [START howto_teradata_stored_procedure_operator_with_in_inout_out] + # [START howto_create_teradata_stored_procedure_operator_with_in_inout] create_sp_in_inout = TeradataOperator( task_id="create_sp_in_inout", - sql=r"""REPLACE PROCEDURE TEST_PROCEDURE (IN val_in INTEGER, OUT val_out INTEGER) + sql=r"""REPLACE PROCEDURE TEST_PROCEDURE ( + IN val_in INTEGER, + INOUT val_in_out INTEGER, + OUT val_out INTEGER, + OUT value_str_out varchar(100) + ) BEGIN set val_out = val_in * 2; + set val_in_out = val_in_out * 4; + set value_str_out = 'string output'; END; """, ) - - opr_sp_in_inout = TeradataStoredProcedureOperator( - task_id="opr_sp_in_inout", + # [END howto_create_teradata_stored_procedure_operator_with_in_inout] + # [START howto_call_teradata_stored_procedure_operator_with_types] + opr_sp_types = TeradataStoredProcedureOperator( + task_id="opr_sp_types", procedure="TEST_PROCEDURE", - parameters=[3, int], + parameters=[3, 1, int, str], ) - - # [END howto_teradata_stored_procedure_operator_with_in_inout] - + # [END howto_call_teradata_stored_procedure_operator_with_types] + # [START howto_call_teradata_stored_procedure_operator_with_place_holder] + opr_sp_place_holder = TeradataStoredProcedureOperator( + task_id="opr_sp_place_holder", + procedure="TEST_PROCEDURE", + parameters=[3, 1, "?", "?"], + ) + # [END howto_call_teradata_stored_procedure_operator_with_place_holder] + # [START howto_call_teradata_stored_procedure_operator_with_dict_input] + opr_sp_dict = TeradataStoredProcedureOperator( + task_id="opr_sp_dict", + procedure="TEST_PROCEDURE", + parameters={"val_in": 3, "val_in_out": 1, "val_out": int, "str_out": str}, + ) + # [END howto_call_teradata_stored_procedure_operator_with_dict_input] + # [END howto_teradata_stored_procedure_operator_with_in_inout_out] + # [START howto_create_teradata_stored_procedure_operator_timestamp] + create_sp_timestamp = TeradataOperator( + task_id="create_sp_timestamp", + sql=r"""REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP) + BEGIN + -- Assign current timestamp to the OUT parameter + SET out_timestamp = CURRENT_TIMESTAMP; + END; + """, + ) + # [END howto_create_teradata_stored_procedure_operator_timestamp] + # [START howto_call_teradata_stored_procedure_operator_timestamp] + opr_sp_timestamp = TeradataStoredProcedureOperator( + task_id="opr_sp_timestamp", + procedure="GetTimestampOutParameter", + parameters=["?"], + ) + # [END howto_call_teradata_stored_procedure_operator_timestamp] # [START howto_teradata_stored_procedure_operator_with_in_out_dynamic_result] create_sp_param_dr = TeradataOperator( task_id="create_sp_param_dr", @@ -84,21 +124,41 @@ end ; """, ) + # [END howto_teradata_stored_procedure_operator_with_in_out_dynamic_result] + # [START howto_call_teradata_stored_procedure_operator_with_in_out_dynamic_result] opr_sp_param_dr = TeradataStoredProcedureOperator( task_id="opr_sp_param_dr", procedure="examplestoredproc", parameters=[3, 2, int], ) - # [END howto_teradata_stored_procedure_operator_with_in_out_dynamic_result] - + # [END howto_call_teradata_stored_procedure_operator_with_in_out_dynamic_result] # [START howto_teradata_stored_procedure_operator_drop] drop_sp = TeradataOperator( task_id="drop_sp", - sql=r"""drop procedure examplestoredproc; - """, + sql=r"drop procedure examplestoredproc;", + ) + drop_sp_test = TeradataOperator( + task_id="drop_sp_test", + sql=r"drop procedure TEST_PROCEDURE;", + ) + drop_sp_timestamp = TeradataOperator( + task_id="drop_sp_timestamp", + sql=r"drop procedure GetTimestampOutParameter;", ) # [END howto_teradata_stored_procedure_operator_drop] - (create_sp_in_inout >> opr_sp_in_inout >> create_sp_param_dr >> opr_sp_param_dr >> drop_sp) + ( + create_sp_in_inout + >> opr_sp_types + >> opr_sp_dict + >> opr_sp_place_holder + >> create_sp_param_dr + >> opr_sp_param_dr + >> drop_sp + >> drop_sp_test + >> create_sp_timestamp + >> opr_sp_timestamp + >> drop_sp_timestamp + ) # [END howto_teradata_operator_for_sp] diff --git a/tests/system/providers/teradata/example_teradata_to_teradata_transfer.py b/tests/system/providers/teradata/example_teradata_to_teradata_transfer.py index 054bae7b85342..441922c4ce180 100644 --- a/tests/system/providers/teradata/example_teradata_to_teradata_transfer.py +++ b/tests/system/providers/teradata/example_teradata_to_teradata_transfer.py @@ -108,9 +108,7 @@ read_data_src = TeradataOperator( task_id="read_data_src", conn_id=CONN_ID, - sql=""" - SELECT TOP 10 * from my_users_src order by user_id desc; - """, + sql="SELECT TOP 10 * from my_users_src order by user_id desc;", ) # [END teradata_to_teradata_transfer_operator_howto_guide_read_data_src] # [START teradata_to_teradata_transfer_operator_howto_guide_transfer_data] @@ -128,27 +126,21 @@ read_data_dest = TeradataOperator( task_id="read_data_dest", conn_id=CONN_ID, - sql=""" - SELECT TOP 10 * from my_users_dest order by user_id desc; - """, + sql="SELECT TOP 10 * from my_users_dest order by user_id desc;", ) # [END teradata_to_teradata_transfer_operator_howto_guide_read_data_dest] # [START teradata_to_teradata_transfer_operator_howto_guide_drop_src_table] drop_src_table = TeradataOperator( task_id="drop_src_table", conn_id=CONN_ID, - sql=""" - DROP TABLE my_users_src; - """, + sql=" DROP TABLE my_users_src;", ) # [END teradata_to_teradata_transfer_operator_howto_guide_drop_src_table] # [START teradata_to_teradata_transfer_operator_howto_guide_drop_dest_table] drop_dest_table = TeradataOperator( task_id="drop_dest_table", conn_id=CONN_ID, - sql=""" - DROP TABLE my_users_dest; - """, + sql="DROP TABLE my_users_dest;", ) # [END teradata_to_teradata_transfer_operator_howto_guide_drop_dest_table] ( From 4baf9abc07d6da4c7dfcd3bc187399f2b440ff5a Mon Sep 17 00:00:00 2001 From: Satish Ch Date: Fri, 3 May 2024 11:17:10 -0700 Subject: [PATCH 6/7] unit tests failure issue fixed --- tests/providers/teradata/operators/test_teradata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/providers/teradata/operators/test_teradata.py b/tests/providers/teradata/operators/test_teradata.py index de7fcad1073d4..a476ea58a0f9d 100644 --- a/tests/providers/teradata/operators/test_teradata.py +++ b/tests/providers/teradata/operators/test_teradata.py @@ -120,6 +120,6 @@ def test_execute(self, mock_run): mock.ANY, "{CALL test(?)}", autocommit=True, - parameters=parameters, + parameters=["value"], handler=mock.ANY, ) From dd2dce4585d4cfb3c68ec6df974ca926bd57f4b5 Mon Sep 17 00:00:00 2001 From: Satish Chinthanippu Date: Wed, 22 May 2024 16:14:15 +0530 Subject: [PATCH 7/7] Address PR comments (#42) * Modified conn_id to teradata_conn_id for Teradata Operator * static check issue fixed --- airflow/providers/teradata/operators/teradata.py | 14 +++++++------- .../operators/s3_to_teradata.rst | 9 ++++++++- .../teradata/operators/test_teradata.py | 16 ++++++++++------ .../example_azure_blob_to_teradata_transfer.py | 10 +--------- .../teradata/example_s3_to_teradata_transfer.py | 9 +-------- .../providers/teradata/example_ssl_teradata.py | 3 ++- .../providers/teradata/example_teradata.py | 3 ++- .../teradata/example_teradata_call_sp.py | 2 +- .../example_teradata_to_teradata_transfer.py | 9 +-------- 9 files changed, 33 insertions(+), 42 deletions(-) diff --git a/airflow/providers/teradata/operators/teradata.py b/airflow/providers/teradata/operators/teradata.py index 00fd6062fe4d7..00cd7a86c7d0b 100644 --- a/airflow/providers/teradata/operators/teradata.py +++ b/airflow/providers/teradata/operators/teradata.py @@ -38,7 +38,7 @@ class TeradataOperator(SQLExecuteQueryOperator): :ref:`howto/operator:TeradataOperator` :param sql: the SQL query to be executed as a single string, or a list of str (sql statements) - :param conn_id: reference to a predefined database + :param teradata_conn_id: reference to a predefined database :param autocommit: if True, each command is automatically committed.(default value: False) :param parameters: (optional) the parameters to render the SQL query with. :param schema: The Teradata database to connect to. @@ -54,7 +54,7 @@ class TeradataOperator(SQLExecuteQueryOperator): def __init__( self, - conn_id: str = TeradataHook.default_conn_name, + teradata_conn_id: str = TeradataHook.default_conn_name, schema: str | None = None, **kwargs, ) -> None: @@ -65,7 +65,7 @@ def __init__( **hook_params, } super().__init__(**kwargs) - self.conn_id = conn_id + self.conn_id = teradata_conn_id class TeradataStoredProcedureOperator(BaseOperator): @@ -73,7 +73,7 @@ class TeradataStoredProcedureOperator(BaseOperator): Executes stored procedure in a specific Teradata database. :param procedure: name of stored procedure to call (templated) - :param conn_id: The :ref:`Teradata connection id ` + :param teradata_conn_id: The :ref:`Teradata connection id ` reference to a specific Teradata database. :param parameters: (optional, templated) the parameters provided in the call @@ -89,15 +89,15 @@ def __init__( self, *, procedure: str, - conn_id: str = TeradataHook.default_conn_name, + teradata_conn_id: str = TeradataHook.default_conn_name, parameters: dict | list | None = None, **kwargs, ) -> None: super().__init__(**kwargs) - self.conn_id = conn_id + self.teradata_conn_id = teradata_conn_id self.procedure = procedure self.parameters = parameters def execute(self, context: Context): - hook = TeradataHook(teradata_conn_id=self.conn_id) + hook = TeradataHook(teradata_conn_id=self.teradata_conn_id) return hook.callproc(self.procedure, autocommit=True, parameters=self.parameters) diff --git a/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst b/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst index d5e862beb9c0a..a6ecbc6f146b8 100644 --- a/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst +++ b/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst @@ -24,10 +24,17 @@ S3ToTeradataOperator ============================ The purpose of ``S3ToTeradataOperator`` is to define tasks involving CSV, JSON and Parquet -format data transfer from an AWS Simple Storage Service (S3) to Teradata table. +format data transfer from an AWS Simple Storage Service (S3) to Teradata table. This operator uses +Teradata READ_NOS feature to transfer data from an AWS Simple Storage Service (S3) to Teradata table. +READ_NOS is a table operator in Teradata Vantage that allows users to list external files at a specified location. +For more details, see `READ_NOS Functionality `_ + Use the :class:`S3ToTeradataOperator ` to transfer data from S3 to Teradata. + .. note:: + The current version of ``S3ToTeradataOperator`` does not support accessing AWS S3 with Security Token Service (STS) temporary credentials. Instead, it exclusively supports accessing with long-term credentials. + Transferring data in CSV format from S3 to Teradata --------------------------------------------------- diff --git a/tests/providers/teradata/operators/test_teradata.py b/tests/providers/teradata/operators/test_teradata.py index a476ea58a0f9d..0c614b42b262d 100644 --- a/tests/providers/teradata/operators/test_teradata.py +++ b/tests/providers/teradata/operators/test_teradata.py @@ -59,13 +59,15 @@ def test_get_hook_default(self, mock_get_db_hook): @mock.patch("airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator.get_db_hook") def test_execute(self, mock_get_db_hook): sql = "SELECT * FROM test_table" - conn_id = "teradata_default" + teradata_conn_id = "teradata_default" parameters = {"parameter": "value"} autocommit = False context = "test_context" task_id = "test_task_id" - operator = TeradataOperator(sql=sql, conn_id=conn_id, parameters=parameters, task_id=task_id) + operator = TeradataOperator( + sql=sql, teradata_conn_id=teradata_conn_id, parameters=parameters, task_id=task_id + ) operator.execute(context=context) mock_get_db_hook.return_value.run.assert_called_once_with( sql=sql, @@ -82,13 +84,15 @@ def test_teradata_operator_test_multi(self, mock_get_db_hook): "TRUNCATE TABLE test_airflow", "INSERT INTO test_airflow VALUES ('X')", ] - conn_id = "teradata_default" + teradata_conn_id = "teradata_default" parameters = {"parameter": "value"} autocommit = False context = "test_context" task_id = "test_task_id" - operator = TeradataOperator(sql=sql, conn_id=conn_id, parameters=parameters, task_id=task_id) + operator = TeradataOperator( + sql=sql, teradata_conn_id=teradata_conn_id, parameters=parameters, task_id=task_id + ) operator.execute(context=context) mock_get_db_hook.return_value.run.assert_called_once_with( sql=sql, @@ -103,14 +107,14 @@ class TestTeradataStoredProcedureOperator: @mock.patch.object(TeradataHook, "run", autospec=TeradataHook.run) def test_execute(self, mock_run): procedure = "test" - conn_id = "teradata_default" + teradata_conn_id = "teradata_default" parameters = {"parameter": "value"} context = "test_context" task_id = "test_task_id" operator = TeradataStoredProcedureOperator( procedure=procedure, - conn_id=conn_id, + teradata_conn_id=teradata_conn_id, parameters=parameters, task_id=task_id, ) diff --git a/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py index 551f86024dc74..5d961550de59d 100644 --- a/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py +++ b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py @@ -51,7 +51,7 @@ start_date=datetime.datetime(2020, 2, 2), schedule="@once", catchup=False, - default_args={"conn_id": "teradata_default"}, + default_args={"teradata_conn_id": CONN_ID}, ) as dag: # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] transfer_data_csv = AzureBlobStorageToTeradataOperator( @@ -59,21 +59,18 @@ blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/", teradata_table="example_blob_teradata_csv", azure_conn_id="wasb_default", - teradata_conn_id="teradata_default", trigger_rule="all_done", ) # [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] # [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv] read_data_table_csv = TeradataOperator( task_id="read_data_table_csv", - conn_id=CONN_ID, sql="SELECT count(1) from example_blob_teradata_csv;", ) # [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv] # [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] drop_table_csv = TeradataOperator( task_id="drop_table_csv", - conn_id=CONN_ID, sql="DROP TABLE example_blob_teradata_csv;", ) # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] @@ -83,21 +80,18 @@ blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/", teradata_table="example_blob_teradata_json", azure_conn_id="wasb_default", - teradata_conn_id="teradata_default", trigger_rule="all_done", ) # [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json] # [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_json] read_data_table_json = TeradataOperator( task_id="read_data_table_json", - conn_id=CONN_ID, sql="SELECT count(1) from example_blob_teradata_json;", ) # [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_json] # [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_json] drop_table_json = TeradataOperator( task_id="drop_table_json", - conn_id=CONN_ID, sql="DROP TABLE example_blob_teradata_json;", ) # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_json] @@ -114,14 +108,12 @@ # [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_parquet] read_data_table_parquet = TeradataOperator( task_id="read_data_table_parquet", - conn_id=CONN_ID, sql="SELECT count(1) from example_blob_teradata_parquet;", ) # [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_parquet] # [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_parquet] drop_table_parquet = TeradataOperator( task_id="drop_table_parquet", - conn_id=CONN_ID, sql="DROP TABLE example_blob_teradata_parquet;", ) # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_parquet] diff --git a/tests/system/providers/teradata/example_s3_to_teradata_transfer.py b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py index 594085dedafd2..fc5e2627393f8 100644 --- a/tests/system/providers/teradata/example_s3_to_teradata_transfer.py +++ b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py @@ -52,7 +52,7 @@ start_date=datetime.datetime(2020, 2, 2), schedule="@once", catchup=False, - default_args={"conn_id": "teradata_default"}, + default_args={"teradata_conn_id": CONN_ID}, ) as dag: # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] transfer_data_csv = S3ToTeradataOperator( @@ -61,7 +61,6 @@ public_bucket=True, teradata_table="example_s3_teradata_csv", aws_conn_id="aws_default", - teradata_conn_id="teradata_default", trigger_rule="all_done", ) # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] @@ -86,21 +85,18 @@ public_bucket=True, teradata_table="example_s3_teradata_json", aws_conn_id="aws_default", - teradata_conn_id="teradata_default", trigger_rule="all_done", ) # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json] # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_json] read_data_table_json = TeradataOperator( task_id="read_data_table_json", - conn_id=CONN_ID, sql="SELECT * from example_s3_teradata_json;", ) # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_json] # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_json] drop_table_json = TeradataOperator( task_id="drop_table_json", - conn_id=CONN_ID, sql="DROP TABLE example_s3_teradata_json;", ) # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_json] @@ -111,21 +107,18 @@ public_bucket=True, teradata_table="example_s3_teradata_parquet", aws_conn_id="aws_default", - teradata_conn_id="teradata_default", trigger_rule="all_done", ) # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet] # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet] read_data_table_parquet = TeradataOperator( task_id="read_data_table_parquet", - conn_id=CONN_ID, sql="SELECT * from example_s3_teradata_parquet;", ) # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet] # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet] drop_table_parquet = TeradataOperator( task_id="drop_table_parquet", - conn_id=CONN_ID, sql="DROP TABLE example_s3_teradata_parquet;", ) # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet] diff --git a/tests/system/providers/teradata/example_ssl_teradata.py b/tests/system/providers/teradata/example_ssl_teradata.py index 18015059bc880..1673bd791a79b 100644 --- a/tests/system/providers/teradata/example_ssl_teradata.py +++ b/tests/system/providers/teradata/example_ssl_teradata.py @@ -47,13 +47,14 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") DAG_ID = "example_ssl_teradata" +CONN_ID = "teradata_ssl_default" with DAG( dag_id=DAG_ID, start_date=datetime.datetime(2020, 2, 2), schedule="@once", catchup=False, - default_args={"conn_id": "teradata_ssl_default"}, + default_args={"teradata_conn_id": CONN_ID}, ) as dag: # [START teradata_operator_howto_guide_create_country_table] create_country_table = TeradataOperator( diff --git a/tests/system/providers/teradata/example_teradata.py b/tests/system/providers/teradata/example_teradata.py index 6d4661b0462df..1fd587cdf8f71 100644 --- a/tests/system/providers/teradata/example_teradata.py +++ b/tests/system/providers/teradata/example_teradata.py @@ -42,13 +42,14 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") DAG_ID = "example_teradata" +CONN_ID = "teradata_default" with DAG( dag_id=DAG_ID, start_date=datetime.datetime(2020, 2, 2), schedule="@once", catchup=False, - default_args={"conn_id": "teradata_default"}, + default_args={"teradata_conn_id": CONN_ID}, ) as dag: # [START teradata_operator_howto_guide_create_table] create_table = TeradataOperator( diff --git a/tests/system/providers/teradata/example_teradata_call_sp.py b/tests/system/providers/teradata/example_teradata_call_sp.py index 911fb5cf82155..98ce85fffdfdc 100644 --- a/tests/system/providers/teradata/example_teradata_call_sp.py +++ b/tests/system/providers/teradata/example_teradata_call_sp.py @@ -47,7 +47,7 @@ max_active_runs=1, max_active_tasks=3, catchup=False, - default_args={"conn_id": CONN_ID}, + default_args={"teradata_conn_id": CONN_ID}, schedule="@once", start_date=datetime(2023, 1, 1), ) as dag: diff --git a/tests/system/providers/teradata/example_teradata_to_teradata_transfer.py b/tests/system/providers/teradata/example_teradata_to_teradata_transfer.py index 441922c4ce180..ac2517a33f519 100644 --- a/tests/system/providers/teradata/example_teradata_to_teradata_transfer.py +++ b/tests/system/providers/teradata/example_teradata_to_teradata_transfer.py @@ -51,12 +51,11 @@ start_date=datetime.datetime(2020, 2, 2), schedule="@once", catchup=False, - default_args={"conn_id": "teradata_default"}, + default_args={"teradata_conn_id": CONN_ID}, ) as dag: # [START teradata_to_teradata_transfer_operator_howto_guide_create_src_table] create_src_table = TeradataOperator( task_id="create_src_table", - conn_id=CONN_ID, sql=""" CREATE TABLE my_users_src, FALLBACK ( @@ -76,7 +75,6 @@ # [START teradata_to_teradata_transfer_operator_howto_guide_create_dest_table] create_dest_table = TeradataOperator( task_id="create_dest_table", - conn_id=CONN_ID, sql=""" CREATE TABLE my_users_dest, FALLBACK ( @@ -96,7 +94,6 @@ # [START teradata_to_teradata_transfer_operator_howto_guide_insert_data_src] insert_data_src = TeradataOperator( task_id="insert_data_src", - conn_id=CONN_ID, sql=""" INSERT INTO my_users_src(user_name) VALUES ('User1'); INSERT INTO my_users_src(user_name) VALUES ('User2'); @@ -107,7 +104,6 @@ # [START teradata_to_teradata_transfer_operator_howto_guide_read_data_src] read_data_src = TeradataOperator( task_id="read_data_src", - conn_id=CONN_ID, sql="SELECT TOP 10 * from my_users_src order by user_id desc;", ) # [END teradata_to_teradata_transfer_operator_howto_guide_read_data_src] @@ -125,21 +121,18 @@ # [START teradata_to_teradata_transfer_operator_howto_guide_read_data_dest] read_data_dest = TeradataOperator( task_id="read_data_dest", - conn_id=CONN_ID, sql="SELECT TOP 10 * from my_users_dest order by user_id desc;", ) # [END teradata_to_teradata_transfer_operator_howto_guide_read_data_dest] # [START teradata_to_teradata_transfer_operator_howto_guide_drop_src_table] drop_src_table = TeradataOperator( task_id="drop_src_table", - conn_id=CONN_ID, sql=" DROP TABLE my_users_src;", ) # [END teradata_to_teradata_transfer_operator_howto_guide_drop_src_table] # [START teradata_to_teradata_transfer_operator_howto_guide_drop_dest_table] drop_dest_table = TeradataOperator( task_id="drop_dest_table", - conn_id=CONN_ID, sql="DROP TABLE my_users_dest;", ) # [END teradata_to_teradata_transfer_operator_howto_guide_drop_dest_table]