Skip to content

Commit

Permalink
馃帀 CDK: Add requests native authenticator support (#5731)
Browse files Browse the repository at this point in the history
* Add requests native auth class

* Update init file.
Update type annotations.
Bump version.

* Update TokenAuthenticator implementation.
Update Oauth2Authenticator implemetation.
Add CHANGELOG.md record.

* Update Oauth2Authenticator default value setting.
Update CHANGELOG.md

* Add requests native authenticator tests

* Add CDK requests native __call__ method tests.
Update CHANGELOG.md

* Add outdated auth deprication messages

* Update requests native auth __call__ method tests

* Bump CDK version to 0.1.20
  • Loading branch information
htrueman committed Sep 15, 2021
1 parent 278cb7d commit 4a0d364
Show file tree
Hide file tree
Showing 11 changed files with 408 additions and 5 deletions.
5 changes: 5 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
@@ -1,5 +1,10 @@
# Changelog

## 0.1.20
- Allow using `requests.auth.AuthBase` as authenticators instead of custom CDK authenticators.
- Implement Oauth2Authenticator, MultipleTokenAuthenticator and TokenAuthenticator authenticators.
- Add support for both legacy and requests native authenticator to HttpStream class.

## 0.1.19
No longer prints full config files on validation error to prevent exposing secrets to log file: https://github.com/airbytehq/airbyte/pull/5879

Expand Down
Expand Up @@ -26,7 +26,10 @@
from abc import ABC, abstractmethod
from typing import Any, Mapping

from deprecated import deprecated


@deprecated(version="0.1.20", reason="Use requests.auth.AuthBase instead")
class HttpAuthenticator(ABC):
"""
Base abstract class for various HTTP Authentication strategies. Authentication strategies are generally
Expand All @@ -40,6 +43,7 @@ def get_auth_header(self) -> Mapping[str, Any]:
"""


@deprecated(version="0.1.20", reason="Set `authenticator=None` instead")
class NoAuth(HttpAuthenticator):
def get_auth_header(self) -> Mapping[str, Any]:
return {}
Expand Up @@ -27,10 +27,12 @@

import pendulum
import requests
from deprecated import deprecated

from .core import HttpAuthenticator


@deprecated(version="0.1.20", reason="Use airbyte_cdk.sources.streams.http.requests_native_auth.Oauth2Authenticator instead")
class Oauth2Authenticator(HttpAuthenticator):
"""
Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials.
Expand Down
Expand Up @@ -26,9 +26,12 @@
from itertools import cycle
from typing import Any, List, Mapping

from deprecated import deprecated

from .core import HttpAuthenticator


@deprecated(version="0.1.20", reason="Use airbyte_cdk.sources.streams.http.requests_native_auth.TokenAuthenticator instead")
class TokenAuthenticator(HttpAuthenticator):
def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "Authorization"):
self.auth_method = auth_method
Expand All @@ -39,6 +42,7 @@ def get_auth_header(self) -> Mapping[str, Any]:
return {self.auth_header: f"{self.auth_method} {self._token}"}


@deprecated(version="0.1.20", reason="Use airbyte_cdk.sources.streams.http.requests_native_auth.MultipleTokenAuthenticator instead")
class MultipleTokenAuthenticator(HttpAuthenticator):
def __init__(self, tokens: List[str], auth_method: str = "Bearer", auth_header: str = "Authorization"):
self.auth_method = auth_method
Expand Down
11 changes: 9 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Expand Up @@ -29,6 +29,7 @@
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.core import Stream
from requests.auth import AuthBase

from .auth.core import HttpAuthenticator, NoAuth
from .exceptions import DefaultBackoffException, RequestBodyException, UserDefinedBackoffException
Expand All @@ -46,10 +47,16 @@ class HttpStream(Stream, ABC):
source_defined_cursor = True # Most HTTP streams use a source defined cursor (i.e: the user can't configure it like on a SQL table)
page_size = None # Use this variable to define page size for API http requests with pagination support

def __init__(self, authenticator: HttpAuthenticator = NoAuth()):
self._authenticator = authenticator
# TODO: remove legacy HttpAuthenticator authenticator references
def __init__(self, authenticator: Union[AuthBase, HttpAuthenticator] = None):
self._session = requests.Session()

self._authenticator = NoAuth()
if isinstance(authenticator, AuthBase):
self._session.auth = authenticator
elif authenticator:
self._authenticator = authenticator

@property
@abstractmethod
def url_base(self) -> str:
Expand Down
@@ -0,0 +1,32 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

from .oauth import Oauth2Authenticator
from .token import MultipleTokenAuthenticator, TokenAuthenticator

__all__ = [
"Oauth2Authenticator",
"TokenAuthenticator",
"MultipleTokenAuthenticator",
]
@@ -0,0 +1,104 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#


from typing import Any, List, Mapping, MutableMapping, Tuple

import pendulum
import requests
from requests.auth import AuthBase


class Oauth2Authenticator(AuthBase):
"""
Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials.
The generated access token is attached to each request via the Authorization header.
"""

def __init__(
self,
token_refresh_endpoint: str,
client_id: str,
client_secret: str,
refresh_token: str,
scopes: List[str] = None,
token_expiry_date: pendulum.datetime = None,
access_token_name: str = "access_token",
expires_in_name: str = "expires_in",
):
self.token_refresh_endpoint = token_refresh_endpoint
self.client_secret = client_secret
self.client_id = client_id
self.refresh_token = refresh_token
self.scopes = scopes
self.access_token_name = access_token_name
self.expires_in_name = expires_in_name

self._token_expiry_date = token_expiry_date or pendulum.now().subtract(days=1)
self._access_token = None

def __call__(self, request):
request.headers.update(self.get_auth_header())
return request

def get_auth_header(self) -> Mapping[str, Any]:
return {"Authorization": f"Bearer {self.get_access_token()}"}

def get_access_token(self):
if self.token_has_expired():
t0 = pendulum.now()
token, expires_in = self.refresh_access_token()
self._access_token = token
self._token_expiry_date = t0.add(seconds=expires_in)

return self._access_token

def token_has_expired(self) -> bool:
return pendulum.now() > self._token_expiry_date

def get_refresh_request_body(self) -> Mapping[str, Any]:
"""Override to define additional parameters"""
payload: MutableMapping[str, Any] = {
"grant_type": "refresh_token",
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token,
}

if self.scopes:
payload["scopes"] = self.scopes

return payload

def refresh_access_token(self) -> Tuple[str, int]:
"""
returns a tuple of (access_token, token_lifespan_in_seconds)
"""
try:
response = requests.request(method="POST", url=self.token_refresh_endpoint, data=self.get_refresh_request_body())
response.raise_for_status()
response_json = response.json()
return response_json[self.access_token_name], response_json[self.expires_in_name]
except Exception as e:
raise Exception(f"Error while refreshing access token: {e}") from e
@@ -0,0 +1,59 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

from itertools import cycle
from typing import Any, List, Mapping

from requests.auth import AuthBase


class MultipleTokenAuthenticator(AuthBase):
"""
Builds auth header, based on the list of tokens provided.
Auth header is changed per each `get_auth_header` call, using each token in cycle.
The token is attached to each request via the `auth_header` header.
"""

def __init__(self, tokens: List[str], auth_method: str = "Bearer", auth_header: str = "Authorization"):
self.auth_method = auth_method
self.auth_header = auth_header
self._tokens = tokens
self._tokens_iter = cycle(self._tokens)

def __call__(self, request):
request.headers.update(self.get_auth_header())
return request

def get_auth_header(self) -> Mapping[str, Any]:
return {self.auth_header: f"{self.auth_method} {next(self._tokens_iter)}"}


class TokenAuthenticator(MultipleTokenAuthenticator):
"""
Builds auth header, based on the token provided.
The token is attached to each request via the `auth_header` header.
"""

def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "Authorization"):
super().__init__([token], auth_method, auth_header)
3 changes: 2 additions & 1 deletion airbyte-cdk/python/setup.py
Expand Up @@ -35,7 +35,7 @@

setup(
name="airbyte-cdk",
version="0.1.18",
version="0.1.20",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down Expand Up @@ -71,6 +71,7 @@
"pydantic~=1.6",
"PyYAML~=5.4",
"requests",
"Deprecated~=1.2",
],
python_requires=">=3.7.0",
extras_require={"dev": ["MyPy~=0.812", "pytest", "pytest-cov", "pytest-mock", "requests-mock"]},
Expand Down

0 comments on commit 4a0d364

Please sign in to comment.