Skip to content

Commit

Permalink
fix: minor fixes and added trimming for events that extend out of the…
Browse files Browse the repository at this point in the history
… query range
  • Loading branch information
ErikBjare committed Jun 21, 2021
1 parent e06b6eb commit 7a934a5
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 26 deletions.
33 changes: 18 additions & 15 deletions aw_datastore/benchmark.py
Expand Up @@ -11,47 +11,42 @@
from aw_datastore import get_storage_methods, Datastore
from aw_datastore.storages import AbstractStorage

td1s = timedelta(seconds=1)


def create_test_events(n):
now = datetime.now(timezone.utc) - timedelta(days=1000)

events = []
for i in range(n):
events.append(
Event(timestamp=now + i * timedelta(seconds=1), data={"label": "asd"})
Event(timestamp=now + i * td1s, duration=td1s, data={"label": "asd"})
)

return events


def create_tmpbucket(ds, num):
bucket_id = "benchmark_test_bucket_{}".format(str(num))
try:
ds.delete_bucket(bucket_id)
except KeyError:
pass
ds.create_bucket(bucket_id, "testingtype", "test-client", "testing-box")
return bucket_id


@contextmanager
def temporary_bucket(ds):
bucket_id = "test_bucket"
try:
ds.delete_bucket(bucket_id)
except KeyError:
except Exception:
pass
bucket = ds.create_bucket(bucket_id, "testingtype", "test-client", "testing-box")
yield bucket
ds.delete_bucket(bucket_id)


def benchmark(storage: Callable[..., AbstractStorage]):
ds = Datastore(storage, testing=True)
if storage.__name__ == "PeeweeStorage":
ds = Datastore(storage, testing=True, filepath="test.db")
else:
ds = Datastore(storage, testing=True)

num_single_events = 50
num_replace_events = 50
num_bulk_events = 2 * 10 ** 3
num_bulk_events = 20_000
num_events = num_single_events + num_replace_events + num_bulk_events + 1
num_final_events = num_single_events + num_bulk_events + 1

Expand Down Expand Up @@ -82,9 +77,17 @@ def benchmark(storage: Callable[..., AbstractStorage]):
events_tmp = bucket.get(limit=1)

with ttt(" get all"):
events_tmp = bucket.get()
events_tmp = bucket.get(limit=-1)
assert len(events_tmp) == num_final_events

with ttt(" get range"):
events_tmp = bucket.get(
limit=-1,
starttime=events[1].timestamp + 0.01 * td1s,
endtime=events[-1].timestamp + events[-1].duration,
)
assert len(events_tmp) == num_final_events - 1


if __name__ == "__main__":
for storage in get_storage_methods().values():
Expand Down
7 changes: 5 additions & 2 deletions aw_datastore/datastore.py
Expand Up @@ -11,12 +11,15 @@

class Datastore:
def __init__(
self, storage_strategy: Callable[..., AbstractStorage], testing=False
self,
storage_strategy: Callable[..., AbstractStorage],
testing=False,
**kwargs,
) -> None:
self.logger = logger.getChild("Datastore")
self.bucket_instances: Dict[str, Bucket] = dict()

self.storage_strategy = storage_strategy(testing=testing)
self.storage_strategy = storage_strategy(testing=testing, **kwargs)

def __repr__(self):
return "<Datastore object using {}>".format(
Expand Down
32 changes: 23 additions & 9 deletions aw_datastore/storages/peewee.py
Expand Up @@ -38,14 +38,15 @@
LATEST_VERSION = 2


def chunks(l, n):
"""Yield successive n-sized chunks from l.
def chunks(ls, n):
"""Yield successive n-sized chunks from ls.
From: https://stackoverflow.com/a/312464/965332"""
for i in range(0, len(l), n):
yield l[i : i + n]
for i in range(0, len(ls), n):
yield ls[i : i + n]


def dt_plus_duration(dt, duration):
# See peewee docs on datemath: https://docs.peewee-orm.com/en/latest/peewee/hacks.html#date-math
return peewee.fn.strftime(
"%Y-%m-%d %H:%M:%f+00:00",
(peewee.fn.julianday(dt) - 2440587.5) * 86400.0 + duration,
Expand Down Expand Up @@ -255,7 +256,7 @@ def get_events(
Example raw query:
SELECT strftime(
"%Y-%m-%d %H:%M:%f",
"%Y-%m-%d %H:%M:%f+00:00",
((julianday(timestamp) - 2440587.5) * 86400),
'unixepoch'
)
Expand All @@ -273,11 +274,22 @@ def get_events(
.limit(limit)
)

# See peewee docs on datemath: https://docs.peewee-orm.com/en/latest/peewee/hacks.html#date-math
logging.getLogger("peewee").setLevel(logging.DEBUG)

q = self._where_range(q, starttime, endtime)
return [Event(**e) for e in list(map(EventModel.json, q.execute()))]

res = q.execute()
events = [Event(**e) for e in list(map(EventModel.json, res))]

# Trim events that are out of range (as done in aw-server-rust)
# TODO: Do the same for the other storage methods
for e in events:
if starttime:
if e.timestamp < starttime:
e.timestamp = starttime
if endtime:
if e.timestamp + e.duration > endtime:
e.duration = endtime - e.timestamp

return events

def get_eventcount(
self,
Expand All @@ -302,6 +314,8 @@ def _where_range(
endtime = endtime.astimezone(timezone.utc)

if starttime:
# This can be slow on large databases... not sure if the unlikely here makes a differnce.
# Tried creating various indexes and using SQLite's unlikely() function, but it had no effect
q = q.where(
starttime <= dt_plus_duration(EventModel.timestamp, EventModel.duration)
)
Expand Down

0 comments on commit 7a934a5

Please sign in to comment.