Skip to content

Commit

Permalink
D205 Support - Providers: GRPC to Oracle (inclusive) (#32357)
Browse files Browse the repository at this point in the history
  • Loading branch information
ferruzzi committed Jul 5, 2023
1 parent f859350 commit 1240dcc
Show file tree
Hide file tree
Showing 37 changed files with 115 additions and 99 deletions.
15 changes: 7 additions & 8 deletions airflow/providers/hashicorp/_internal_client/vault_client.py
Expand Up @@ -48,10 +48,12 @@

class _VaultClient(LoggingMixin):
"""
Retrieves Authenticated client from Hashicorp Vault. This is purely internal class promoting
authentication code reuse between the Hook and the SecretBackend, it should not be used directly in
Airflow DAGs. Use VaultBackend for backend integration and Hook in case you want to communicate
with VaultHook using standard Airflow Connection definition.
Retrieves Authenticated client from Hashicorp Vault.
This is purely internal class promoting authentication code reuse between the Hook and the
SecretBackend, it should not be used directly in Airflow DAGs. Use VaultBackend for backend
integration and Hook in case you want to communicate with VaultHook using standard Airflow
Connection definition.
:param url: Base URL for the Vault instance being addressed.
:param auth_type: Authentication Type for Vault. Default is ``token``. Available values are in
Expand Down Expand Up @@ -172,12 +174,9 @@ def __init__(
@property
def client(self):
"""
Authentication to Vault can expire. This wrapper function checks that
it is still authenticated to Vault, and invalidates the cache if this
is not the case.
Checks that it is still authenticated to Vault and invalidates the cache if this is not the case.
:return: Vault Client
"""
if not self._client.is_authenticated():
# Invalidate the cache:
Expand Down
5 changes: 3 additions & 2 deletions airflow/providers/hashicorp/secrets/vault.py
Expand Up @@ -209,8 +209,9 @@ def get_conn_uri(self, conn_id: str) -> str | None:

def get_connection(self, conn_id: str) -> Connection | None:
"""
Get connection from Vault as secret. Prioritize conn_uri if exists,
if not fall back to normal Connection creation.
Get connection from Vault as secret.
Prioritize conn_uri if exists, if not fall back to normal Connection creation.
:return: A Connection object constructed from Vault data
"""
Expand Down
3 changes: 1 addition & 2 deletions airflow/providers/http/sensors/http.py
Expand Up @@ -29,8 +29,7 @@

class HttpSensor(BaseSensorOperator):
"""
Executes a HTTP GET statement and returns False on failure caused by
404 Not Found or `response_check` returning False.
Execute HTTP GET statement; return False on failure 404 Not Found or `response_check` returning False.
HTTP Error codes other than 404 (like 403) or Connection Refused Error
would raise an exception and fail the sensor itself directly (no more poking).
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/imap/hooks/imap.py
Expand Up @@ -16,8 +16,8 @@
# specific language governing permissions and limitations
# under the License.
"""
This module provides everything to be able to search in mails for a specific attachment
and also to download it.
This module provides everything to search mail for a specific attachment and download it.
It uses the imaplib library that is already integrated in python 3.
"""
from __future__ import annotations
Expand Down
6 changes: 4 additions & 2 deletions airflow/providers/jenkins/hooks/jenkins.py
Expand Up @@ -27,8 +27,10 @@

def _ensure_prefixes(conn_type):
"""
Remove when provider min airflow version >= 2.5.0 since this is handled by
provider manager from that version.
Deprecated.
Remove when provider min airflow version >= 2.5.0 since
this is handled by provider manager from that version.
"""

def dec(func):
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/jenkins/operators/jenkins_job_trigger.py
Expand Up @@ -21,7 +21,7 @@
import json
import socket
import time
from typing import Any, Iterable, List, Mapping, Sequence, Union
from typing import Any, Iterable, Mapping, Sequence, Union
from urllib.error import HTTPError, URLError

import jenkins
Expand All @@ -33,7 +33,7 @@
from airflow.providers.jenkins.hooks.jenkins import JenkinsHook

JenkinsRequest = Mapping[str, Any]
ParamType = Union[str, dict, List, None]
ParamType = Union[str, dict, list, None]


def jenkins_request_with_headers(jenkins_server: Jenkins, req: Request) -> JenkinsRequest | None:
Expand Down
5 changes: 3 additions & 2 deletions airflow/providers/microsoft/azure/hooks/adx.py
Expand Up @@ -186,8 +186,9 @@ def get_required_param(name: str) -> str:

def run_query(self, query: str, database: str, options: dict | None = None) -> KustoResponseDataSetV2:
"""
Run KQL query using provided configuration, and return
`azure.kusto.data.response.KustoResponseDataSet` instance.
Run KQL query using provided configuration, and return KustoResponseDataSet instance.
See: `azure.kusto.data.response.KustoResponseDataSet`
If query is unsuccessful AirflowException is raised.
:param query: KQL query to run
Expand Down
6 changes: 4 additions & 2 deletions airflow/providers/microsoft/azure/hooks/base_azure.py
Expand Up @@ -27,8 +27,10 @@

class AzureBaseHook(BaseHook):
"""
This hook acts as a base hook for azure services. It offers several authentication mechanisms to
authenticate the client library used for upstream azure hooks.
This hook acts as a base hook for azure services.
It offers several authentication mechanisms to authenticate
the client library used for upstream azure hooks.
:param sdk_client: The SDKClient to use.
:param conn_id: The :ref:`Azure connection id<howto/connection:azure>`
Expand Down
5 changes: 1 addition & 4 deletions airflow/providers/microsoft/azure/hooks/cosmos.py
Expand Up @@ -237,10 +237,7 @@ def delete_collection(self, collection_name: str, database_name: str | None = No
)

def upsert_document(self, document, database_name=None, collection_name=None, document_id=None):
"""
Inserts a new document (or updates an existing one) into an existing
collection in the CosmosDB database.
"""
"""Insert or update a document into an existing collection in the CosmosDB database."""
# Assign unique ID if one isn't provided
if document_id is None:
document_id = str(uuid.uuid4())
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/microsoft/azure/hooks/fileshare.py
Expand Up @@ -28,6 +28,8 @@

def _ensure_prefixes(conn_type):
"""
Deprecated.
Remove when provider min airflow version >= 2.5.0 since this is handled by
provider manager from that version.
"""
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/microsoft/azure/hooks/wasb.py
Expand Up @@ -53,6 +53,8 @@

def _ensure_prefixes(conn_type):
"""
Deprecated.
Remove when provider min airflow version >= 2.5.0 since this is handled by
provider manager from that version.
"""
Expand Down
13 changes: 6 additions & 7 deletions airflow/providers/microsoft/azure/log/wasb_task_handler.py
Expand Up @@ -45,9 +45,9 @@ def get_default_delete_local_copy():

class WasbTaskHandler(FileTaskHandler, LoggingMixin):
"""
WasbTaskHandler is a python log handler that handles and reads
task instance logs. It extends airflow FileTaskHandler and
uploads to and reads from Wasb remote storage.
WasbTaskHandler is a python log handler that handles and reads task instance logs.
It extends airflow FileTaskHandler and uploads to and reads from Wasb remote storage.
"""

trigger_should_wrap = True
Expand Down Expand Up @@ -171,6 +171,7 @@ def _read(
) -> tuple[str, dict[str, bool]]:
"""
Read logs of given task instance and try_number from Wasb remote storage.
If failed, read the log from task instance host machine.
todo: when min airflow version >= 2.6, remove this method
Expand Down Expand Up @@ -207,8 +208,7 @@ def wasb_log_exists(self, remote_log_location: str) -> bool:

def wasb_read(self, remote_log_location: str, return_error: bool = False):
"""
Returns the log found at the remote_log_location. Returns '' if no
logs are found or there is an error.
Return the log found at the remote_log_location. Returns '' if no logs are found or there is an error.
:param remote_log_location: the log's location in remote storage
:param return_error: if True, returns a string error message if an
Expand All @@ -226,8 +226,7 @@ def wasb_read(self, remote_log_location: str, return_error: bool = False):

def wasb_write(self, log: str, remote_log_location: str, append: bool = True) -> bool:
"""
Writes the log to the remote_log_location. Fails silently if no hook
was created.
Writes the log to the remote_log_location. Fails silently if no hook was created.
:param log: the log to write to the remote_log_location
:param remote_log_location: the log's location in remote storage
Expand Down
10 changes: 2 additions & 8 deletions airflow/providers/microsoft/azure/operators/asb.py
Expand Up @@ -155,10 +155,7 @@ def __init__(
self.max_wait_time = max_wait_time

def execute(self, context: Context) -> None:
"""
Receive Message in specific queue in Service Bus namespace,
by connecting to Service Bus client.
"""
"""Receive Message in specific queue in Service Bus namespace by connecting to Service Bus client."""
# Create the hook
hook = MessageHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)

Expand Down Expand Up @@ -527,10 +524,7 @@ def __init__(
self.azure_service_bus_conn_id = azure_service_bus_conn_id

def execute(self, context: Context) -> None:
"""
Receive Message in specific queue in Service Bus namespace,
by connecting to Service Bus client.
"""
"""Receive Message in specific queue in Service Bus namespace by connecting to Service Bus client."""
# Create the hook
hook = MessageHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/microsoft/azure/operators/data_factory.py
Expand Up @@ -233,8 +233,8 @@ def execute(self, context: Context) -> None:
def execute_complete(self, context: Context, event: dict[str, str]) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
if event:
if event["status"] == "error":
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/microsoft/azure/secrets/key_vault.py
Expand Up @@ -162,6 +162,7 @@ def get_config(self, key: str) -> str | None:
def build_path(path_prefix: str, secret_id: str, sep: str = "-") -> str:
"""
Given a path_prefix and secret_id, build a valid secret name for the Azure Key Vault Backend.
Also replaces underscore in the path with dashes to support easy switching between
environment variables, so ``connection_default`` becomes ``connection-default``.
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/microsoft/azure/sensors/data_factory.py
Expand Up @@ -113,8 +113,8 @@ def execute(self, context: Context) -> None:
def execute_complete(self, context: Context, event: dict[str, str]) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
if event:
if event["status"] == "error":
Expand Down
8 changes: 4 additions & 4 deletions airflow/providers/microsoft/azure/sensors/wasb.py
Expand Up @@ -97,8 +97,8 @@ def execute(self, context: Context) -> None:
def execute_complete(self, context: Context, event: dict[str, str]) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
if event:
if event["status"] == "error":
Expand Down Expand Up @@ -193,8 +193,8 @@ def execute(self, context: Context) -> None:
def execute_complete(self, context: Context, event: dict[str, str]) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
if event:
if event["status"] == "error":
Expand Down
Expand Up @@ -102,6 +102,7 @@ def execute(self, context: Context) -> None:
class LocalToAzureDataLakeStorageOperator(LocalFilesystemToADLSOperator):
"""
This class is deprecated.
Please use `airflow.providers.microsoft.azure.transfers.local_to_adls.LocalFilesystemToADLSOperator`.
"""

Expand Down
Expand Up @@ -32,9 +32,7 @@

class OracleToAzureDataLakeOperator(BaseOperator):
"""
Moves data from Oracle to Azure Data Lake. The operator runs the query against
Oracle and stores the file locally before loading it into Azure Data Lake.
Runs the query against Oracle and stores the file locally before loading it into Azure Data Lake.
:param filename: file name to be used by the csv file.
:param azure_data_lake_conn_id: destination azure data lake connection.
Expand Down
8 changes: 5 additions & 3 deletions airflow/providers/microsoft/azure/triggers/wasb.py
Expand Up @@ -25,8 +25,9 @@

class WasbBlobSensorTrigger(BaseTrigger):
"""
WasbBlobSensorTrigger is fired as deferred class with params to run the task in
trigger worker to check for existence of the given blob in the provided container.
Checks for existence of the given blob in the provided container.
WasbBlobSensorTrigger is fired as deferred class with params to run the task in trigger worker.
:param container_name: name of the container in which the blob should be searched for
:param blob_name: name of the blob to check existence for
Expand Down Expand Up @@ -90,8 +91,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]:

class WasbPrefixSensorTrigger(BaseTrigger):
"""
Checks for the existence of a blob with the given prefix in the provided container.
WasbPrefixSensorTrigger is fired as a deferred class with params to run the task in trigger.
It checks for the existence of a blob with the given prefix in the provided container.
:param container_name: name of the container in which the blob should be searched for
:param prefix: prefix of the blob to check existence for
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/microsoft/azure/utils.py
Expand Up @@ -23,6 +23,8 @@

def _ensure_prefixes(conn_type):
"""
Deprecated.
Remove when provider min airflow version >= 2.5.0 since this is handled by
provider manager from that version.
"""
Expand Down
16 changes: 9 additions & 7 deletions airflow/providers/microsoft/mssql/hooks/mssql.py
Expand Up @@ -26,7 +26,14 @@


class MsSqlHook(DbApiHook):
"""Interact with Microsoft SQL Server."""
"""
Interact with Microsoft SQL Server.
:param args: passed to DBApiHook
:param sqlalchemy_scheme: Scheme sqlalchemy connection. Default is ``mssql+pymssql`` Only used for
``get_sqlalchemy_engine`` and ``get_sqlalchemy_connection`` methods.
:param kwargs: passed to DbApiHook
"""

conn_name_attr = "mssql_conn_id"
default_conn_name = "mssql_default"
Expand All @@ -41,12 +48,6 @@ def __init__(
sqlalchemy_scheme: str | None = None,
**kwargs,
) -> None:
"""
:param args: passed to DBApiHook
:param sqlalchemy_scheme: Scheme sqlalchemy connection. Default is ``mssql+pymssql`` Only used for
``get_sqlalchemy_engine`` and ``get_sqlalchemy_connection`` methods.
:param kwargs: passed to DbApiHook
"""
super().__init__(*args, **kwargs)
self.schema = kwargs.pop("schema", None)
self._sqlalchemy_scheme = sqlalchemy_scheme
Expand All @@ -55,6 +56,7 @@ def __init__(
def connection_extra_lower(self) -> dict:
"""
``connection.extra_dejson`` but where keys are converted to lower case.
This is used internally for case-insensitive access of mssql params.
"""
conn = self.get_connection(self.mssql_conn_id) # type: ignore[attr-defined]
Expand Down
5 changes: 3 additions & 2 deletions airflow/providers/microsoft/psrp/hooks/psrp.py
Expand Up @@ -158,8 +158,9 @@ def apply_extra(d, keys):
@contextmanager
def invoke(self) -> Generator[PowerShell, None, None]:
"""
Context manager that yields a PowerShell object to which commands can be
added. Upon exit, the commands will be invoked.
Yields a PowerShell object to which commands can be added.
Upon exit, the commands will be invoked.
"""
logger = copy(self.log)
logger.setLevel(self._logging_level)
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/mongo/hooks/mongo.py
Expand Up @@ -98,6 +98,7 @@ def get_conn(self) -> MongoClient:
def _create_uri(self) -> str:
"""
Create URI string from the given credentials.
:return: URI string.
"""
srv = self.extras.pop("srv", False)
Expand Down
7 changes: 4 additions & 3 deletions airflow/providers/mysql/transfers/presto_to_mysql.py
Expand Up @@ -29,9 +29,10 @@

class PrestoToMySqlOperator(BaseOperator):
"""
Moves data from Presto to MySQL, note that for now the data is loaded
into memory before being pushed to MySQL, so this operator should
be used for smallish amount of data.
Moves data from Presto to MySQL.
Note that for now the data is loaded into memory before being pushed
to MySQL, so this operator should be used for smallish amount of data.
:param sql: SQL query to execute against Presto. (templated)
:param mysql_table: target MySQL table, use dot notation to target a
Expand Down

0 comments on commit 1240dcc

Please sign in to comment.