Skip to content

Commit

Permalink
Allow and prefer non-prefixed extra fields for AsanaHook (#27043)
Browse files Browse the repository at this point in the history
From airflow version 2.3, extra prefixes are not required so we enable them here.
  • Loading branch information
dstandish committed Oct 22, 2022
1 parent a770e9c commit 81b5f50
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 44 deletions.
14 changes: 14 additions & 0 deletions airflow/providers/asana/CHANGELOG.rst
Expand Up @@ -23,6 +23,20 @@
Changelog
---------

3.0.0
.....

Breaking changes
~~~~~~~~~~~~~~~~

* This release of provider is only available for Airflow 2.3+ as explained in the Apache Airflow
providers support policy https://github.com/apache/airflow/blob/main/README.md#support-for-providers

Misc
~~~~

* In AsanaHook, non-prefixed extra fields are supported and are preferred. So if you should update your connection to replace ``extra__asana__workspace`` with ``workspace`` etc.

2.0.1
.....

Expand Down
55 changes: 49 additions & 6 deletions airflow/providers/asana/hooks/asana.py
Expand Up @@ -18,6 +18,7 @@
"""Connect to Asana."""
from __future__ import annotations

from functools import wraps
from typing import Any

from asana import Client # type: ignore[attr-defined]
Expand All @@ -27,6 +28,34 @@
from airflow.hooks.base import BaseHook


def _ensure_prefixes(conn_type):
"""
Remove when provider min airflow version >= 2.5.0 since this is handled by
provider manager from that version.
"""

def dec(func):
@wraps(func)
def inner():
field_behaviors = func()
conn_attrs = {'host', 'schema', 'login', 'password', 'port', 'extra'}

def _ensure_prefix(field):
if field not in conn_attrs and not field.startswith('extra__'):
return f"extra__{conn_type}__{field}"
else:
return field

if 'placeholders' in field_behaviors:
placeholders = field_behaviors['placeholders']
field_behaviors['placeholders'] = {_ensure_prefix(k): v for k, v in placeholders.items()}
return field_behaviors

return inner

return dec


class AsanaHook(BaseHook):
"""Wrapper around Asana Python client library."""

Expand All @@ -39,8 +68,21 @@ def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.connection = self.get_connection(conn_id)
extras = self.connection.extra_dejson
self.workspace = extras.get("extra__asana__workspace") or None
self.project = extras.get("extra__asana__project") or None
self.workspace = self._get_field(extras, "workspace") or None
self.project = self._get_field(extras, "project") or None

def _get_field(self, extras: dict, field_name: str):
"""Get field from extra, first checking short name, then for backcompat we check for prefixed name."""
backcompat_prefix = "extra__asana__"
if field_name.startswith('extra__'):
raise ValueError(
f"Got prefixed name {field_name}; please remove the '{backcompat_prefix}' prefix "
"when using this method."
)
if field_name in extras:
return extras[field_name] or None
prefixed_name = f"{backcompat_prefix}{field_name}"
return extras.get(prefixed_name) or None

def get_conn(self) -> Client:
return self.client
Expand All @@ -53,20 +95,21 @@ def get_connection_form_widgets() -> dict[str, Any]:
from wtforms import StringField

return {
"extra__asana__workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()),
"extra__asana__project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()),
"workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()),
"project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()),
}

@staticmethod
@_ensure_prefixes(conn_type='asana')
def get_ui_field_behaviour() -> dict[str, Any]:
"""Returns custom field behaviour"""
return {
"hidden_fields": ["port", "host", "login", "schema"],
"relabeling": {},
"placeholders": {
"password": "Asana personal access token",
"extra__asana__workspace": "Asana workspace gid",
"extra__asana__project": "Asana project gid",
"workspace": "Asana workspace gid",
"project": "Asana project gid",
},
}

Expand Down
109 changes: 71 additions & 38 deletions tests/providers/asana/hooks/test_asana.py
Expand Up @@ -16,16 +16,19 @@
# under the License.
from __future__ import annotations

import unittest
import os
from unittest.mock import patch

import pytest
from asana import Client
from pytest import param

from airflow.models import Connection
from airflow.providers.asana.hooks.asana import AsanaHook
from tests.test_utils.providers import get_provider_min_airflow_version, object_exists


class TestAsanaHook(unittest.TestCase):
class TestAsanaHook:
"""
Tests for AsanaHook Asana client retrieval
"""
Expand All @@ -40,7 +43,7 @@ def test_asana_client_retrieved(self):
):
hook = AsanaHook()
client = hook.get_conn()
self.assertEqual(type(client), Client)
assert type(client) == Client

def test_missing_password_raises(self):
"""
Expand All @@ -49,7 +52,7 @@ def test_missing_password_raises(self):
"""
with patch.object(AsanaHook, "get_connection", return_value=Connection(conn_type="asana")):
hook = AsanaHook()
with self.assertRaises(ValueError):
with pytest.raises(ValueError):
hook.get_conn()

def test_merge_create_task_parameters_default_project(self):
Expand All @@ -62,7 +65,7 @@ def test_merge_create_task_parameters_default_project(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"name": "test", "projects": ["1"]}
self.assertEqual(expected_merged_params, hook._merge_create_task_parameters("test", {}))
assert hook._merge_create_task_parameters("test", {}) == expected_merged_params

def test_merge_create_task_parameters_specified_project(self):
"""
Expand All @@ -74,10 +77,7 @@ def test_merge_create_task_parameters_specified_project(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"name": "test", "projects": ["1", "2"]}
self.assertEqual(
expected_merged_params,
hook._merge_create_task_parameters("test", {"projects": ["1", "2"]}),
)
assert hook._merge_create_task_parameters("test", {"projects": ["1", "2"]}) == expected_merged_params

def test_merge_create_task_parameters_specified_workspace(self):
"""
Expand All @@ -89,7 +89,7 @@ def test_merge_create_task_parameters_specified_workspace(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"name": "test", "workspace": "1"}
self.assertEqual(expected_merged_params, hook._merge_create_task_parameters("test", {}))
assert hook._merge_create_task_parameters("test", {}) == expected_merged_params

def test_merge_create_task_parameters_default_project_overrides_default_workspace(self):
"""
Expand All @@ -105,7 +105,7 @@ def test_merge_create_task_parameters_default_project_overrides_default_workspac
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"name": "test", "projects": ["1"]}
self.assertEqual(expected_merged_params, hook._merge_create_task_parameters("test", {}))
assert hook._merge_create_task_parameters("test", {}) == expected_merged_params

def test_merge_create_task_parameters_specified_project_overrides_default_workspace(self):
"""
Expand All @@ -121,10 +121,7 @@ def test_merge_create_task_parameters_specified_project_overrides_default_worksp
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"name": "test", "projects": ["2"]}
self.assertEqual(
expected_merged_params,
hook._merge_create_task_parameters("test", {"projects": ["2"]}),
)
assert hook._merge_create_task_parameters("test", {"projects": ["2"]}) == expected_merged_params

def test_merge_find_task_parameters_default_project(self):
"""
Expand All @@ -136,7 +133,7 @@ def test_merge_find_task_parameters_default_project(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"project": "1"}
self.assertEqual(expected_merged_params, hook._merge_find_task_parameters({}))
assert hook._merge_find_task_parameters({}) == expected_merged_params

def test_merge_find_task_parameters_specified_project(self):
"""
Expand All @@ -148,10 +145,7 @@ def test_merge_find_task_parameters_specified_project(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"project": "2"}
self.assertEqual(
expected_merged_params,
hook._merge_find_task_parameters({"project": "2"}),
)
assert hook._merge_find_task_parameters({"project": "2"}) == expected_merged_params

def test_merge_find_task_parameters_default_workspace(self):
"""
Expand All @@ -163,10 +157,7 @@ def test_merge_find_task_parameters_default_workspace(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"workspace": "1", "assignee": "1"}
self.assertEqual(
expected_merged_params,
hook._merge_find_task_parameters({"assignee": "1"}),
)
assert hook._merge_find_task_parameters({"assignee": "1"}) == expected_merged_params

def test_merge_find_task_parameters_specified_workspace(self):
"""
Expand All @@ -178,10 +169,7 @@ def test_merge_find_task_parameters_specified_workspace(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"workspace": "2", "assignee": "1"}
self.assertEqual(
expected_merged_params,
hook._merge_find_task_parameters({"workspace": "2", "assignee": "1"}),
)
assert hook._merge_find_task_parameters({"workspace": "2", "assignee": "1"}) == expected_merged_params

def test_merge_find_task_parameters_default_project_overrides_workspace(self):
"""
Expand All @@ -196,7 +184,7 @@ def test_merge_find_task_parameters_default_project_overrides_workspace(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"project": "1"}
self.assertEqual(expected_merged_params, hook._merge_find_task_parameters({}))
assert hook._merge_find_task_parameters({}) == expected_merged_params

def test_merge_find_task_parameters_specified_project_overrides_workspace(self):
"""
Expand All @@ -212,10 +200,7 @@ def test_merge_find_task_parameters_specified_project_overrides_workspace(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"project": "2"}
self.assertEqual(
expected_merged_params,
hook._merge_find_task_parameters({"project": "2"}),
)
assert hook._merge_find_task_parameters({"project": "2"}) == expected_merged_params

def test_merge_project_parameters(self):
"""
Expand All @@ -226,7 +211,7 @@ def test_merge_project_parameters(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"workspace": "1", "name": "name"}
self.assertEqual(expected_merged_params, hook._merge_project_parameters({"name": "name"}))
assert hook._merge_project_parameters({"name": "name"}) == expected_merged_params

def test_merge_project_parameters_override(self):
"""
Expand All @@ -237,7 +222,55 @@ def test_merge_project_parameters_override(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"workspace": "2"}
self.assertEqual(
expected_merged_params,
hook._merge_project_parameters({"workspace": "2"}),
)
assert hook._merge_project_parameters({"workspace": "2"}) == expected_merged_params

def test__ensure_prefixes_removal(self):
"""Ensure that _ensure_prefixes is removed from snowflake when airflow min version >= 2.5.0."""
path = 'airflow.providers.asana.hooks.asana._ensure_prefixes'
if not object_exists(path):
raise Exception(
"You must remove this test. It only exists to "
"remind us to remove decorator `_ensure_prefixes`."
)

if get_provider_min_airflow_version('apache-airflow-providers-asana') >= (2, 5):
raise Exception(
"You must now remove `_ensure_prefixes` from AsanaHook."
" The functionality is now taken care of by providers manager."
)

def test__ensure_prefixes(self):
"""
Check that ensure_prefixes decorator working properly
Note: remove this test when removing ensure_prefixes (after min airflow version >= 2.5.0
"""
assert list(AsanaHook.get_ui_field_behaviour()['placeholders'].keys()) == [
'password',
'extra__asana__workspace',
'extra__asana__project',
]

@pytest.mark.parametrize(
'uri',
[
param(
'a://?extra__asana__workspace=abc&extra__asana__project=abc',
id='prefix',
),
param('a://?workspace=abc&project=abc', id='no-prefix'),
],
)
def test_backcompat_prefix_works(self, uri):
with patch.dict(os.environ, {"AIRFLOW_CONN_MY_CONN": uri}):
hook = AsanaHook('my_conn')
assert hook.workspace == 'abc'
assert hook.project == 'abc'

def test_backcompat_prefix_both_prefers_short(self):
with patch.dict(
os.environ,
{"AIRFLOW_CONN_MY_CONN": 'a://?workspace=non-prefixed&extra__asana__workspace=prefixed'},
):
hook = AsanaHook('my_conn')
assert hook.workspace == 'non-prefixed'

0 comments on commit 81b5f50

Please sign in to comment.