Skip to content

Commit

Permalink
Add dynamic connection fields to Azure Connection (#15159)
Browse files Browse the repository at this point in the history
This PR adds custom connection form widgets and behaviors to the Azure Connections defined in the following hooks:  
- AzureBaseHook
- AzureContainerInstanceHook
-  AzureDataExplorerHook
- AzureCosmosDBHook
- AzureBatchHook
- AzureDataFactoryHook
- AzureDataLakeHook
- AzureContainerRegistryHook
- WasbHook

This PR also adds a new connection 'azure_container_registry' for the AzureContainerRegistryHook. The form for both the Azure connection, and the Azure Container Instance connection are identical.

screenshots:

AzureContainerInstanceHook
![Screenshot_2021-04-01 Add Connection - Airflow(1)](https://user-images.githubusercontent.com/63181127/113424206-ba51be80-939d-11eb-94f0-493fffcd9728.png)

AzureDataExplorerHook
![azure-data-explorer](https://user-images.githubusercontent.com/63181127/113774431-485adb80-96f5-11eb-9288-97c75b77f502.png)

AzureCosmosDBHook
![image](https://user-images.githubusercontent.com/63181127/113774462-53157080-96f5-11eb-9e21-71e63659de83.png)

AzureBatchHook
![azure-batch](https://user-images.githubusercontent.com/63181127/113774499-60caf600-96f5-11eb-82d6-b05fa57b25da.png)

AzureDataFactoryHook
![azure-data-factory](https://user-images.githubusercontent.com/63181127/113774580-76d8b680-96f5-11eb-802d-abd2c4784f84.png)

AzureDataLakeHook
![azure-data-lake](https://user-images.githubusercontent.com/63181127/113774715-a12a7400-96f5-11eb-890a-2e5fe5621492.png)


AzureContainerRegistryHook
![image](https://user-images.githubusercontent.com/63181127/113774632-8657ff80-96f5-11eb-878d-5293045b967b.png)

WasbHook
![Azure-blob](https://user-images.githubusercontent.com/63181127/113774745-aedff980-96f5-11eb-8cd2-4c64dbb3eaf7.png)
  • Loading branch information
sunkickr committed Apr 12, 2021
1 parent e4c0689 commit 1a85ba9
Show file tree
Hide file tree
Showing 12 changed files with 431 additions and 36 deletions.
68 changes: 61 additions & 7 deletions airflow/providers/microsoft/azure/hooks/adx.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#

"""This module contains Azure Data Explorer hook"""
from typing import Dict, Optional
from typing import Any, Dict, Optional

from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.request import ClientRequestProperties, KustoClient, KustoConnectionStringBuilder
Expand Down Expand Up @@ -83,6 +83,49 @@ class AzureDataExplorerHook(BaseHook):
conn_type = 'azure_data_explorer'
hook_name = 'Azure Data Explorer'

@staticmethod
def get_connection_form_widgets() -> Dict[str, Any]:
"""Returns connection widgets to add to connection form"""
from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget, BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import PasswordField, StringField

return {
"extra__azure_data_explorer__auth_method": StringField(
lazy_gettext('Tenant ID'), widget=BS3TextFieldWidget()
),
"extra__azure_data_explorer__tenant": StringField(
lazy_gettext('Authentication Method'), widget=BS3TextFieldWidget()
),
"extra__azure_data_explorer__certificate": PasswordField(
lazy_gettext('Application PEM Certificate'), widget=BS3PasswordFieldWidget()
),
"extra__azure_data_explorer__thumbprint": PasswordField(
lazy_gettext('Application Certificate Thumbprint'), widget=BS3PasswordFieldWidget()
),
}

@staticmethod
def get_ui_field_behaviour() -> Dict:
"""Returns custom field behaviour"""
return {
"hidden_fields": ['schema', 'port', 'extra'],
"relabeling": {
'login': 'Auth Username',
'password': 'Auth Password',
'host': 'Data Explorer Cluster Url',
},
"placeholders": {
'login': 'varies with authentication method',
'password': 'varies with authentication method',
'host': 'cluster url',
'extra__azure_data_explorer__auth_method': 'AAD_APP/AAD_APP_CERT/AAD_CREDS/AAD_DEVICE',
'extra__azure_data_explorer__tenant': 'used with AAD_APP/AAD_APP_CERT/AAD_CREDS',
'extra__azure_data_explorer__certificate': 'used with AAD_APP_CERT',
'extra__azure_data_explorer__thumbprint': 'used with AAD_APP_CERT',
},
}

def __init__(self, azure_data_explorer_conn_id: str = default_conn_name) -> None:
super().__init__()
self.conn_id = azure_data_explorer_conn_id
Expand All @@ -102,23 +145,34 @@ def get_required_param(name: str) -> str:
raise AirflowException(f'Extra connection option is missing required parameter: `{name}`')
return value

auth_method = get_required_param('auth_method')
auth_method = get_required_param('auth_method') or get_required_param(
'extra__azure_data_explorer__auth_method'
)

if auth_method == 'AAD_APP':
tenant = get_required_param('tenant') or get_required_param('extra__azure_data_explorer__tenant')
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
cluster, conn.login, conn.password, get_required_param('tenant')
cluster, conn.login, conn.password, tenant
)
elif auth_method == 'AAD_APP_CERT':
certificate = get_required_param('certificate') or get_required_param(
'extra__azure_data_explorer__certificate'
)
thumbprint = get_required_param('thumbprint') or get_required_param(
'extra__azure_data_explorer__thumbprint'
)
tenant = get_required_param('tenant') or get_required_param('extra__azure_data_explorer__tenant')
kcsb = KustoConnectionStringBuilder.with_aad_application_certificate_authentication(
cluster,
conn.login,
get_required_param('certificate'),
get_required_param('thumbprint'),
get_required_param('tenant'),
certificate,
thumbprint,
tenant,
)
elif auth_method == 'AAD_CREDS':
tenant = get_required_param('tenant') or get_required_param('extra__azure_data_explorer__tenant')
kcsb = KustoConnectionStringBuilder.with_aad_user_password_authentication(
cluster, conn.login, conn.password, get_required_param('tenant')
cluster, conn.login, conn.password, tenant
)
elif auth_method == 'AAD_DEVICE':
kcsb = KustoConnectionStringBuilder.with_aad_device_authentication(cluster)
Expand Down
35 changes: 33 additions & 2 deletions airflow/providers/microsoft/azure/hooks/azure_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#
import time
from datetime import timedelta
from typing import Optional, Set
from typing import Any, Dict, Optional, Set

from azure.batch import BatchServiceClient, batch_auth, models as batch_models
from azure.batch.models import JobAddParameter, PoolAddParameter, TaskAddParameter
Expand All @@ -42,6 +42,35 @@ class AzureBatchHook(BaseHook):
conn_type = 'azure_batch'
hook_name = 'Azure Batch Service'

@staticmethod
def get_connection_form_widgets() -> Dict[str, Any]:
"""Returns connection widgets to add to connection form"""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import StringField

return {
"extra__azure_batch__account_url": StringField(
lazy_gettext('Azure Batch Account URl'), widget=BS3TextFieldWidget()
),
}

@staticmethod
def get_ui_field_behaviour() -> Dict:
"""Returns custom field behaviour"""
return {
"hidden_fields": ['schema', 'port', 'host', 'extra'],
"relabeling": {
'login': 'Azure Batch Account Name',
'password': 'Azure Batch Key',
},
"placeholders": {
'login': 'batch account',
'password': 'key',
'extra__azure_batch__account_url': 'account url',
},
}

def __init__(self, azure_batch_conn_id: str = default_conn_name) -> None:
super().__init__()
self.conn_id = azure_batch_conn_id
Expand All @@ -68,7 +97,9 @@ def _get_required_param(name):
raise AirflowException(f'Extra connection option is missing required parameter: `{name}`')
return value

batch_account_url = _get_required_param('account_url')
batch_account_url = _get_required_param('account_url') or _get_required_param(
'extra__azure_batch__account_url'
)
credentials = batch_auth.SharedKeyCredentials(conn.login, conn.password)
batch_client = BatchServiceClient(credentials, batch_url=batch_account_url)
return batch_client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#

import warnings
from typing import Any
from typing import Any, Dict

from azure.mgmt.containerinstance import ContainerInstanceManagementClient
from azure.mgmt.containerinstance.models import ContainerGroup
Expand All @@ -41,11 +41,53 @@ class AzureContainerInstanceHook(AzureBaseHook):
:type conn_id: str
"""

conn_name_attr = 'conn_id'
conn_name_attr = 'azure_conn_id'
default_conn_name = 'azure_default'
conn_type = 'azure_container_instances'
hook_name = 'Azure Container Instance'

@staticmethod
def get_connection_form_widgets() -> Dict[str, Any]:
"""Returns connection widgets to add to connection form"""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import StringField

return {
"extra__azure__tenantId": StringField(
lazy_gettext('Azure Tenant ID'), widget=BS3TextFieldWidget()
),
"extra__azure__subscriptionId": StringField(
lazy_gettext('Azure Subscription ID'), widget=BS3TextFieldWidget()
),
}

@staticmethod
def get_ui_field_behaviour() -> Dict:
"""Returns custom field behaviour"""
import json

return {
"hidden_fields": ['schema', 'port', 'host'],
"relabeling": {
'login': 'Azure Client ID',
'password': 'Azure Secret',
},
"placeholders": {
'extra': json.dumps(
{
"key_path": "path to json file for auth",
"key_json": "specifies json dict for auth",
},
indent=1,
),
'login': 'client id (token credentials auth)',
'password': 'secret (token credentials auth)',
'extra__azure__tenantId': 'tenant id (token credentials auth)',
'extra__azure__subscriptionId': 'subscription id (token credentials auth)',
},
}

def __init__(self, conn_id: str = default_conn_name) -> None:
super().__init__(sdk_client=ContainerInstanceManagementClient, conn_id=conn_id)
self.connection = self.get_conn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# under the License.
"""Hook for Azure Container Registry"""

from typing import Dict

from azure.mgmt.containerinstance.models import ImageRegistryCredential

from airflow.hooks.base import BaseHook
Expand All @@ -31,6 +33,28 @@ class AzureContainerRegistryHook(BaseHook):
:type conn_id: str
"""

conn_name_attr = 'azure_container_registry_conn_id'
default_conn_name = 'azure_container_registry_default'
conn_type = 'azure_container_registry'
hook_name = 'Azure Container Registry'

@staticmethod
def get_ui_field_behaviour() -> Dict:
"""Returns custom field behaviour"""
return {
"hidden_fields": ['schema', 'port', 'extra'],
"relabeling": {
'login': 'Registry Username',
'password': 'Registry Password',
'host': 'Registry Server',
},
"placeholders": {
'login': 'private registry username',
'password': 'private registry password',
'host': 'docker image registry server',
},
}

def __init__(self, conn_id: str = 'azure_registry') -> None:
super().__init__()
self.conn_id = conn_id
Expand Down
43 changes: 40 additions & 3 deletions airflow/providers/microsoft/azure/hooks/azure_cosmos.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
the default database and collection to use (see connection `azure_cosmos_default` for an example).
"""
import uuid
from typing import Optional
from typing import Any, Dict, Optional

from azure.cosmos.cosmos_client import CosmosClient
from azure.cosmos.errors import HTTPFailure
Expand All @@ -50,6 +50,39 @@ class AzureCosmosDBHook(BaseHook):
conn_type = 'azure_cosmos'
hook_name = 'Azure CosmosDB'

@staticmethod
def get_connection_form_widgets() -> Dict[str, Any]:
"""Returns connection widgets to add to connection form"""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import StringField

return {
"extra__azure_cosmos__database_name": StringField(
lazy_gettext('Cosmos Database Name (optional)'), widget=BS3TextFieldWidget()
),
"extra__azure_cosmos__collection_name": StringField(
lazy_gettext('Cosmos Collection Name (optional)'), widget=BS3TextFieldWidget()
),
}

@staticmethod
def get_ui_field_behaviour() -> Dict:
"""Returns custom field behaviour"""
return {
"hidden_fields": ['schema', 'port', 'host', 'extra'],
"relabeling": {
'login': 'Cosmos Endpoint URI',
'password': 'Cosmos Master Key Token',
},
"placeholders": {
'login': 'endpoint uri',
'password': 'master key',
'extra__azure_cosmos__database_name': 'database name',
'extra__azure_cosmos__collection_name': 'collection name',
},
}

def __init__(self, azure_cosmos_conn_id: str = default_conn_name) -> None:
super().__init__()
self.conn_id = azure_cosmos_conn_id
Expand All @@ -66,8 +99,12 @@ def get_conn(self) -> CosmosClient:
endpoint_uri = conn.login
master_key = conn.password

self.default_database_name = extras.get('database_name')
self.default_collection_name = extras.get('collection_name')
self.default_database_name = extras.get('database_name') or extras.get(
'extra__azure_cosmos__database_name'
)
self.default_collection_name = extras.get('collection_name') or extras.get(
'extra__azure_cosmos__collection_name'
)

# Initialize the Python Azure Cosmos DB client
self._conn = CosmosClient(endpoint_uri, {'masterKey': master_key})
Expand Down
Loading

0 comments on commit 1a85ba9

Please sign in to comment.