Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion elastic_transport/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
# specific language governing permissions and limitations
# under the License.

__version__ = "0.1.0b0"
__version__ = "7.11.0"
7 changes: 7 additions & 0 deletions elastic_transport/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,15 @@
except NameError:
string_types = (str, bytes)

try:
from collections.abc import Mapping, MutableMapping
except ImportError:
from collections import Mapping, MutableMapping

__all__ = [
"urlparse",
"urlencode",
"string_types",
"Mapping",
"MutableMapping",
]
6 changes: 4 additions & 2 deletions elastic_transport/connection/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,17 @@ def log_request_fail(
if response is not None:
logger.debug("< %s", response)

def _raise_error(self, status, raw_data):
def _raise_error(self, status, headers, raw_data):
"""Locate appropriate exception and raise it. Attempts
to decode the raw data as JSON for better usability.
"""
try:
raw_data = json.loads(six.ensure_str(raw_data, "utf-8", "ignore"))
except Exception:
pass
raise HTTP_EXCEPTIONS.get(status, APIError)(message=raw_data, status=status)
raise HTTP_EXCEPTIONS.get(status, APIError)(
message=raw_data, status=status, headers=headers
)

def _gzip_compress(self, body):
buf = io.BytesIO()
Expand Down
4 changes: 3 additions & 1 deletion elastic_transport/connection/http_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ def perform_request(
status=response.status_code,
response=raw_data,
)
self._raise_error(response.status_code, raw_data)
self._raise_error(
status=response.status_code, headers=response.headers, raw_data=raw_data
)

self.log_request_success(
method=method,
Expand Down
27 changes: 9 additions & 18 deletions elastic_transport/connection/http_urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# specific language governing permissions and limitations
# under the License.

import ssl
import time
import warnings

Expand All @@ -26,7 +25,7 @@

from ..compat import urlencode
from ..exceptions import ConnectionError, ConnectionTimeout
from ..utils import DEFAULT
from ..utils import DEFAULT, normalize_headers
from .base import Connection

CA_CERTS = None
Expand All @@ -39,25 +38,13 @@
pass


def create_ssl_context(**kwargs):
"""
A helper function around creating an SSL context

https://docs.python.org/3/library/ssl.html#context-creation

Accepts kwargs in the same manner as `create_default_context`.
"""
ctx = ssl.create_default_context(**kwargs)
return ctx


class Urllib3HttpConnection(Connection):
"""
Default connection class using the `urllib3` library and the http protocol.

:arg host: hostname of the node (default: localhost)
:arg port: port to use (integer, default: 9200)
:arg url_prefix: optional url prefix for elasticsearch
:arg port: port to use (integer)
:arg url_prefix: optional url prefix
:arg timeout: default timeout in seconds (float, default: 10)
:arg use_ssl: use ssl for the connection if `True`
:arg verify_certs: whether to verify SSL certificates
Expand Down Expand Up @@ -223,6 +210,7 @@ def perform_request(

request_headers = self.headers.copy()
request_headers.update(headers or ())
request_headers = normalize_headers(request_headers)

if self.http_compress and body:
body = self._gzip_compress(body)
Expand All @@ -236,6 +224,7 @@ def perform_request(
headers=request_headers,
**kw
)
response_headers = dict(response.headers)
duration = time.time() - start
raw_data = response.data.decode("utf-8", "surrogatepass")
except Exception as e:
Expand All @@ -262,7 +251,9 @@ def perform_request(
status=response.status,
response=raw_data,
)
self._raise_error(response.status, raw_data)
self._raise_error(
status=response.status, headers=response_headers, raw_data=raw_data
)

self.log_request_success(
method=method,
Expand All @@ -273,7 +264,7 @@ def perform_request(
duration=duration,
)

return response.status, response.getheaders(), raw_data
return response.status, response_headers, raw_data

def close(self):
"""
Expand Down
11 changes: 9 additions & 2 deletions elastic_transport/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from six import add_metaclass, python_2_unicode_compatible

from .response import Headers

HTTP_EXCEPTIONS = {}


Expand All @@ -38,17 +40,22 @@ class TransportError(Exception):
most recently raised (index=0) to least recently raised (index=N)

If an HTTP status code is available with the error it
will be stored under 'status'.
will be stored under 'status'. If HTTP headers are available
they are stored under 'headers'.
"""

status = None

def __init__(self, message, errors=(), status=None):
def __init__(self, message, errors=(), status=None, headers=None):
super(TransportError, self).__init__(message)
self.errors = tuple(errors)
self.message = message
if status is not None:
self.status = status
if headers is not None:
self.headers = Headers(headers)
else:
self.headers = None

def __repr__(self):
parts = [repr(self.message)]
Expand Down
70 changes: 64 additions & 6 deletions elastic_transport/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,71 @@
# specific language governing permissions and limitations
# under the License.

from .compat import Mapping


class Headers(Mapping):
"""HTTP headers"""

def __init__(self, initial=None):
self._internal = {}
if initial:
for key, val in dict(initial).items():
self._internal[self._normalize_key(key)] = (key, val)

def __getitem__(self, item):
return self._internal[self._normalize_key(item)][1]

def __eq__(self, other):
if isinstance(other, Mapping):
return dict(self.items()) == dict(other.items())
return NotImplemented

def __ne__(self, other):
if isinstance(other, Mapping):
return dict(self.items()) != dict(other.items())
return NotImplemented

def __iter__(self):
return iter(self.keys())

def __len__(self):
return len(self._internal)

def __contains__(self, item):
return self._normalize_key(item) in self._internal

def __repr__(self):
return repr(dict(self.items()))

def __str__(self):
return str(dict(self.items()))

def get(self, key, default=None):
return self._internal.get(self._normalize_key(key), (None, default))[1]

def keys(self):
return [key for _, (key, _) in self._internal.items()]

def values(self):
return [val for _, (_, val) in self._internal.items()]

def items(self):
return [(key, val) for _, (key, val) in self._internal.items()]

def copy(self):
return dict(self.items())

def _normalize_key(self, key):
return key.lower() if hasattr(key, "lower") else key


class Response(object):
"""HTTP response"""

def __init__(self, headers, status, body):
self.headers = headers
def __init__(self, status, headers, body):
self.status = status
self.headers = Headers(headers)
self.body = body

def __repr__(self):
Expand Down Expand Up @@ -67,12 +125,12 @@ def __ne__(self, other):


class DictResponse(Response, dict):
def __init__(self, headers, status, body):
Response.__init__(self, headers, status, body)
def __init__(self, status, headers, body):
Response.__init__(self, status=status, headers=headers, body=body)
dict.__init__(self, body)


class ListResponse(Response, list):
def __init__(self, headers, status, body):
Response.__init__(self, headers, status, body)
def __init__(self, status, headers, body):
Response.__init__(self, status=status, headers=headers, body=body)
list.__init__(self, body)
16 changes: 8 additions & 8 deletions elastic_transport/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def perform_request(
connection = self.get_connection()

try:
status, headers_response, data = connection.perform_request(
resp_status, resp_headers, data = connection.perform_request(
method,
path,
params,
Expand All @@ -253,7 +253,7 @@ def perform_request(
if method == "HEAD" and e.status == 404:
return Response(
status=404,
headers={},
headers=e.headers,
body=False,
)

Expand Down Expand Up @@ -289,14 +289,14 @@ def perform_request(

if method == "HEAD":
return Response(
status=status,
headers=headers_response,
body=200 <= status < 300,
status=resp_status,
headers=resp_headers,
body=200 <= resp_status < 300,
)

if data:
data = self.deserializer.loads(
data, headers_response.get("content-type")
data, resp_headers.get("content-type")
)

# After the body is deserialized put the data
Expand All @@ -307,8 +307,8 @@ def perform_request(
elif isinstance(data, dict):
response_cls = DictResponse
return response_cls(
status=status,
headers=headers_response,
status=resp_status,
headers=resp_headers,
body=data,
)

Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
},
packages=packages,
install_requires=[
"urllib3>=1.21.1",
"urllib3>=1.21.1, <2",
"six>=1.12",
"certifi",
],
Expand All @@ -57,7 +57,7 @@
"develop": ["pytest", "pytest-cov", "pytest-mock", "mock", "requests"],
},
classifiers=[
"Development Status :: 4 - Beta",
"Development Status :: 5 - Production/Stable",
"License :: OSI Approved :: Apache Software License",
"Intended Audience :: Developers",
"Operating System :: OS Independent",
Expand Down
3 changes: 3 additions & 0 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def test_timeout_is_10_seconds_by_default(self):
with patch.object(conn.pool, "urlopen") as pool_urlopen:
resp = Mock()
resp.status = 200
resp.headers = {}
pool_urlopen.return_value = resp

conn.perform_request("GET", "/")
Expand All @@ -158,6 +159,7 @@ def test_timeout_override_default(self, request_timeout):
with patch.object(conn.pool, "urlopen") as pool_urlopen:
resp = Mock()
resp.status = 200
resp.headers = {}
pool_urlopen.return_value = resp

conn.perform_request("GET", "/", request_timeout=request_timeout)
Expand Down Expand Up @@ -244,6 +246,7 @@ def test_failed_request_logs(self, logger):
resp = Mock()
resp.data = b'{"answer":42}'
resp.status = 500
resp.headers = {}
pool_urlopen.return_value = resp

with pytest.raises(TransportError) as e:
Expand Down
Loading