Navigation Menu

Skip to content

Commit

Permalink
Add unit test for Azure header value formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
c-w committed Aug 12, 2019
1 parent 73055d4 commit 7b49fda
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 27 deletions.
62 changes: 35 additions & 27 deletions libcloud/common/azure.py
Expand Up @@ -159,22 +159,8 @@ def _get_azure_auth_signature(self,
CanonicalizedHeaders +
CanonicalizedResource;
"""
special_header_values = []
xms_header_values = []
param_list = []
special_header_keys = [
'content-encoding',
'content-language',
'content-length',
'content-md5',
'content-type',
'date',
'if-modified-since',
'if-match',
'if-none-match',
'if-unmodified-since',
'range'
]

# Split the x-ms headers and normal headers and make everything
# lower case
Expand All @@ -187,20 +173,9 @@ def _get_azure_auth_signature(self,
else:
headers_copy[header] = value

is_change = method not in ("GET", "HEAD")
is_old_api = self.API_VERSION <= "2014-02-14"

# Get the values for the headers in the specific order
for header in special_header_keys:
header = header.lower() # Just for safety
if header in headers_copy:
special_header_values.append(headers_copy[header])
elif header == "content-length" and is_change and is_old_api:
# For old API versions, the Content-Length header must be '0'
# https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key#content-length-header-in-version-2014-02-14-and-earlier
special_header_values.append('0')
else:
special_header_values.append('')
special_header_values = self._format_special_header_values(
headers_copy, method)

# Prepare the first section of the string to be signed
values_to_sign = [method] + special_header_values
Expand Down Expand Up @@ -232,6 +207,39 @@ def _get_azure_auth_signature(self,

return 'SharedKey %s:%s' % (self.user_id, b64_hmac.decode('utf-8'))

def _format_special_header_values(self, headers, method):
is_change = method not in ('GET', 'HEAD')
is_old_api = self.API_VERSION <= '2014-02-14'

special_header_keys = [
'content-encoding',
'content-language',
'content-length',
'content-md5',
'content-type',
'date',
'if-modified-since',
'if-match',
'if-none-match',
'if-unmodified-since',
'range'
]

special_header_values = []

for header in special_header_keys:
header = header.lower() # Just for safety
if header in headers:
special_header_values.append(headers[header])
elif header == 'content-length' and is_change and is_old_api:
# For old API versions, the Content-Length header must be '0'
# https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key#content-length-header-in-version-2014-02-14-and-earlier
special_header_values.append('0')
else:
special_header_values.append('')

return special_header_values


class AzureBaseDriver(object):
name = "Microsoft Azure Service Management API"
Expand Down
64 changes: 64 additions & 0 deletions libcloud/test/common/test_azure.py
@@ -0,0 +1,64 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import unittest

from libcloud.common.azure import AzureConnection
from libcloud.test import LibcloudTestCase


class AzureConnectionTestCase(LibcloudTestCase):
def setUp(self):
self.conn = AzureConnection('user', 'key')

def test_content_length_is_used_if_set(self):
headers = {'content-length': '123'}
method = 'PUT'

values = self.conn._format_special_header_values(headers, method)

self.assertEqual(values[2], '123')

def test_content_length_is_blank_if_new_api_version(self):
headers = {}
method = 'PUT'
self.conn.API_VERSION = '2018-11-09'

values = self.conn._format_special_header_values(headers, method)

self.assertEqual(values[2], '')

def test_content_length_is_zero_if_write_and_old_api_version(self):
headers = {}
method = 'PUT'
self.conn.API_VERSION = '2011-08-18'

values = self.conn._format_special_header_values(headers, method)

self.assertEqual(values[2], '0')

def test_content_length_is_blank_if_read_and_old_api_version(self):
headers = {}
method = 'GET'
self.conn.API_VERSION = '2011-08-18'

values = self.conn._format_special_header_values(headers, method)

self.assertEqual(values[2], '')


if __name__ == '__main__':
sys.exit(unittest.main())

0 comments on commit 7b49fda

Please sign in to comment.