Skip to content

Commit

Permalink
Merge pull request #419 from geopy/v2/add-asyncio-support
Browse files Browse the repository at this point in the history
geopy 2.0: Add optional asyncio support via AioHTTPAdapter
  • Loading branch information
KostyaEsmukov committed Jun 21, 2020
2 parents 3cb2113 + 88333c0 commit 83b371f
Show file tree
Hide file tree
Showing 42 changed files with 1,880 additions and 1,218 deletions.
7 changes: 6 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ before_install:
- travis_retry pip install -U pip wheel setuptools

install:
- travis_retry pip install -e ".[requests,timezone]"
- travis_retry pip install -e ".[aiohttp,requests,timezone]"

stages:
- lint
Expand Down Expand Up @@ -49,3 +49,8 @@ jobs:

# The `test` stage using the `python` matrix above is included implicitly.

# Run a single job with asyncio adapter:
# (not the whole matrix, to avoid spending extra quota)
- stage: test
python: "3.5"
env: GEOPY_TEST_ADAPTER=geopy.adapters.AioHTTPAdapter
7 changes: 6 additions & 1 deletion docs/changelog_2xx.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ New features
for HTTP requests, which doesn't support keepalives. Adapters is
a new mechanism which allows to use other HTTP client implementations.

There are 2 implementations coming out of the box:
There are 3 implementations coming out of the box:

+ :class:`geopy.adapters.RequestsAdapter` -- uses ``requests`` library
which supports keepalives (thus it is significantly more effective
Expand All @@ -33,7 +33,11 @@ New features
+ :class:`geopy.adapters.URLLibAdapter` -- uses ``urllib``, basically
it provides the same behavior as in geopy 1.x. It is used by default if
``requests`` package is not installed.
+ :class:`geopy.adapters.AioHTTPAdapter` -- uses ``aiohttp`` library.

- Added optional asyncio support in all geocoders via
:class:`.AioHTTPAdapter`, see the new :ref:`Async Mode <async_mode>`
doc section.

Packaging changes
~~~~~~~~~~~~~~~~~
Expand All @@ -42,6 +46,7 @@ Packaging changes
- New extras:

+ ``geopy[requests]`` for :class:`geopy.adapters.RequestsAdapter`.
+ ``geopy[aiohttp]`` for :class:`geopy.adapters.AioHTTPAdapter`.

Chores
~~~~~~
Expand Down
11 changes: 11 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ Supported Adapters
.. autoclass:: geopy.adapters.URLLibAdapter
:show-inheritance:

.. autoclass:: geopy.adapters.AioHTTPAdapter
:show-inheritance:


Base Classes
------------
Expand All @@ -415,6 +418,14 @@ Base Classes

.. automethod:: __init__

.. autoclass:: geopy.adapters.BaseSyncAdapter
:show-inheritance:
:members:

.. autoclass:: geopy.adapters.BaseAsyncAdapter
:show-inheritance:
:members:

Logging
~~~~~~~

Expand Down
221 changes: 207 additions & 14 deletions geopy/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,29 @@
.. _provisional basis: https://docs.python.org/3/glossary.html#term-provisional-api
"""
import abc
import asyncio
import contextlib
import json
import warnings
from socket import timeout as SocketTimeout
from ssl import SSLError
from urllib.error import HTTPError
from urllib.request import HTTPSHandler, ProxyHandler, Request, URLError, build_opener
from urllib.parse import urlparse
from urllib.request import (
HTTPSHandler,
ProxyHandler,
Request,
URLError,
build_opener,
getproxies,
)

from geopy.exc import (
GeocoderParseError,
GeocoderServiceError,
GeocoderTimedOut,
GeocoderUnavailable,
GeopyError,
)
from geopy.util import logger

Expand All @@ -38,6 +49,15 @@
RequestsHTTPAdapter = object
requests_available = False

try:
import aiohttp
import aiohttp.client_exceptions
import yarl

aiohttp_available = True
except ImportError:
aiohttp_available = False


class AdapterHTTPError(IOError):
"""An exception which must be raised by adapters when an HTTP response
Expand All @@ -63,9 +83,16 @@ def __init__(self, message, *, status_code, text):
class BaseAdapter(abc.ABC):
"""Base class for an Adapter.
To make geocoders use a custom adapter, add an implementation
of this class and specify it in
the :attr:`geopy.geocoders.options.default_adapter_factory` value.
There are two types of adapters:
- :class:`.BaseSyncAdapter` -- synchronous adapter,
- :class:`.BaseAsyncAdapter` -- asynchronous (asyncio) adapter.
Concrete adapter implementations must extend one of the two
base adapters above.
See :attr:`geopy.geocoders.options.default_adapter_factory`
for details on how to specify an adapter to be used by geocoders.
"""

Expand All @@ -89,7 +116,6 @@ def __init__(self, *, proxies, ssl_context):
See :attr:`geopy.geocoders.options.default_ssl_context`.
"""
pass

@abc.abstractmethod
def get_json(self, url, *, timeout, headers):
Expand All @@ -106,7 +132,6 @@ def get_json(self, url, *, timeout, headers):
:param dict headers: A dict with custom HTTP request headers.
"""
pass

@abc.abstractmethod
def get_text(self, url, *, timeout, headers):
Expand All @@ -130,10 +155,57 @@ def get_text(self, url, *, timeout, headers):
:param dict headers: A dict with custom HTTP request headers.
"""


class BaseSyncAdapter(BaseAdapter):
"""Base class for synchronous adapters.
"""

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
pass


class URLLibAdapter(BaseAdapter):
class BaseAsyncAdapter(BaseAdapter):
"""Base class for asynchronous adapters.
"""

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
pass


def _normalize_proxies(proxies):
"""Normalize user-supplied `proxies`:
- For `None` -- retrieve System proxies using
:func:`urllib.request.getproxies`
- Add `http://` scheme to proxy urls if missing.
"""
if proxies is None: # Use system proxy settings
proxies = getproxies()
if not proxies:
return {} # Disable proxies

normalized = {}
for scheme, url in proxies.items():
if url and "://" not in url:
# Without the scheme there are errors:
# from aiohttp:
# ValueError: Only http proxies are supported
# from requests (in some envs):
# urllib3.exceptions.ProxySchemeUnknown: Not supported
# proxy scheme localhost
url = "http://%s" % url
normalized[scheme] = url
return normalized


class URLLibAdapter(BaseSyncAdapter):
"""The fallback adapter which uses urllib from the Python standard
library, see :func:`urllib.request.urlopen`.
Expand All @@ -146,6 +218,7 @@ class URLLibAdapter(BaseAdapter):
"""

def __init__(self, *, proxies, ssl_context):
proxies = _normalize_proxies(proxies)
super().__init__(proxies=proxies, ssl_context=ssl_context)

# `ProxyHandler` should be present even when actually there're
Expand Down Expand Up @@ -224,7 +297,7 @@ def _decode_page(self, page):
raise GeocoderParseError("Unable to decode the response bytes")


class RequestsAdapter(BaseAdapter):
class RequestsAdapter(BaseSyncAdapter):
"""The adapter which uses `requests`_ library.
.. _requests: https://requests.readthedocs.io
Expand Down Expand Up @@ -254,14 +327,12 @@ def __init__(
"this command to install requests: "
'`pip install "geopy[requests]"`.'
)
proxies = _normalize_proxies(proxies)
super().__init__(proxies=proxies, ssl_context=ssl_context)

self.session = requests.Session()
if proxies is None:
# Use system proxies:
self.session.trust_env = True
else:
self.session.trust_env = False
self.session.proxies = proxies
self.session.trust_env = False # don't use system proxies
self.session.proxies = proxies

self.session.mount(
"http://",
Expand All @@ -283,6 +354,12 @@ def __init__(
),
)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.session.close()

def __del__(self):
# Cleanup keepalive connections when Geocoder (and, thus, Adapter)
# instances are getting garbage-collected.
Expand Down Expand Up @@ -330,6 +407,122 @@ def _request(self, url, *, timeout, headers):
return resp


class AioHTTPAdapter(BaseAsyncAdapter):
"""The adapter which uses `aiohttp`_ library.
.. _aiohttp: https://docs.aiohttp.org/
`aiohttp` supports keep-alives, persists Cookies, allows response
compression and uses HTTP/1.1 [currently].
``aiohttp`` package must be installed in order to use this adapter.
"""

is_available = aiohttp_available

def __init__(self, *, proxies, ssl_context):
if not aiohttp_available:
raise ImportError(
"`aiohttp` must be installed in order to use AioHTTPAdapter. "
"If you have installed geopy via pip, you may use "
"this command to install aiohttp: "
'`pip install "geopy[aiohttp]"`.'
)
proxies = _normalize_proxies(proxies)
super().__init__(proxies=proxies, ssl_context=ssl_context)

self.proxies = proxies
self.ssl_context = ssl_context

@property
def session(self):
# Lazy session creation, which allows to avoid "unclosed socket"
# warnings if a Geocoder instance is created without entering
# async context and making any requests.
session = self.__dict__.get("session")
if session is None:
session = aiohttp.ClientSession(
trust_env=False, # don't use system proxies
raise_for_status=False
)
self.__dict__["session"] = session
return session

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
# Might issue a warning if loop is immediately closed:
# ResourceWarning: unclosed transport <_SelectorSocketTransport fd=10>
# https://github.com/aio-libs/aiohttp/issues/1115#issuecomment-242278593
# https://github.com/python/asyncio/issues/466
await self.session.close()

async def get_text(self, url, *, timeout, headers):
with self._normalize_exceptions():
async with self._request(url, timeout=timeout, headers=headers) as resp:
await self._raise_for_status(resp)
return await resp.text()

async def get_json(self, url, *, timeout, headers):
with self._normalize_exceptions():
async with self._request(url, timeout=timeout, headers=headers) as resp:
await self._raise_for_status(resp)
try:
try:
return await resp.json()
except aiohttp.client_exceptions.ContentTypeError:
# `Attempt to decode JSON with unexpected mimetype:
# text/plain;charset=utf-8`
return json.loads(await resp.text())
except ValueError:
raise GeocoderParseError(
"Could not deserialize using deserializer:\n%s"
% (await resp.text())
)

async def _raise_for_status(self, resp):
if resp.status >= 400:
raise AdapterHTTPError(
"Non-successful status code %s" % resp.status,
status_code=resp.status,
text=await resp.text(),
)

def _request(self, url, *, timeout, headers):
if self.proxies:
scheme = urlparse(url).scheme
proxy = self.proxies.get(scheme.lower())
else:
proxy = None

# aiohttp accepts url as string or as yarl.URL.
# A string url might be re-encoded by yarl, which might cause
# a hashsum of params to change. Some geocoders use that
# to authenticate their requests (such as Baidu SK).
url = yarl.URL(url, encoded=True) # `encoded` param disables url re-encoding
return self.session.get(
url, timeout=timeout, headers=headers, proxy=proxy, ssl=self.ssl_context
)

@contextlib.contextmanager
def _normalize_exceptions(self):
try:
yield
except (GeopyError, AdapterHTTPError, AssertionError):
raise
except Exception as error:
message = str(error)
if isinstance(error, asyncio.TimeoutError):
raise GeocoderTimedOut("Service timed out")
elif isinstance(error, SSLError):
if "timed out" in message:
raise GeocoderTimedOut("Service timed out")
elif isinstance(error, aiohttp.ClientConnectionError):
raise GeocoderUnavailable(message)
raise GeocoderServiceError(message)


# https://github.com/kennethreitz/requests/issues/3774#issuecomment-267871876
class RequestsHTTPWithSSLContextAdapter(RequestsHTTPAdapter):
def __init__(self, *, ssl_context=None, **kwargs):
Expand Down
7 changes: 7 additions & 0 deletions geopy/compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
try:
# >=3.7
from asyncio import current_task
except ImportError:
from asyncio import Task
current_task = Task.current_task
del Task

0 comments on commit 83b371f

Please sign in to comment.