Skip to content

Commit

Permalink
refactor: use 'Client._post_resource' in 'Bucket.lock_retention_policy'
Browse files Browse the repository at this point in the history
Toward #38.
  • Loading branch information
tseaver committed May 11, 2021
1 parent 2753bd4 commit 2acfb06
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 51 deletions.
8 changes: 4 additions & 4 deletions google/cloud/storage/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -3311,13 +3311,13 @@ def lock_retention_policy(
query_params["userProject"] = self.user_project

path = "/b/{}/lockRetentionPolicy".format(self.name)
api_response = client._connection.api_request(
method="POST",
path=path,
api_response = client._post_resource(
path,
None,
query_params=query_params,
_target_object=self,
timeout=timeout,
retry=retry,
_target_object=self,
)
self._set_properties(api_response)

Expand Down
108 changes: 61 additions & 47 deletions tests/unit/test_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -3489,22 +3489,18 @@ def test_generate_upload_policy_bad_credentials(self):
bucket.generate_upload_policy([])

def test_lock_retention_policy_no_policy_set(self):
credentials = object()
connection = _Connection()
connection.credentials = credentials
client = _Client(connection)
client = mock.Mock(spec=["_post_resource"])
name = "name"
bucket = self._make_one(client=client, name=name)
bucket._properties["metageneration"] = 1234

with self.assertRaises(ValueError):
bucket.lock_retention_policy()

client._post_resource.assert_not_called()

def test_lock_retention_policy_no_metageneration(self):
credentials = object()
connection = _Connection()
connection.credentials = credentials
client = _Client(connection)
client = mock.Mock(spec=["_post_resource"])
name = "name"
bucket = self._make_one(client=client, name=name)
bucket._properties["retentionPolicy"] = {
Expand All @@ -3515,11 +3511,10 @@ def test_lock_retention_policy_no_metageneration(self):
with self.assertRaises(ValueError):
bucket.lock_retention_policy()

client._post_resource.assert_not_called()

def test_lock_retention_policy_already_locked(self):
credentials = object()
connection = _Connection()
connection.credentials = credentials
client = _Client(connection)
client = mock.Mock(spec=["_post_resource"])
name = "name"
bucket = self._make_one(client=client, name=name)
bucket._properties["metageneration"] = 1234
Expand All @@ -3532,69 +3527,88 @@ def test_lock_retention_policy_already_locked(self):
with self.assertRaises(ValueError):
bucket.lock_retention_policy()

def test_lock_retention_policy_ok(self):
client._post_resource.assert_not_called()

def test_lock_retention_policy_ok_w_timeout_w_retry(self):
name = "name"
response = {
effective_time = "2018-03-01T16:46:27.123456Z"
one_hundred_days = 86400 * 100 # seconds in 100 days
metageneration = 1234
api_response = {
"name": name,
"metageneration": 1235,
"metageneration": metageneration + 1,
"retentionPolicy": {
"effectiveTime": "2018-03-01T16:46:27.123456Z",
"effectiveTime": effective_time,
"isLocked": True,
"retentionPeriod": 86400 * 100, # 100 days
"retentionPeriod": one_hundred_days,
},
}
credentials = object()
connection = _Connection(response)
connection.credentials = credentials
client = _Client(connection)
metageneration = 1234
client = mock.Mock(spec=["_post_resource"])
client._post_resource.return_value = api_response
bucket = self._make_one(client=client, name=name)
bucket._properties["metageneration"] = 1234
bucket._properties["metageneration"] = metageneration
bucket._properties["retentionPolicy"] = {
"effectiveTime": "2018-03-01T16:46:27.123456Z",
"retentionPeriod": 86400 * 100, # 100 days
"effectiveTime": effective_time,
"retentionPeriod": one_hundred_days,
}
timeout = 42
retry = mock.Mock(spec=[])

bucket.lock_retention_policy(timeout=42)
bucket.lock_retention_policy(timeout=timeout, retry=retry)

(kw,) = connection._requested
self.assertEqual(kw["method"], "POST")
self.assertEqual(kw["path"], "/b/{}/lockRetentionPolicy".format(name))
self.assertEqual(kw["query_params"], {"ifMetagenerationMatch": 1234})
self.assertEqual(kw["timeout"], 42)
expected_path = "/b/{}/lockRetentionPolicy".format(name)
expected_data = None
expected_query_params = {"ifMetagenerationMatch": metageneration}
client._post_resource.assert_called_once_with(
expected_path,
expected_data,
query_params=expected_query_params,
timeout=timeout,
retry=retry,
_target_object=bucket,
)

def test_lock_retention_policy_w_user_project(self):
name = "name"
user_project = "user-project-123"
response = {
metageneration = 1234
effective_time = "2018-03-01T16:46:27.123456Z"
one_hundred_days = 86400 * 100 # seconds in 100 days
api_response = {
"name": name,
"metageneration": 1235,
"metageneration": metageneration + 1,
"retentionPolicy": {
"effectiveTime": "2018-03-01T16:46:27.123456Z",
"effectiveTime": effective_time,
"isLocked": True,
"retentionPeriod": 86400 * 100, # 100 days
"retentionPeriod": one_hundred_days,
},
}
credentials = object()
connection = _Connection(response)
connection.credentials = credentials
client = _Client(connection)
client = mock.Mock(spec=["_post_resource"])
client._post_resource.return_value = api_response
bucket = self._make_one(client=client, name=name, user_project=user_project)
bucket._properties["metageneration"] = 1234
bucket._properties["retentionPolicy"] = {
"effectiveTime": "2018-03-01T16:46:27.123456Z",
"retentionPeriod": 86400 * 100, # 100 days
"effectiveTime": effective_time,
"retentionPeriod": one_hundred_days,
}

bucket.lock_retention_policy()

(kw,) = connection._requested
self.assertEqual(kw["method"], "POST")
self.assertEqual(kw["path"], "/b/{}/lockRetentionPolicy".format(name))
self.assertEqual(
kw["query_params"],
{"ifMetagenerationMatch": 1234, "userProject": user_project},
expected_path = "/b/{}/lockRetentionPolicy".format(name)
expected_data = None
expected_query_params = {
"ifMetagenerationMatch": metageneration,
"userProject": user_project,
}
client._post_resource.assert_called_once_with(
expected_path,
expected_data,
query_params=expected_query_params,
timeout=self._get_default_timeout(),
retry=DEFAULT_RETRY,
_target_object=bucket,
)
self.assertEqual(kw["timeout"], self._get_default_timeout())

def test_generate_signed_url_w_invalid_version(self):
expiration = "2014-10-16T20:34:37.000Z"
Expand Down

0 comments on commit 2acfb06

Please sign in to comment.