Skip to content

Commit

Permalink
Pipeline (#106)
Browse files Browse the repository at this point in the history
* Pipeline renaming

* Work in progress

* Work in progress 2

* Move formData to ClientRequest

* Pure ClientRequest with no requests

* Add kwargs for send

* Pipeline update

* First pass on tests

* Some typehints

* Fixing all tests

* Full Pipeline mypy

* Py3 mock compat

* Pipeline and stream download

* First pass on Autorest testserver

* While making coverage report, don't cry for async

* Pylint

* Add absolute_import for Py2.7

* Fix ABC for 2.7

* Add absolute_import to another file for 2.7

* Pipeline is a context manager

* Some async ABC

* Move mypy to 3.6

* Fix empty policies

* aiohttp proof of concepts

* Simplify ClientResponse

* Improve response handling + async fixes

* Fix mypy

* Create a basic HTTPSender

* async dependencies

* Fix Pipfile for asyncio

* Py3.5 compat

* Add universal to async tests

* Basic requests as asyncio impl

* Remove check_redirect from configuration

* Improve default pipeline

* Make pipeline a public attribute

* Split configuration for pipeline

* Refactor config

* Restore exception

* mypy happiness

* Split requests configuration

* Simplify keep_alive behavior

* Default parameter

* Rename to on_request/on_response after feedback

* Multi-thread compatible HTTP requests sender

* Squashed commit of the following:

commit 3246847
Merge: 18cb696 388e8d0
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Thu Jun 14 15:26:57 2018 -0700

    Merge remote-tracking branch 'origin/master' into async2

commit 18cb696
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Wed May 23 11:30:00 2018 -0700

    MyPy happiness

commit bd71233
Merge: a997e97 3a8b79d
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Wed May 23 11:23:44 2018 -0700

    Merge branch 'master' into async2

commit a997e97
Merge: 4130eca 2b7d778
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Wed May 9 13:54:11 2018 -0700

    Merge remote-tracking branch 'origin/master' into async2

commit 4130eca
Merge: 8ffedd8 9d81113
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Fri Apr 20 10:38:45 2018 -0700

    Merge remote-tracking branch 'origin/master' into async2

commit 8ffedd8
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Tue Mar 20 15:36:40 2018 -0700

    Refactor a little async stream download

commit bbf1259
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Fri Mar 16 17:20:07 2018 -0700

    Add stream upload support

commit 2d26003
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Tue Feb 27 16:25:33 2018 -0800

    Fix incorrect request call

commit 6b55d4f
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Wed Jan 17 13:39:18 2018 -0800

    Add status/finished to async poller

commit 02c333e
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Tue Jan 16 15:50:06 2018 -0800

    Port stream to async implementation

commit b3f0ac7
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Tue Jan 16 15:32:23 2018 -0800

    Add AsyncPoller

commit 3e9e178
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Thu Dec 7 16:13:06 2017 -0800

    Sync ServiceClientAsync with latest fixes

commit 5483e28
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Thu Jul 20 11:27:05 2017 -0700

    Address feedback from @brettcannon on async

commit c99f4b7
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Tue Jul 18 13:14:27 2017 -0700

    Robust coverage xml report

commit e0c6d3e
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Tue Jul 18 12:23:12 2017 -0700

    Rename SC mixin

commit 8e029ff
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Tue Jul 18 12:12:12 2017 -0700

    Add async_get to paging

commit f3dfaf6
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Tue Jul 18 12:06:20 2017 -0700

    Rename paging mixin

commit 2f7142d
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Tue Jul 18 12:05:34 2017 -0700

    async_get_next in paging

commit 9e82100
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Tue Jul 18 11:20:24 2017 -0700

    async send form data

commit 17045f7
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Tue Jul 18 11:18:46 2017 -0700

    Add async client mixin

commit 3294115
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Tue Jul 18 11:08:37 2017 -0700

    Fix Py3.5 async tests

commit 615f672
Author: Laurent Mazuel <laurent.mazuel@gmail.com>
Date:   Tue Jul 18 10:31:54 2017 -0700

    Async paging with mixin

* Remove useless tox line

* Add credentials to async requests

* Add trio support

* Revamp async stream download

* Add pipeline Response wrapper

* Introduce a raw deserializer as a policy

* SansIO on_exception

* Fix deserialization tests

* Implement #116 - Env variable for UserAgent

* Revamp streaming

* Logger should not log streamable response

* Put pipeline in config

* Mypy fixes

* Update mypy 0.620

* Fix trio dep

* Create universal HTTP package

* Pipeline as a universal HTTP implementation, no mypy

* Pipeline as a universal HTTP implementation, with mypy

* Backward compatible ServiceClient

* Doc update

* Make config optional in requests engine

* Fix types for MyPY 0.630

* Credentials can be directly a Policy

* msrest[async]

* 0.6.0rc1 first ChangeLog

* Don't coverage report the TYPE_CHECKING import

* Fix dev install on 3.5 and more

* 0.6.0rc1

* Pre-load aiohttp body

* sync ServiceClient returns requersts.Response

* Fix import issue

* Remove concept of requests_kwargs

* Fix hooks parameter
  • Loading branch information
lmazuel committed Oct 2, 2018
1 parent 50c5546 commit 3653d29
Show file tree
Hide file tree
Showing 47 changed files with 4,408 additions and 1,212 deletions.
4 changes: 4 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[report]
exclude_lines =
pragma: no cover
if TYPE_CHECKING:
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ _autorest_install: &_autorest_install
jobs:
include:
- stage: MyPy
python: 3.5
python: 3.6
install:
- pip install mypy
script:
- mypy msrest --ignore-missing-imports
- mypy msrest
- stage: Test
python: 2.7
env: TOXENV=py27
Expand Down
4 changes: 3 additions & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ verify_ssl = true
name = "pypi"

[packages]
"e1839a8" = {path = ".", editable = true}
"e1839a8" = {path = ".", extras = ["async"], editable = true}

[dev-packages]
pytest = "*"
pytest-cov = "*"
pytest-asyncio = {version = "*", markers="python_version >= '3.5'"}
httpretty = ">=0.8.10"
mock = {version = "*", markers="python_version <= '2.7'"}
mypy = {version = "==0.630", markers="python_version > '2.7'"}
pylint = "*"
trio = {version = "*", markers="python_version >= '3.5'"}
35 changes: 35 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,41 @@ To install:
Release History
---------------

2018-XX-XX Version 0.6.0rc1
+++++++++++++++++++++++++++

**Features**

- The environment variable AZURE_HTTP_USER_AGENT, if present, is now injected part of the UserAgent
- New msrest.universal_http module. Provide tools to generic HTTP management (sync/async, requests/aiohttp, etc.)
- New **preview** msrest.pipeline implementation:

- A Pipeline is an ordered list of Policies than can process an HTTP request and response in a generic way.
- More details in the wiki page about Pipeline: https://github.com/Azure/msrest-for-python/wiki/msrest-0.6.0---Pipeline

- Adding new attribute to Configuration instance:

- http_logger_policy - Policy to handle HTTP logging
- user_agent_policy - Policy to handle HTTP logging
- pipeline - The current pipeline used by the SDK client
- async_pipeline - The current async pipeline used by the SDK client

- Installing "msrest[async]" now install the **experimental** async support

**Breaking changes**

- The HTTPDriver API introduced in 0.5.0 has been replaced by Pipeline.

- The following classes have been moved from "msrest.pipeline" to "msrest.universal_http":

- ClientRedirectPolicy
- ClientProxies
- ClientConnection

- The following classes have been moved from "msrest.pipeline" to "msrest.universal_http.requests":

- ClientRetryPolicy

2018-09-04 Version 0.5.5
++++++++++++++++++++++++

Expand Down
2 changes: 1 addition & 1 deletion msrest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
#
# --------------------------------------------------------------------------

from .version import msrest_version
from .configuration import Configuration
from .service_client import ServiceClient, SDKClient
from .serialization import Serializer, Deserializer
from .version import msrest_version

__all__ = [
"ServiceClient",
Expand Down
125 changes: 125 additions & 0 deletions msrest/async_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# --------------------------------------------------------------------------
#
# Copyright (c) Microsoft Corporation. All rights reserved.
#
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the ""Software""), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------

import asyncio
import functools
import logging

from typing import Any, Dict, List, Union, TYPE_CHECKING

from .universal_http import ClientRequest
from .universal_http.async_requests import AsyncRequestsHTTPSender
from .pipeline import Request, AsyncPipeline, AsyncHTTPPolicy, SansIOHTTPPolicy
from .pipeline.async_requests import (
AsyncPipelineRequestsHTTPSender,
AsyncRequestsCredentialsPolicy
)
from .pipeline.universal import (
HTTPLogger,
RawDeserializer,
)

if TYPE_CHECKING:
from .configuration import Configuration # pylint: disable=unused-import

_LOGGER = logging.getLogger(__name__)


class AsyncSDKClientMixin:
"""The base class of all generated SDK client.
"""
async def __aenter__(self):
await self._client.__aenter__()
return self

async def __aexit__(self, *exc_details):
await self._client.__aexit__(*exc_details)


class AsyncServiceClientMixin:

def __init__(self, creds: Any, config: 'Configuration') -> None:
# Don't do super, since I know it will be "object"
# super(AsyncServiceClientMixin, self).__init__(creds, config)

# "async_pipeline" be should accessible from "config"
# In legacy mode this is weird, this config is a parameter of "pipeline"
# Should be revamp one day.
self.config.async_pipeline = self._create_default_async_pipeline() # type: ignore

def _create_default_async_pipeline(self):

policies = [
self.config.user_agent_policy, # UserAgent policy
RawDeserializer(), # Deserialize the raw bytes
self.config.http_logger_policy # HTTP request/response log
] # type: List[Union[AsyncHTTPPolicy, SansIOHTTPPolicy]]
if self._creds:
if isinstance(self._creds, (AsyncHTTPPolicy, SansIOHTTPPolicy)):
policies.insert(1, self._creds)
else:
# Assume this is the old credentials class, and then requests. Wrap it.
policies.insert(1, AsyncRequestsCredentialsPolicy(self._creds))

return AsyncPipeline(
policies,
AsyncPipelineRequestsHTTPSender(
AsyncRequestsHTTPSender(self.config) # Send HTTP request using requests
)
)

async def __aenter__(self):
await self.config.async_pipeline.__aenter__()
return self

async def __aexit__(self, *exc_details):
await self.config.async_pipeline.__aexit__(*exc_details)

async def async_send(self, request, **kwargs):
"""Prepare and send request object according to configuration.
:param ClientRequest request: The request object to be sent.
:param dict headers: Any headers to add to the request.
:param content: Any body data to add to the request.
:param config: Any specific config overrides
"""
kwargs.setdefault('stream', True)
# In the current backward compatible implementation, return the HTTP response
# and plug context inside. Could be remove if we modify Autorest,
# but we still need it to be backward compatible
pipeline_response = await self.config.async_pipeline.run(request, **kwargs)
response = pipeline_response.http_response
response.context = pipeline_response.context
return response

def stream_download_async(self, response, user_callback):
"""Async Generator for streaming request body data.
:param response: The initial response
:param user_callback: Custom callback for monitoring progress.
"""
block = self.config.connection.data_block_size
return response.stream_download(block, user_callback)
74 changes: 74 additions & 0 deletions msrest/async_paging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# --------------------------------------------------------------------------
#
# Copyright (c) Microsoft Corporation. All rights reserved.
#
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the ""Software""), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------
from collections.abc import AsyncIterator
import logging

_LOGGER = logging.getLogger(__name__)

class AsyncPagedMixin(AsyncIterator):

def __init__(self, *args, **kwargs):
"""Bring async to Paging.
"async_command" is mandatory keyword argument for this mixin to work.
"""
self._async_get_next = kwargs.get("async_command")
if not self._async_get_next:
_LOGGER.warning("Paging async iterator protocol is not available for %s",
self.__class__.__name__)

async def async_get(self, url):
"""Get an arbitrary page.
This resets the iterator and then fully consumes it to return the
specific page **only**.
:param str url: URL to arbitrary page results.
"""
self.reset()
self.next_link = url
return await self.async_advance_page()

async def async_advance_page(self):
if self.next_link is None:
raise StopAsyncIteration("End of paging")
self._current_page_iter_index = 0
self._response = await self._async_get_next(self.next_link)
self._derserializer(self, self._response)
return self.current_page

async def __anext__(self):
"""Iterate through responses."""
# Storing the list iterator might work out better, but there's no
# guarantee that some code won't replace the list entirely with a copy,
# invalidating an list iterator that might be saved between iterations.
if self.current_page and self._current_page_iter_index < len(self.current_page):
response = self.current_page[self._current_page_iter_index]
self._current_page_iter_index += 1
return response
else:
await self.async_advance_page()
return await self.__anext__()
Loading

0 comments on commit 3653d29

Please sign in to comment.