Skip to content

Commit

Permalink
[7.x] Move client meta header logic to Transport
Browse files Browse the repository at this point in the history
Co-authored-by: Seth Michael Larson <seth.larson@elastic.co>
  • Loading branch information
github-actions[bot] and sethmlarson committed Dec 17, 2020
1 parent 82e42e6 commit fe53673
Show file tree
Hide file tree
Showing 17 changed files with 232 additions and 234 deletions.
6 changes: 5 additions & 1 deletion elasticsearch/_async/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,11 @@ async def async_scan(

finally:
if scroll_id and clear_scroll:
await client.clear_scroll(body={"scroll_id": [scroll_id]}, ignore=(404,))
await client.clear_scroll(
body={"scroll_id": [scroll_id]},
ignore=(404,),
params={"__elastic_client_meta": (("h", "s"),)},
)


async def async_reindex(
Expand Down
18 changes: 4 additions & 14 deletions elasticsearch/_async/http_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
from .compat import get_running_loop
from ..connection.base import (
Connection,
_get_client_meta_header,
_python_to_meta_version,
)
from ..compat import urlencode
from ..exceptions import (
Expand All @@ -34,6 +32,7 @@
ImproperlyConfigured,
SSLError,
)
from ..utils import _client_meta_version


# sentinel value for `verify_certs`.
Expand Down Expand Up @@ -72,6 +71,9 @@ async def close(self):


class AIOHttpConnection(AsyncConnection):

HTTP_CLIENT_META = ("ai", _client_meta_version(aiohttp.__version__))

def __init__(
self,
host="localhost",
Expand Down Expand Up @@ -222,11 +224,6 @@ async def perform_request(

orig_body = body
url_path = self.url_prefix + url
if params:
# Pop client metadata from parameters, if any.
client_meta = tuple(params.pop("_client_meta", ()))
else:
client_meta = ()
if params:
query_string = urlencode(params)
else:
Expand Down Expand Up @@ -277,13 +274,6 @@ async def perform_request(
body = self._gzip_compress(body)
req_headers["content-encoding"] = "gzip"

# Create meta header for aiohttp
if self.meta_header:
client_meta = (
("ai", _python_to_meta_version(aiohttp.__version__)),
) + client_meta
req_headers["x-elastic-client-meta"] = _get_client_meta_header(client_meta)

start = self.loop.time()
try:
async with self.session.request(
Expand Down
4 changes: 2 additions & 2 deletions elasticsearch/_async/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ async def perform_request(self, method, url, headers=None, params=None, body=Non
"""
await self._async_call()

method, params, body, ignore, timeout = self._resolve_request_args(
method, params, body
method, headers, params, body, ignore, timeout = self._resolve_request_args(
method, headers, params, body
)

for attempt in range(self.max_retries + 1):
Expand Down
28 changes: 2 additions & 26 deletions elasticsearch/connection/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ class Connection(object):
:arg cloud_id: The Cloud ID from ElasticCloud. Convenient way to connect to cloud instances.
:arg opaque_id: Send this value in the 'X-Opaque-Id' HTTP header
For tracing all requests made by this transport.
:arg meta_header: If True will send the 'X-Elastic-Client-Meta' HTTP header containing
simple client metadata. Setting to False will disable the header. Defaults to True.
"""

HTTP_CLIENT_META = None

def __init__(
self,
host="localhost",
Expand Down Expand Up @@ -336,27 +336,3 @@ def _get_api_key_header_val(self, api_key):
s = "{0}:{1}".format(api_key[0], api_key[1]).encode("utf-8")
return "ApiKey " + binascii.b2a_base64(s).rstrip(b"\r\n").decode("utf-8")
return "ApiKey " + api_key


def _python_to_meta_version(version):
"""Transforms a Python package version to one
compatible with 'X-Elastic-Client-Meta'. Essentially
replaces any pre-release information with a 'p' suffix.
"""
version, version_pre = re.match(
r"^([0-9][0-9.]*[0-9]|[0-9])(.*)$", version
).groups()
if version_pre:
version += "p"
return version


def _get_client_meta_header(client_meta=()):
"""Builds an 'X-Elastic-Client-Meta' HTTP header"""
es_version = _python_to_meta_version(__versionstr__)
py_version = _python_to_meta_version(python_version())
# First three values have to be 'service', 'language', 'transport'
client_meta = (("es", es_version), ("py", py_version), ("t", es_version)) + tuple(
client_meta
)
return ",".join("%s=%s" % (k, v) for k, v in client_meta)
33 changes: 13 additions & 20 deletions elasticsearch/connection/http_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,24 @@
import time
import warnings

try:
import requests

REQUESTS_AVAILABLE = True
except ImportError:
REQUESTS_AVAILABLE = False

from .base import Connection, _get_client_meta_header, _python_to_meta_version
from .base import Connection
from ..exceptions import (
ConnectionError,
ImproperlyConfigured,
ConnectionTimeout,
SSLError,
)
from ..compat import urlencode, string_types
from ..utils import _client_meta_version

try:
import requests

REQUESTS_AVAILABLE = True
_REQUESTS_META_VERSION = _client_meta_version(requests.__version__)
except ImportError:
REQUESTS_AVAILABLE = False
_REQUESTS_META_VERSION = ""


class RequestsHttpConnection(Connection):
Expand All @@ -59,6 +62,8 @@ class RequestsHttpConnection(Connection):
For tracing all requests made by this transport.
"""

HTTP_CLIENT_META = ("rq", _REQUESTS_META_VERSION)

def __init__(
self,
host="localhost",
Expand Down Expand Up @@ -141,11 +146,6 @@ def perform_request(
):
url = self.base_url + url
headers = headers or {}
if params:
# Pop client metadata from parameters, if any.
client_meta = params.pop("_client_meta", ())
else:
client_meta = ()
if params:
url = "%s?%s" % (url, urlencode(params))

Expand All @@ -154,13 +154,6 @@ def perform_request(
body = self._gzip_compress(body)
headers["content-encoding"] = "gzip"

# Create meta header for requests
if self.meta_header:
client_meta = (
("rq", _python_to_meta_version(requests.__version__)),
) + client_meta
headers["x-elastic-client-meta"] = _get_client_meta_header(client_meta)

start = time.time()
request = requests.Request(method=method, headers=headers, url=url, data=body)
prepared_request = self.session.prepare_request(request)
Expand Down
19 changes: 4 additions & 15 deletions elasticsearch/connection/http_urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
from urllib3.util.retry import Retry # type: ignore
import warnings

from .base import Connection, _get_client_meta_header, _python_to_meta_version
from .base import Connection
from ..exceptions import (
ConnectionError,
ImproperlyConfigured,
ConnectionTimeout,
SSLError,
)
from ..compat import urlencode
from ..utils import _client_meta_version

# sentinel value for `verify_certs` and `ssl_show_warn`.
# This is used to detect if a user is passing in a value
Expand Down Expand Up @@ -96,6 +97,8 @@ class Urllib3HttpConnection(Connection):
For tracing all requests made by this transport.
"""

HTTP_CLIENT_META = ("ur", _client_meta_version(urllib3.__version__))

def __init__(
self,
host="localhost",
Expand Down Expand Up @@ -216,11 +219,6 @@ def perform_request(
self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None
):
url = self.url_prefix + url
# Pop client metadata from parameters, if any.
if params:
client_meta = tuple(params.pop("_client_meta", ()))
else:
client_meta = ()
if params:
url = "%s?%s" % (url, urlencode(params))

Expand Down Expand Up @@ -248,15 +246,6 @@ def perform_request(
body = self._gzip_compress(body)
request_headers["content-encoding"] = "gzip"

# Create meta header for urllib3
if self.meta_header:
client_meta = (
("ur", _python_to_meta_version(urllib3.__version__)),
) + client_meta
request_headers["x-elastic-client-meta"] = _get_client_meta_header(
client_meta
)

response = self.pool.urlopen(
method, url, body, retries=Retry(False), headers=request_headers, **kw
)
Expand Down
4 changes: 2 additions & 2 deletions elasticsearch/helpers/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def _process_bulk_chunk(

def _add_helper_meta_to_kwargs(kwargs, helper_meta):
params = (kwargs or {}).pop("params", {})
params["_client_meta"] = (("h", helper_meta),)
params["__elastic_client_meta"] = (("h", helper_meta),)
kwargs["params"] = params
return kwargs

Expand Down Expand Up @@ -575,7 +575,7 @@ def scan(
client.clear_scroll(
body={"scroll_id": [scroll_id]},
ignore=(404,),
params={"_client_meta": (("h", "s"),)},
params={"__elastic_client_meta": (("h", "s"),)},
)


Expand Down
41 changes: 37 additions & 4 deletions elasticsearch/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
# under the License.

import time
from platform import python_version
from itertools import chain

from ._version import __versionstr__
from .connection import Urllib3HttpConnection
from .connection_pool import ConnectionPool, DummyConnectionPool, EmptyConnectionPool
from .serializer import JSONSerializer, Deserializer, DEFAULT_SERIALIZERS
Expand All @@ -27,6 +29,7 @@
SerializationError,
ConnectionTimeout,
)
from .utils import _client_meta_version


def get_host_info(node_info, host):
Expand Down Expand Up @@ -76,6 +79,7 @@ def __init__(
retry_on_status=(502, 503, 504),
retry_on_timeout=False,
send_get_body_as="GET",
meta_header=True,
**kwargs
):
"""
Expand Down Expand Up @@ -110,13 +114,17 @@ def __init__(
don't support passing bodies with GET requests. If you set this to
'POST' a POST method will be used instead, if to 'source' then the body
will be serialized and passed as a query parameter `source`.
:arg meta_header: If True will send the 'X-Elastic-Client-Meta' HTTP header containing
simple client metadata. Setting to False will disable the header. Defaults to True.
Any extra keyword arguments will be passed to the `connection_class`
when creating and instance unless overridden by that connection's
options provided as part of the hosts parameter.
"""
if connection_class is None:
connection_class = self.DEFAULT_CONNECTION_CLASS
if not isinstance(meta_header, bool):
raise TypeError("meta_header must be of type bool")

# serialization config
_serializers = DEFAULT_SERIALIZERS.copy()
Expand All @@ -132,6 +140,7 @@ def __init__(
self.retry_on_timeout = retry_on_timeout
self.retry_on_status = retry_on_status
self.send_get_body_as = send_get_body_as
self.meta_header = meta_header

# data serializer
self.serializer = serializer
Expand Down Expand Up @@ -175,6 +184,20 @@ def __init__(
if sniff_on_start:
self.sniff_hosts(True)

# Create the default metadata for the x-elastic-client-meta
# HTTP header. Only requires adding the (service, service_version)
# tuple to the beginning of the client_meta
self._client_meta = (
("es", _client_meta_version(__versionstr__)),
("py", _client_meta_version(python_version())),
("t", _client_meta_version(__versionstr__)),
)

# Grab the 'HTTP_CLIENT_META' property from the connection class
http_client_meta = getattr(connection_class, "HTTP_CLIENT_META", None)
if http_client_meta:
self._client_meta += (http_client_meta,)

def add_connection(self, host):
"""
Create a new :class:`~elasticsearch.Connection` instance and add it to the pool.
Expand Down Expand Up @@ -347,8 +370,8 @@ def perform_request(self, method, url, headers=None, params=None, body=None):
:arg body: body of the request, will be serialized using serializer and
passed to the connection
"""
method, params, body, ignore, timeout = self._resolve_request_args(
method, params, body
method, headers, params, body, ignore, timeout = self._resolve_request_args(
method, headers, params, body
)

for attempt in range(self.max_retries + 1):
Expand Down Expand Up @@ -410,7 +433,7 @@ def close(self):
"""
self.connection_pool.close()

def _resolve_request_args(self, method, params, body):
def _resolve_request_args(self, method, headers, params, body):
"""Resolves parameters for .perform_request()"""
if body is not None:
body = self.serializer.dumps(body)
Expand Down Expand Up @@ -442,5 +465,15 @@ def _resolve_request_args(self, method, params, body):
ignore = params.pop("ignore", ())
if isinstance(ignore, int):
ignore = (ignore,)
client_meta = params.pop("__elastic_client_meta", ())
else:
client_meta = ()

if self.meta_header:
headers = headers or {}
client_meta = self._client_meta + client_meta
headers["x-elastic-client-meta"] = ",".join(
"%s=%s" % (k, v) for k, v in client_meta
)

return method, params, body, ignore, timeout
return method, headers, params, body, ignore, timeout
1 change: 1 addition & 0 deletions elasticsearch/transport.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class Transport(object):
retry_on_status: Collection[int] = ...,
retry_on_timeout: bool = ...,
send_get_body_as: str = ...,
meta_header: bool = ...,
**kwargs: Any
) -> None: ...
def add_connection(self, host: Any) -> None: ...
Expand Down

0 comments on commit fe53673

Please sign in to comment.