Skip to content
Permalink
Browse files
feat(storage): improve v4 signature query parameters encoding (#48)
* feat(storage): improve v4 signature query parameters encoding

* add URL encoding test

* add query_parameters arg into conformance tests

* add tests for _quote_param() function

* declare test file encoding

* fix the param type

* add test with bytes

* Update _signing.py

Co-authored-by: Frank Natividad <frankyn@users.noreply.github.com>
Co-authored-by: Christopher Wilcox <crwilcox@google.com>
  • Loading branch information
3 people committed Mar 7, 2020
1 parent 0df8a91 commit 8df0b554a1904787889309707f08c6b8683cad44
Showing with 91 additions and 3 deletions.
  1. +33 −3 google/cloud/storage/_signing.py
  2. +58 −0 tests/unit/test__signing.py
@@ -509,7 +509,7 @@ def generate_signed_url_v4(
:type query_parameters: dict
:param query_parameters:
(Optional) Additional query paramtersto be included as part of the
(Optional) Additional query parameters to be included as part of the
signed URLs. See:
https://cloud.google.com/storage/docs/xml-api/reference-headers#query
@@ -585,8 +585,7 @@ def generate_signed_url_v4(
if generation is not None:
query_parameters["generation"] = generation

ordered_query_parameters = sorted(query_parameters.items())
canonical_query_string = six.moves.urllib.parse.urlencode(ordered_query_parameters)
canonical_query_string = _url_encode(query_parameters)

lowercased_headers = dict(ordered_headers)

@@ -672,3 +671,34 @@ def _sign_message(message, access_token, service_account_email):

data = json.loads(response.data.decode("utf-8"))
return data["signature"]


def _url_encode(query_params):
"""Encode query params into URL.
:type query_params: dict
:param query_params: Query params to be encoded.
:rtype: str
:returns: URL encoded query params.
"""
params = [
"{}={}".format(_quote_param(name), _quote_param(value))
for name, value in query_params.items()
]

return "&".join(sorted(params))


def _quote_param(param):
"""Quote query param.
:type param: Any
:param param: Query param to be encoded.
:rtype: str
:returns: URL encoded query param.
"""
if not isinstance(param, bytes):
param = str(param)
return six.moves.urllib.parse.quote(param, safe="~")
@@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-
#
# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -705,6 +707,61 @@ def test_sign_bytes_failure(self):
)


class TestCustomURLEncoding(unittest.TestCase):
def test_url_encode(self):
from google.cloud.storage._signing import _url_encode

# param1 includes safe symbol ~
# param# includes symbols, which must be encoded
query_params = {"param1": "value~1-2", "param#": "*value+value/"}

self.assertEqual(
_url_encode(query_params), "param%23=%2Avalue%2Bvalue%2F&param1=value~1-2"
)


class TestQuoteParam(unittest.TestCase):
def test_ascii_symbols(self):
from google.cloud.storage._signing import _quote_param

encoded_param = _quote_param("param")
self.assertIsInstance(encoded_param, str)
self.assertEqual(encoded_param, "param")

def test_quoted_symbols(self):
from google.cloud.storage._signing import _quote_param

encoded_param = _quote_param("!#$%&'()*+,/:;=?@[]")
self.assertIsInstance(encoded_param, str)
self.assertEqual(
encoded_param, "%21%23%24%25%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
)

def test_unquoted_symbols(self):
from google.cloud.storage._signing import _quote_param
import string

UNQUOTED = string.ascii_letters + string.digits + ".~_-"

encoded_param = _quote_param(UNQUOTED)
self.assertIsInstance(encoded_param, str)
self.assertEqual(encoded_param, UNQUOTED)

def test_unicode_symbols(self):
from google.cloud.storage._signing import _quote_param

encoded_param = _quote_param("ЁЙЦЯЩЯЩ")
self.assertIsInstance(encoded_param, str)
self.assertEqual(encoded_param, "%D0%81%D0%99%D0%A6%D0%AF%D0%A9%D0%AF%D0%A9")

def test_bytes(self):
from google.cloud.storage._signing import _quote_param

encoded_param = _quote_param(b"bytes")
self.assertIsInstance(encoded_param, str)
self.assertEqual(encoded_param, "bytes")


_DUMMY_SERVICE_ACCOUNT = None


@@ -731,6 +788,7 @@ def _run_conformance_test(resource, test_data):
method=test_data["method"],
_request_timestamp=test_data["timestamp"],
headers=test_data.get("headers"),
query_parameters=test_data.get("queryParameters"),
)

assert url == test_data["expectedUrl"]

0 comments on commit 8df0b55

Please sign in to comment.