Skip to content

Commit

Permalink
Avoid more time-consuming calls (elastic#70)
Browse files Browse the repository at this point in the history
With this commit we reduce usage of two time-consuming methods on the hot code
path: generating random numbers and determining the current timestamp. This also
changes behavior slightly:

* The `hostname` field will vary host names from 1 - 3 in a regular pattern
instead of randomly (however, as the host name is e.g. `web-EU-1.elastic.co` and
only the number changes non-randomly we deem this change ok)
* The `offset` will change be more realistic now: Before it changed randomly and
now the offset increases by the average event size up to a certain maximum.
* The current `@timestamp` will be retrieved only once per bulk. For documents
within a bulk we'll advance the microsecond portion by `1 / bulk size`
microseconds.

We've measured the performance impact of this change by stubbing out
Elasticsearch with nginx and running the `index-logs-fixed-daily-volume`
challenge with the following track parameters:

* `bulk_size`: 20000
* `bulk_indexing_clients`: 16
* `number_of_days`: 1
* `daily_logging_volume`: "20GB"

We have measured the following median indexing throughput in our test
environment:

* baseline (master): 153476 docs/s
* Using a deterministic `hostname` and `offset`: 174371 docs/s
* All three measures together: 222611 docs/s

This means we improve the maximum achievable indexing throughput by 45% in this
configuration.
  • Loading branch information
danielmitterdorfer committed Jan 23, 2020
1 parent 146a372 commit 8efa5cd
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 17 deletions.
1 change: 1 addition & 0 deletions eventdata/parameter_sources/elasticlogs_bulk_source.py
Expand Up @@ -144,6 +144,7 @@ def percent_completed(self):
def params(self):
# Build bulk array
bulk_array = []
self._randomevent.start_bulk(self._bulk_size)
for x in range(0, self._bulk_size):
try:
evt, idx, typ = self._randomevent.generate_event()
Expand Down
32 changes: 24 additions & 8 deletions eventdata/parameter_sources/randomevent.py
Expand Up @@ -18,6 +18,7 @@

import datetime
import gzip
import itertools
import json
import os
import random
Expand Down Expand Up @@ -295,6 +296,11 @@ def __init__(self, params, agent=Agent, client_ip=ClientIp, referrer=Referrer, r
self.total_days = params.get("number_of_days")
self.remaining_days = self.total_days
self.record_raw_event_size = params.get("record_raw_event_size", False)
self._offset = 0
self._web_host = itertools.cycle([1, 2, 3])
self._timestruct = None
self._index_name = None
self._time_interval_current_bulk = 0

@property
def percent_completed(self):
Expand All @@ -306,25 +312,31 @@ def percent_completed(self):
total = self.total_days * self.daily_logging_volume
return already_generated / total

def start_bulk(self, bulk_size):
self._time_interval_current_bulk = 1 / bulk_size
self._timestruct = self._timestamp_generator.next_timestamp()
self._index_name = self.__generate_index_pattern(self._timestruct)

def generate_event(self):
if self.remaining_days == 0:
raise StopIteration()
timestruct = self._timestamp_generator.next_timestamp()
index_name = self.__generate_index_pattern(timestruct)

# advance time by a few micros
self._timestruct = self._timestamp_generator.simulate_tick(self._time_interval_current_bulk)
# index for the current line - we may cross a date boundary later if we're above the daily logging volume
index = self._index_name
event = self._event
event["@timestamp"] = timestruct["iso"]
event["@timestamp"] = self._timestruct["iso"]

# set random offset
event["offset"] = random.randrange(0, 10000000)
# assume a typical event size of 263 bytes but limit the file size to 4GB
event["offset"] = (self._offset + 263) % (4 * 1024 * 1024 * 1024)

self._agent.add_fields(event)
self._clientip.add_fields(event)
self._referrer.add_fields(event)
self._request.add_fields(event)

# set host name
event["hostname"] = "web-{}-{}.elastic.co".format(event["geoip_continent_code"], random.randrange(1, 3))
event["hostname"] = "web-%s-%s.elastic.co" % (event["geoip_continent_code"], next(self._web_host))

if self.record_raw_event_size or self.daily_logging_volume:
# determine the raw event size (as if this were contained in nginx log file). We do not bother to
Expand All @@ -340,6 +352,10 @@ def generate_event(self):
if self.remaining_days is not None:
self.remaining_days -= 1
self._timestamp_generator.skip(datetime.timedelta(days=1))
# advance time now for real (we usually use #simulate_tick() which will keep everything except for
# microseconds constant.
self._timestruct = self._timestamp_generator.next_timestamp()
self._index_name = self.__generate_index_pattern(self._timestruct)
self.current_logging_volume = 0

if self.record_raw_event_size:
Expand Down Expand Up @@ -387,7 +403,7 @@ def generate_event(self):
event["referrer"],
event["request"], event["bytes"], event["verb"], event["response"], event["httpversion"])

return line, index_name, self._type
return line, index, self._type

def __generate_index_pattern(self, timestruct):
if self._index_pattern:
Expand Down
33 changes: 24 additions & 9 deletions eventdata/parameter_sources/timeutils.py
Expand Up @@ -42,10 +42,26 @@ def __init__(self, starting_point, offset=None, acceleration_factor=1.0, utcnow=
self._acceleration_factor = acceleration_factor
# reuse to reduce object churn
self._ts = {}
self._simulated_micros = 0.0

def next_timestamp(self):
self._simulated_micros = 0.0
delta = (self._utcnow() - self._start) * self._acceleration_factor
return self.__to_struct(self._starting_point + delta)
self.__to_struct(self._starting_point + delta)
return self.simulate_tick(0)

def simulate_tick(self, micros):
"""
Advances the current timestamp by a given number of microseconds but keep all other time components. This can be
used to avoid retrieving the current timestamp to often but still simulate changes in time.
:param micros: A positive number of microseconds to add.
:return: The current (formatted) timestamp structure as a dict.
"""
self._simulated_micros += micros
self._ts["iso"] = "%s.%03dZ" % (self._ts["iso_prefix"], self._simulated_micros)
return self._ts

def skip(self, delta):
# advance the generated timestamp by delta
Expand All @@ -55,14 +71,13 @@ def skip(self, delta):

def __to_struct(self, dt):
# string formatting is about 4 times faster than strftime.
iso = "%04d-%02d-%02dT%02d:%02d:%02d.%03dZ" % (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, dt.microsecond)
self._ts["iso"] = iso
self._ts["yyyy"] = iso[:4]
self._ts["yy"] = iso[2:4]
self._ts["mm"] = iso[5:7]
self._ts["dd"] = iso[8:10]
self._ts["hh"] = iso[11:13]
return self._ts
iso_prefix = "%04d-%02d-%02dT%02d:%02d:%02d" % (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second)
self._ts["iso_prefix"] = iso_prefix
self._ts["yyyy"] = iso_prefix[:4]
self._ts["yy"] = iso_prefix[2:4]
self._ts["mm"] = iso_prefix[5:7]
self._ts["dd"] = iso_prefix[8:10]
self._ts["hh"] = iso_prefix[11:13]

def __parse_starting_point(self, point):
if point == "now":
Expand Down
3 changes: 3 additions & 0 deletions tests/parameter_sources/elasticlogs_bulk_source_test.py
Expand Up @@ -26,6 +26,9 @@ def __init__(self, index, type, doc, at_most=-1):
self.doc = doc
self.at_most = at_most

def start_bulk(self, bulk_size):
pass

def generate_event(self):
if self.at_most == 0:
raise StopIteration()
Expand Down
5 changes: 5 additions & 0 deletions tests/parameter_sources/randomevent_test.py
Expand Up @@ -72,6 +72,7 @@ def test_random_event_no_event_size_by_default():
referrer=StaticReferrer,
request=StaticRequest)

e.start_bulk(1)
raw_doc, index, doc_type = e.generate_event()

doc = json.loads(raw_doc)
Expand All @@ -93,6 +94,7 @@ def test_random_event_with_event_size():
referrer=StaticReferrer,
request=StaticRequest)

e.start_bulk(1)
raw_doc, index, doc_type = e.generate_event()

doc = json.loads(raw_doc)
Expand All @@ -118,6 +120,7 @@ def test_random_events_with_daily_logging_volume():

assert e.percent_completed is None

e.start_bulk(15)
# 5 events fit into one kilobyte
for i in range(5):
doc, index, _ = e.generate_event()
Expand Down Expand Up @@ -150,13 +153,15 @@ def test_random_events_with_daily_logging_volume_and_maximum_days():

assert e.percent_completed == 0.0

e.start_bulk(5)
# 5 events fit into one kilobyte
for i in range(5):
doc, index, _ = e.generate_event()
assert index == "logs-20190105"

assert e.percent_completed == 0.5

e.start_bulk(6)
for i in range(5):
doc, index, _ = e.generate_event()
assert index == "logs-20190106"
Expand Down
62 changes: 62 additions & 0 deletions tests/parameter_sources/timeutils_test.py
Expand Up @@ -41,6 +41,7 @@ def test_generate_interval_from_now():
# first generated timestamp will be one (clock) invocation after the original start
assert g.next_timestamp() == {
"iso": "2019-01-05T15:00:05.000Z",
"iso_prefix": "2019-01-05T15:00:05",
"yyyy": "2019",
"yy": "19",
"mm": "01",
Expand All @@ -50,6 +51,7 @@ def test_generate_interval_from_now():

assert g.next_timestamp() == {
"iso": "2019-01-05T15:00:10.000Z",
"iso_prefix": "2019-01-05T15:00:10",
"yyyy": "2019",
"yy": "19",
"mm": "01",
Expand All @@ -59,6 +61,7 @@ def test_generate_interval_from_now():

assert g.next_timestamp() == {
"iso": "2019-01-05T15:00:15.000Z",
"iso_prefix": "2019-01-05T15:00:15",
"yyyy": "2019",
"yy": "19",
"mm": "01",
Expand All @@ -77,6 +80,7 @@ def test_generate_interval_from_fixed_starting_point():

assert g.next_timestamp() == {
"iso": "2018-05-01T00:59:59.000Z",
"iso_prefix": "2018-05-01T00:59:59",
"yyyy": "2018",
"yy": "18",
"mm": "05",
Expand All @@ -86,6 +90,7 @@ def test_generate_interval_from_fixed_starting_point():

assert g.next_timestamp() == {
"iso": "2018-05-01T01:00:02.000Z",
"iso_prefix": "2018-05-01T01:00:02",
"yyyy": "2018",
"yy": "18",
"mm": "05",
Expand All @@ -94,6 +99,7 @@ def test_generate_interval_from_fixed_starting_point():
}
assert g.next_timestamp() == {
"iso": "2018-05-01T01:00:05.000Z",
"iso_prefix": "2018-05-01T01:00:05",
"yyyy": "2018",
"yy": "18",
"mm": "05",
Expand All @@ -113,6 +119,7 @@ def test_generate_interval_from_fixed_starting_point_with_offset():

assert g.next_timestamp() == {
"iso": "2018-05-11T00:59:59.000Z",
"iso_prefix": "2018-05-11T00:59:59",
"yyyy": "2018",
"yy": "18",
"mm": "05",
Expand All @@ -122,6 +129,7 @@ def test_generate_interval_from_fixed_starting_point_with_offset():

assert g.next_timestamp() == {
"iso": "2018-05-11T01:00:02.000Z",
"iso_prefix": "2018-05-11T01:00:02",
"yyyy": "2018",
"yy": "18",
"mm": "05",
Expand All @@ -130,6 +138,7 @@ def test_generate_interval_from_fixed_starting_point_with_offset():
}
assert g.next_timestamp() == {
"iso": "2018-05-11T01:00:05.000Z",
"iso_prefix": "2018-05-11T01:00:05",
"yyyy": "2018",
"yy": "18",
"mm": "05",
Expand All @@ -148,6 +157,7 @@ def test_generate_interval_and_skip():

assert g.next_timestamp() == {
"iso": "2018-05-01T00:59:59.000Z",
"iso_prefix": "2018-05-01T00:59:59",
"yyyy": "2018",
"yy": "18",
"mm": "05",
Expand All @@ -157,6 +167,7 @@ def test_generate_interval_and_skip():

assert g.next_timestamp() == {
"iso": "2018-05-01T01:00:02.000Z",
"iso_prefix": "2018-05-01T01:00:02",
"yyyy": "2018",
"yy": "18",
"mm": "05",
Expand All @@ -168,6 +179,7 @@ def test_generate_interval_and_skip():

assert g.next_timestamp() == {
"iso": "2018-05-02T00:59:59.000Z",
"iso_prefix": "2018-05-02T00:59:59",
"yyyy": "2018",
"yy": "18",
"mm": "05",
Expand All @@ -177,6 +189,7 @@ def test_generate_interval_and_skip():

assert g.next_timestamp() == {
"iso": "2018-05-02T01:00:02.000Z",
"iso_prefix": "2018-05-02T01:00:02",
"yyyy": "2018",
"yy": "18",
"mm": "05",
Expand All @@ -185,6 +198,55 @@ def test_generate_interval_and_skip():
}


def test_simulate_ticks():
clock = ReproducibleClock(start=datetime.datetime(year=2019, month=1, day=5, hour=15),
delta=datetime.timedelta(seconds=1))

g = TimestampStructGenerator(starting_point="2018-05-01:00:59:56",
acceleration_factor=3.0,
utcnow=clock)

assert g.next_timestamp() == {
"iso": "2018-05-01T00:59:59.000Z",
"iso_prefix": "2018-05-01T00:59:59",
"yyyy": "2018",
"yy": "18",
"mm": "05",
"dd": "01",
"hh": "00"
}

assert g.simulate_tick(micros=1.0) == {
"iso": "2018-05-01T00:59:59.001Z",
"iso_prefix": "2018-05-01T00:59:59",
"yyyy": "2018",
"yy": "18",
"mm": "05",
"dd": "01",
"hh": "00"
}

assert g.simulate_tick(micros=0.1) == {
"iso": "2018-05-01T00:59:59.001Z",
"iso_prefix": "2018-05-01T00:59:59",
"yyyy": "2018",
"yy": "18",
"mm": "05",
"dd": "01",
"hh": "00"
}

assert g.simulate_tick(micros=10.0) == {
"iso": "2018-05-01T00:59:59.011Z",
"iso_prefix": "2018-05-01T00:59:59",
"yyyy": "2018",
"yy": "18",
"mm": "05",
"dd": "01",
"hh": "00"
}


def test_generate_invalid_time_interval():
# "w" is unsupported
with pytest.raises(TimeParsingError) as ex:
Expand Down

0 comments on commit 8efa5cd

Please sign in to comment.