Skip to content

Commit

Permalink
fix: fixed upserting events using insert_many (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
ErikBjare committed Oct 14, 2021
1 parent 5dbb610 commit d3e31f7
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 7 deletions.
26 changes: 22 additions & 4 deletions aw_datastore/storages/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,16 @@ def get_metadata(self, bucket_id: str):
raise Exception("Bucket did not exist, could not get metadata")

def insert_one(self, bucket: str, event: Event) -> Event:
self.db[bucket].append(Event(**event))
event.id = len(self.db[bucket]) - 1
if event.id is not None:
self.replace(bucket, event.id, event)
else:
# We need to copy the event to avoid setting the ID on the passed event
event = copy.copy(event)
if self.db[bucket]:
event.id = max(int(e.id or 0) for e in self.db[bucket]) + 1
else:
event.id = 0
self.db[bucket].append(event)
return event

def delete(self, bucket_id, event_id):
Expand All @@ -110,7 +118,17 @@ def delete(self, bucket_id, event_id):
return False

def replace(self, bucket_id, event_id, event):
self.db[bucket_id][event_id] = event
for idx in (
idx
for idx, event in reversed(list(enumerate(self.db[bucket_id])))
if event.id == event_id
):
# We need to copy the event to avoid setting the ID on the passed event
event = copy.copy(event)
event.id = event_id
self.db[bucket_id][idx] = event

def replace_last(self, bucket_id, event):
self.db[bucket_id][-1] = event
# NOTE: This does not actually get the most recent event, only the last inserted
last = sorted(self.db[bucket_id], key=lambda e: e.timestamp)[-1]
self.replace(bucket_id, last.id, event)
13 changes: 12 additions & 1 deletion aw_datastore/storages/peewee.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,16 @@ def insert_one(self, bucket_id: str, event: Event) -> Event:
event.id = e.id
return event

def insert_many(self, bucket_id, events: List[Event], fast=False) -> None:
def insert_many(self, bucket_id, events: List[Event]) -> None:
# NOTE: Events need to be handled differently depending on
# if they're upserts or inserts (have id's or not).

# These events are updates which need to be applied one by one
events_updates = [e for e in events if e.id is not None]
for e in events_updates:
self.insert_one(bucket_id, e)

# These events can be inserted with insert_many
events_dictlist = [
{
"bucket": self.bucket_keys[bucket_id],
Expand All @@ -194,7 +203,9 @@ def insert_many(self, bucket_id, events: List[Event], fast=False) -> None:
"datastr": json.dumps(event.data),
}
for event in events
if event.id is None
]

# Chunking into lists of length 100 is needed here due to SQLITE_MAX_COMPOUND_SELECT
# and SQLITE_LIMIT_VARIABLE_NUMBER under Windows.
# See: https://github.com/coleifer/peewee/issues/948
Expand Down
12 changes: 10 additions & 2 deletions aw_datastore/storages/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,21 @@ def insert_one(self, bucket_id: str, event: Event) -> Event:
self.conditional_commit(1)
return event

def insert_many(self, bucket_id, events: List[Event], fast=False) -> None:
def insert_many(self, bucket_id, events: List[Event]) -> None:
# FIXME: Is this true not only for peewee but sqlite aswell?
# Chunking into lists of length 100 is needed here due to SQLITE_MAX_COMPOUND_SELECT
# and SQLITE_LIMIT_VARIABLE_NUMBER under Windows.
# See: https://github.com/coleifer/peewee/issues/948

# First, upsert events with id's set
events_upsert = [e for e in events if e.id is not None]
for e in events_upsert:
self.replace(bucket_id, e.id, e)

# Then insert events without id's set
events_insert = [e for e in events if e.id is None]
event_rows = []
for event in events:
for event in events_insert:
starttime = event.timestamp.timestamp() * 1000000
endtime = starttime + (event.duration.total_seconds() * 1000000)
datastr = json.dumps(event.data)
Expand Down
26 changes: 26 additions & 0 deletions tests/test_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,32 @@ def test_insert_many(bucket_cm):
assert e == fe


@pytest.mark.parametrize("bucket_cm", param_testing_buckets_cm())
def test_insert_many_upsert(bucket_cm):
"""
Tests that you can update/upsert many events at the same time to a bucket
"""
num_events = 10
with bucket_cm as bucket:
events = num_events * [Event(timestamp=now, duration=td1s, data={"key": "val"})]
# insert events to get IDs assigned
bucket.insert(events)

events = bucket.get(limit=-1)
assert num_events == len(events)
for e in events:
assert e.id is not None
e.data["key"] = "new val"

# Upsert the events
bucket.insert(events)

events = bucket.get(limit=-1)
assert num_events == len(events)
for e in events:
assert e.data["key"] == "new val"


@pytest.mark.parametrize("bucket_cm", param_testing_buckets_cm())
def test_delete(bucket_cm):
"""
Expand Down

0 comments on commit d3e31f7

Please sign in to comment.