/
base_aws.py
627 lines (549 loc) · 26.8 KB
/
base_aws.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains Base AWS Hook.
.. seealso::
For more information on how to use this hook, take a look at the guide:
:ref:`howto/connection:AWSHook`
"""
import configparser
import datetime
import logging
import warnings
from functools import wraps
from typing import Any, Callable, Dict, Optional, Tuple, Union
import boto3
import botocore
import botocore.session
import requests
import tenacity
from botocore.config import Config
from botocore.credentials import ReadOnlyCredentials
from slugify import slugify
try:
from functools import cached_property
except ImportError:
from cached_property import cached_property
from dateutil.tz import tzlocal
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.models.connection import Connection
from airflow.utils.log.logging_mixin import LoggingMixin
class _SessionFactory(LoggingMixin):
def __init__(self, conn: Connection, region_name: Optional[str], config: Config) -> None:
super().__init__()
self.conn = conn
self.region_name = region_name
self.config = config
self.extra_config = self.conn.extra_dejson
self.basic_session = None
self.role_arn = None
def create_session(self) -> boto3.session.Session:
"""Create AWS session."""
session_kwargs = {}
if "session_kwargs" in self.extra_config:
self.log.info(
"Retrieving session_kwargs from Connection.extra_config['session_kwargs']: %s",
self.extra_config["session_kwargs"],
)
session_kwargs = self.extra_config["session_kwargs"]
self.basic_session = self._create_basic_session(session_kwargs=session_kwargs)
self.role_arn = self._read_role_arn_from_extra_config()
# If role_arn was specified then STS + assume_role
if self.role_arn is None:
return self.basic_session
return self._create_session_with_assume_role(session_kwargs=session_kwargs)
def _create_basic_session(self, session_kwargs: Dict[str, Any]) -> boto3.session.Session:
aws_access_key_id, aws_secret_access_key = self._read_credentials_from_connection()
aws_session_token = self.extra_config.get("aws_session_token")
region_name = self.region_name
if self.region_name is None and 'region_name' in self.extra_config:
self.log.info("Retrieving region_name from Connection.extra_config['region_name']")
region_name = self.extra_config["region_name"]
self.log.info(
"Creating session with aws_access_key_id=%s region_name=%s",
aws_access_key_id,
region_name,
)
return boto3.session.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name,
aws_session_token=aws_session_token,
**session_kwargs,
)
def _create_session_with_assume_role(self, session_kwargs: Dict[str, Any]) -> boto3.session.Session:
assume_role_method = self.extra_config.get('assume_role_method', 'assume_role')
self.log.info("assume_role_method=%s", assume_role_method)
supported_methods = ['assume_role', 'assume_role_with_saml', 'assume_role_with_web_identity']
if assume_role_method not in supported_methods:
raise NotImplementedError(
f'assume_role_method={assume_role_method} in Connection {self.conn.conn_id} Extra.'
f'Currently {supported_methods} are supported.'
'(Exclude this setting will default to "assume_role").'
)
if assume_role_method == 'assume_role_with_web_identity':
# Deferred credentials have no initial credentials
credential_fetcher = self._get_web_identity_credential_fetcher()
credentials = botocore.credentials.DeferredRefreshableCredentials(
method='assume-role-with-web-identity',
refresh_using=credential_fetcher.fetch_credentials,
time_fetcher=lambda: datetime.datetime.now(tz=tzlocal()),
)
else:
# Refreshable credentials do have initial credentials
credentials = botocore.credentials.RefreshableCredentials.create_from_metadata(
metadata=self._refresh_credentials(),
refresh_using=self._refresh_credentials,
method="sts-assume-role",
)
session = botocore.session.get_session()
session._credentials = credentials # pylint: disable=protected-access
region_name = self.basic_session.region_name
session.set_config_variable("region", region_name)
return boto3.session.Session(botocore_session=session, **session_kwargs)
def _refresh_credentials(self) -> Dict[str, Any]:
self.log.info('Refreshing credentials')
assume_role_method = self.extra_config.get('assume_role_method', 'assume_role')
sts_session = self.basic_session
if assume_role_method == 'assume_role':
sts_client = sts_session.client("sts", config=self.config)
sts_response = self._assume_role(sts_client=sts_client)
elif assume_role_method == 'assume_role_with_saml':
sts_client = sts_session.client("sts", config=self.config)
sts_response = self._assume_role_with_saml(sts_client=sts_client)
else:
raise NotImplementedError(f'assume_role_method={assume_role_method} not expected')
sts_response_http_status = sts_response['ResponseMetadata']['HTTPStatusCode']
if not sts_response_http_status == 200:
raise Exception(f'sts_response_http_status={sts_response_http_status}')
credentials = sts_response['Credentials']
expiry_time = credentials.get('Expiration').isoformat()
self.log.info(f'New credentials expiry_time:{expiry_time}')
credentials = {
"access_key": credentials.get("AccessKeyId"),
"secret_key": credentials.get("SecretAccessKey"),
"token": credentials.get("SessionToken"),
"expiry_time": expiry_time,
}
return credentials
def _read_role_arn_from_extra_config(self) -> Optional[str]:
aws_account_id = self.extra_config.get("aws_account_id")
aws_iam_role = self.extra_config.get("aws_iam_role")
role_arn = self.extra_config.get("role_arn")
if role_arn is None and aws_account_id is not None and aws_iam_role is not None:
self.log.info("Constructing role_arn from aws_account_id and aws_iam_role")
role_arn = f"arn:aws:iam::{aws_account_id}:role/{aws_iam_role}"
self.log.info("role_arn is %s", role_arn)
return role_arn
def _read_credentials_from_connection(self) -> Tuple[Optional[str], Optional[str]]:
aws_access_key_id = None
aws_secret_access_key = None
if self.conn.login:
aws_access_key_id = self.conn.login
aws_secret_access_key = self.conn.password
self.log.info("Credentials retrieved from login")
elif "aws_access_key_id" in self.extra_config and "aws_secret_access_key" in self.extra_config:
aws_access_key_id = self.extra_config["aws_access_key_id"]
aws_secret_access_key = self.extra_config["aws_secret_access_key"]
self.log.info("Credentials retrieved from extra_config")
elif "s3_config_file" in self.extra_config:
aws_access_key_id, aws_secret_access_key = _parse_s3_config(
self.extra_config["s3_config_file"],
self.extra_config.get("s3_config_format"),
self.extra_config.get("profile"),
)
self.log.info("Credentials retrieved from extra_config['s3_config_file']")
else:
self.log.info("No credentials retrieved from Connection")
return aws_access_key_id, aws_secret_access_key
def _strip_invalid_session_name_characters(self, role_session_name: str) -> str:
return slugify(role_session_name, regex_pattern=r'[^\w+=,.@-]+')
def _assume_role(self, sts_client: boto3.client) -> Dict:
assume_role_kwargs = self.extra_config.get("assume_role_kwargs", {})
if "external_id" in self.extra_config: # Backwards compatibility
assume_role_kwargs["ExternalId"] = self.extra_config.get("external_id")
role_session_name = self._strip_invalid_session_name_characters(f"Airflow_{self.conn.conn_id}")
self.log.info(
"Doing sts_client.assume_role to role_arn=%s (role_session_name=%s)",
self.role_arn,
role_session_name,
)
return sts_client.assume_role(
RoleArn=self.role_arn, RoleSessionName=role_session_name, **assume_role_kwargs
)
def _assume_role_with_saml(self, sts_client: boto3.client) -> Dict[str, Any]:
saml_config = self.extra_config['assume_role_with_saml']
principal_arn = saml_config['principal_arn']
idp_auth_method = saml_config['idp_auth_method']
if idp_auth_method == 'http_spegno_auth':
saml_assertion = self._fetch_saml_assertion_using_http_spegno_auth(saml_config)
else:
raise NotImplementedError(
f'idp_auth_method={idp_auth_method} in Connection {self.conn.conn_id} Extra.'
'Currently only "http_spegno_auth" is supported, and must be specified.'
)
self.log.info("Doing sts_client.assume_role_with_saml to role_arn=%s", self.role_arn)
assume_role_kwargs = self.extra_config.get("assume_role_kwargs", {})
return sts_client.assume_role_with_saml(
RoleArn=self.role_arn,
PrincipalArn=principal_arn,
SAMLAssertion=saml_assertion,
**assume_role_kwargs,
)
def _get_idp_response(
self, saml_config: Dict[str, Any], auth: requests.auth.AuthBase
) -> requests.models.Response:
idp_url = saml_config["idp_url"]
self.log.info("idp_url= %s", idp_url)
session = requests.Session()
# Configurable Retry when querying the IDP endpoint
if "idp_request_retry_kwargs" in saml_config:
idp_request_retry_kwargs = saml_config["idp_request_retry_kwargs"]
self.log.info("idp_request_retry_kwargs= %s", idp_request_retry_kwargs)
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
retry_strategy = Retry(**idp_request_retry_kwargs)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
idp_request_kwargs = {}
if "idp_request_kwargs" in saml_config:
idp_request_kwargs = saml_config["idp_request_kwargs"]
idp_response = session.get(idp_url, auth=auth, **idp_request_kwargs)
idp_response.raise_for_status()
return idp_response
def _fetch_saml_assertion_using_http_spegno_auth(self, saml_config: Dict[str, Any]) -> str:
# requests_gssapi will need paramiko > 2.6 since you'll need
# 'gssapi' not 'python-gssapi' from PyPi.
# https://github.com/paramiko/paramiko/pull/1311
import requests_gssapi
from lxml import etree
auth = requests_gssapi.HTTPSPNEGOAuth()
if 'mutual_authentication' in saml_config:
mutual_auth = saml_config['mutual_authentication']
if mutual_auth == 'REQUIRED':
auth = requests_gssapi.HTTPSPNEGOAuth(requests_gssapi.REQUIRED)
elif mutual_auth == 'OPTIONAL':
auth = requests_gssapi.HTTPSPNEGOAuth(requests_gssapi.OPTIONAL)
elif mutual_auth == 'DISABLED':
auth = requests_gssapi.HTTPSPNEGOAuth(requests_gssapi.DISABLED)
else:
raise NotImplementedError(
f'mutual_authentication={mutual_auth} in Connection {self.conn.conn_id} Extra.'
'Currently "REQUIRED", "OPTIONAL" and "DISABLED" are supported.'
'(Exclude this setting will default to HTTPSPNEGOAuth() ).'
)
# Query the IDP
idp_response = self._get_idp_response(saml_config, auth=auth)
# Assist with debugging. Note: contains sensitive info!
xpath = saml_config['saml_response_xpath']
log_idp_response = 'log_idp_response' in saml_config and saml_config['log_idp_response']
if log_idp_response:
self.log.warning(
'The IDP response contains sensitive information, but log_idp_response is ON (%s).',
log_idp_response,
)
self.log.info('idp_response.content= %s', idp_response.content)
self.log.info('xpath= %s', xpath)
# Extract SAML Assertion from the returned HTML / XML
xml = etree.fromstring(idp_response.content)
saml_assertion = xml.xpath(xpath)
if isinstance(saml_assertion, list):
if len(saml_assertion) == 1:
saml_assertion = saml_assertion[0]
if not saml_assertion:
raise ValueError('Invalid SAML Assertion')
return saml_assertion
def _get_web_identity_credential_fetcher(
self,
) -> botocore.credentials.AssumeRoleWithWebIdentityCredentialFetcher:
base_session = self.basic_session._session or botocore.session.get_session()
client_creator = base_session.create_client
federation = self.extra_config.get('assume_role_with_web_identity_federation')
if federation == 'google':
web_identity_token_loader = self._get_google_identity_token_loader()
else:
raise AirflowException(
f'Unsupported federation: {federation}. Currently "google" only are supported.'
)
assume_role_kwargs = self.extra_config.get("assume_role_kwargs", {})
return botocore.credentials.AssumeRoleWithWebIdentityCredentialFetcher(
client_creator=client_creator,
web_identity_token_loader=web_identity_token_loader,
role_arn=self.role_arn,
extra_args=assume_role_kwargs,
)
def _get_google_identity_token_loader(self):
from google.auth.transport import requests as requests_transport
from airflow.providers.google.common.utils.id_token_credentials import (
get_default_id_token_credentials,
)
audience = self.extra_config.get('assume_role_with_web_identity_federation_audience')
google_id_token_credentials = get_default_id_token_credentials(target_audience=audience)
def web_identity_token_loader():
if not google_id_token_credentials.valid:
request_adapter = requests_transport.Request()
google_id_token_credentials.refresh(request=request_adapter)
return google_id_token_credentials.token
return web_identity_token_loader
class AwsBaseHook(BaseHook):
"""
Interact with AWS.
This class is a thin wrapper around the boto3 python library.
:param aws_conn_id: The Airflow connection used for AWS credentials.
If this is None or empty then the default boto3 behaviour is used. If
running Airflow in a distributed manner and aws_conn_id is None or
empty, then default boto3 configuration would be used (and must be
maintained on each worker node).
:type aws_conn_id: str
:param verify: Whether or not to verify SSL certificates.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
:type verify: Union[bool, str, None]
:param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
:type region_name: Optional[str]
:param client_type: boto3.client client_type. Eg 's3', 'emr' etc
:type client_type: Optional[str]
:param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
:type resource_type: Optional[str]
:param config: Configuration for botocore client.
(https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html)
:type config: Optional[botocore.client.Config]
"""
conn_name_attr = 'aws_conn_id'
default_conn_name = 'aws_default'
conn_type = 'aws'
hook_name = 'Amazon Web Services'
def __init__(
self,
aws_conn_id: Optional[str] = default_conn_name,
verify: Union[bool, str, None] = None,
region_name: Optional[str] = None,
client_type: Optional[str] = None,
resource_type: Optional[str] = None,
config: Optional[Config] = None,
) -> None:
super().__init__()
self.aws_conn_id = aws_conn_id
self.verify = verify
self.client_type = client_type
self.resource_type = resource_type
self.region_name = region_name
self.config = config
if not (self.client_type or self.resource_type):
raise AirflowException('Either client_type or resource_type must be provided.')
def _get_credentials(self, region_name: Optional[str]) -> Tuple[boto3.session.Session, Optional[str]]:
if not self.aws_conn_id:
session = boto3.session.Session(region_name=region_name)
return session, None
self.log.info("Airflow Connection: aws_conn_id=%s", self.aws_conn_id)
try:
# Fetch the Airflow connection object
connection_object = self.get_connection(self.aws_conn_id)
extra_config = connection_object.extra_dejson
endpoint_url = extra_config.get("host")
# https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html#botocore.config.Config
if "config_kwargs" in extra_config:
self.log.info(
"Retrieving config_kwargs from Connection.extra_config['config_kwargs']: %s",
extra_config["config_kwargs"],
)
self.config = Config(**extra_config["config_kwargs"])
session = _SessionFactory(
conn=connection_object, region_name=region_name, config=self.config
).create_session()
return session, endpoint_url
except AirflowException:
self.log.warning("Unable to use Airflow Connection for credentials.")
self.log.info("Fallback on boto3 credential strategy")
# http://boto3.readthedocs.io/en/latest/guide/configuration.html
self.log.info(
"Creating session using boto3 credential strategy region_name=%s",
region_name,
)
session = boto3.session.Session(region_name=region_name)
return session, None
def get_client_type(
self,
client_type: Optional[str] = None,
region_name: Optional[str] = None,
config: Optional[Config] = None,
) -> boto3.client:
"""Get the underlying boto3 client using boto3 session"""
session, endpoint_url = self._get_credentials(region_name)
if client_type:
warnings.warn(
"client_type is deprecated. Set client_type from class attribute.",
DeprecationWarning,
stacklevel=2,
)
else:
client_type = self.client_type
# No AWS Operators use the config argument to this method.
# Keep backward compatibility with other users who might use it
if config is None:
config = self.config
return session.client(client_type, endpoint_url=endpoint_url, config=config, verify=self.verify)
def get_resource_type(
self,
resource_type: Optional[str] = None,
region_name: Optional[str] = None,
config: Optional[Config] = None,
) -> boto3.resource:
"""Get the underlying boto3 resource using boto3 session"""
session, endpoint_url = self._get_credentials(region_name)
if resource_type:
warnings.warn(
"resource_type is deprecated. Set resource_type from class attribute.",
DeprecationWarning,
stacklevel=2,
)
else:
resource_type = self.resource_type
# No AWS Operators use the config argument to this method.
# Keep backward compatibility with other users who might use it
if config is None:
config = self.config
return session.resource(resource_type, endpoint_url=endpoint_url, config=config, verify=self.verify)
@cached_property
def conn(self) -> Union[boto3.client, boto3.resource]:
"""
Get the underlying boto3 client/resource (cached)
:return: boto3.client or boto3.resource
:rtype: Union[boto3.client, boto3.resource]
"""
if self.client_type:
return self.get_client_type(self.client_type, region_name=self.region_name)
elif self.resource_type:
return self.get_resource_type(self.resource_type, region_name=self.region_name)
else:
# Rare possibility - subclasses have not specified a client_type or resource_type
raise NotImplementedError('Could not get boto3 connection!')
def get_conn(self) -> Union[boto3.client, boto3.resource]:
"""
Get the underlying boto3 client/resource (cached)
Implemented so that caching works as intended. It exists for compatibility
with subclasses that rely on a super().get_conn() method.
:return: boto3.client or boto3.resource
:rtype: Union[boto3.client, boto3.resource]
"""
# Compat shim
return self.conn
def get_session(self, region_name: Optional[str] = None) -> boto3.session.Session:
"""Get the underlying boto3.session."""
session, _ = self._get_credentials(region_name)
return session
def get_credentials(self, region_name: Optional[str] = None) -> ReadOnlyCredentials:
"""
Get the underlying `botocore.Credentials` object.
This contains the following authentication attributes: access_key, secret_key and token.
"""
session, _ = self._get_credentials(region_name)
# Credentials are refreshable, so accessing your access key and
# secret key separately can lead to a race condition.
# See https://stackoverflow.com/a/36291428/8283373
return session.get_credentials().get_frozen_credentials()
def expand_role(self, role: str) -> str:
"""
If the IAM role is a role name, get the Amazon Resource Name (ARN) for the role.
If IAM role is already an IAM role ARN, no change is made.
:param role: IAM role name or ARN
:return: IAM role ARN
"""
if "/" in role:
return role
else:
return self.get_client_type("iam").get_role(RoleName=role)["Role"]["Arn"]
@staticmethod
def retry(should_retry: Callable[[Exception], bool]):
"""
A decorator that provides a mechanism to repeat requests in response to exceeding a temporary quote
limit.
"""
def retry_decorator(fun: Callable):
@wraps(fun)
def decorator_f(self, *args, **kwargs):
retry_args = getattr(self, 'retry_args', None)
if retry_args is None:
return fun(self, *args, **kwargs)
multiplier = retry_args.get('multiplier', 1)
min_limit = retry_args.get('min', 1)
max_limit = retry_args.get('max', 1)
stop_after_delay = retry_args.get('stop_after_delay', 10)
tenacity_logger = tenacity.before_log(self.log, logging.DEBUG) if self.log else None
default_kwargs = {
'wait': tenacity.wait_exponential(multiplier=multiplier, max=max_limit, min=min_limit),
'retry': tenacity.retry_if_exception(should_retry),
'stop': tenacity.stop_after_delay(stop_after_delay),
'before': tenacity_logger,
'after': tenacity_logger,
}
return tenacity.retry(**default_kwargs)(fun)(self, *args, **kwargs)
return decorator_f
return retry_decorator
def _parse_s3_config(
config_file_name: str, config_format: Optional[str] = "boto", profile: Optional[str] = None
) -> Tuple[Optional[str], Optional[str]]:
"""
Parses a config file for s3 credentials. Can currently
parse boto, s3cmd.conf and AWS SDK config formats
:param config_file_name: path to the config file
:type config_file_name: str
:param config_format: config type. One of "boto", "s3cmd" or "aws".
Defaults to "boto"
:type config_format: str
:param profile: profile name in AWS type config file
:type profile: str
"""
config = configparser.ConfigParser()
if config.read(config_file_name): # pragma: no cover
sections = config.sections()
else:
raise AirflowException(f"Couldn't read {config_file_name}")
# Setting option names depending on file format
if config_format is None:
config_format = "boto"
conf_format = config_format.lower()
if conf_format == "boto": # pragma: no cover
if profile is not None and "profile " + profile in sections:
cred_section = "profile " + profile
else:
cred_section = "Credentials"
elif conf_format == "aws" and profile is not None:
cred_section = profile
else:
cred_section = "default"
# Option names
if conf_format in ("boto", "aws"): # pragma: no cover
key_id_option = "aws_access_key_id"
secret_key_option = "aws_secret_access_key"
# security_token_option = 'aws_security_token'
else:
key_id_option = "access_key"
secret_key_option = "secret_key"
# Actual Parsing
if cred_section not in sections:
raise AirflowException("This config file format is not recognized")
else:
try:
access_key = config.get(cred_section, key_id_option)
secret_key = config.get(cred_section, secret_key_option)
except Exception:
logging.warning("Option Error in parsing s3 config file")
raise
return access_key, secret_key