Skip to content

Commit

Permalink
[7.x] Add the 'X-Elastic-Client-Meta' header
Browse files Browse the repository at this point in the history
Co-authored-by: Seth Michael Larson <seth.larson@elastic.co>
  • Loading branch information
github-actions[bot] and sethmlarson committed Dec 14, 2020
1 parent 2e06989 commit b894e35
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 12 deletions.
18 changes: 17 additions & 1 deletion elasticsearch/_async/http_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
import warnings
from ._extra_imports import aiohttp_exceptions, aiohttp, yarl
from .compat import get_running_loop
from ..connection import Connection
from ..connection.base import (
Connection,
_get_client_meta_header,
_python_to_meta_version,
)
from ..compat import urlencode
from ..exceptions import (
ConnectionError,
Expand Down Expand Up @@ -218,6 +222,11 @@ async def perform_request(

orig_body = body
url_path = self.url_prefix + url
if params:
# Pop client metadata from parameters, if any.
client_meta = tuple(params.pop("_client_meta", ()))
else:
client_meta = ()
if params:
query_string = urlencode(params)
else:
Expand Down Expand Up @@ -268,6 +277,13 @@ async def perform_request(
body = self._gzip_compress(body)
req_headers["content-encoding"] = "gzip"

# Create meta header for aiohttp
if self.meta_header:
client_meta = (
("ai", _python_to_meta_version(aiohttp.__version__)),
) + client_meta
req_headers["x-elastic-client-meta"] = _get_client_meta_header(client_meta)

start = self.loop.time()
try:
async with self.session.request(
Expand Down
7 changes: 4 additions & 3 deletions elasticsearch/_async/http_aiohttp.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@
# under the License.

from ._extra_imports import aiohttp # type: ignore
from typing import Optional, Mapping, Collection, Union, Any, Tuple
from typing import Optional, Mapping, MutableMapping, Collection, Union, Any, Tuple
from ..connection import Connection

class AsyncConnection(Connection):
async def perform_request( # type: ignore
self,
method: str,
url: str,
params: Optional[Mapping[str, Any]] = ...,
params: Optional[MutableMapping[str, Any]] = ...,
body: Optional[bytes] = ...,
timeout: Optional[Union[int, float]] = ...,
ignore: Collection[int] = ...,
headers: Optional[Mapping[str, str]] = ...,
headers: Optional[MutableMapping[str, str]] = ...,
) -> Tuple[int, Mapping[str, str], str]: ...
async def close(self) -> None: ...

Expand All @@ -55,6 +55,7 @@ class AIOHttpConnection(AsyncConnection):
cloud_id: Optional[str] = ...,
api_key: Optional[Any] = ...,
opaque_id: Optional[str] = ...,
meta_header: bool = ...,
loop: Any = ...,
**kwargs: Any,
) -> None: ...
30 changes: 30 additions & 0 deletions elasticsearch/connection/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io
import re
from platform import python_version
import sys
import warnings

try:
Expand Down Expand Up @@ -65,6 +66,8 @@ class Connection(object):
:arg cloud_id: The Cloud ID from ElasticCloud. Convenient way to connect to cloud instances.
:arg opaque_id: Send this value in the 'X-Opaque-Id' HTTP header
For tracing all requests made by this transport.
:arg meta_header: If True will send the 'X-Elastic-Client-Meta' HTTP header containing
simple client metadata. Setting to False will disable the header. Defaults to True.
"""

def __init__(
Expand All @@ -79,6 +82,7 @@ def __init__(
cloud_id=None,
api_key=None,
opaque_id=None,
meta_header=True,
**kwargs
):

Expand Down Expand Up @@ -148,6 +152,10 @@ def __init__(
self.url_prefix = url_prefix
self.timeout = timeout

if not isinstance(meta_header, bool):
raise TypeError("meta_header must be of type bool")
self.meta_header = meta_header

def __repr__(self):
return "<%s: %s>" % (self.__class__.__name__, self.host)

Expand Down Expand Up @@ -329,3 +337,25 @@ def _get_api_key_header_val(self, api_key):
s = "{0}:{1}".format(api_key[0], api_key[1]).encode("utf-8")
return "ApiKey " + binascii.b2a_base64(s).rstrip(b"\r\n").decode("utf-8")
return "ApiKey " + api_key


def _python_to_meta_version(version):
"""Transforms a Python package version to one
compatible with 'X-Elastic-Client-Meta'. Essentially
replaces any pre-release information with a 'p' suffix.
"""
version, version_pre = re.match(r"^([0-9.]+)(.*)$", version).groups()
if version_pre:
version += "p"
return version


def _get_client_meta_header(client_meta=()):
"""Builds an 'X-Elastic-Client-Meta' HTTP header"""
es_version = _python_to_meta_version(__versionstr__)
py_version = python_version() + ("p" if sys.version_info[3] != "final" else "")
# First three values have to be 'service', 'language', 'transport'
client_meta = (("es", es_version), ("py", py_version), ("t", es_version)) + tuple(
client_meta
)
return ",".join("%s=%s" % (k, v) for k, v in client_meta)
8 changes: 5 additions & 3 deletions elasticsearch/connection/base.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ from typing import (
Union,
Optional,
Mapping,
MutableMapping,
Tuple,
List,
NoReturn,
Dict,
Sequence,
Any,
AnyStr,
Collection,
)

Expand All @@ -44,6 +44,7 @@ class Connection(object):
host: str
url_prefix: str
timeout: Optional[Union[float, int]]
meta_header: bool
def __init__(
self,
host: str = ...,
Expand All @@ -56,6 +57,7 @@ class Connection(object):
cloud_id: Optional[str] = ...,
api_key: Optional[Union[Tuple[str, str], List[str], str]] = ...,
opaque_id: Optional[str] = ...,
meta_header: bool = ...,
**kwargs: Any
) -> None: ...
def __repr__(self) -> str: ...
Expand All @@ -77,11 +79,11 @@ class Connection(object):
self,
method: str,
url: str,
params: Optional[Mapping[str, Any]] = ...,
params: Optional[MutableMapping[str, Any]] = ...,
body: Optional[bytes] = ...,
timeout: Optional[Union[int, float]] = ...,
ignore: Collection[int] = ...,
headers: Optional[Mapping[str, str]] = ...,
headers: Optional[MutableMapping[str, str]] = ...,
) -> Tuple[int, Mapping[str, str], str]: ...
def log_request_success(
self,
Expand Down
16 changes: 14 additions & 2 deletions elasticsearch/connection/http_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
except ImportError:
REQUESTS_AVAILABLE = False

from .base import Connection
from .base import Connection, _get_client_meta_header, _python_to_meta_version
from ..exceptions import (
ConnectionError,
ImproperlyConfigured,
Expand Down Expand Up @@ -142,13 +142,25 @@ def perform_request(
url = self.base_url + url
headers = headers or {}
if params:
url = "%s?%s" % (url, urlencode(params or {}))
# Pop client metadata from parameters, if any.
client_meta = params.pop("_client_meta", ())
else:
client_meta = ()
if params:
url = "%s?%s" % (url, urlencode(params))

orig_body = body
if self.http_compress and body:
body = self._gzip_compress(body)
headers["content-encoding"] = "gzip"

# Create meta header for requests
if self.meta_header:
client_meta = (
("rq", _python_to_meta_version(requests.__version__)),
) + client_meta
headers["x-elastic-client-meta"] = _get_client_meta_header(client_meta)

start = time.time()
request = requests.Request(method=method, headers=headers, url=url, data=body)
prepared_request = self.session.prepare_request(request)
Expand Down
1 change: 1 addition & 0 deletions elasticsearch/connection/http_requests.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ class RequestsHttpConnection(Connection):
cloud_id: Optional[str] = ...,
api_key: Optional[Any] = ...,
opaque_id: Optional[str] = ...,
meta_header: bool = ...,
**kwargs: Any
) -> None: ...
17 changes: 16 additions & 1 deletion elasticsearch/connection/http_urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from urllib3.util.retry import Retry # type: ignore
import warnings

from .base import Connection
from .base import Connection, _get_client_meta_header, _python_to_meta_version
from ..exceptions import (
ConnectionError,
ImproperlyConfigured,
Expand Down Expand Up @@ -216,8 +216,14 @@ def perform_request(
self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None
):
url = self.url_prefix + url
# Pop client metadata from parameters, if any.
if params:
client_meta = tuple(params.pop("_client_meta", ()))
else:
client_meta = ()
if params:
url = "%s?%s" % (url, urlencode(params))

full_url = self.host + url

start = time.time()
Expand All @@ -242,6 +248,15 @@ def perform_request(
body = self._gzip_compress(body)
request_headers["content-encoding"] = "gzip"

# Create meta header for urllib3
if self.meta_header:
client_meta = (
("ur", _python_to_meta_version(urllib3.__version__)),
) + client_meta
request_headers["x-elastic-client-meta"] = _get_client_meta_header(
client_meta
)

response = self.pool.urlopen(
method, url, body, retries=Retry(False), headers=request_headers, **kw
)
Expand Down
1 change: 1 addition & 0 deletions elasticsearch/connection/http_urllib3.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ class Urllib3HttpConnection(Connection):
cloud_id: Optional[str] = ...,
api_key: Optional[Any] = ...,
opaque_id: Optional[str] = ...,
meta_header: bool = ...,
**kwargs: Any
) -> None: ...
16 changes: 15 additions & 1 deletion elasticsearch/helpers/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ def _process_bulk_chunk(
"""
Send a bulk request to elasticsearch and process the output.
"""
kwargs = _add_helper_meta_to_kwargs(kwargs, "bp")

try:
# send the actual request
resp = client.bulk("\n".join(bulk_actions) + "\n", *args, **kwargs)
Expand All @@ -248,6 +250,13 @@ def _process_bulk_chunk(
yield item


def _add_helper_meta_to_kwargs(kwargs, helper_meta):
params = (kwargs or {}).pop("params", {})
params["_client_meta"] = (("h", helper_meta),)
kwargs["params"] = params
return kwargs


def streaming_bulk(
client,
actions,
Expand Down Expand Up @@ -515,6 +524,7 @@ def scan(
"""
scroll_kwargs = scroll_kwargs or {}
_add_helper_meta_to_kwargs(scroll_kwargs, "s")

if not preserve_order:
query = query.copy() if query else {}
Expand Down Expand Up @@ -562,7 +572,11 @@ def scan(

finally:
if scroll_id and clear_scroll:
client.clear_scroll(body={"scroll_id": [scroll_id]}, ignore=(404,))
client.clear_scroll(
body={"scroll_id": [scroll_id]},
ignore=(404,),
params={"_client_meta": (("h", "s"),)},
)


def reindex(
Expand Down
41 changes: 41 additions & 0 deletions test_elasticsearch/test_async/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.

import ssl
import re
import gzip
import io
from mock import patch
Expand Down Expand Up @@ -316,3 +317,43 @@ async def test_surrogatepass_into_bytes(self):
con = await self._get_mock_connection(response_body=buf)
status, headers, data = await con.perform_request("GET", "/")
assert u"你好\uda6a" == data

async def test_meta_header_value(self):
con = await self._get_mock_connection()
assert con.meta_header is True

await con.perform_request("GET", "/", body=b"{}")

_, kwargs = con.session.request.call_args
headers = kwargs["headers"]
assert re.match(
r"^es=[0-9]+\.[0-9]+\.[0-9]+p?,py=[0-9]+\.[0-9]+\.[0-9]+p?,"
r"t=[0-9]+\.[0-9]+\.[0-9]+p?,ai=[0-9]+\.[0-9]+\.[0-9]+p?$",
headers["x-elastic-client-meta"],
)

con = await self._get_mock_connection()
assert con.meta_header is True

await con.perform_request(
"GET", "/", body=b"{}", params={"_client_meta": (("h", "bp"),)}
)

(method, url), kwargs = con.session.request.call_args
headers = kwargs["headers"]
assert method == "GET"
assert str(url) == "http://localhost:9200/"
assert re.match(
r"^es=[0-9]+\.[0-9]+\.[0-9]+p?,py=[0-9]+\.[0-9]+\.[0-9]+p?,"
r"t=[0-9]+\.[0-9]+\.[0-9]+p?,ai=[0-9]+\.[0-9]+\.[0-9]+p?,h=bp$",
headers["x-elastic-client-meta"],
)

con = await self._get_mock_connection(connection_params={"meta_header": False})
assert con.meta_header is False

await con.perform_request("GET", "/", body=b"{}")

_, kwargs = con.session.request.call_args
headers = kwargs["headers"]
assert "x-elastic-client-meta" not in (x.lower() for x in headers)

0 comments on commit b894e35

Please sign in to comment.