Skip to content

Commit

Permalink
feat: made all queries range inclusive
Browse files Browse the repository at this point in the history
  • Loading branch information
ErikBjare committed Jun 21, 2021
1 parent f04a450 commit e06b6eb
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 47 deletions.
15 changes: 5 additions & 10 deletions aw_datastore/storages/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,16 @@ def get_events(
endtime: datetime = None,
) -> List[Event]:
events = self.db[bucket]

# Sort by timestamp
events = sorted(events, key=lambda k: k["timestamp"])[::-1]

# Filter by date
if starttime:
e = []
for event in events:
if event.timestamp >= starttime:
e.append(event)
events = e
events = [e for e in events if starttime <= (e.timestamp + e.duration)]
if endtime:
e = []
for event in events:
if event.timestamp <= endtime:
e.append(event)
events = e
events = [e for e in events if e.timestamp <= endtime]

# Limit
if limit == 0:
return []
Expand Down
59 changes: 48 additions & 11 deletions aw_datastore/storages/peewee.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import iso8601

import peewee
from peewee import (
Model,
CharField,
Expand Down Expand Up @@ -44,6 +45,14 @@ def chunks(l, n):
yield l[i : i + n]


def dt_plus_duration(dt, duration):
return peewee.fn.strftime(
"%Y-%m-%d %H:%M:%f+00:00",
(peewee.fn.julianday(dt) - 2440587.5) * 86400.0 + duration,
"unixepoch",
)


class BaseModel(Model):
class Meta:
database = _db
Expand Down Expand Up @@ -240,6 +249,21 @@ def get_events(
starttime: Optional[datetime] = None,
endtime: Optional[datetime] = None,
):
"""
Fetch events from a certain bucket, optionally from a given range of time.
Example raw query:
SELECT strftime(
"%Y-%m-%d %H:%M:%f",
((julianday(timestamp) - 2440587.5) * 86400),
'unixepoch'
)
FROM eventmodel
WHERE eventmodel.timestamp > '2021-06-20'
LIMIT 10;
"""
if limit == 0:
return []
q = (
Expand All @@ -248,27 +272,40 @@ def get_events(
.order_by(EventModel.timestamp.desc())
.limit(limit)
)
if starttime:
# Important to normalize datetimes to UTC, otherwise any UTC offset will be ignored
starttime = starttime.astimezone(timezone.utc)
q = q.where(starttime <= EventModel.timestamp)
if endtime:
endtime = endtime.astimezone(timezone.utc)
q = q.where(EventModel.timestamp <= endtime)

# See peewee docs on datemath: https://docs.peewee-orm.com/en/latest/peewee/hacks.html#date-math
logging.getLogger("peewee").setLevel(logging.DEBUG)

q = self._where_range(q, starttime, endtime)
return [Event(**e) for e in list(map(EventModel.json, q.execute()))]

def get_eventcount(
self,
bucket_id: str,
starttime: Optional[datetime] = None,
endtime: Optional[datetime] = None,
):
) -> int:
q = EventModel.select().where(EventModel.bucket == self.bucket_keys[bucket_id])
q = self._where_range(q, starttime, endtime)
return q.count()

def _where_range(
self,
q,
starttime: Optional[datetime] = None,
endtime: Optional[datetime] = None,
):
# Important to normalize datetimes to UTC, otherwise any UTC offset will be ignored
if starttime:
# Important to normalize datetimes to UTC, otherwise any UTC offset will be ignored
starttime = starttime.astimezone(timezone.utc)
q = q.where(starttime <= EventModel.timestamp)
if endtime:
endtime = endtime.astimezone(timezone.utc)

if starttime:
q = q.where(
starttime <= dt_plus_duration(EventModel.timestamp, EventModel.duration)
)
if endtime:
q = q.where(EventModel.timestamp <= endtime)
return q.count()

return q
14 changes: 7 additions & 7 deletions aw_datastore/storages/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,13 @@ def get_events(
c = self.conn.cursor()
starttime_i = starttime.timestamp() * 1000000 if starttime else 0
endtime_i = endtime.timestamp() * 1000000 if endtime else MAX_TIMESTAMP
query = (
"SELECT id, starttime, endtime, datastr "
+ "FROM events "
+ "WHERE bucketrow = (SELECT rowid FROM buckets WHERE id = ?) "
+ "AND starttime >= ? AND endtime <= ? "
+ "ORDER BY endtime DESC LIMIT ?"
)
query = """
SELECT id, starttime, endtime, datastr
FROM events
WHERE bucketrow = (SELECT rowid FROM buckets WHERE id = ?)
AND endtime >= ? AND starttime <= ?
ORDER BY endtime DESC LIMIT ?
"""
rows = c.execute(query, [bucket_id, starttime_i, endtime_i, limit])
events = []
for row in rows:
Expand Down
122 changes: 103 additions & 19 deletions tests/test_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
logging.basicConfig(level=logging.DEBUG)

# Useful when you just want some placeholder time in your events, saves typing
now = datetime.now(timezone.utc)
now = datetime.now(tz=timezone.utc)
td1s = timedelta(seconds=1)


def test_get_storage_methods():
Expand Down Expand Up @@ -84,11 +85,11 @@ def test_insert_one(bucket_cm):
Tests inserting one event into a bucket
"""
with bucket_cm as bucket:
l = len(bucket.get())
event = Event(timestamp=now, duration=timedelta(seconds=1), data={"key": "val"})
n_events = len(bucket.get())
event = Event(timestamp=now, duration=td1s, data={"key": "val"})
bucket.insert(event)
fetched_events = bucket.get()
assert l + 1 == len(fetched_events)
assert n_events + 1 == len(fetched_events)
assert isinstance(fetched_events[0], Event)
assert event == fetched_events[0]
logging.info(event)
Expand All @@ -111,9 +112,7 @@ def test_insert_many(bucket_cm):
"""
num_events = 5000
with bucket_cm as bucket:
events = num_events * [
Event(timestamp=now, duration=timedelta(seconds=1), data={"key": "val"})
]
events = num_events * [Event(timestamp=now, duration=td1s, data={"key": "val"})]
bucket.insert(events)
fetched_events = bucket.get(limit=-1)
assert num_events == len(fetched_events)
Expand All @@ -128,9 +127,7 @@ def test_delete(bucket_cm):
"""
num_events = 10
with bucket_cm as bucket:
events = num_events * [
Event(timestamp=now, duration=timedelta(seconds=1), data={"key": "val"})
]
events = num_events * [Event(timestamp=now, duration=td1s, data={"key": "val"})]
bucket.insert(events)

fetched_events = bucket.get(limit=-1)
Expand Down Expand Up @@ -170,7 +167,7 @@ def test_get_ordered(bucket_cm):
eventcount = 10
events = []
for i in range(10):
events.append(Event(timestamp=now + timedelta(seconds=i)))
events.append(Event(timestamp=now + i * td1s, duration=td1s))
random.shuffle(events)
print(events)
bucket.insert(events)
Expand Down Expand Up @@ -205,32 +202,119 @@ def test_get_event_with_timezone(bucket_cm):


@pytest.mark.parametrize("bucket_cm", param_testing_buckets_cm())
def test_get_datefilter(bucket_cm):
def test_get_datefilter_simple(bucket_cm):
with bucket_cm as bucket:
eventcount = 3
events = [
Event(timestamp=now + i * td1s, duration=td1s) for i in range(eventcount)
]
bucket.insert(events)

# Get first event, but expect only half the event to match the interval
fetched_events = bucket.get(
-1,
starttime=now - 0.5 * td1s,
endtime=now + 0.5 * td1s,
)
assert 1 == len(fetched_events)

# Get first two events, but expect only half of each to match the interval
fetched_events = bucket.get(
-1,
starttime=now + 0.5 * td1s,
endtime=now + 1.5 * td1s,
)
assert 2 == len(fetched_events)

# Get last event, but expect only half to match the interval
fetched_events = bucket.get(
-1,
starttime=now + 2.5 * td1s,
endtime=now + 3.5 * td1s,
)
assert 1 == len(fetched_events)

# Check approx precision
fetched_events = bucket.get(
-1,
starttime=now - 0.01 * td1s,
endtime=now + 0.01 * td1s,
)
assert 1 == len(fetched_events)

# Check precision of start
fetched_events = bucket.get(
-1,
starttime=now,
endtime=now,
)
assert 1 == len(fetched_events)

# Check approx precision of end
fetched_events = bucket.get(
-1,
starttime=now + 2.99 * td1s,
endtime=now + 3.01 * td1s,
)
assert 1 == len(fetched_events)


@pytest.mark.parametrize("bucket_cm", param_testing_buckets_cm())
def test_get_datefilter_start(bucket_cm):
"""
Tests the datetimefilter when fetching events
"""
with bucket_cm as bucket:
eventcount = 10
events = []
for i in range(10):
events.append(Event(timestamp=now + timedelta(seconds=i)))
events = [
Event(timestamp=now + i * td1s, duration=td1s) for i in range(eventcount)
]
bucket.insert(events)

# Starttime
for i in range(eventcount):
fetched_events = bucket.get(-1, starttime=events[i].timestamp)
fetched_events = bucket.get(-1, starttime=events[i].timestamp + 0.01 * td1s)
assert eventcount - i == len(fetched_events)


@pytest.mark.parametrize("bucket_cm", param_testing_buckets_cm())
def test_get_datefilter_end(bucket_cm):
"""
Tests the datetimefilter when fetching events
"""
with bucket_cm as bucket:
eventcount = 10
events = [
Event(timestamp=now + i * td1s, duration=td1s) for i in range(eventcount)
]
bucket.insert(events)

# Endtime
for i in range(eventcount):
fetched_events = bucket.get(-1, endtime=events[i].timestamp)
assert i + 1 == len(fetched_events)
fetched_events = bucket.get(-1, endtime=events[i].timestamp - 0.01 * td1s)
assert i == len(fetched_events)


@pytest.mark.parametrize("bucket_cm", param_testing_buckets_cm())
def test_get_datefilter_both(bucket_cm):
"""
Tests the datetimefilter when fetching events
"""
with bucket_cm as bucket:
eventcount = 10
events = [
Event(timestamp=now + i * td1s, duration=td1s) for i in range(eventcount)
]
bucket.insert(events)

# Both
for i in range(eventcount):
for j in range(i + 1, eventcount):
fetched_events = bucket.get(
starttime=events[i].timestamp, endtime=events[j].timestamp
starttime=events[i].timestamp + timedelta(seconds=0.01),
endtime=events[j].timestamp
+ events[j].duration
- timedelta(seconds=0.01),
)
assert j - i + 1 == len(fetched_events)

Expand Down

0 comments on commit e06b6eb

Please sign in to comment.