Skip to content

Commit

Permalink
pre-commit and breeze doc issues fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
satish-chinthanippu committed Apr 22, 2024
1 parent 6a8ee3c commit b4345a9
Show file tree
Hide file tree
Showing 20 changed files with 266 additions and 186 deletions.
7 changes: 5 additions & 2 deletions airflow/providers/teradata/hooks/teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,16 @@ def callproc(
def handler(cursor):
records = cursor.fetchall()

if records is None:
return

if isinstance(records, list):
return [row for row in records]

if isinstance(records, dict):
return {n: v for (n, v) in records.items()}

raise TypeError(f"Unexpected results: {cursor.fetchall()!r}")
self.log.info("records - %s", records)
raise TypeError(f"Unexpected results: {records}")

result = self.run(
sql,
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/teradata/operators/teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class TeradataOperator(SQLExecuteQueryOperator):
"""

template_fields: Sequence[str] = (
"parameters",
"sql",
"parameters",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"sql": "sql"}
Expand Down Expand Up @@ -80,8 +80,8 @@ class TeradataStoredProcedureOperator(BaseOperator):
"""

template_fields: Sequence[str] = (
"parameters",
"procedure",
"parameters",
)
ui_color = "#ededed"

Expand Down
33 changes: 14 additions & 19 deletions airflow/providers/teradata/transfers/azure_blob_to_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,23 @@ class AzureBlobStorageToTeradataOperator(BaseOperator):
Loads CSV, JSON and Parquet format data from Azure Blob Storage to Teradata.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:AzureBlobStorageToTeradataOperator`
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:AzureBlobStorageToTeradataOperator`
:param blob_source_key: The object store URI with blob location. The URI format is /az/YOUR-STORAGE-ACCOUNT.blob.core.windows.net/YOUR-CONTAINER/YOUR-BLOB-LOCATION. Refer to
:param blob_source_key: The URI format specifying the location of the Azure blob object store.(templated)
The URI format is /az/YOUR-STORAGE-ACCOUNT.blob.core.windows.net/YOUR-CONTAINER/YOUR-BLOB-LOCATION.
Refer to
https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US
:param azure_conn_id: The :ref:`Azure connection id<howto/connection:azure>`
which refers to the information to connect to Azure service.
:param teradata_table: destination table to insert rows.
:param teradata_conn_id: :ref:`Teradata connection <howto/connection:Teradata>`
which refers to the information to connect to Teradata
:param azure_conn_id: The Airflow WASB connection used for azure blob credentials.
:param teradata_table: The name of the teradata table to which the data is transferred.(templated)
:param teradata_conn_id: The connection ID used to connect to Teradata
:ref:`Teradata connection <howto/connection:Teradata>`
Note that ``blob_source_key`` and ``teradata_table`` are
templated, so you can use variables in them if you wish.
"""

template_fields: Sequence[str] = ("blob_source_key", "teradata_table")
template_fields_renderers = {"blob_source_key": "sql", "teradata_table": "py"}
ui_color = "#e07c24"

def __init__(
Expand All @@ -66,13 +68,8 @@ def __init__(
self.teradata_conn_id = teradata_conn_id

def execute(self, context: Context) -> None:
"""
Execute the transfer operation from Azure Blob Storage to Teradata.
:param context: The context that is being provided when executing.
"""
self.log.info("transferring data from %s to teradata table %s...", self.blob_source_key,
self.teradata_table)
azure_hook = WasbHook(wasb_conn_id=self.azure_conn_id)
conn = azure_hook.get_connection(self.azure_conn_id)
# Obtaining the Azure client ID and Azure secret in order to access a specified Blob container
Expand All @@ -94,8 +91,6 @@ def execute(self, context: Context) -> None:
) AS d
) WITH DATA
"""

self.log.info("COPYING using READ_NOS and CREATE TABLE AS feature of teradata....")
try:
teradata_hook.run(sql, True)
except Exception as ex:
Expand All @@ -111,4 +106,4 @@ def execute(self, context: Context) -> None:
raise
self.log.error("Issue occurred at Teradata: %s", str(ex))
raise
self.log.info("COPYING is completed")
self.log.info("The transfer of data from Azure Blob to Teradata was successful")
76 changes: 42 additions & 34 deletions airflow/providers/teradata/transfers/s3_to_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,23 @@ class S3ToTeradataOperator(BaseOperator):
Loads CSV, JSON and Parquet format data from Amazon S3 to Teradata.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:S3ToTeradataOperator`
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:S3ToTeradataOperator`
:param s3_source_key: The path to the file (S3 key) that will be loaded into Teradata.
:param teradata_table: destination table to insert rows.
:param aws_conn_id: reference to a specific S3 connection.
:param teradata_conn_id: :ref:`Teradata connection <howto/connection:Teradata>`.
:param aws_access_key: S3 bucket access key.
:param aws_access_secret: S3 bucket access secret.
:param s3_source_key: The URI format specifying the location of the S3 object store.(templated)
The URI format is /s3/YOUR-BUCKET.s3.amazonaws.com/YOUR-BUCKET-NAME.
Refer to
https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US
:param teradata_table: The name of the teradata table to which the data is transferred.(templated)
:param aws_conn_id: The Airflow AWS connection used for AWS credentials.
:param teradata_conn_id: The connection ID used to connect to Teradata
:ref:`Teradata connection <howto/connection:Teradata>`.
Note that ``s3_source_key`` and ``teradata_table`` are
templated, so you can use variables in them if you wish.
"""

template_fields: Sequence[str] = ("s3_source_key", "teradata_table")
template_fields_renderers = {"s3_source_key": "sql", "teradata_table": "py"}
ui_color = "#e07c24"

def __init__(
Expand All @@ -54,36 +58,28 @@ def __init__(
teradata_table: str,
aws_conn_id: str = "aws_default",
teradata_conn_id: str = "teradata_default",
aws_access_key: str = "",
aws_access_secret: str = "",
**kwargs,
**kwargs
) -> None:
super().__init__(**kwargs)
self.s3_source_key = s3_source_key
self.teradata_table = teradata_table
self.aws_conn_id = aws_conn_id
self.teradata_conn_id = teradata_conn_id
self.aws_access_key = aws_access_key
self.aws_access_secret = aws_access_secret

def execute(self, context: Context) -> None:
self.log.info("Loading %s to Teradata table %s...", self.s3_source_key, self.teradata_table)

access_key = self.aws_access_key
access_secret = self.aws_access_secret
self.log.info("transferring data from %s to teradata table %s...", self.s3_source_key, self.teradata_table)

if not access_key or not access_secret:
s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
access_key = (
s3_hook.conn_config.aws_access_key_id
if s3_hook.conn_config.aws_access_key_id is not None
else ""
)
access_secret = (
s3_hook.conn_config.aws_secret_access_key
if s3_hook.conn_config.aws_secret_access_key is not None
else ""
)
s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
access_key = (
s3_hook.conn_config.aws_access_key_id
if s3_hook.conn_config.aws_access_key_id is not None
else ""
)
access_secret = (
s3_hook.conn_config.aws_secret_access_key
if s3_hook.conn_config.aws_secret_access_key is not None
else ""
)

teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
sql = f"""
Expand All @@ -96,7 +92,19 @@ def execute(self, context: Context) -> None:
) AS d
) WITH DATA
"""
self.log.info("COPYING using READ_NOS and CREATE TABLE AS feature of teradata....")
self.log.info("sql : %s", sql)
teradata_hook.run(sql)
self.log.info("COPYING is completed")
try:
teradata_hook.run(sql, True)
except Exception as ex:
# Handling permission issue errors
if "Error 3524" in str(ex):
self.log.error("The user does not have CREATE TABLE access in teradata")
raise
if "Error 9134" in str(ex):
self.log.error(
"There is an issue with the transfer operation. Please validate s3 and "
"teradata connection details."
)
raise
self.log.error("Issue occurred at Teradata: %s", str(ex))
raise
self.log.info("The transfer of data from S3 to Teradata was successful")
16 changes: 0 additions & 16 deletions airflow/providers/teradata/triggers/__init__.py

This file was deleted.

16 changes: 0 additions & 16 deletions airflow/providers/teradata/utils/__init__.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,58 @@
under the License.
.. _howto/operator:AzureBlobStorageToTeradataOperator:

============================
Azure Blob to Teradata
============================

Use the ``AzureBlobStorageToTeradataOperator`` transfer operator to copy CSV, JSON and Parquet format data from Azure Blob Storage to Teradata table.
==================================
AzureBlobStorageToTeradataOperator
==================================

Operators
---------
The purpose of ``AzureBlobStorageToTeradataOperator`` is to define tasks involving CSV, JSON and Parquet
format data transfer from an Azure Blob Storage to Teradata table.
Use the :class:`AzureBlobStorageToTeradataOperator <airflow.providers.teradata.transfers.azure_blob_to_teradata>`
to transfer data from an Azure Blob Storage to Teradata.

.. _howto/operator:AzureBlobStorageToTeradataOperator:

Azure Blob To Teradata transfer operator
==============================================
Transferring data in CSV format from Azure Blob Storage to Teradata
-------------------------------------------------------------------

An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from Azure Blob Storage
to teradata table is as follows:

.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py
:language: python
:start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv]
:end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv]

Transferring data in JSON format from Azure Blob Storage to Teradata
--------------------------------------------------------------------

This operator loads CSV, JSON and Parquet format data from Amazon S3 to Teradata table.
An example usage of the AzureBlobStorageToTeradataOperator to transfer JSON data format from Azure Blob Storage
to teradata table is as follows:

Using the Operator
^^^^^^^^^^^^^^^^^^
.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py
:language: python
:start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json]
:end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json]

Transferring data in PARQUET format from Azure Blob Storage to Teradata
-----------------------------------------------------------------------

An example usage of the AzureBlobStorageToTeradataOperator to transfer PARQUET data format from Azure Blob Storage
to teradata table is as follows:

.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py
:language: python
:start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_parquet]
:end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_parquet]

To transfer CSV, JSON and Parquet data from Azure Blob to Teradata, use the
:class:`~airflow.providers.teradata.transfers.azure_blob_to_teradata.AzureBlobStorageToTeradataOperator`.
The complete ``AzureBlobStorageToTeradataOperator`` Operator DAG
----------------------------------------------------------------

An example usage of the AzureBlobStorageToTeradataOperator is as follows:
When we put everything together, our DAG should look like this:

.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer_operator.py
.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py
:language: python
:start-after: [START howto_transfer_operator_azure_blob_to_teradata]
:end-before: [END howto_transfer_operator_azure_blob_to_teradata]
:start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide]
:end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide]
55 changes: 39 additions & 16 deletions docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,55 @@
under the License.
.. _howto/operator:S3ToTeradataOperator:


============================
Amazon S3 to Teradata
S3ToTeradataOperator
============================

Use the ``S3ToTeradataOperator`` transfer operator to copy CSV, JSON and Parquet format data from an Amazon Simple Storage Service (S3) to Teradata table.
The purpose of ``S3ToTeradataOperator`` is to define tasks involving CSV, JSON and Parquet
format data transfer from an AWS Simple Storage Service (S3) to Teradata table.
Use the :class:`S3ToTeradataOperator <airflow.providers.teradata.transfers.s3_to_teradata>`
to transfer data from S3 to Teradata.

Operators
---------

.. _howto/operator:S3ToTeradataOperator:
Transferring data in CSV format from S3 to Teradata
---------------------------------------------------

An example usage of the S3ToTeradataOperator to transfer CSV data format from S3 to teradata table is as follows:

.. exampleinclude:: /../../tests/system/providers/teradata/example_s3_to_teradata_transfer.py
:language: python
:start-after: [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv]
:end-before: [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv]

Amazon S3 To Teradata transfer operator
==============================================
Transferring data in JSON format from S3 to Teradata
----------------------------------------------------

This operator loads CSV, JSON and Parquet format data from Amazon S3 to Teradata table.
An example usage of the S3ToTeradataOperator to transfer JSON data format from S3 to teradata table is as follows:

Using the Operator
^^^^^^^^^^^^^^^^^^
.. exampleinclude:: /../../tests/system/providers/teradata/example_s3_to_teradata_transfer.py
:language: python
:start-after: [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json]
:end-before: [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json]

Transferring data in PARQUET format from S3 to Teradata
-------------------------------------------------------

An example usage of the S3ToTeradataOperator to transfer PARQUET data format from S3 to teradata table is as follows:

.. exampleinclude:: /../../tests/system/providers/teradata/example_s3_to_teradata_transfer.py
:language: python
:start-after: [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet]
:end-before: [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet]

To transfer CSV, JSON and Parquet data from Amazon S3 to Teradata, use the
:class:`~airflow.providers.teradata.transfers.s3_to_teradata.S3ToTeradataOperator`.
The complete ``S3ToTeradataOperator`` Operator DAG
--------------------------------------------------

An example usage of the S3ToTeradataOperator is as follows:
When we put everything together, our DAG should look like this:

.. exampleinclude:: /../../airflow/providers/teradata/example_dags/example_s3_to_teradata_transfer_operator.py
.. exampleinclude:: /../../tests/system/providers/teradata/example_s3_to_teradata_transfer.py
:language: python
:start-after: [START howto_transfer_operator_s3_to_teradata]
:end-before: [END howto_transfer_operator_s3_to_teradata]
:start-after: [START s3_to_teradata_transfer_operator_howto_guide]
:end-before: [END s3_to_teradata_transfer_operator_howto_guide]

0 comments on commit b4345a9

Please sign in to comment.