Skip to content

Commit

Permalink
pre-commit issues resolved
Browse files Browse the repository at this point in the history
  • Loading branch information
satish-chinthanippu committed Apr 22, 2024
1 parent c9b400f commit 6a8ee3c
Show file tree
Hide file tree
Showing 18 changed files with 268 additions and 174 deletions.
7 changes: 2 additions & 5 deletions airflow/providers/teradata/hooks/teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def _get_conn_config_teradatasql(self) -> dict[str, Any]:
"dbs_port": conn.port or "1025",
"database": conn.schema or "",
"user": conn.login or "dbc",
"password": conn.password or "dbc"
"password": conn.password or "dbc",
}

if conn.extra_dejson.get("tmode", False):
Expand Down Expand Up @@ -219,10 +219,7 @@ def callproc(
if parameters is None:
parameters = []

args = ",".join(
f"?"
for name in parameters
)
args = ",".join("?" for name in parameters)

sql = f"{{CALL {identifier}({(args)})}}"

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/teradata/operators/teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
# under the License.
from __future__ import annotations

from typing import Sequence, TYPE_CHECKING
from typing import TYPE_CHECKING, Sequence

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.models import BaseOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.teradata.hooks.teradata import TeradataHook

if TYPE_CHECKING:
Expand Down
79 changes: 49 additions & 30 deletions airflow/providers/teradata/transfers/azure_blob_to_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,27 @@
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from airflow.providers.teradata.hooks.teradata import TeradataHook

import os

if TYPE_CHECKING:
from airflow.utils.context import Context


class AzureBlobStorageToTeradataOperator(BaseOperator):
"""
Loads CSV, JSON and Parquet format data from Azure Blob Storage to Teradata.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:AzureBlobStorageToTeradataOperator`
:param blob_source_key: The path to the file (Azure Blob) that will be loaded into Teradata.
:param wasb_conn_id: Reference to the wasb connection.
:param blob_source_key: The object store URI with blob location. The URI format is /az/YOUR-STORAGE-ACCOUNT.blob.core.windows.net/YOUR-CONTAINER/YOUR-BLOB-LOCATION. Refer to
https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US
:param azure_conn_id: The :ref:`Azure connection id<howto/connection:azure>`
which refers to the information to connect to Azure service.
:param teradata_table: destination table to insert rows.
:param teradata_conn_id: :ref:`Teradata connection <howto/connection:Teradata>`.
:param teradata_conn_id: :ref:`Teradata connection <howto/connection:Teradata>`
which refers to the information to connect to Teradata
"""

template_fields: Sequence[str] = ("blob_source_key", "teradata_table")
Expand All @@ -49,47 +54,61 @@ def __init__(
self,
*,
blob_source_key: str,
wasb_conn_id: str = "wasb_default",
azure_conn_id: str = "azure_default",
teradata_table: str,
teradata_conn_id: str = "teradata_default",
wasb_extra_args: dict | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.blob_source_key = blob_source_key
self.wasb_conn_id = wasb_conn_id
self.azure_conn_id = azure_conn_id
self.teradata_table = teradata_table
self.teradata_conn_id = teradata_conn_id
self.wasb_extra_args = wasb_extra_args or {}

def execute(self, context: Context) -> None:
"""
Executes the transfer operation from Azure Blob Storage to Teradata.
:param context: The context that is being provided when executing.
"""
self.log.info("Loading %s to Teradata table %s...", self.blob_source_key, self.teradata_table)
# list all files in the Azure Blob Storage container
wasb_hook = WasbHook(wasb_conn_id=self.wasb_conn_id, **self.wasb_extra_args)
access_key = wasb_hook.get_conn().token_credential.client_id
access_secret = wasb_hook.get_conn().token_credential.client_secret
Execute the transfer operation from Azure Blob Storage to Teradata.
:param context: The context that is being provided when executing.
if access_key is None or access_secret is None:
access_key = ""
"""
azure_hook = WasbHook(wasb_conn_id=self.azure_conn_id)
conn = azure_hook.get_connection(self.azure_conn_id)
# Obtaining the Azure client ID and Azure secret in order to access a specified Blob container
access_id = conn.login
access_secret = conn.password
# if no credentials, then accessing blob as public
if access_id is None or access_secret is None:
access_id = ""
access_secret = ""

teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
sql = f"""
CREATE MULTISET TABLE {self.teradata_table} AS
(
SELECT * FROM (
LOCATION = '{self.blob_source_key}'
ACCESS_ID= '{access_key}'
ACCESS_KEY= '{access_secret}'
) AS d
) WITH DATA
"""
CREATE MULTISET TABLE {self.teradata_table} AS
(
SELECT * FROM (
LOCATION = '{self.blob_source_key}'
ACCESS_ID= '{access_id}'
ACCESS_KEY= '{access_secret}'
) AS d
) WITH DATA
"""

self.log.info("COPYING using READ_NOS and CREATE TABLE AS feature of teradata....")
self.log.info("sql : %s", sql)
teradata_hook.run(sql)
try:
teradata_hook.run(sql, True)
except Exception as ex:
# Handling permission issue errors
if "Error 3524" in str(ex):
self.log.error("The user does not have CREATE TABLE access in teradata")
raise
if "Error 9134" in str(ex):
self.log.error(
"There is an issue with the transfer operation. Please validate azure and "
"teradata connection details."
)
raise
self.log.error("Issue occurred at Teradata: %s", str(ex))
raise
self.log.info("COPYING is completed")
24 changes: 12 additions & 12 deletions airflow/providers/teradata/transfers/s3_to_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.teradata.hooks.teradata import TeradataHook

import os

if TYPE_CHECKING:
from airflow.utils.context import Context


class S3ToTeradataOperator(BaseOperator):
"""
Loads CSV, JSON and Parquet format data from Amazon S3 to Teradata.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:S3ToTeradataOperator`
:param s3_source_key: The path to the file (S3 key) that will be loaded into Teradata.
:param teradata_table: destination table to insert rows.
:param aws_conn_id: reference to a specific S3 connection.
Expand Down Expand Up @@ -67,23 +67,23 @@ def __init__(
self.aws_access_secret = aws_access_secret

def execute(self, context: Context) -> None:
"""
Executes the transfer operation from S3 to Teradata.
:param context: The context that is being provided when executing.
"""
self.log.info("Loading %s to Teradata table %s...", self.s3_source_key, self.teradata_table)

access_key = self.aws_access_key
access_secret = self.aws_access_secret

if not access_key or not access_secret:
s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
access_key = s3_hook.conn_config.aws_access_key_id
access_secret = s3_hook.conn_config.aws_secret_access_key

if access_key is None or access_secret is None:
access_key = ""
access_secret = ""
access_key = (
s3_hook.conn_config.aws_access_key_id
if s3_hook.conn_config.aws_access_key_id is not None
else ""
)
access_secret = (
s3_hook.conn_config.aws_secret_access_key
if s3_hook.conn_config.aws_secret_access_key is not None
else ""
)

teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
sql = f"""
Expand Down
16 changes: 16 additions & 0 deletions airflow/providers/teradata/triggers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
16 changes: 16 additions & 0 deletions airflow/providers/teradata/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,7 @@ initcontainers
initdb
initialisation
initialising
inout
InsecureClient
InspectContentResponse
InspectTemplate
Expand Down
4 changes: 3 additions & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,9 @@
],
"devel-deps": [],
"cross-providers-deps": [
"common.sql"
"amazon",
"common.sql",
"microsoft.azure"
],
"excluded-python-versions": [],
"state": "ready"
Expand Down
60 changes: 60 additions & 0 deletions tests/providers/teradata/transfers/test_azure_blob_to_teradata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from unittest import mock

from airflow.providers.teradata.transfers.azure_blob_to_teradata import AzureBlobStorageToTeradataOperator

AZURE_CONN_ID = "wasb_default"
TERADATA_CONN_ID = "teradata_default"
BLOB_SOURCE_KEY = "az/test"
TERADATA_TABLE = "test"
TASK_ID = "transfer_file"


class TestAzureBlobStorageToTeradataOperator:
def test_init(self):
operator = AzureBlobStorageToTeradataOperator(
azure_conn_id=AZURE_CONN_ID,
teradata_conn_id=TERADATA_CONN_ID,
teradata_table=TERADATA_TABLE,
blob_source_key=BLOB_SOURCE_KEY,
task_id=TASK_ID,
)

assert operator.azure_conn_id == AZURE_CONN_ID
assert operator.blob_source_key == BLOB_SOURCE_KEY
assert operator.teradata_conn_id == TERADATA_CONN_ID
assert operator.teradata_table == TERADATA_TABLE
assert operator.task_id == TASK_ID

@mock.patch("airflow.providers.teradata.transfers.azure_blob_to_teradata.WasbHook")
@mock.patch("airflow.providers.teradata.transfers.azure_blob_to_teradata.TeradataHook")
def test_execute(self, mock_hook_wasb, mock_hook_teradata):
op = AzureBlobStorageToTeradataOperator(
azure_conn_id=AZURE_CONN_ID,
teradata_conn_id=TERADATA_CONN_ID,
teradata_table=TERADATA_TABLE,
blob_source_key=BLOB_SOURCE_KEY,
task_id=TASK_ID,
)
op.execute(context=None)
mock_hook_wasb.assert_called_once_with(wasb_conn_id=AZURE_CONN_ID)
mock_hook_teradata.assert_called_once_with(teradata_conn_id=TERADATA_CONN_ID)
sql = "SQL"
mock_hook_teradata.run(sql)

0 comments on commit 6a8ee3c

Please sign in to comment.