Skip to content

Commit

Permalink
Move raven-aiohttp code inside (#150)
Browse files Browse the repository at this point in the history
* Move raven-aiohttp code inside and add licence attribution
  • Loading branch information
mosquito committed Nov 7, 2022
1 parent ce44961 commit 4f27ff7
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 43 deletions.
2 changes: 1 addition & 1 deletion aiomisc/circuit_breaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ async def async_wrapper(
return await circuit_breaker.call_async(func, *args, **kw)

@wraps(func)
def wrapper(*args: Any, **kw: Any) -> T:
def wrapper(*args: Any, **kw: Any) -> Any:
return circuit_breaker.call(func, *args, **kw)

if asyncio.iscoroutinefunction(func):
Expand Down
8 changes: 3 additions & 5 deletions aiomisc/service/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,9 @@ async def stop(self, exception: Exception = None) -> Any:
)

if stop_result:
stop_result = (
await self.loop.run_in_executor(
None, process.join, self.process_stop_timeout,
)
) is not None
await self.loop.run_in_executor(
None, process.join, self.process_stop_timeout,
)

if not stop_result and process.is_alive():
process.kill()
Expand Down
300 changes: 270 additions & 30 deletions aiomisc/service/raven.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,303 @@
import abc
import asyncio
import inspect
import logging
import socket
import sys
from asyncio import Queue, ensure_future
from http import HTTPStatus
from types import MappingProxyType
from typing import Any, Iterable, Mapping
from typing import Any, Awaitable, Callable, Iterable, Mapping, Optional, Set

import aiohttp
import yarl
from aiohttp import ClientSession, TCPConnector
from raven import Client # type: ignore
from raven.conf import defaults # type: ignore
from raven.exceptions import APIError, RateLimited # type: ignore
from raven.handlers.logging import SentryHandler # type: ignore
from raven.transport import Transport # type: ignore
from raven_aiohttp import QueuedAioHttpTransport # type: ignore
from raven.transport.base import AsyncTransport # type: ignore
from raven.transport.http import HTTPTransport # type: ignore

from aiomisc.service import Service
from aiomisc.utils import TimeoutType


log = logging.getLogger(__name__)


__doc__ = """
This module based on https://github.com/getsentry/raven-aiohttp
Copyright (c) 2018 Functional Software, Inc and individual contributors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the Raven nor the names of its contributors may be
used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
"""


class DummyTransport(Transport): # type: ignore
def send(self, url, data, headers): # type: ignore
pass


class QueuedPatchedAioHttpTransport(QueuedAioHttpTransport): # type: ignore
class AioHttpTransportBase(
AsyncTransport, HTTPTransport, metaclass=abc.ABCMeta, # type: ignore
):

def __init__(
self,
*args: Any,
workers: int = 1,
qsize: int = 1000,
**kwargs: Any
self, parsed_url: Optional[str] = None, *, verify_ssl: bool = True,
timeout: TimeoutType = defaults.TIMEOUT, keepalive: bool = True,
family: int = socket.AF_INET,
):
super(QueuedAioHttpTransport, self).__init__(*args, **kwargs)
loop_args = (
{"loop": self._loop} if sys.version_info < (3, 8) else {}
self._keepalive = keepalive
self._family = family
self._loop = asyncio.get_event_loop()

if parsed_url is not None:
raise TypeError(
"Transport accepts no URLs for this version of raven.",
)
super().__init__(timeout, verify_ssl)

if self.keepalive:
self._client_session = self._client_session_factory()

self._closing = False

@property
def keepalive(self) -> bool:
return self._keepalive

@property
def family(self) -> int:
return self._family

def _client_session_factory(self) -> aiohttp.ClientSession:
connector = aiohttp.TCPConnector(
verify_ssl=self.verify_ssl, family=self.family,
)
return aiohttp.ClientSession(
connector=connector,
)
self._queue: Queue = Queue(maxsize=qsize, **loop_args)

async def _do_send(
self, url: str, data: Any, headers: Mapping,
callback: Callable[[], Any], errorback: Callable[[Any], Any],
) -> None:
if self.keepalive:
session = self._client_session
else:
session = self._client_session_factory()

resp = None

try:
resp = await session.post(
url, data=data, compress=False,
headers=headers, timeout=self.timeout,
)

code = resp.status
if code != HTTPStatus.OK:
msg = resp.headers.get("x-sentry-error")
if code == HTTPStatus.TOO_MANY_REQUESTS:
try:
retry_after = int(resp.headers.get("retry-after", "0"))
except (ValueError, TypeError):
retry_after = 0
errorback(RateLimited(msg, retry_after))
else:
errorback(APIError(msg, code))
else:
callback()
except asyncio.CancelledError:
# do not mute asyncio.CancelledError
raise
except Exception as exc:
errorback(exc)
finally:
if resp is not None:
resp.release()
if not self.keepalive:
await session.close()

@abc.abstractmethod
def _async_send(
self, url: str, data: Any, headers: Mapping,
success_cb: Callable[[], Any], failure_cb: Callable[[Any], Any],
) -> None: # pragma: no cover
pass

@abc.abstractmethod
async def _close(self) -> None: # pragma: no cover
pass

def async_send(
self, url: str, data: Any, headers: Mapping,
success_cb: Callable[[], Any], failure_cb: Callable[[Any], Any],
) -> None:
if self._closing:
failure_cb(RuntimeError(f"{self.__class__.__name__} is closed"))
return

self._loop.call_soon_threadsafe(
self._async_send, url, data, headers, success_cb, failure_cb,
)

async def _close_coro(self, *, timeout: TimeoutType = None) -> None:
try:
await asyncio.wait_for(
self._close(), timeout=timeout,
)
except asyncio.TimeoutError:
pass
finally:
if self.keepalive:
await self._client_session.close()

def close(self, *, timeout: TimeoutType = None) -> Awaitable[Any]:
if self._closing:
async def dummy() -> None:
pass

return dummy()

self._closing = True

return self._loop.create_task(self._close_coro(timeout=timeout))


class AioHttpTransport(AioHttpTransportBase):

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)

self._tasks: Set[asyncio.Task] = set()

def _async_send(
self, url: str, data: Any, headers: Mapping,
success_cb: Callable[[], Any], failure_cb: Callable[[Any], Any],
) -> None:
coro = self._do_send(url, data, headers, success_cb, failure_cb)

task = self._loop.create_task(coro)
self._tasks.add(task)
task.add_done_callback(self._tasks.remove)

async def _close(self) -> None:
await asyncio.gather(
*self._tasks,
return_exceptions=True,
)

assert len(self._tasks) == 0


class QueuedAioHttpTransport(AioHttpTransportBase):

def __init__(
self, *args: Any, workers: int = 1, qsize: int = 1000, **kwargs: Any
):
super().__init__(*args, **kwargs)

self._queue: asyncio.Queue = asyncio.Queue(maxsize=qsize)

self._workers = set()

for _ in range(workers):
worker = ensure_future(self._worker())
worker: asyncio.Task = self._loop.create_task(self._worker())
self._workers.add(worker)
worker.add_done_callback(self._workers.remove)

async def _worker(self) -> None:
while True:
data = await self._queue.get()

class QueuedKeepaliveAioHttpTransport(
QueuedPatchedAioHttpTransport,
):
try:
if data is ...:
self._queue.put_nowait(...)
break

url, data, headers, success_cb, failure_cb = data

await self._do_send(
url, data, headers, success_cb, failure_cb,
)
finally:
self._queue.task_done()

def _async_send(
self, url: str, data: Any, headers: Mapping,
callback: Callable[[], Any], errorback: Callable[[Any], Any],
) -> None:
payload = url, data, headers, callback, errorback

try:
self._queue.put_nowait(payload)
except asyncio.QueueFull:
skipped = self._queue.get_nowait()
self._queue.task_done()

*_, errorback = skipped

errorback(
RuntimeError("QueuedAioHttpTransport internal queue is full"),
)

self._queue.put_nowait(payload)

async def _close(self) -> None:
try:
self._queue.put_nowait(...)
except asyncio.QueueFull:
skipped = self._queue.get_nowait()
self._queue.task_done()

*_, failure_cb = skipped

failure_cb(
RuntimeError("QueuedAioHttpTransport internal queue was full"),
)

self._queue.put_nowait(...)

await asyncio.gather(
*self._workers,
return_exceptions=True,
)

assert len(self._workers) == 0
assert self._queue.qsize() == 1
try:
assert self._queue.get_nowait() is ...
finally:
self._queue.task_done()


class QueuedKeepaliveAioHttpTransport(QueuedAioHttpTransport):
DNS_CACHE_TTL = 600
DNS_CACHE = True
TCP_CONNECTION_LIMIT = 32
Expand All @@ -60,7 +307,6 @@ class QueuedKeepaliveAioHttpTransport(

def __init__(
self, *args: Any, family: int = socket.AF_UNSPEC,
loop: asyncio.AbstractEventLoop = None,
dns_cache: bool = DNS_CACHE,
dns_cache_ttl: int = DNS_CACHE_TTL,
connection_limit: int = TCP_CONNECTION_LIMIT,
Expand All @@ -73,7 +319,7 @@ def __init__(
self.dns_cache_ttl = dns_cache_ttl

super().__init__(
*args, family=family, loop=loop, keepalive=True,
*args, family=family, keepalive=True,
workers=workers, qsize=qsize, **kwargs
)

Expand All @@ -92,15 +338,9 @@ def _client_session_factory(self) -> ClientSession:
connector_owner=False,
)

async def _close(self) -> Transport:
transport = await super()._close()

if inspect.iscoroutinefunction(self.connector.close()):
await self.connection.close()
else:
# noinspection PyAsyncCall
self.connector.close()
return transport
async def _close(self) -> None:
await super()._close()
await self.connector.close()


class RavenSender(Service):
Expand Down
3 changes: 1 addition & 2 deletions aiomisc/thread_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,7 @@ def wrap(*args: Any, **kwargs: Any) -> Any:
future = run_in_new_thread(
func, args=args, kwargs=kwargs, detach=detach,
)

return _awaiter(future)
return future

return wrap

Expand Down

0 comments on commit 4f27ff7

Please sign in to comment.