Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Destination AWS Datalake: Enable STS Role Credential Refresh for Long Syncs #33853

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@

import awswrangler as wr
import boto3
import botocore
import pandas as pd
from airbyte_cdk.destinations import Destination
from awswrangler import _data_types
from botocore.credentials import AssumeRoleCredentialFetcher, CredentialResolver, DeferredRefreshableCredentials, JSONFileCache
from botocore.exceptions import ClientError
from retrying import retry

Expand Down Expand Up @@ -64,6 +66,32 @@ def _cast_pandas_column(df: pd.DataFrame, col: str, current_type: str, desired_t
_data_types._cast_pandas_column = _cast_pandas_column


# This class created to support refreshing sts role assumption credentials for long running syncs
class AssumeRoleProvider(object):
METHOD = "assume-role"

def __init__(self, fetcher):
self._fetcher = fetcher

def load(self):
return DeferredRefreshableCredentials(self._fetcher.fetch_credentials, self.METHOD)

@staticmethod
def assume_role_refreshable(
session: botocore.session.Session, role_arn: str, duration: int = 3600, session_name: str = None
) -> botocore.session.Session:
fetcher = AssumeRoleCredentialFetcher(
session.create_client,
session.get_credentials(),
role_arn,
extra_args={"DurationSeconds": duration, "RoleSessionName": session_name},
cache=JSONFileCache(),
)
role_session = botocore.session.Session()
role_session.register_component("credential_provider", CredentialResolver([AssumeRoleProvider(fetcher)]))
return role_session


class AwsHandler:
def __init__(self, connector_config: ConnectorConfig, destination: Destination) -> None:
self._config: ConnectorConfig = connector_config
Expand All @@ -87,18 +115,10 @@ def create_session(self) -> None:
)

elif self._config.credentials_type == CredentialsType.IAM_ROLE:
client = boto3.client("sts")
role = client.assume_role(
RoleArn=self._config.role_arn,
RoleSessionName="airbyte-destination-aws-datalake",
)
creds = role.get("Credentials", {})
self._session = boto3.Session(
aws_access_key_id=creds.get("AccessKeyId"),
aws_secret_access_key=creds.get("SecretAccessKey"),
aws_session_token=creds.get("SessionToken"),
region_name=self._config.region,
botocore_session = AssumeRoleProvider.assume_role_refreshable(
session=botocore.session.Session(), role_arn=self._config.role_arn, session_name="airbyte-destination-aws-datalake"
)
self._session = boto3.session.Session(region_name=self._config.region, botocore_session=botocore_session)

def _get_s3_path(self, database: str, table: str) -> str:
bucket = f"s3://{self._config.bucket_name}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data:
definitionId: 99878c90-0fbd-46d3-9d98-ffde879d17fc
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:1.1.0@sha256:bd98f6505c6764b1b5f99d3aedc23dfc9e9af631a62533f60eb32b1d3dbab20c
dockerImageTag: 0.1.6
dockerImageTag: 0.1.7
dockerRepository: airbyte/destination-aws-datalake
githubIssueLabel: destination-aws-datalake
icon: awsdatalake.svg
Expand Down