Skip to content

Commit

Permalink
Merge d1c213c into dd9ad49
Browse files Browse the repository at this point in the history
  • Loading branch information
alfred82santa authored Mar 3, 2021
2 parents dd9ad49 + d1c213c commit 2011658
Show file tree
Hide file tree
Showing 16 changed files with 219 additions and 149 deletions.
File renamed without changes.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
language: python
python:
- "3.5"
- "3.6"
- "3.7"
- "3.8"
- "3.9"
# command to install dependencies
install:
- make requirements-test
Expand Down
8 changes: 8 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ In order to send a payload you must use ``payload`` keyword on call:
Changelog
=========


v0.7.1
------

- Python 3.9 compatible.
- Update aiohttp.


v0.6.1
------

Expand Down
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ flake8
coverage
nose
asynctest
autopep8
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
dirty-loader >= 0.2.2
git+https://github.com/alfred82santa/configure.git
aiohttp >= 3.1.1
configure-fork
aiohttp >= 3.7.4
14 changes: 9 additions & 5 deletions service_client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import logging
from asyncio import get_event_loop, ensure_future
from asyncio.tasks import Task
from asyncio import Task, ensure_future, get_event_loop
from urllib.parse import urlparse, urlunsplit

try:
current_task = Task.current_task
except AttributeError: # pragma: no cover
from asyncio import current_task

from aiohttp.client import ClientSession
from aiohttp.client_reqrep import ClientResponse
from aiohttp.connector import TCPConnector
from yarl import URL

from .utils import ObjectWrapper

__version__ = '0.7.0'
__version__ = '0.7.1'


class ServiceClient:
Expand All @@ -36,7 +40,7 @@ def __init__(self, name='GenericService', spec=None, plugins=None, config=None,

def create_response(self, *args, **kwargs):
response = ObjectWrapper(ClientResponse(*args, **kwargs))
task = Task.current_task(loop=self.loop)
task = current_task(loop=self.loop)

self._execute_plugin_hooks_sync('prepare_response',
endpoint_desc=task.endpoint_desc, session=task.session,
Expand Down Expand Up @@ -78,7 +82,7 @@ async def call(self, endpoint, payload=None, **kwargs):
request_params=request_params)

await self.before_request(endpoint_desc, session, request_params)
task = Task.current_task(loop=self.loop)
task = current_task(loop=self.loop)
task.session = session
task.endpoint_desc = endpoint_desc
task.request_params = request_params
Expand Down
5 changes: 3 additions & 2 deletions service_client/formatters.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from urllib.parse import urlencode
from http.server import BaseHTTPRequestHandler
from logging import Formatter, PercentStyle, StrFormatStyle, StringTemplateStyle
from urllib.parse import urlencode

_STYLES = {
'%': PercentStyle,
Expand Down Expand Up @@ -70,7 +70,8 @@ def formatMessage(self, record):
pass

try:
record.exception_repr = repr(record.exception)
record.exception_repr = "{type}('{message}')".format(type=type(record.exception).__name__,
message=str(record.exception))
except AttributeError:
pass

Expand Down
36 changes: 28 additions & 8 deletions service_client/mocks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from asyncio import get_event_loop
from asyncio.futures import Future
from functools import wraps

from aiohttp import RequestInfo
from aiohttp.client_reqrep import ClientResponse
from aiohttp.helpers import TimerContext
from dirty_loader import LoaderNamespaceReversedCached
from functools import wraps
from multidict import CIMultiDict
from multidict import CIMultiDict, CIMultiDictProxy
from yarl import URL

from .plugins import BasePlugin
Expand Down Expand Up @@ -189,6 +191,11 @@ def __init__(self, endpoint_desc, session, request_params,
self.loop = loop or get_event_loop()

async def __call__(self, *args, **kwargs):
try:
from asyncio import create_task
except ImportError: # pragma: no cover
create_task = self.loop.create_task

args = list(args)
try:
method = kwargs['method']
Expand All @@ -204,14 +211,27 @@ async def __call__(self, *args, **kwargs):
self.url = url
self.args = args
self.kwargs = kwargs
self.response = ClientResponse(method, URL(url),
writer=None, continue100=False, timer=None,
request_info=RequestInfo(URL(url), method, kwargs.get('headers', [])),
auto_decompress=False,
traces=[], loop=self.loop, session=self.session)

async def writer(*args, **kwargs):
return None

continue100 = Future()
continue100.set_result(False)

self.response = ClientResponse(method,
URL(url),
writer=create_task(writer()),
continue100=continue100,
timer=TimerContext(loop=self.loop),
request_info=RequestInfo(URL(url),
method,
kwargs.get('headers', [])),
traces=[],
loop=self.loop,
session=self.session)

self.response.status = self.mock_desc.get('status', 200)
self.response.headers = CIMultiDict(self.mock_desc.get('headers', {}))
self.response._headers = CIMultiDictProxy(CIMultiDict(self.mock_desc.get('headers', {})))

await self.prepare_response()

Expand Down
38 changes: 19 additions & 19 deletions service_client/plugins.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import logging
from asyncio import wait_for, TimeoutError
import weakref
from asyncio import TimeoutError, wait_for
from datetime import datetime
from functools import wraps
from urllib.parse import quote_plus

import weakref
from async_timeout import timeout as TimeoutContext
from functools import wraps
from multidict import CIMultiDict

from service_client.utils import IncompleteFormatter, random_token
Expand Down Expand Up @@ -336,16 +336,15 @@ def __init__(self, limit=1, timeout=None, hard_limit=None):
self.limit = limit
self._counter = 0
self._fut = None
self._pending = 0
self._pending_futs = []
self._timeout = timeout
self._hard_limit = hard_limit

@property
def pending(self):
return self._pending
return len(self._pending_futs)

async def _acquire(self):

timeout = self._timeout
while True:
if self._counter < self.limit:
Expand All @@ -355,26 +354,26 @@ async def _acquire(self):
if self._hard_limit is not None and self._hard_limit < self.pending:
raise TooManyRequestsPendingError(self.TOO_MANY_REQ_PENDING_MSG)

if self._fut is None:
self._fut = self.service_client.loop.create_future()
self._pending += 1
fut = self.service_client.loop.create_future()
self._pending_futs.append(fut)

try:
now = self.service_client.loop.time()
await wait_for(self._fut, timeout=timeout, loop=self.service_client.loop)
await wait_for(fut, timeout=timeout, loop=self.service_client.loop)
if timeout is not None:
timeout -= self.service_client.loop.time() - now
if timeout <= 0:
raise TimeoutError()
except TimeoutError:
raise TooMuchTimePendingError(self.TOO_MUCH_TIME_MSG)
finally:
self._pending -= 1

def _release(self):
self._counter -= 1
if self._fut is not None:
self._fut.set_result(None)

try:
fut = self._pending_futs.pop(0)
except IndexError:
pass
else:
fut.set_result(None)

async def before_request(self, endpoint_desc, session, request_params):
start = self.service_client.loop.time()
Expand All @@ -386,9 +385,10 @@ async def before_request(self, endpoint_desc, session, request_params):
self.service_client.loop.time() - start)

def close(self):
if self._fut is not None:
from service_client import ConnectionClosedError
self._fut.set_exception(ConnectionClosedError('Connection closed'))
from service_client import ConnectionClosedError
while len(self._pending_futs):
fut = self._pending_futs.pop(0)
fut.set_exception(ConnectionClosedError('Connection closed'))


class Pool(BaseLimitPlugin):
Expand Down
6 changes: 2 additions & 4 deletions service_client/spec_loaders.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@


def json_loader(filename):
from json import load
with open(filename) as f:
return load(f)


def yaml_loader(filename):
from yaml import load
from yaml import load, FullLoader
with open(filename) as f:
return load(f)
return load(f, Loader=FullLoader)


def configuration_loader(filename):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
'Development Status :: 4 - Beta'],
packages=['service_client'],
include_package_data=False,
install_requires=['dirty-loader>=0.2.2', 'aiohttp>=3.1.1'],
install_requires=['dirty-loader>=0.2.2', 'aiohttp>=3.7.4', 'configure-fork'],
description="Service Client Framework powered by Python asyncio.",
long_description=open(os.path.join(os.path.dirname(__file__), 'README.rst')).read(),
test_suite="nose.collector",
Expand Down
28 changes: 23 additions & 5 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
from asyncio import get_event_loop
from asyncio import Future, get_event_loop

from aiohttp import ClientResponse, RequestInfo
from aiohttp.helpers import TimerContext
from yarl import URL


async def create_fake_response(method, url, *, session, headers=None, loop=None):
loop = loop or get_event_loop()

try:
from asyncio import create_task
except ImportError: # pragma: no cover
create_task = loop.create_task

async def writer(*args, **kwargs):
return None

continue100 = Future()
continue100.set_result(False)

return ClientResponse(method, URL(url),
writer=None, continue100=False, timer=None,
request_info=RequestInfo(URL(url), method,
writer=create_task(writer()),
continue100=continue100,
timer=TimerContext(loop=loop),
request_info=RequestInfo(URL(url),
method,
headers or []),
auto_decompress=False,
traces=[], loop=loop or get_event_loop(), session=session)
traces=[],
loop=loop,
session=session)
4 changes: 2 additions & 2 deletions tests/tests_formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def test_on_parse_exception(self):
'status_code': 404,
'elapsed': timedelta(seconds=0.214),
'exception': AttributeError('test exception')})
log_text = """EXCEPTION | GET http://example.com | 404 Not Found | 214 ms | AttributeError('test exception',)
log_text = """EXCEPTION | GET http://example.com | 404 Not Found | 214 ms | AttributeError('test exception')
Headers:
Test-Header-1: header value 1
Test-Header-2: header value 2
Expand All @@ -110,7 +110,7 @@ def test_on_exception(self):
'method': 'GET',
'url': 'http://example.com',
'exception': AttributeError('test exception')})
log_text = "EXCEPTION | GET http://example.com | AttributeError('test exception',)"
log_text = "EXCEPTION | GET http://example.com | AttributeError('test exception')"

self.assertEqual(self.formatter.formatMessage(log_entry), log_text)

Expand Down
3 changes: 2 additions & 1 deletion tests/tests_mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from aiohttp import hdrs
from aiohttp.client import ClientSession
from asynctest.case import TestCase
from multidict import CIMultiDict

from service_client.mocks import Mock, mock_manager, RawFileMock
from service_client.utils import ObjectWrapper
Expand Down Expand Up @@ -124,7 +125,7 @@ async def test_calling_mock_same_endpoint(self):

@mock_manager.patch_mock_desc(patch={'mock_type': 'default:RawFileMock',
'file': os.path.join(MOCKS_DIR, 'test_mock_text.data'),
'headers': {hdrs.CONTENT_TYPE: "text/plain; charset=utf8"}})
'headers': CIMultiDict({hdrs.CONTENT_TYPE: "text/plain; charset=utf8"})})
async def test_patch_mock(self):
await self.plugin.prepare_session(self.service_desc, self.session, {})
self.assertIsInstance(self.session.request, RawFileMock)
Expand Down
Loading

0 comments on commit 2011658

Please sign in to comment.