Skip to content

Commit

Permalink
Use async runners if needed (#80)
Browse files Browse the repository at this point in the history
Due to elastic/rally#852 we will implement a compatibility layer in the
current load generator that will also use the asyncio API and thus
requires custom runners to be registered differently (by specifying
`async_runner=True`). Rally's runner registry will also expose a new
attribute `async_runner` that is set to `True` if Rally requires runners
to be registered as described above.

With this commit we introduce a (temporary) compatibility layer for all
custom runners that allows older Rally versions to work with the classic
runners and newer Rally versions with the async runners.

Relates elastic/rally#852
Relates elastic/rally#916
  • Loading branch information
danielmitterdorfer committed Feb 21, 2020
1 parent 93b176d commit b4be633
Show file tree
Hide file tree
Showing 11 changed files with 370 additions and 26 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ PYENV_PREREQ_HELP = "\033[0;31mIMPORTANT\033[0m: please add \033[0;31meval \"\$$
VE_MISSING_HELP = "\033[0;31mIMPORTANT\033[0m: Couldn't find $(PWD)/$(VENV_NAME); have you executed make venv-create?\033[0m\n"

prereq: make-requirements.txt
pyenv install --skip-existing $(PY35)
pyenv install --skip-existing $(PY36)
pyenv install --skip-existing $(PY37)
pyenv global system $(PY35) $(PY36) $(PY37)
pyenv install --skip-existing $(PY38)
pyenv local $(PY36) $(PY37) $(PY38)
@# Ensure all Python versions are registered for this project
@awk -F'=' '{print $$2}' make-requirements.txt > .python-version
-@ printf $(PYENV_PREREQ_HELP)
Expand Down
69 changes: 67 additions & 2 deletions eventdata/runners/deleteindex_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

from fnmatch import fnmatch

def deleteindex(es, params):

async def deleteindex_async(es, params):
"""
Deletes all indices in Elasticsearch matching either the specified index pattern or
the suffix of the index against a more complex pattern.
Expand Down Expand Up @@ -65,13 +66,77 @@ def get_suffix(name, separator):
suffix_separator = params.get('suffix_separator', '-')

if max_indices:
indices = es.cat.indices(h='index').split("\n")
indices = await es.cat.indices(h='index').split("\n")
indices_by_suffix = {get_suffix(idx, suffix_separator): idx
for idx in indices
if fnmatch(idx, index_pattern) and
get_suffix(idx, suffix_separator) is not None
}

sorted_suffixes = sorted(list(indices_by_suffix.keys()))
if len(sorted_suffixes) > max_indices:
indices_to_delete = ",".join([indices_by_suffix[key] for key in sorted_suffixes[:(len(sorted_suffixes)-max_indices)]])
await es.indices.delete(indices_to_delete)
else:
await es.indices.delete(index=index_pattern)

return 1, "ops"


def deleteindex(es, params):
"""
Deletes all indices in Elasticsearch matching either the specified index pattern or
the suffix of the index against a more complex pattern.
:param es: Specifies the index pattern to delete. Defaults to 'elasticlogs-*'
:type es: str
:param params: Parameter hash containing one of the keys documented below.
:type params: dict
"index_pattern" - Mandatory.
Specifies the index pattern to delete.
"max_indices" - Optional.
int specifying how many rolled over indices to retain at max.
The elibigle indices need to satisfy `index-pattern`.
'suffix_separator' is used to retrieve the integer suffixes to calculate indices to delete.
Example:
For the indices: 'elasticlogs-000001', 'elasticlogs-000002', ... 000011
(index currently written to is 'elasticlogs-000011')
using:
suffix_separator='-' and
max_indices=8
will result in deleting indices 'elasticlogs-000001' and 'elasticlogs-000002'
"suffix_separator" - Defaults to '-'. Used only when 'max_indices' is specified.
Specifies string separator used to extract the index suffix, e.g. '-'.
"""
def get_suffix(name, separator):
if separator in name:
name_parts = name.split(separator)
if len(name_parts) > 1:
try:
return int(name_parts[-1])
except ValueError:
# TODO: log that suffix is not integer
return None
return None

index_pattern = params['index_pattern']
max_indices = params.get('max_indices', None)
suffix_separator = params.get('suffix_separator', '-')

if max_indices:
indices = es.cat.indices(h='index').split("\n")
indices_by_suffix = {get_suffix(idx, suffix_separator): idx
for idx in indices
if fnmatch(idx, index_pattern) and
get_suffix(idx, suffix_separator) is not None
}

sorted_suffixes = sorted(list(indices_by_suffix.keys()))
if len(sorted_suffixes) > max_indices:
indices_to_delete = ",".join([indices_by_suffix[key] for key in sorted_suffixes[:(len(sorted_suffixes)-max_indices)]])
Expand Down
63 changes: 63 additions & 0 deletions eventdata/runners/fieldstats_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,69 @@
import logging


async def fieldstats_async(es, params):
"""
Looks up minimum and maximum values for a specified field for an index pattern and stores
this information in a global variable that can be accessed by other components of the track.
It expects a parameter dictionary with the following keys:
* index_pattern (mandatory): Index pattern statistics are retrieved for.
* fieldname (optional): Field to extract statistics for. Defaults to "@timestamp".
"""
index_pattern = params["index_pattern"]
field_name = params.get("fieldname", "@timestamp")
ignore_throttled = params.get("ignore_throttled", True)

if ignore_throttled:
query_params = {}
else:
query_params = {"ignore_throttled": "false"}

result = await es.search(index=index_pattern,
body={
"query": {
"match_all": {}
},
"size": 0,
"aggs": {
"maxval": {
"max": {
"field": field_name
}
},
"minval": {
"min": {
"field": field_name
}
}
}
},
params=query_params)

hits = result["hits"]["total"]
# ES 7.0+
if isinstance(hits, dict):
total_hits = hits["value"]
else:
total_hits = hits

if total_hits > 0:
key = "{}_{}".format(index_pattern, field_name)
min_field_value = int(result["aggregations"]["minval"]["value"])
max_field_value = int(result["aggregations"]["maxval"]["value"])
gs.global_fieldstats[key] = {
"max": max_field_value,
"min": min_field_value
}
logger = logging.getLogger("track.eventdata.fieldstats")
logger.info("Identified statistics for field '%s' in '%s'. Min: %d, Max: %d",
field_name, index_pattern, min_field_value, max_field_value)
else:
raise AssertionError("No matching data found for field '{}' in pattern '{}'.".format(field_name, index_pattern))


def fieldstats(es, params):
"""
Looks up minimum and maximum values for a specified field for an index pattern and stores
Expand Down
59 changes: 59 additions & 0 deletions eventdata/runners/indicesstats_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,65 @@

logger = logging.getLogger("track.eventdata")


async def indicesstats_async(es, params):
"""
Retrieves index stats for an index or index pattern.
It expects the parameter hash to contain the following keys:
"index_pattern" - Index pattern that storage statistics are retrieved for.
"""
index_pattern = params['index_pattern']
response = {
"weight": 1,
"unit": "ops",
"index_pattern": index_pattern
}

try:
result = await es.indices.stats(index=index_pattern, metric='store,docs,segments')

if result['_all']:
a = result['_all']

if a['primaries']['docs']['count']:
response['primary_doc_count'] = a['primaries']['docs']['count']

if a['total']['docs']['count']:
response['total_doc_count'] = a['total']['docs']['count']

if a['primaries']['store']['size_in_bytes']:
response['primary_size_bytes'] = a['primaries']['store']['size_in_bytes']

if a['total']['store']['size_in_bytes']:
response['total_size_bytes'] = a['total']['store']['size_in_bytes']

if a['primaries']['segments']['count']:
response['primary_segment_count'] = a['primaries']['segments']['count']

if a['total']['segments']['count']:
response['total_segment_count'] = a['total']['segments']['count']

if a['primaries']['segments']['memory_in_bytes']:
response['primary_segments_memory_in_bytes'] = a['primaries']['segments']['memory_in_bytes']

if a['total']['segments']['memory_in_bytes']:
response['total_segment_memory_in_bytes'] = a['total']['segments']['memory_in_bytes']

if a['primaries']['segments']['terms_memory_in_bytes']:
response['primary_segment_terms_memory_in_bytes'] = a['primaries']['segments']['terms_memory_in_bytes']

if a['total']['segments']['terms_memory_in_bytes']:
response['total_segment_terms_memory_in_bytes'] = a['total']['segments']['terms_memory_in_bytes']

if logger.isEnabledFor(logging.DEBUG):
logger.debug("Indices stats for {} => {}".format(index_pattern, json.dumps(result)))
except elasticsearch.TransportError as e:
logger.info("[indicesstats_runner] Error: {}".format(e))

return response


def indicesstats(es, params):
"""
Retrieves index stats for an index or index pattern.
Expand Down
54 changes: 54 additions & 0 deletions eventdata/runners/kibana_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,60 @@
logger = logging.getLogger("track.eventdata")


async def kibana_async(es, params):
"""
Simulates Kibana msearch dashboard queries.
It expects the parameter hash to contain the following keys:
"body" - msearch request body representing the Kibana dashboard in the form of an array of dicts.
"meta_data" - Dictionary containing meta data information to be carried through into metrics.
"""
request = params["body"]
meta_data = params["meta_data"]

if meta_data["debug"]:
logger.info("Request:\n=====\n{}\n=====".format(json.dumps(request)))

visualisations = int(len(request) / 2)

response = {}

for key in meta_data.keys():
response[key] = meta_data[key]

response["weight"] = 1
response["unit"] = "ops"
response["visualisation_count"] = visualisations

if "pre_filter_shard_size" in meta_data:
result = await es.msearch(body=request, params={"pre_filter_shard_size": meta_data["pre_filter_shard_size"]})
else:
result = await es.msearch(body=request)

sum_hits = 0
max_took = 0
for r in result["responses"]:
hits = r.get("hits", {}).get("total", 0)
if isinstance(hits, dict):
sum_hits += hits["value"]
else:
sum_hits += hits
max_took = max(max_took, r["took"])

# use the request's took if possible but approximate it using the maximum of all responses
response["took"] = result.get("took", max_took)
response["hits"] = sum_hits

if meta_data["debug"]:
for r in result["responses"]:
# clear hits otherwise we'll spam the log
r["hits"]["hits"] = []
r["aggregations"] = {}
logger.info("Response (excluding specific hits):\n=====\n{}\n=====".format(json.dumps(result)))

return response


def kibana(es, params):
"""
Simulates Kibana msearch dashboard queries.
Expand Down
44 changes: 44 additions & 0 deletions eventdata/runners/nodestorage_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,50 @@
BYTES_PER_TB = 1024 * 1024 * 1024 * 1024


async def nodestorage_async(es, params):
"""
Calculates the total data volume in the cluster as well as average volume per data node.
It takes no parameters.
"""
response = {
"weight": 1,
"unit": "ops"
}

try:
# get number of data nodes
node_role_list = await es.cat.nodes(h="node.role")

data_node_count = 0

for node_role in node_role_list:
if 'd' in node_role:
data_node_count += 1

result = await es.indices.stats(index='*', metric='store')
total_data_size = 0

if result['_all']:
if result['_all']['total']['store']['size_in_bytes']:
total_data_size = result['_all']['total']['store']['size_in_bytes']

total_data_size_tb = float(total_data_size) / BYTES_PER_TB

volume_per_data_node = int(total_data_size / data_node_count)
volume_per_data_node_tb = total_data_size_tb / data_node_count

response['total_data_volume_bytes'] = total_data_size
response['total_data_volume_tb'] = total_data_size_tb
response['average_data_volume_per_node_bytes'] = volume_per_data_node
response['average_data_volume_per_node_tb'] = volume_per_data_node_tb

except elasticsearch.TransportError as e:
logger.info("[nodestorage_runner] Error: {}".format(e))

return response


def nodestorage(es, params):
"""
Calculates the total data volume in the cluster as well as average volume per data node.
Expand Down
12 changes: 12 additions & 0 deletions eventdata/runners/rollover_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@
# under the License.


async def rollover_async(es, params):
"""
Runs a rollover operation against Elasticsearch.
It expects the parameter hash to contain a key "alias" specifying the alias to rollover
as well as a key "body" containing the actual rollover request and associated conditions.
"""
await es.indices.rollover(alias=params["alias"], body=params["body"])
return 1, "ops"


def rollover(es, params):
"""
Runs a rollover operation against Elasticsearch.
Expand Down
22 changes: 16 additions & 6 deletions eventdata/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,22 @@


def register(registry):
async_runner = registry.meta_data.get("async_runner", False)
if async_runner:
registry.register_runner("delete_indices", deleteindex_runner.deleteindex_async, async_runner=True)
registry.register_runner("fieldstats", fieldstats_runner.fieldstats_async, async_runner=True)
registry.register_runner("indicesstats", indicesstats_runner.indicesstats_async, async_runner=True)
registry.register_runner("kibana", kibana_runner.kibana_async, async_runner=True)
registry.register_runner("node_storage", nodestorage_runner.nodestorage_async, async_runner=True)
registry.register_runner("rollover", rollover_runner.rollover_async, async_runner=True)
else:
registry.register_runner("delete_indices", deleteindex_runner.deleteindex)
registry.register_runner("fieldstats", fieldstats_runner.fieldstats)
registry.register_runner("indicesstats", indicesstats_runner.indicesstats)
registry.register_runner("kibana", kibana_runner.kibana)
registry.register_runner("node_storage", nodestorage_runner.nodestorage)
registry.register_runner("rollover", rollover_runner.rollover)

registry.register_param_source("elasticlogs_bulk", ElasticlogsBulkSource)
registry.register_param_source("elasticlogs_kibana", ElasticlogsKibanaSource)
registry.register_runner("delete_indices", deleteindex_runner.deleteindex)
registry.register_runner("fieldstats", fieldstats_runner.fieldstats)
registry.register_runner("indicesstats", indicesstats_runner.indicesstats)
registry.register_runner("kibana", kibana_runner.kibana)
registry.register_runner("node_storage", nodestorage_runner.nodestorage)
registry.register_runner("rollover", rollover_runner.rollover)
registry.register_scheduler("utilization", utilization_scheduler.UtilizationBasedScheduler)
Loading

0 comments on commit b4be633

Please sign in to comment.