Skip to content
Permalink
Fetching contributors…
Cannot retrieve contributors at this time
504 lines (415 sloc) 18.6 KB
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# coding=utf-8
import time
import warnings
import json
import logging
import jmespath
import copy
import platform
import aliyunsdkcore
from aliyunsdkcore.vendored.six.moves.urllib.parse import urlencode
from aliyunsdkcore.vendored.requests import codes
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.acs_exception import error_code, error_msg
from aliyunsdkcore.http.http_response import HttpResponse
from aliyunsdkcore.request import AcsRequest
from aliyunsdkcore.http import format_type
from aliyunsdkcore.auth.signers.signer_factory import SignerFactory
from aliyunsdkcore.request import CommonRequest
from aliyunsdkcore.endpoint.resolver_endpoint_request import ResolveEndpointRequest
from aliyunsdkcore.endpoint.default_endpoint_resolver import DefaultEndpointResolver
import aliyunsdkcore.retry.retry_policy as retry_policy
from aliyunsdkcore.retry.retry_condition import RetryCondition
from aliyunsdkcore.retry.retry_policy_context import RetryPolicyContext
import aliyunsdkcore.utils
import aliyunsdkcore.utils.parameter_helper
import aliyunsdkcore.utils.validation
from aliyunsdkcore.vendored.requests.structures import CaseInsensitiveDict
from aliyunsdkcore.vendored.requests.structures import OrderedDict
"""
Acs default client module.
"""
DEFAULT_READ_TIMEOUT = 10
DEFAULT_CONNECTION_TIMEOUT = 5
# TODO: replace it with TimeoutHandler
_api_timeout_config_data = aliyunsdkcore.utils._load_json_from_data_dir("timeout_config.json")
logger = logging.getLogger(__name__)
class AcsClient:
LOG_FORMAT = '%(thread)d %(asctime)s %(name)s %(levelname)s %(message)s'
def __init__(
self,
ak=None,
secret=None,
region_id="cn-hangzhou",
auto_retry=True,
max_retry_time=None,
user_agent=None,
port=80,
connect_timeout=None,
timeout=None,
public_key_id=None,
private_key=None,
session_period=3600,
credential=None,
debug=False):
"""
constructor for AcsClient
:param ak: String, access key id
:param secret: String, access key secret
:param region_id: String, region id
:param auto_retry: Boolean
:param max_retry_time: Number
:return:
"""
self._max_retry_num = max_retry_time
self._auto_retry = auto_retry
self._ak = ak
self._secret = secret
self._region_id = region_id
self._user_agent = user_agent
self._port = port
self._connect_timeout = connect_timeout
self._read_timeout = timeout
self._extra_user_agent = {}
credential = {
'ak': ak,
'secret': secret,
'public_key_id': public_key_id,
'private_key': private_key,
'session_period': session_period,
'credential': credential,
}
self._signer = SignerFactory.get_signer(
credential, region_id, self._implementation_of_do_action, debug)
self._endpoint_resolver = DefaultEndpointResolver(self)
if self._auto_retry:
self._retry_policy = retry_policy.get_default_retry_policy(
max_retry_times=self._max_retry_num)
else:
self._retry_policy = retry_policy.NO_RETRY_POLICY
def get_region_id(self):
return self._region_id
def get_access_key(self):
return self._ak
def get_access_secret(self):
return self._secret
def is_auto_retry(self):
return self._auto_retry
def get_max_retry_num(self):
return self._max_retry_num
def get_user_agent(self):
return self._user_agent
def set_region_id(self, region):
self._region_id = region
def set_max_retry_num(self, num):
self._max_retry_num = num
def set_auto_retry(self, flag):
"""
set whether or not the client perform auto-retry
:param flag: Booleans
:return: None
"""
self._auto_retry = flag
def set_user_agent(self, agent):
"""
User agent set to client will overwrite the request setting.
:param agent:
:return:
"""
self._user_agent = agent
def append_user_agent(self, key, value):
self._extra_user_agent.update({key: value})
@staticmethod
def user_agent_header():
base = '%s (%s %s;%s)' \
% ('AlibabaCloud',
platform.system(),
platform.release(),
platform.machine()
)
return base
@staticmethod
def default_user_agent():
default_agent = OrderedDict()
default_agent['Python'] = platform.python_version()
default_agent['Core'] = __import__('aliyunsdkcore').__version__
default_agent['python-requests'] = __import__(
'aliyunsdkcore.vendored.requests.__version__', globals(), locals(),
['vendored', 'requests', '__version__'], 0).__version__
return CaseInsensitiveDict(default_agent)
def client_user_agent(self):
client_user_agent = {}
if self.get_user_agent() is not None:
client_user_agent.update({'client': self.get_user_agent()})
else:
client_user_agent.update(self._extra_user_agent)
return CaseInsensitiveDict(client_user_agent)
def get_port(self):
return self._port
def get_location_service(self):
return None
@staticmethod
def merge_user_agent(default_agent, extra_agent):
if default_agent is None:
return extra_agent
if extra_agent is None:
return default_agent
user_agent = default_agent.copy()
for key, value in extra_agent.items():
if key not in default_agent:
user_agent[key] = value
return user_agent
def handle_extra_agent(self, request):
client_agent = self.client_user_agent()
request_agent = request.request_user_agent()
if client_agent is None:
return request_agent
if request_agent is None:
return client_agent
for key in request_agent:
if key in client_agent:
client_agent.pop(key)
client_agent.update(request_agent)
return client_agent
def _make_http_response(self, endpoint, request, read_timeout, connect_timeout,
specific_signer=None):
body_params = request.get_body_params()
if body_params:
body = urlencode(body_params)
request.set_content(body)
request.set_content_type(format_type.APPLICATION_FORM)
elif request.get_content() and "Content-Type" not in request.get_headers():
request.set_content_type(format_type.APPLICATION_OCTET_STREAM)
method = request.get_method()
signer = self._signer if specific_signer is None else specific_signer
header, url = signer.sign(self._region_id, request)
base = self.user_agent_header()
extra_agent = self.handle_extra_agent(request)
default_agent = self.default_user_agent()
user_agent = self.merge_user_agent(default_agent, extra_agent)
for key, value in user_agent.items():
base += ' %s/%s' % (key, value)
header['User-Agent'] = base
header['x-sdk-client'] = 'python/2.0.0'
protocol = request.get_protocol_type()
response = HttpResponse(
endpoint,
url,
method,
header,
protocol,
request.get_content(),
self._port,
read_timeout=read_timeout,
connect_timeout=connect_timeout)
if body_params:
body = urlencode(request.get_body_params())
response.set_content(body, "utf-8", format_type.APPLICATION_FORM)
return response
def _implementation_of_do_action(self, request, signer=None):
if not isinstance(request, AcsRequest):
raise ClientException(
error_code.SDK_INVALID_REQUEST,
error_msg.get_msg('SDK_INVALID_REQUEST'))
# modify Accept-Encoding
request.add_header('Accept-Encoding', 'identity')
if isinstance(request, CommonRequest):
request.trans_to_acs_request()
if request.endpoint:
endpoint = request.endpoint
else:
endpoint = self._resolve_endpoint(request)
return self._handle_retry_and_timeout(endpoint, request, signer)
def implementation_of_do_action(self, request, signer=None):
# keep compatible
warnings.warn(
"implementation_of_do_action() method is deprecated",
DeprecationWarning)
status, headers, body, exception = self._implementation_of_do_action(request, signer)
return status, headers, body
def _add_request_client_token(self, request):
if hasattr(request, "set_ClientToken") and hasattr(request, "get_ClientToken"):
client_token = request.get_ClientToken()
if not client_token:
# ClientToken has not been set
client_token = aliyunsdkcore.utils.parameter_helper.get_uuid() # up to 60 chars
request.set_ClientToken(client_token)
def _get_request_read_timeout(self, request):
# TODO: replace it with a timeout_handler
if request._request_read_timeout:
return request._request_read_timeout
# if self._timeout:
# return self._timeout
if self._read_timeout:
return self._read_timeout
if request.get_product() is None:
return DEFAULT_READ_TIMEOUT
path = '"{0}"."{1}"."{2}"'.format(request.get_product().lower(), request.get_version(),
request.get_action_name())
timeout = jmespath.search(path, _api_timeout_config_data)
if timeout is None:
return DEFAULT_READ_TIMEOUT
else:
aliyunsdkcore.utils.validation.assert_integer_positive(timeout, "timeout")
return max(timeout, DEFAULT_READ_TIMEOUT)
def _get_request_connect_timeout(self, request):
if request._request_connect_timeout:
return request._request_connect_timeout
if self._connect_timeout:
return self._connect_timeout
return DEFAULT_CONNECTION_TIMEOUT
def _handle_retry_and_timeout(self, endpoint, request, signer):
# TODO: replace it with a retry_handler
# it's a temporary implementation. the long-term plan will be a group a normalized handlers
# which contains retry_handler and timeout_handler
# decide whether we should initialize a ClientToken for the request
retry_policy_context = RetryPolicyContext(request, None, 0, None)
if self._retry_policy.should_retry(retry_policy_context) & \
RetryCondition.SHOULD_RETRY_WITH_CLIENT_TOKEN:
self._add_request_client_token(request)
request_read_timeout = self._get_request_read_timeout(request)
request_connect_timeout = self._get_request_connect_timeout(request)
retries = 0
while True:
status, headers, body, exception = self._handle_single_request(endpoint,
request,
request_read_timeout,
request_connect_timeout,
signer)
retry_policy_context = RetryPolicyContext(request, exception, retries, status)
retryable = self._retry_policy.should_retry(retry_policy_context)
if retryable & RetryCondition.NO_RETRY:
break
logger.debug("Retry needed. Request:%s Retries :%d",
request.get_action_name(), retries)
retry_policy_context.retryable = retryable
time_to_sleep = self._retry_policy.compute_delay_before_next_retry(retry_policy_context)
time.sleep(time_to_sleep / 1000.0)
retries += 1
if isinstance(exception, ClientException):
raise exception
return status, headers, body, exception
def _handle_single_request(self, endpoint, request, read_timeout, connect_timeout, signer):
http_response = self._make_http_response(endpoint, request, read_timeout, connect_timeout,
signer)
params = copy.deepcopy(request.get_query_params())
params.pop('AccessKeyId', None)
logger.debug('Request received. Product:%s Endpoint:%s Params: %s',
request.get_product(), endpoint, str(params))
# Do the actual network thing
try:
status, headers, body = http_response.get_response_object()
except IOError as e:
exception = ClientException(error_code.SDK_HTTP_ERROR, str(e))
logger.error("HttpError occurred. Host:%s SDK-Version:%s ClientException:%s",
endpoint, aliyunsdkcore.__version__, str(exception))
return None, None, None, exception
exception = self._get_server_exception(status, body, endpoint, request.string_to_sign)
return status, headers, body, exception
@staticmethod
def _parse_error_info_from_response_body(response_body):
error_code_to_return = error_code.SDK_UNKNOWN_SERVER_ERROR
# TODO handle if response_body is too big
error_message_to_return = "ServerResponseBody: " + str(response_body)
try:
body_obj = json.loads(response_body)
if 'Code' in body_obj:
error_code_to_return = body_obj['Code']
if 'Message' in body_obj:
error_message_to_return = body_obj['Message']
except ValueError:
# failed to parse body as json format
logger.warning('Failed to parse response as json format. Response:%s', response_body)
return error_code_to_return, error_message_to_return
def _get_server_exception(self, http_status, response_body, endpoint, string_to_sign):
request_id = None
try:
body_obj = json.loads(response_body.decode('utf-8'))
request_id = body_obj.get('RequestId')
except (ValueError, TypeError, AttributeError):
# in case the response body is not a json string, return the raw
# data instead
logger.warning('Failed to parse response as json format. Response:%s', response_body)
if http_status < codes.OK or http_status >= codes.MULTIPLE_CHOICES:
server_error_code, server_error_message = self._parse_error_info_from_response_body(
response_body.decode('utf-8'))
if http_status == codes.BAD_REQUEST and server_error_code == 'SignatureDoesNotMatch':
if string_to_sign == server_error_message.split(':')[1]:
server_error_code = 'InvalidAccessKeySecret'
server_error_message = 'The AccessKeySecret is incorrect. ' \
'Please check your AccessKeyId and AccessKeySecret.'
exception = ServerException(
server_error_code,
server_error_message,
http_status=http_status,
request_id=request_id)
logger.error("ServerException occurred. Host:%s SDK-Version:%s ServerException:%s",
endpoint, aliyunsdkcore.__version__, str(exception))
return exception
def do_action_with_exception(self, acs_request):
# set server response format as json, because this function will
# parse the response so which format doesn't matter
acs_request.set_accept_format('JSON')
status, headers, body, exception = self._implementation_of_do_action(acs_request)
if exception:
raise exception
logger.debug('Response received. Product:%s Response-body: %s',
acs_request.get_product(), body)
return body
def _resolve_endpoint(self, request):
resolve_request = ResolveEndpointRequest(
self._region_id,
request.get_product(),
request.get_location_service_code(),
request.get_location_endpoint_type(),
)
return self._endpoint_resolver.resolve(resolve_request)
def do_action(self, acs_request):
warnings.warn(
"do_action() method is deprecated, please use do_action_with_exception() instead.",
DeprecationWarning)
status, headers, body, exception = self._implementation_of_do_action(acs_request)
return body
def get_response(self, acs_request):
return self.implementation_of_do_action(acs_request)
def add_endpoint(self, region_id, product_code, endpoint):
self._endpoint_resolver.put_endpoint_entry(
region_id, product_code, endpoint)
def set_stream_logger(self, log_level=logging.DEBUG, logger_name='aliyunsdkcore', stream=None,
format_string=None):
log = logging.getLogger(logger_name)
log.setLevel(log_level)
ch = logging.StreamHandler(stream)
ch.setLevel(log_level)
if format_string is None:
format_string = self.LOG_FORMAT
formatter = logging.Formatter(format_string)
ch.setFormatter(formatter)
log.addHandler(ch)
def set_file_logger(self, path, log_level=logging.DEBUG, logger_name='aliyunsdkcore'):
log = logging.getLogger(logger_name)
log.setLevel(log_level)
fh = logging.FileHandler(path)
fh.setLevel(log_level)
formatter = logging.Formatter(self.LOG_FORMAT)
fh.setFormatter(formatter)
log.addHandler(fh)
You can’t perform that action at this time.