Skip to content

Commit

Permalink
Address PR comments (#42)
Browse files Browse the repository at this point in the history
* Modified conn_id to teradata_conn_id for Teradata Operator

* static check issue fixed
  • Loading branch information
sc250072 committed May 22, 2024
1 parent 4baf9ab commit dd2dce4
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 42 deletions.
14 changes: 7 additions & 7 deletions airflow/providers/teradata/operators/teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class TeradataOperator(SQLExecuteQueryOperator):
:ref:`howto/operator:TeradataOperator`
:param sql: the SQL query to be executed as a single string, or a list of str (sql statements)
:param conn_id: reference to a predefined database
:param teradata_conn_id: reference to a predefined database
:param autocommit: if True, each command is automatically committed.(default value: False)
:param parameters: (optional) the parameters to render the SQL query with.
:param schema: The Teradata database to connect to.
Expand All @@ -54,7 +54,7 @@ class TeradataOperator(SQLExecuteQueryOperator):

def __init__(
self,
conn_id: str = TeradataHook.default_conn_name,
teradata_conn_id: str = TeradataHook.default_conn_name,
schema: str | None = None,
**kwargs,
) -> None:
Expand All @@ -65,15 +65,15 @@ def __init__(
**hook_params,
}
super().__init__(**kwargs)
self.conn_id = conn_id
self.conn_id = teradata_conn_id


class TeradataStoredProcedureOperator(BaseOperator):
"""
Executes stored procedure in a specific Teradata database.
:param procedure: name of stored procedure to call (templated)
:param conn_id: The :ref:`Teradata connection id <howto/connection:teradata>`
:param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>`
reference to a specific Teradata database.
:param parameters: (optional, templated) the parameters provided in the call
Expand All @@ -89,15 +89,15 @@ def __init__(
self,
*,
procedure: str,
conn_id: str = TeradataHook.default_conn_name,
teradata_conn_id: str = TeradataHook.default_conn_name,
parameters: dict | list | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.conn_id = conn_id
self.teradata_conn_id = teradata_conn_id
self.procedure = procedure
self.parameters = parameters

def execute(self, context: Context):
hook = TeradataHook(teradata_conn_id=self.conn_id)
hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
return hook.callproc(self.procedure, autocommit=True, parameters=self.parameters)
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,17 @@ S3ToTeradataOperator
============================

The purpose of ``S3ToTeradataOperator`` is to define tasks involving CSV, JSON and Parquet
format data transfer from an AWS Simple Storage Service (S3) to Teradata table.
format data transfer from an AWS Simple Storage Service (S3) to Teradata table. This operator uses
Teradata READ_NOS feature to transfer data from an AWS Simple Storage Service (S3) to Teradata table.
READ_NOS is a table operator in Teradata Vantage that allows users to list external files at a specified location.
For more details, see `READ_NOS Functionality <https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Reading-Data/Examples-For-DBAs-and-Advanced-Users/Loading-External-Data-into-the-Database/Loading-External-Data-into-the-Database-Using-READ_NOS-and-CREATE-TABLE-AS>`_

Use the :class:`S3ToTeradataOperator <airflow.providers.teradata.transfers.s3_to_teradata>`
to transfer data from S3 to Teradata.

.. note::
The current version of ``S3ToTeradataOperator`` does not support accessing AWS S3 with Security Token Service (STS) temporary credentials. Instead, it exclusively supports accessing with long-term credentials.


Transferring data in CSV format from S3 to Teradata
---------------------------------------------------
Expand Down
16 changes: 10 additions & 6 deletions tests/providers/teradata/operators/test_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ def test_get_hook_default(self, mock_get_db_hook):
@mock.patch("airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator.get_db_hook")
def test_execute(self, mock_get_db_hook):
sql = "SELECT * FROM test_table"
conn_id = "teradata_default"
teradata_conn_id = "teradata_default"
parameters = {"parameter": "value"}
autocommit = False
context = "test_context"
task_id = "test_task_id"

operator = TeradataOperator(sql=sql, conn_id=conn_id, parameters=parameters, task_id=task_id)
operator = TeradataOperator(
sql=sql, teradata_conn_id=teradata_conn_id, parameters=parameters, task_id=task_id
)
operator.execute(context=context)
mock_get_db_hook.return_value.run.assert_called_once_with(
sql=sql,
Expand All @@ -82,13 +84,15 @@ def test_teradata_operator_test_multi(self, mock_get_db_hook):
"TRUNCATE TABLE test_airflow",
"INSERT INTO test_airflow VALUES ('X')",
]
conn_id = "teradata_default"
teradata_conn_id = "teradata_default"
parameters = {"parameter": "value"}
autocommit = False
context = "test_context"
task_id = "test_task_id"

operator = TeradataOperator(sql=sql, conn_id=conn_id, parameters=parameters, task_id=task_id)
operator = TeradataOperator(
sql=sql, teradata_conn_id=teradata_conn_id, parameters=parameters, task_id=task_id
)
operator.execute(context=context)
mock_get_db_hook.return_value.run.assert_called_once_with(
sql=sql,
Expand All @@ -103,14 +107,14 @@ class TestTeradataStoredProcedureOperator:
@mock.patch.object(TeradataHook, "run", autospec=TeradataHook.run)
def test_execute(self, mock_run):
procedure = "test"
conn_id = "teradata_default"
teradata_conn_id = "teradata_default"
parameters = {"parameter": "value"}
context = "test_context"
task_id = "test_task_id"

operator = TeradataStoredProcedureOperator(
procedure=procedure,
conn_id=conn_id,
teradata_conn_id=teradata_conn_id,
parameters=parameters,
task_id=task_id,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,26 @@
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"conn_id": "teradata_default"},
default_args={"teradata_conn_id": CONN_ID},
) as dag:
# [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv]
transfer_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_csv",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
teradata_table="example_blob_teradata_csv",
azure_conn_id="wasb_default",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv]
# [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
read_data_table_csv = TeradataOperator(
task_id="read_data_table_csv",
conn_id=CONN_ID,
sql="SELECT count(1) from example_blob_teradata_csv;",
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
# [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv]
drop_table_csv = TeradataOperator(
task_id="drop_table_csv",
conn_id=CONN_ID,
sql="DROP TABLE example_blob_teradata_csv;",
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv]
Expand All @@ -83,21 +80,18 @@
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
teradata_table="example_blob_teradata_json",
azure_conn_id="wasb_default",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json]
# [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_json]
read_data_table_json = TeradataOperator(
task_id="read_data_table_json",
conn_id=CONN_ID,
sql="SELECT count(1) from example_blob_teradata_json;",
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_json]
# [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_json]
drop_table_json = TeradataOperator(
task_id="drop_table_json",
conn_id=CONN_ID,
sql="DROP TABLE example_blob_teradata_json;",
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_json]
Expand All @@ -114,14 +108,12 @@
# [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_parquet]
read_data_table_parquet = TeradataOperator(
task_id="read_data_table_parquet",
conn_id=CONN_ID,
sql="SELECT count(1) from example_blob_teradata_parquet;",
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_parquet]
# [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_parquet]
drop_table_parquet = TeradataOperator(
task_id="drop_table_parquet",
conn_id=CONN_ID,
sql="DROP TABLE example_blob_teradata_parquet;",
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_parquet]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"conn_id": "teradata_default"},
default_args={"teradata_conn_id": CONN_ID},
) as dag:
# [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv]
transfer_data_csv = S3ToTeradataOperator(
Expand All @@ -61,7 +61,6 @@
public_bucket=True,
teradata_table="example_s3_teradata_csv",
aws_conn_id="aws_default",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
# [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv]
Expand All @@ -86,21 +85,18 @@
public_bucket=True,
teradata_table="example_s3_teradata_json",
aws_conn_id="aws_default",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
# [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json]
# [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_json]
read_data_table_json = TeradataOperator(
task_id="read_data_table_json",
conn_id=CONN_ID,
sql="SELECT * from example_s3_teradata_json;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_json]
# [START s3_to_teradata_transfer_operator_howto_guide_drop_table_json]
drop_table_json = TeradataOperator(
task_id="drop_table_json",
conn_id=CONN_ID,
sql="DROP TABLE example_s3_teradata_json;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_drop_table_json]
Expand All @@ -111,21 +107,18 @@
public_bucket=True,
teradata_table="example_s3_teradata_parquet",
aws_conn_id="aws_default",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
# [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet]
# [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet]
read_data_table_parquet = TeradataOperator(
task_id="read_data_table_parquet",
conn_id=CONN_ID,
sql="SELECT * from example_s3_teradata_parquet;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet]
# [START s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet]
drop_table_parquet = TeradataOperator(
task_id="drop_table_parquet",
conn_id=CONN_ID,
sql="DROP TABLE example_s3_teradata_parquet;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet]
Expand Down
3 changes: 2 additions & 1 deletion tests/system/providers/teradata/example_ssl_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_ssl_teradata"
CONN_ID = "teradata_ssl_default"

with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"conn_id": "teradata_ssl_default"},
default_args={"teradata_conn_id": CONN_ID},
) as dag:
# [START teradata_operator_howto_guide_create_country_table]
create_country_table = TeradataOperator(
Expand Down
3 changes: 2 additions & 1 deletion tests/system/providers/teradata/example_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_teradata"
CONN_ID = "teradata_default"

with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"conn_id": "teradata_default"},
default_args={"teradata_conn_id": CONN_ID},
) as dag:
# [START teradata_operator_howto_guide_create_table]
create_table = TeradataOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
max_active_runs=1,
max_active_tasks=3,
catchup=False,
default_args={"conn_id": CONN_ID},
default_args={"teradata_conn_id": CONN_ID},
schedule="@once",
start_date=datetime(2023, 1, 1),
) as dag:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,11 @@
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"conn_id": "teradata_default"},
default_args={"teradata_conn_id": CONN_ID},
) as dag:
# [START teradata_to_teradata_transfer_operator_howto_guide_create_src_table]
create_src_table = TeradataOperator(
task_id="create_src_table",
conn_id=CONN_ID,
sql="""
CREATE TABLE my_users_src,
FALLBACK (
Expand All @@ -76,7 +75,6 @@
# [START teradata_to_teradata_transfer_operator_howto_guide_create_dest_table]
create_dest_table = TeradataOperator(
task_id="create_dest_table",
conn_id=CONN_ID,
sql="""
CREATE TABLE my_users_dest,
FALLBACK (
Expand All @@ -96,7 +94,6 @@
# [START teradata_to_teradata_transfer_operator_howto_guide_insert_data_src]
insert_data_src = TeradataOperator(
task_id="insert_data_src",
conn_id=CONN_ID,
sql="""
INSERT INTO my_users_src(user_name) VALUES ('User1');
INSERT INTO my_users_src(user_name) VALUES ('User2');
Expand All @@ -107,7 +104,6 @@
# [START teradata_to_teradata_transfer_operator_howto_guide_read_data_src]
read_data_src = TeradataOperator(
task_id="read_data_src",
conn_id=CONN_ID,
sql="SELECT TOP 10 * from my_users_src order by user_id desc;",
)
# [END teradata_to_teradata_transfer_operator_howto_guide_read_data_src]
Expand All @@ -125,21 +121,18 @@
# [START teradata_to_teradata_transfer_operator_howto_guide_read_data_dest]
read_data_dest = TeradataOperator(
task_id="read_data_dest",
conn_id=CONN_ID,
sql="SELECT TOP 10 * from my_users_dest order by user_id desc;",
)
# [END teradata_to_teradata_transfer_operator_howto_guide_read_data_dest]
# [START teradata_to_teradata_transfer_operator_howto_guide_drop_src_table]
drop_src_table = TeradataOperator(
task_id="drop_src_table",
conn_id=CONN_ID,
sql=" DROP TABLE my_users_src;",
)
# [END teradata_to_teradata_transfer_operator_howto_guide_drop_src_table]
# [START teradata_to_teradata_transfer_operator_howto_guide_drop_dest_table]
drop_dest_table = TeradataOperator(
task_id="drop_dest_table",
conn_id=CONN_ID,
sql="DROP TABLE my_users_dest;",
)
# [END teradata_to_teradata_transfer_operator_howto_guide_drop_dest_table]
Expand Down

0 comments on commit dd2dce4

Please sign in to comment.