Skip to content

Commit

Permalink
Fix Aws base async hook on connection details
Browse files Browse the repository at this point in the history
  • Loading branch information
bharanidharan14 committed Nov 15, 2022
1 parent 3de8b00 commit 40bc84f
Showing 1 changed file with 12 additions and 40 deletions.
52 changes: 12 additions & 40 deletions astronomer/providers/amazon/aws/hooks/base_aws.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from aiobotocore.client import AioBaseClient
from aiobotocore.session import get_session
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook, _parse_s3_config
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.utils.connection_wrapper import AwsConnectionWrapper
from asgiref.sync import sync_to_async


Expand Down Expand Up @@ -34,50 +35,21 @@ async def get_client_async(self) -> AioBaseClient:
"""Create an Async Client object to communicate with AWS services."""
# Fetch the Airflow connection object
connection_object = await sync_to_async(self.get_connection)(self.aws_conn_id)
extra_config = connection_object.extra_dejson

aws_access_key_id = None
aws_secret_access_key = None
aws_session_token = None
if connection_object.login:
aws_access_key_id = connection_object.login
aws_secret_access_key = connection_object.password
self.log.info("Credentials retrieved from login")
elif "aws_access_key_id" in extra_config and "aws_secret_access_key" in extra_config:
aws_access_key_id = extra_config["aws_access_key_id"]
aws_secret_access_key = extra_config["aws_secret_access_key"]
aws_session_token = extra_config.get("aws_session_token")
self.log.info("Credentials retrieved from extra_config")
elif "s3_config_file" in extra_config:
aws_access_key_id, aws_secret_access_key = await sync_to_async(
_parse_s3_config
)( # pragma: no cover
extra_config["s3_config_file"],
extra_config.get("s3_config_format"),
extra_config.get("profile"),
)
self.log.info("Credentials retrieved from extra_config['s3_config_file']") # pragma: no cover
else:
self.log.info("No credentials retrieved from Connection")

region_name = self.region_name
if self.region_name is None and "region_name" in extra_config:
self.log.info("Retrieving region_name from Connection.extra_config['region_name']")
region_name = extra_config["region_name"] # pragma: no cover

if "aws_session_token" in extra_config:
self.log.info(
"session token retrieved from extra, please note you are responsible for renewing these.",
)
aws_session_token = extra_config.get("aws_session_token")
conn_config = AwsConnectionWrapper(
conn=connection_object,
region_name=self.region_name,
botocore_config=self.config,
verify=self.verify,
)

async_connection = get_session()
return async_connection.create_client(
service_name=self.client_type,
region_name=region_name,
aws_secret_access_key=aws_secret_access_key,
aws_access_key_id=aws_access_key_id,
aws_session_token=aws_session_token,
region_name=conn_config.region_name,
aws_secret_access_key=conn_config.aws_secret_access_key,
aws_access_key_id=conn_config.aws_access_key_id,
aws_session_token=conn_config.aws_session_token,
verify=self.verify,
config=self.config,
)

0 comments on commit 40bc84f

Please sign in to comment.