Skip to content

Commit

Permalink
🐛 Destination AWS Datalake: Enable STS Role Credential Refresh for Lo…
Browse files Browse the repository at this point in the history
…ng Syncs (#33853)

Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
  • Loading branch information
3 people committed Apr 29, 2024
1 parent 8d6f159 commit 3b5c44a
Show file tree
Hide file tree
Showing 5 changed files with 484 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@

import awswrangler as wr
import boto3
import botocore
import pandas as pd
from airbyte_cdk.destinations import Destination
from awswrangler import _data_types
from botocore.credentials import AssumeRoleCredentialFetcher, CredentialResolver, DeferredRefreshableCredentials, JSONFileCache
from botocore.exceptions import ClientError
from retrying import retry

Expand Down Expand Up @@ -64,6 +66,32 @@ def _cast_pandas_column(df: pd.DataFrame, col: str, current_type: str, desired_t
_data_types._cast_pandas_column = _cast_pandas_column


# This class created to support refreshing sts role assumption credentials for long running syncs
class AssumeRoleProvider(object):
METHOD = "assume-role"

def __init__(self, fetcher):
self._fetcher = fetcher

def load(self):
return DeferredRefreshableCredentials(self._fetcher.fetch_credentials, self.METHOD)

@staticmethod
def assume_role_refreshable(
session: botocore.session.Session, role_arn: str, duration: int = 3600, session_name: str = None
) -> botocore.session.Session:
fetcher = AssumeRoleCredentialFetcher(
session.create_client,
session.get_credentials(),
role_arn,
extra_args={"DurationSeconds": duration, "RoleSessionName": session_name},
cache=JSONFileCache(),
)
role_session = botocore.session.Session()
role_session.register_component("credential_provider", CredentialResolver([AssumeRoleProvider(fetcher)]))
return role_session


class AwsHandler:
def __init__(self, connector_config: ConnectorConfig, destination: Destination) -> None:
self._config: ConnectorConfig = connector_config
Expand All @@ -87,18 +115,10 @@ def create_session(self) -> None:
)

elif self._config.credentials_type == CredentialsType.IAM_ROLE:
client = boto3.client("sts")
role = client.assume_role(
RoleArn=self._config.role_arn,
RoleSessionName="airbyte-destination-aws-datalake",
)
creds = role.get("Credentials", {})
self._session = boto3.Session(
aws_access_key_id=creds.get("AccessKeyId"),
aws_secret_access_key=creds.get("SecretAccessKey"),
aws_session_token=creds.get("SessionToken"),
region_name=self._config.region,
botocore_session = AssumeRoleProvider.assume_role_refreshable(
session=botocore.session.Session(), role_arn=self._config.role_arn, session_name="airbyte-destination-aws-datalake"
)
self._session = boto3.session.Session(region_name=self._config.region, botocore_session=botocore_session)

def _get_s3_path(self, database: str, table: str) -> str:
bucket = f"s3://{self._config.bucket_name}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data:
definitionId: 99878c90-0fbd-46d3-9d98-ffde879d17fc
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:1.1.0@sha256:bd98f6505c6764b1b5f99d3aedc23dfc9e9af631a62533f60eb32b1d3dbab20c
dockerImageTag: 0.1.6
dockerImageTag: 0.1.7
dockerRepository: airbyte/destination-aws-datalake
githubIssueLabel: destination-aws-datalake
icon: awsdatalake.svg
Expand Down

0 comments on commit 3b5c44a

Please sign in to comment.