Skip to content

Commit

Permalink
Fix datetime.utcnow() usages
Browse files Browse the repository at this point in the history
  • Loading branch information
nateprewitt committed Aug 15, 2023
1 parent aef2675 commit 96ca907
Show file tree
Hide file tree
Showing 14 changed files with 42 additions and 47 deletions.
4 changes: 2 additions & 2 deletions botocore/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ def signature(self, string_to_sign, request):
def add_auth(self, request):
if self.credentials is None:
raise NoCredentialsError()
datetime_now = datetime.datetime.utcnow()
datetime_now = datetime.datetime.now(datetime.timezone.utc)
request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
# This could be a retry. Make sure the previous
# authorization header is removed first.
Expand Down Expand Up @@ -643,7 +643,7 @@ class S3SigV4PostAuth(SigV4Auth):
"""

def add_auth(self, request):
datetime_now = datetime.datetime.utcnow()
datetime_now = datetime.datetime.now(datetime.timezone.utc)
request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)

fields = {}
Expand Down
12 changes: 2 additions & 10 deletions botocore/crt/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@ def add_auth(self, request):
if self.credentials is None:
raise NoCredentialsError()

# Use utcnow() because that's what gets mocked by tests, but set
# timezone because CRT assumes naive datetime is local time.
datetime_now = datetime.datetime.utcnow().replace(
tzinfo=datetime.timezone.utc
)
datetime_now = datetime.datetime.now(datetime.timezone.utc)

# Use existing 'X-Amz-Content-SHA256' header if able
existing_sha256 = self._get_existing_sha256(request)
Expand Down Expand Up @@ -251,11 +247,7 @@ def add_auth(self, request):
if self.credentials is None:
raise NoCredentialsError()

# Use utcnow() because that's what gets mocked by tests, but set
# timezone because CRT assumes naive datetime is local time.
datetime_now = datetime.datetime.utcnow().replace(
tzinfo=datetime.timezone.utc
)
datetime_now = datetime.datetime.now(datetime.timezone.utc)

# Use existing 'X-Amz-Content-SHA256' header if able
existing_sha256 = self._get_existing_sha256(request)
Expand Down
6 changes: 4 additions & 2 deletions botocore/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def prepare_request(self, request):
def _calculate_ttl(
self, response_received_timestamp, date_header, read_timeout
):
local_timestamp = datetime.datetime.utcnow()
local_timestamp = datetime.datetime.now(datetime.timezone.utc)
date_conversion = datetime.datetime.strptime(
date_header, "%a, %d %b %Y %H:%M:%S %Z"
)
Expand All @@ -169,7 +169,9 @@ def _set_ttl(self, retries_context, read_timeout, success_response):
has_streaming_input = retries_context.get('has_streaming_input')
if response_date_header and not has_streaming_input:
try:
response_received_timestamp = datetime.datetime.utcnow()
response_received_timestamp = datetime.datetime.now(
datetime.timezone.utc
)
retries_context['ttl'] = self._calculate_ttl(
response_received_timestamp,
response_date_header,
Expand Down
2 changes: 1 addition & 1 deletion botocore/signers.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ def generate_presigned_post(
policy = {}

# Create an expiration date for the policy
datetime_now = datetime.datetime.utcnow()
datetime_now = datetime.datetime.now(datetime.timezone.utc)
expire_date = datetime_now + datetime.timedelta(seconds=expires_in)
policy['expiration'] = expire_date.strftime(botocore.auth.ISO8601)

Expand Down
2 changes: 1 addition & 1 deletion botocore/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ def _evaluate_expiration(self, credentials):
)
jitter = random.randint(120, 600) # Between 2 to 10 minutes
refresh_interval_with_jitter = refresh_interval + jitter
current_time = datetime.datetime.utcnow()
current_time = datetime.datetime.now(datetime.timezone.utc)
refresh_offset = datetime.timedelta(
seconds=refresh_interval_with_jitter
)
Expand Down
6 changes: 3 additions & 3 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,20 +568,20 @@ class FreezeTime(ContextDecorator):
:param module: reference to imported module to patch (e.g. botocore.auth.datetime)
:type date: datetime.datetime
:param date: datetime object specifying the output for utcnow()
:param date: datetime object specifying the output for now() in UTC
"""

def __init__(self, module, date=None):
if date is None:
date = datetime.datetime.utcnow()
date = datetime.datetime.now(datetime.timezone.utc)
self.date = date
self.datetime_patcher = mock.patch.object(
module, 'datetime', mock.Mock(wraps=datetime.datetime)
)

def __enter__(self, *args, **kwargs):
mock = self.datetime_patcher.start()
mock.utcnow.return_value = self.date
mock.now.return_value = self.date

def __exit__(self, *args, **kwargs):
self.datetime_patcher.stop()
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/test_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def setUp(self):
mock.Mock(wraps=datetime.datetime),
)
self.mocked_datetime = self.datetime_patch.start()
self.mocked_datetime.utcnow.return_value = self.now
self.mocked_datetime.now.return_value = self.now

def tearDown(self):
super().tearDown()
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/test_lex.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_unsigned_payload(self):
timestamp = datetime(2017, 3, 22, 0, 0)

with mock.patch('botocore.auth.datetime.datetime') as _datetime:
_datetime.utcnow.return_value = timestamp
_datetime.now.return_value = timestamp
self.http_stubber.add_response(body=b'{}')
with self.http_stubber:
self.client.post_content(**params)
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _retry_headers_test_cases(self):

# The first, third and seventh datetime values of each
# utcnow_side_effects list are side_effect values for when
# utcnow is called in SigV4 signing.
# datetime.now is called in SigV4 signing.
utcnow_side_effects = [
[
datetime.datetime(2019, 6, 1, 0, 0, 0, 0),
Expand Down Expand Up @@ -114,7 +114,7 @@ def _test_amz_sdk_request_header_with_test_case(
mock.Mock(wraps=datetime.datetime),
)
mocked_datetime = datetime_patcher.start()
mocked_datetime.utcnow.side_effect = utcnow_side_effects
mocked_datetime.now.side_effect = utcnow_side_effects

client = self.session.create_client(
'dynamodb', self.region, config=client_config
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/test_sts.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def setUp(self):
def test_presigned_url_contains_no_content_type(self):
timestamp = datetime(2017, 3, 22, 0, 0)
with mock.patch('botocore.auth.datetime.datetime') as _datetime:
_datetime.utcnow.return_value = timestamp
_datetime.now.return_value = timestamp
url = self.client.generate_presigned_url('get_caller_identity', {})

# There should be no 'content-type' in x-amz-signedheaders
Expand Down
14 changes: 5 additions & 9 deletions tests/unit/auth/test_signers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def setUp(self):
self.fixed_date = datetime.datetime(2014, 3, 10, 17, 2, 55, 0)
self.datetime_patch = mock.patch('botocore.auth.datetime.datetime')
self.datetime_mock = self.datetime_patch.start()
self.datetime_mock.utcnow.return_value = self.fixed_date
self.datetime_mock.now.return_value = self.fixed_date
self.datetime_mock.strptime.return_value = self.fixed_date

def tearDown(self):
Expand Down Expand Up @@ -523,15 +523,15 @@ def test_thread_safe_timestamp(self):
) as mock_datetime:
original_utcnow = datetime.datetime(2014, 1, 1, 0, 0)

mock_datetime.utcnow.return_value = original_utcnow
mock_datetime.now.return_value = original_utcnow
# Go through the add_auth process once. This will attach
# a timestamp to the request at the beginning of auth.
auth.add_auth(request)
self.assertEqual(request.context['timestamp'], '20140101T000000Z')
# Ensure the date is in the Authorization header
self.assertIn('20140101', request.headers['Authorization'])
# Now suppose the utc time becomes the next day all of a sudden
mock_datetime.utcnow.return_value = datetime.datetime(
mock_datetime.now.return_value = datetime.datetime(
2014, 1, 2, 0, 0
)
# Smaller methods like the canonical request and string_to_sign
Expand Down Expand Up @@ -798,9 +798,7 @@ def setUp(self):
mock.Mock(wraps=datetime.datetime),
)
mocked_datetime = self.datetime_patcher.start()
mocked_datetime.utcnow.return_value = datetime.datetime(
2014, 1, 1, 0, 0
)
mocked_datetime.now.return_value = datetime.datetime(2014, 1, 1, 0, 0)

def tearDown(self):
self.datetime_patcher.stop()
Expand Down Expand Up @@ -1102,9 +1100,7 @@ def setUp(self):
mock.Mock(wraps=datetime.datetime),
)
mocked_datetime = self.datetime_patcher.start()
mocked_datetime.utcnow.return_value = datetime.datetime(
2014, 1, 1, 0, 0
)
mocked_datetime.now.return_value = datetime.datetime(2014, 1, 1, 0, 0)

def tearDown(self):
self.datetime_patcher.stop()
Expand Down
27 changes: 16 additions & 11 deletions tests/unit/test_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import shutil
import subprocess
import tempfile
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from pathlib import Path

import pytest
Expand Down Expand Up @@ -307,8 +307,9 @@ def test_expiration_in_datetime_format(self):
self.assertEqual(response, expected_response)

def test_retrieves_from_cache(self):
date_in_future = datetime.utcnow() + timedelta(seconds=1000)
utc_timestamp = date_in_future.isoformat() + 'Z'
date_now = datetime.now(timezone.utc)
date_in_future = date_now + timedelta(seconds=1000)
utc_timestamp = date_in_future.isoformat()
cache_key = '793d6e2f27667ab2da104824407e486bfec24a47'
cache = {
cache_key: {
Expand Down Expand Up @@ -741,8 +742,9 @@ def test_no_cache(self):
self.assertEqual(response, expected_response)

def test_retrieves_from_cache(self):
date_in_future = datetime.utcnow() + timedelta(seconds=1000)
utc_timestamp = date_in_future.isoformat() + 'Z'
date_now = datetime.now(timezone.utc)
date_in_future = date_now + timedelta(seconds=1000)
utc_timestamp = date_in_future.isoformat()
cache_key = '793d6e2f27667ab2da104824407e486bfec24a47'
cache = {
cache_key: {
Expand Down Expand Up @@ -859,8 +861,9 @@ def test_assume_role_with_no_cache(self):
mock_loader_cls.assert_called_with('/some/path/token.jwt')

def test_assume_role_retrieves_from_cache(self):
date_in_future = datetime.utcnow() + timedelta(seconds=1000)
utc_timestamp = date_in_future.isoformat() + 'Z'
date_now = datetime.now(timezone.utc)
date_in_future = date_now + timedelta(seconds=1000)
utc_timestamp = date_in_future.isoformat()

cache_key = 'c29461feeacfbed43017d20612606ff76abc073d'
cache = {
Expand Down Expand Up @@ -2029,8 +2032,9 @@ def test_assume_role_refresher_serializes_datetime(self):
self.assertEqual(expiry_time, '2016-11-06T01:30:00UTC')

def test_assume_role_retrieves_from_cache(self):
date_in_future = datetime.utcnow() + timedelta(seconds=1000)
utc_timestamp = date_in_future.isoformat() + 'Z'
date_now = datetime.now(timezone.utc)
date_in_future = date_now + timedelta(seconds=1000)
utc_timestamp = date_in_future.isoformat()
self.fake_config['profiles']['development']['role_arn'] = 'myrole'

cache_key = '793d6e2f27667ab2da104824407e486bfec24a47'
Expand Down Expand Up @@ -2058,8 +2062,9 @@ def test_assume_role_retrieves_from_cache(self):
self.assertEqual(creds.token, 'baz-cached')

def test_chain_prefers_cache(self):
date_in_future = datetime.utcnow() + timedelta(seconds=1000)
utc_timestamp = date_in_future.isoformat() + 'Z'
date_now = datetime.now(timezone.utc)
date_in_future = date_now + timedelta(seconds=1000)
utc_timestamp = date_in_future.isoformat()

# The profile we will be using has a cache entry, but the profile it
# is sourcing from does not. This should result in the cached
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_signers.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ def setUp(self):
self.datetime_mock = self.datetime_patch.start()
self.fixed_date = datetime.datetime(2014, 3, 10, 17, 2, 55, 0)
self.fixed_delta = datetime.timedelta(seconds=3600)
self.datetime_mock.datetime.utcnow.return_value = self.fixed_date
self.datetime_mock.datetime.now.return_value = self.fixed_date
self.datetime_mock.timedelta.return_value = self.fixed_delta

def tearDown(self):
Expand Down Expand Up @@ -1146,7 +1146,7 @@ def test_generate_db_auth_token(self):
clock = datetime.datetime(2016, 11, 7, 17, 39, 33, tzinfo=tzutc())

with mock.patch('datetime.datetime') as dt:
dt.utcnow.return_value = clock
dt.now.return_value = clock
result = generate_db_auth_token(
self.client, hostname, port, username
)
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3030,7 +3030,7 @@ def test_metadata_token_bad_request_yields_no_credentials(self):

def _get_datetime(self, dt=None, offset=None, offset_func=operator.add):
if dt is None:
dt = datetime.datetime.utcnow()
dt = datetime.datetime.now(datetime.timezone.utc)
if offset is not None:
dt = offset_func(dt, offset)

Expand Down

0 comments on commit 96ca907

Please sign in to comment.