diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index ab06ebaa26803..7191bad5e0908 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 0.1.20 +- Allow using `requests.auth.AuthBase` as authenticators instead of custom CDK authenticators. +- Implement Oauth2Authenticator, MultipleTokenAuthenticator and TokenAuthenticator authenticators. +- Add support for both legacy and requests native authenticator to HttpStream class. + ## 0.1.19 No longer prints full config files on validation error to prevent exposing secrets to log file: https://github.com/airbytehq/airbyte/pull/5879 diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/core.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/core.py index 5db5cfea1f750..97bcc3b58645b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/core.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/core.py @@ -26,7 +26,10 @@ from abc import ABC, abstractmethod from typing import Any, Mapping +from deprecated import deprecated + +@deprecated(version="0.1.20", reason="Use requests.auth.AuthBase instead") class HttpAuthenticator(ABC): """ Base abstract class for various HTTP Authentication strategies. Authentication strategies are generally @@ -40,6 +43,7 @@ def get_auth_header(self) -> Mapping[str, Any]: """ +@deprecated(version="0.1.20", reason="Set `authenticator=None` instead") class NoAuth(HttpAuthenticator): def get_auth_header(self) -> Mapping[str, Any]: return {} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/oauth.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/oauth.py index d7799e25ab736..b76cf962ffb94 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/oauth.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/oauth.py @@ -27,10 +27,12 @@ import pendulum import requests +from deprecated import deprecated from .core import HttpAuthenticator +@deprecated(version="0.1.20", reason="Use airbyte_cdk.sources.streams.http.requests_native_auth.Oauth2Authenticator instead") class Oauth2Authenticator(HttpAuthenticator): """ Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/token.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/token.py index 294e19175d3e4..64da6c61f8e12 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/token.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/token.py @@ -26,9 +26,12 @@ from itertools import cycle from typing import Any, List, Mapping +from deprecated import deprecated + from .core import HttpAuthenticator +@deprecated(version="0.1.20", reason="Use airbyte_cdk.sources.streams.http.requests_native_auth.TokenAuthenticator instead") class TokenAuthenticator(HttpAuthenticator): def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "Authorization"): self.auth_method = auth_method @@ -39,6 +42,7 @@ def get_auth_header(self) -> Mapping[str, Any]: return {self.auth_header: f"{self.auth_method} {self._token}"} +@deprecated(version="0.1.20", reason="Use airbyte_cdk.sources.streams.http.requests_native_auth.MultipleTokenAuthenticator instead") class MultipleTokenAuthenticator(HttpAuthenticator): def __init__(self, tokens: List[str], auth_method: str = "Bearer", auth_header: str = "Authorization"): self.auth_method = auth_method diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index 83f1e00a06437..9d7575bd81540 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -29,6 +29,7 @@ import requests from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.core import Stream +from requests.auth import AuthBase from .auth.core import HttpAuthenticator, NoAuth from .exceptions import DefaultBackoffException, RequestBodyException, UserDefinedBackoffException @@ -46,10 +47,16 @@ class HttpStream(Stream, ABC): source_defined_cursor = True # Most HTTP streams use a source defined cursor (i.e: the user can't configure it like on a SQL table) page_size = None # Use this variable to define page size for API http requests with pagination support - def __init__(self, authenticator: HttpAuthenticator = NoAuth()): - self._authenticator = authenticator + # TODO: remove legacy HttpAuthenticator authenticator references + def __init__(self, authenticator: Union[AuthBase, HttpAuthenticator] = None): self._session = requests.Session() + self._authenticator = NoAuth() + if isinstance(authenticator, AuthBase): + self._session.auth = authenticator + elif authenticator: + self._authenticator = authenticator + @property @abstractmethod def url_base(self) -> str: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/__init__.py new file mode 100644 index 0000000000000..8b62c71c24da3 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/__init__.py @@ -0,0 +1,32 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + +from .oauth import Oauth2Authenticator +from .token import MultipleTokenAuthenticator, TokenAuthenticator + +__all__ = [ + "Oauth2Authenticator", + "TokenAuthenticator", + "MultipleTokenAuthenticator", +] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py new file mode 100644 index 0000000000000..ee90164a70e9e --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py @@ -0,0 +1,104 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +from typing import Any, List, Mapping, MutableMapping, Tuple + +import pendulum +import requests +from requests.auth import AuthBase + + +class Oauth2Authenticator(AuthBase): + """ + Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials. + The generated access token is attached to each request via the Authorization header. + """ + + def __init__( + self, + token_refresh_endpoint: str, + client_id: str, + client_secret: str, + refresh_token: str, + scopes: List[str] = None, + token_expiry_date: pendulum.datetime = None, + access_token_name: str = "access_token", + expires_in_name: str = "expires_in", + ): + self.token_refresh_endpoint = token_refresh_endpoint + self.client_secret = client_secret + self.client_id = client_id + self.refresh_token = refresh_token + self.scopes = scopes + self.access_token_name = access_token_name + self.expires_in_name = expires_in_name + + self._token_expiry_date = token_expiry_date or pendulum.now().subtract(days=1) + self._access_token = None + + def __call__(self, request): + request.headers.update(self.get_auth_header()) + return request + + def get_auth_header(self) -> Mapping[str, Any]: + return {"Authorization": f"Bearer {self.get_access_token()}"} + + def get_access_token(self): + if self.token_has_expired(): + t0 = pendulum.now() + token, expires_in = self.refresh_access_token() + self._access_token = token + self._token_expiry_date = t0.add(seconds=expires_in) + + return self._access_token + + def token_has_expired(self) -> bool: + return pendulum.now() > self._token_expiry_date + + def get_refresh_request_body(self) -> Mapping[str, Any]: + """Override to define additional parameters""" + payload: MutableMapping[str, Any] = { + "grant_type": "refresh_token", + "client_id": self.client_id, + "client_secret": self.client_secret, + "refresh_token": self.refresh_token, + } + + if self.scopes: + payload["scopes"] = self.scopes + + return payload + + def refresh_access_token(self) -> Tuple[str, int]: + """ + returns a tuple of (access_token, token_lifespan_in_seconds) + """ + try: + response = requests.request(method="POST", url=self.token_refresh_endpoint, data=self.get_refresh_request_body()) + response.raise_for_status() + response_json = response.json() + return response_json[self.access_token_name], response_json[self.expires_in_name] + except Exception as e: + raise Exception(f"Error while refreshing access token: {e}") from e diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/token.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/token.py new file mode 100644 index 0000000000000..925962993fba9 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/token.py @@ -0,0 +1,59 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + +from itertools import cycle +from typing import Any, List, Mapping + +from requests.auth import AuthBase + + +class MultipleTokenAuthenticator(AuthBase): + """ + Builds auth header, based on the list of tokens provided. + Auth header is changed per each `get_auth_header` call, using each token in cycle. + The token is attached to each request via the `auth_header` header. + """ + + def __init__(self, tokens: List[str], auth_method: str = "Bearer", auth_header: str = "Authorization"): + self.auth_method = auth_method + self.auth_header = auth_header + self._tokens = tokens + self._tokens_iter = cycle(self._tokens) + + def __call__(self, request): + request.headers.update(self.get_auth_header()) + return request + + def get_auth_header(self) -> Mapping[str, Any]: + return {self.auth_header: f"{self.auth_method} {next(self._tokens_iter)}"} + + +class TokenAuthenticator(MultipleTokenAuthenticator): + """ + Builds auth header, based on the token provided. + The token is attached to each request via the `auth_header` header. + """ + + def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "Authorization"): + super().__init__([token], auth_method, auth_header) diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 7e5223f33a268..3488ef6928528 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -35,7 +35,7 @@ setup( name="airbyte-cdk", - version="0.1.18", + version="0.1.20", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", @@ -71,6 +71,7 @@ "pydantic~=1.6", "PyYAML~=5.4", "requests", + "Deprecated~=1.2", ], python_requires=">=3.7.0", extras_require={"dev": ["MyPy~=0.812", "pytest", "pytest-cov", "pytest-mock", "requests-mock"]}, diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/requests_native_auth/test_requests_native_auth.py b/airbyte-cdk/python/unit_tests/sources/streams/http/requests_native_auth/test_requests_native_auth.py new file mode 100644 index 0000000000000..f1a88dadc585a --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/requests_native_auth/test_requests_native_auth.py @@ -0,0 +1,164 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +import logging + +import requests +from airbyte_cdk.sources.streams.http.requests_native_auth import MultipleTokenAuthenticator, Oauth2Authenticator, TokenAuthenticator +from requests import Response + +LOGGER = logging.getLogger(__name__) + + +def test_token_authenticator(): + """ + Should match passed in token, no matter how many times token is retrieved. + """ + token_auth = TokenAuthenticator(token="test-token") + header1 = token_auth.get_auth_header() + header2 = token_auth.get_auth_header() + + prepared_request = requests.PreparedRequest() + prepared_request.headers = {} + token_auth(prepared_request) + + assert {"Authorization": "Bearer test-token"} == prepared_request.headers + assert {"Authorization": "Bearer test-token"} == header1 + assert {"Authorization": "Bearer test-token"} == header2 + + +def test_multiple_token_authenticator(): + multiple_token_auth = MultipleTokenAuthenticator(tokens=["token1", "token2"]) + header1 = multiple_token_auth.get_auth_header() + header2 = multiple_token_auth.get_auth_header() + header3 = multiple_token_auth.get_auth_header() + + prepared_request = requests.PreparedRequest() + prepared_request.headers = {} + multiple_token_auth(prepared_request) + + assert {"Authorization": "Bearer token2"} == prepared_request.headers + assert {"Authorization": "Bearer token1"} == header1 + assert {"Authorization": "Bearer token2"} == header2 + assert {"Authorization": "Bearer token1"} == header3 + + +class TestOauth2Authenticator: + """ + Test class for OAuth2Authenticator. + """ + + refresh_endpoint = "refresh_end" + client_id = "client_id" + client_secret = "client_secret" + refresh_token = "refresh_token" + + def test_get_auth_header_fresh(self, mocker): + """ + Should not retrieve new token if current token is valid. + """ + oauth = Oauth2Authenticator( + token_refresh_endpoint=TestOauth2Authenticator.refresh_endpoint, + client_id=TestOauth2Authenticator.client_id, + client_secret=TestOauth2Authenticator.client_secret, + refresh_token=TestOauth2Authenticator.refresh_token, + ) + + mocker.patch.object(Oauth2Authenticator, "refresh_access_token", return_value=("access_token", 1000)) + header = oauth.get_auth_header() + assert {"Authorization": "Bearer access_token"} == header + + def test_get_auth_header_expired(self, mocker): + """ + Should retrieve new token if current token is expired. + """ + oauth = Oauth2Authenticator( + token_refresh_endpoint=TestOauth2Authenticator.refresh_endpoint, + client_id=TestOauth2Authenticator.client_id, + client_secret=TestOauth2Authenticator.client_secret, + refresh_token=TestOauth2Authenticator.refresh_token, + ) + + expire_immediately = 0 + mocker.patch.object(Oauth2Authenticator, "refresh_access_token", return_value=("access_token_1", expire_immediately)) + oauth.get_auth_header() # Set the first expired token. + + valid_100_secs = 100 + mocker.patch.object(Oauth2Authenticator, "refresh_access_token", return_value=("access_token_2", valid_100_secs)) + header = oauth.get_auth_header() + assert {"Authorization": "Bearer access_token_2"} == header + + def test_refresh_request_body(self): + """ + Request body should match given configuration. + """ + scopes = ["scope1", "scope2"] + oauth = Oauth2Authenticator( + token_refresh_endpoint=TestOauth2Authenticator.refresh_endpoint, + client_id=TestOauth2Authenticator.client_id, + client_secret=TestOauth2Authenticator.client_secret, + refresh_token=TestOauth2Authenticator.refresh_token, + scopes=scopes, + ) + body = oauth.get_refresh_request_body() + expected = { + "grant_type": "refresh_token", + "client_id": "client_id", + "client_secret": "client_secret", + "refresh_token": "refresh_token", + "scopes": scopes, + } + assert body == expected + + def test_refresh_access_token(self, mocker): + oauth = Oauth2Authenticator( + token_refresh_endpoint=TestOauth2Authenticator.refresh_endpoint, + client_id=TestOauth2Authenticator.client_id, + client_secret=TestOauth2Authenticator.client_secret, + refresh_token=TestOauth2Authenticator.refresh_token, + ) + resp = Response() + resp.status_code = 200 + + mocker.patch.object(requests, "request", return_value=resp) + mocker.patch.object(resp, "json", return_value={"access_token": "access_token", "expires_in": 1000}) + token = oauth.refresh_access_token() + + assert ("access_token", 1000) == token + + def test_auth_call_method(self, mocker): + oauth = Oauth2Authenticator( + token_refresh_endpoint=TestOauth2Authenticator.refresh_endpoint, + client_id=TestOauth2Authenticator.client_id, + client_secret=TestOauth2Authenticator.client_secret, + refresh_token=TestOauth2Authenticator.refresh_token, + ) + + mocker.patch.object(Oauth2Authenticator, "refresh_access_token", return_value=("access_token", 1000)) + prepared_request = requests.PreparedRequest() + prepared_request.headers = {} + oauth(prepared_request) + + assert {"Authorization": "Bearer access_token"} == prepared_request.headers diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py index 84a53835243d3..591e2cc8003ee 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py @@ -32,15 +32,18 @@ import requests from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.streams.http.auth import NoAuth +from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator as HttpTokenAuthenticator from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException, RequestBodyException, UserDefinedBackoffException +from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator class StubBasicReadHttpStream(HttpStream): url_base = "https://test_base_url.com" primary_key = "" - def __init__(self): - super().__init__() + def __init__(self, **kwargs): + super().__init__(**kwargs) self.resp_counter = 1 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: @@ -63,6 +66,24 @@ def parse_response( yield stubResp +def test_default_authenticator(): + stream = StubBasicReadHttpStream() + assert isinstance(stream.authenticator, NoAuth) + assert stream._session.auth is None + + +def test_requests_native_token_authenticator(): + stream = StubBasicReadHttpStream(authenticator=TokenAuthenticator("test-token")) + assert isinstance(stream.authenticator, NoAuth) + assert isinstance(stream._session.auth, TokenAuthenticator) + + +def test_http_token_authenticator(): + stream = StubBasicReadHttpStream(authenticator=HttpTokenAuthenticator("test-token")) + assert isinstance(stream.authenticator, HttpTokenAuthenticator) + assert stream._session.auth is None + + def test_request_kwargs_used(mocker, requests_mock): stream = StubBasicReadHttpStream() request_kwargs = {"cert": None, "proxies": "google.com"}