Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address PR comments #42

Merged
merged 2 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions airflow/providers/teradata/operators/teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class TeradataOperator(SQLExecuteQueryOperator):
:ref:`howto/operator:TeradataOperator`

:param sql: the SQL query to be executed as a single string, or a list of str (sql statements)
:param conn_id: reference to a predefined database
:param teradata_conn_id: reference to a predefined database
:param autocommit: if True, each command is automatically committed.(default value: False)
:param parameters: (optional) the parameters to render the SQL query with.
:param schema: The Teradata database to connect to.
Expand All @@ -54,7 +54,7 @@ class TeradataOperator(SQLExecuteQueryOperator):

def __init__(
self,
conn_id: str = TeradataHook.default_conn_name,
teradata_conn_id: str = TeradataHook.default_conn_name,
schema: str | None = None,
**kwargs,
) -> None:
Expand All @@ -65,15 +65,15 @@ def __init__(
**hook_params,
}
super().__init__(**kwargs)
self.conn_id = conn_id
self.conn_id = teradata_conn_id


class TeradataStoredProcedureOperator(BaseOperator):
"""
Executes stored procedure in a specific Teradata database.

:param procedure: name of stored procedure to call (templated)
:param conn_id: The :ref:`Teradata connection id <howto/connection:teradata>`
:param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>`
reference to a specific Teradata database.
:param parameters: (optional, templated) the parameters provided in the call

Expand All @@ -89,15 +89,15 @@ def __init__(
self,
*,
procedure: str,
conn_id: str = TeradataHook.default_conn_name,
teradata_conn_id: str = TeradataHook.default_conn_name,
parameters: dict | list | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.conn_id = conn_id
self.teradata_conn_id = teradata_conn_id
self.procedure = procedure
self.parameters = parameters

def execute(self, context: Context):
hook = TeradataHook(teradata_conn_id=self.conn_id)
hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
return hook.callproc(self.procedure, autocommit=True, parameters=self.parameters)
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,17 @@ S3ToTeradataOperator
============================

The purpose of ``S3ToTeradataOperator`` is to define tasks involving CSV, JSON and Parquet
format data transfer from an AWS Simple Storage Service (S3) to Teradata table.
format data transfer from an AWS Simple Storage Service (S3) to Teradata table. This operator uses
Teradata READ_NOS feature to transfer data from an AWS Simple Storage Service (S3) to Teradata table.
READ_NOS is a table operator in Teradata Vantage that allows users to list external files at a specified location.
For more details, see `READ_NOS Functionality <https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Reading-Data/Examples-For-DBAs-and-Advanced-Users/Loading-External-Data-into-the-Database/Loading-External-Data-into-the-Database-Using-READ_NOS-and-CREATE-TABLE-AS>`_

Use the :class:`S3ToTeradataOperator <airflow.providers.teradata.transfers.s3_to_teradata>`
to transfer data from S3 to Teradata.

.. note::
The current version of ``S3ToTeradataOperator`` does not support accessing AWS S3 with Security Token Service (STS) temporary credentials. Instead, it exclusively supports accessing with long-term credentials.


Transferring data in CSV format from S3 to Teradata
---------------------------------------------------
Expand Down
16 changes: 10 additions & 6 deletions tests/providers/teradata/operators/test_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ def test_get_hook_default(self, mock_get_db_hook):
@mock.patch("airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator.get_db_hook")
def test_execute(self, mock_get_db_hook):
sql = "SELECT * FROM test_table"
conn_id = "teradata_default"
teradata_conn_id = "teradata_default"
parameters = {"parameter": "value"}
autocommit = False
context = "test_context"
task_id = "test_task_id"

operator = TeradataOperator(sql=sql, conn_id=conn_id, parameters=parameters, task_id=task_id)
operator = TeradataOperator(
sql=sql, teradata_conn_id=teradata_conn_id, parameters=parameters, task_id=task_id
)
operator.execute(context=context)
mock_get_db_hook.return_value.run.assert_called_once_with(
sql=sql,
Expand All @@ -82,13 +84,15 @@ def test_teradata_operator_test_multi(self, mock_get_db_hook):
"TRUNCATE TABLE test_airflow",
"INSERT INTO test_airflow VALUES ('X')",
]
conn_id = "teradata_default"
teradata_conn_id = "teradata_default"
parameters = {"parameter": "value"}
autocommit = False
context = "test_context"
task_id = "test_task_id"

operator = TeradataOperator(sql=sql, conn_id=conn_id, parameters=parameters, task_id=task_id)
operator = TeradataOperator(
sql=sql, teradata_conn_id=teradata_conn_id, parameters=parameters, task_id=task_id
)
operator.execute(context=context)
mock_get_db_hook.return_value.run.assert_called_once_with(
sql=sql,
Expand All @@ -103,14 +107,14 @@ class TestTeradataStoredProcedureOperator:
@mock.patch.object(TeradataHook, "run", autospec=TeradataHook.run)
def test_execute(self, mock_run):
procedure = "test"
conn_id = "teradata_default"
teradata_conn_id = "teradata_default"
parameters = {"parameter": "value"}
context = "test_context"
task_id = "test_task_id"

operator = TeradataStoredProcedureOperator(
procedure=procedure,
conn_id=conn_id,
teradata_conn_id=teradata_conn_id,
parameters=parameters,
task_id=task_id,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,26 @@
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"conn_id": "teradata_default"},
default_args={"teradata_conn_id": CONN_ID},
) as dag:
# [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv]
transfer_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_csv",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
teradata_table="example_blob_teradata_csv",
azure_conn_id="wasb_default",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv]
# [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
read_data_table_csv = TeradataOperator(
task_id="read_data_table_csv",
conn_id=CONN_ID,
sql="SELECT count(1) from example_blob_teradata_csv;",
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
# [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv]
drop_table_csv = TeradataOperator(
task_id="drop_table_csv",
conn_id=CONN_ID,
sql="DROP TABLE example_blob_teradata_csv;",
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv]
Expand All @@ -83,21 +80,18 @@
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
teradata_table="example_blob_teradata_json",
azure_conn_id="wasb_default",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json]
# [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_json]
read_data_table_json = TeradataOperator(
task_id="read_data_table_json",
conn_id=CONN_ID,
sql="SELECT count(1) from example_blob_teradata_json;",
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_json]
# [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_json]
drop_table_json = TeradataOperator(
task_id="drop_table_json",
conn_id=CONN_ID,
sql="DROP TABLE example_blob_teradata_json;",
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_json]
Expand All @@ -114,14 +108,12 @@
# [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_parquet]
read_data_table_parquet = TeradataOperator(
task_id="read_data_table_parquet",
conn_id=CONN_ID,
sql="SELECT count(1) from example_blob_teradata_parquet;",
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_parquet]
# [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_parquet]
drop_table_parquet = TeradataOperator(
task_id="drop_table_parquet",
conn_id=CONN_ID,
sql="DROP TABLE example_blob_teradata_parquet;",
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_parquet]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"conn_id": "teradata_default"},
default_args={"teradata_conn_id": CONN_ID},
) as dag:
# [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv]
transfer_data_csv = S3ToTeradataOperator(
Expand All @@ -61,7 +61,6 @@
public_bucket=True,
teradata_table="example_s3_teradata_csv",
aws_conn_id="aws_default",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
# [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv]
Expand All @@ -86,21 +85,18 @@
public_bucket=True,
teradata_table="example_s3_teradata_json",
aws_conn_id="aws_default",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
# [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json]
# [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_json]
read_data_table_json = TeradataOperator(
task_id="read_data_table_json",
conn_id=CONN_ID,
sql="SELECT * from example_s3_teradata_json;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_json]
# [START s3_to_teradata_transfer_operator_howto_guide_drop_table_json]
drop_table_json = TeradataOperator(
task_id="drop_table_json",
conn_id=CONN_ID,
sql="DROP TABLE example_s3_teradata_json;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_drop_table_json]
Expand All @@ -111,21 +107,18 @@
public_bucket=True,
teradata_table="example_s3_teradata_parquet",
aws_conn_id="aws_default",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
# [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet]
# [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet]
read_data_table_parquet = TeradataOperator(
task_id="read_data_table_parquet",
conn_id=CONN_ID,
sql="SELECT * from example_s3_teradata_parquet;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet]
# [START s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet]
drop_table_parquet = TeradataOperator(
task_id="drop_table_parquet",
conn_id=CONN_ID,
sql="DROP TABLE example_s3_teradata_parquet;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet]
Expand Down
3 changes: 2 additions & 1 deletion tests/system/providers/teradata/example_ssl_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_ssl_teradata"
CONN_ID = "teradata_ssl_default"

with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"conn_id": "teradata_ssl_default"},
default_args={"teradata_conn_id": CONN_ID},
) as dag:
# [START teradata_operator_howto_guide_create_country_table]
create_country_table = TeradataOperator(
Expand Down
3 changes: 2 additions & 1 deletion tests/system/providers/teradata/example_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_teradata"
CONN_ID = "teradata_default"

with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"conn_id": "teradata_default"},
default_args={"teradata_conn_id": CONN_ID},
) as dag:
# [START teradata_operator_howto_guide_create_table]
create_table = TeradataOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
max_active_runs=1,
max_active_tasks=3,
catchup=False,
default_args={"conn_id": CONN_ID},
default_args={"teradata_conn_id": CONN_ID},
schedule="@once",
start_date=datetime(2023, 1, 1),
) as dag:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,11 @@
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"conn_id": "teradata_default"},
default_args={"teradata_conn_id": CONN_ID},
) as dag:
# [START teradata_to_teradata_transfer_operator_howto_guide_create_src_table]
create_src_table = TeradataOperator(
task_id="create_src_table",
conn_id=CONN_ID,
sql="""
CREATE TABLE my_users_src,
FALLBACK (
Expand All @@ -76,7 +75,6 @@
# [START teradata_to_teradata_transfer_operator_howto_guide_create_dest_table]
create_dest_table = TeradataOperator(
task_id="create_dest_table",
conn_id=CONN_ID,
sql="""
CREATE TABLE my_users_dest,
FALLBACK (
Expand All @@ -96,7 +94,6 @@
# [START teradata_to_teradata_transfer_operator_howto_guide_insert_data_src]
insert_data_src = TeradataOperator(
task_id="insert_data_src",
conn_id=CONN_ID,
sql="""
INSERT INTO my_users_src(user_name) VALUES ('User1');
INSERT INTO my_users_src(user_name) VALUES ('User2');
Expand All @@ -107,7 +104,6 @@
# [START teradata_to_teradata_transfer_operator_howto_guide_read_data_src]
read_data_src = TeradataOperator(
task_id="read_data_src",
conn_id=CONN_ID,
sql="SELECT TOP 10 * from my_users_src order by user_id desc;",
)
# [END teradata_to_teradata_transfer_operator_howto_guide_read_data_src]
Expand All @@ -125,21 +121,18 @@
# [START teradata_to_teradata_transfer_operator_howto_guide_read_data_dest]
read_data_dest = TeradataOperator(
task_id="read_data_dest",
conn_id=CONN_ID,
sql="SELECT TOP 10 * from my_users_dest order by user_id desc;",
)
# [END teradata_to_teradata_transfer_operator_howto_guide_read_data_dest]
# [START teradata_to_teradata_transfer_operator_howto_guide_drop_src_table]
drop_src_table = TeradataOperator(
task_id="drop_src_table",
conn_id=CONN_ID,
sql=" DROP TABLE my_users_src;",
)
# [END teradata_to_teradata_transfer_operator_howto_guide_drop_src_table]
# [START teradata_to_teradata_transfer_operator_howto_guide_drop_dest_table]
drop_dest_table = TeradataOperator(
task_id="drop_dest_table",
conn_id=CONN_ID,
sql="DROP TABLE my_users_dest;",
)
# [END teradata_to_teradata_transfer_operator_howto_guide_drop_dest_table]
Expand Down
Loading