Skip to content

Commit 0a94e0f

Browse files
fix(import): merge events into existing bucket instead of failing (#575)
* fix(import): merge events into existing bucket instead of failing When importing a bucket that already exists, insert the events into the existing bucket instead of returning a 500 error. This enables the common workflow of re-importing data (e.g. from Android) without having to manually delete existing buckets first. Closes ActivityWatch/activitywatch#1213 * fix(import): deduplicate ID-less events on re-import to ensure idempotency When importing into an existing bucket, events without an explicit ID were silently duplicated on each re-import because SQLite AUTOINCREMENT always assigns a new rowid for NULL-id inserts. Fix: fetch existing events in the import time range before inserting, then filter out events already present (matched by timestamp, duration, and data). This makes re-import idempotent for the common Android/JSON export case where events lack explicit IDs. Add idempotency test that re-imports the same event and asserts the event count stays at 2 (not 3). Co-authored-by: Bob <bob@superuserlabs.org> * perf(import): use HashSet for merge dedup * fix(import): deduplicate against unclipped overlap events * docs(import): document dedup assumptions per Greptile review Add inline comments addressing Greptile review feedback: - event_identity: note insertion-order-dependent JSON serialization and that it's correct for primary use case (Android re-import) - get_events call: note memory assumption (few thousand events typical) and potential mitigations for pathological cases * fix(import): use canonical JSON for order-independent event dedup Sort event data keys via BTreeMap before serializing for the dedup identity hash. This prevents missed duplicates when events from different clients serialize the same key-value pairs in different orders (e.g., one client puts 'app' before 'title', another reverses). Addresses Greptile review P1 finding on PR #575.
1 parent ce8afc0 commit 0a94e0f

4 files changed

Lines changed: 262 additions & 18 deletions

File tree

aw-datastore/src/datastore.rs

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -713,13 +713,14 @@ impl DatastoreInstance {
713713
Ok(row)
714714
}
715715

716-
pub fn get_events(
716+
fn get_events_inner(
717717
&mut self,
718718
conn: &Connection,
719719
bucket_id: &str,
720720
starttime_opt: Option<DateTime<Utc>>,
721721
endtime_opt: Option<DateTime<Utc>>,
722722
limit_opt: Option<u64>,
723+
clip_to_query_range: bool,
723724
) -> Result<Vec<Event>, DatastoreError> {
724725
let bucket = self.get_bucket(bucket_id)?;
725726

@@ -774,11 +775,13 @@ impl DatastoreInstance {
774775
let mut endtime_ns: i64 = row.get(2)?;
775776
let data_str: String = row.get(3)?;
776777

777-
if starttime_ns < starttime_filter_ns {
778-
starttime_ns = starttime_filter_ns
779-
}
780-
if endtime_ns > endtime_filter_ns {
781-
endtime_ns = endtime_filter_ns
778+
if clip_to_query_range {
779+
if starttime_ns < starttime_filter_ns {
780+
starttime_ns = starttime_filter_ns
781+
}
782+
if endtime_ns > endtime_filter_ns {
783+
endtime_ns = endtime_filter_ns
784+
}
782785
}
783786
let duration_ns = endtime_ns - starttime_ns;
784787

@@ -812,6 +815,35 @@ impl DatastoreInstance {
812815
Ok(list)
813816
}
814817

818+
pub fn get_events(
819+
&mut self,
820+
conn: &Connection,
821+
bucket_id: &str,
822+
starttime_opt: Option<DateTime<Utc>>,
823+
endtime_opt: Option<DateTime<Utc>>,
824+
limit_opt: Option<u64>,
825+
) -> Result<Vec<Event>, DatastoreError> {
826+
self.get_events_inner(conn, bucket_id, starttime_opt, endtime_opt, limit_opt, true)
827+
}
828+
829+
pub fn get_events_unclipped(
830+
&mut self,
831+
conn: &Connection,
832+
bucket_id: &str,
833+
starttime_opt: Option<DateTime<Utc>>,
834+
endtime_opt: Option<DateTime<Utc>>,
835+
limit_opt: Option<u64>,
836+
) -> Result<Vec<Event>, DatastoreError> {
837+
self.get_events_inner(
838+
conn,
839+
bucket_id,
840+
starttime_opt,
841+
endtime_opt,
842+
limit_opt,
843+
false,
844+
)
845+
}
846+
815847
pub fn get_event_count(
816848
&self,
817849
conn: &Connection,

aw-datastore/src/worker.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ pub enum Command {
6969
Option<DateTime<Utc>>,
7070
Option<DateTime<Utc>>,
7171
Option<u64>,
72+
bool,
7273
),
7374
GetEventCount(String, Option<DateTime<Utc>>, Option<DateTime<Utc>>),
7475
DeleteEventsById(String, Vec<i64>),
@@ -267,8 +268,13 @@ impl DatastoreWorker {
267268
Err(e) => Err(e),
268269
}
269270
}
270-
Command::GetEvents(bucketname, starttime_opt, endtime_opt, limit_opt) => {
271-
match ds.get_events(tx, &bucketname, starttime_opt, endtime_opt, limit_opt) {
271+
Command::GetEvents(bucketname, starttime_opt, endtime_opt, limit_opt, unclipped) => {
272+
let result = if unclipped {
273+
ds.get_events_unclipped(tx, &bucketname, starttime_opt, endtime_opt, limit_opt)
274+
} else {
275+
ds.get_events(tx, &bucketname, starttime_opt, endtime_opt, limit_opt)
276+
};
277+
match result {
272278
Ok(el) => Ok(Response::EventList(el)),
273279
Err(e) => Err(e),
274280
}
@@ -433,7 +439,37 @@ impl Datastore {
433439
endtime_opt: Option<DateTime<Utc>>,
434440
limit_opt: Option<u64>,
435441
) -> Result<Vec<Event>, DatastoreError> {
436-
let cmd = Command::GetEvents(bucket_id.to_string(), starttime_opt, endtime_opt, limit_opt);
442+
let cmd = Command::GetEvents(
443+
bucket_id.to_string(),
444+
starttime_opt,
445+
endtime_opt,
446+
limit_opt,
447+
false,
448+
);
449+
let receiver = self.requester.request(cmd).unwrap();
450+
match receiver.collect().unwrap() {
451+
Ok(r) => match r {
452+
Response::EventList(el) => Ok(el),
453+
_ => panic!("Invalid response"),
454+
},
455+
Err(e) => Err(e),
456+
}
457+
}
458+
459+
pub fn get_events_unclipped(
460+
&self,
461+
bucket_id: &str,
462+
starttime_opt: Option<DateTime<Utc>>,
463+
endtime_opt: Option<DateTime<Utc>>,
464+
limit_opt: Option<u64>,
465+
) -> Result<Vec<Event>, DatastoreError> {
466+
let cmd = Command::GetEvents(
467+
bucket_id.to_string(),
468+
starttime_opt,
469+
endtime_opt,
470+
limit_opt,
471+
true,
472+
);
437473
let receiver = self.requester.request(cmd).unwrap();
438474
match receiver.collect().unwrap() {
439475
Ok(r) => match r {

aw-server/src/endpoints/import.rs

Lines changed: 98 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,114 @@ use rocket::http::Status;
33
use rocket::serde::json::Json;
44
use rocket::State;
55

6+
use std::collections::{BTreeMap, HashSet};
67
use std::sync::Mutex;
78

8-
use aw_models::BucketsExport;
9+
use aw_models::{BucketsExport, Event};
910

10-
use aw_datastore::Datastore;
11+
use aw_datastore::{Datastore, DatastoreError};
1112

1213
use crate::endpoints::{HttpErrorJson, ServerState};
1314

15+
/// Computes a dedup identity tuple for an event.
16+
///
17+
/// Uses canonical JSON serialization (sorted keys via `BTreeMap`) so that
18+
/// events with identical key-value pairs but different insertion order
19+
/// (e.g., from different clients) are correctly identified as duplicates.
20+
fn event_identity(
21+
event: &Event,
22+
) -> Result<(chrono::DateTime<chrono::Utc>, i64, String), HttpErrorJson> {
23+
let duration_ns = event.duration.num_nanoseconds().ok_or_else(|| {
24+
HttpErrorJson::new(
25+
Status::InternalServerError,
26+
"Failed to encode event duration for dedup".to_string(),
27+
)
28+
})?;
29+
// Sort keys before serializing for canonical, order-independent dedup.
30+
// This prevents missed duplicates when events from different clients
31+
// serialize the same data with different key orderings.
32+
let sorted: BTreeMap<_, _> = event.data.iter().collect();
33+
let data_json = serde_json::to_string(&sorted).map_err(|e| {
34+
HttpErrorJson::new(
35+
Status::InternalServerError,
36+
format!("Failed to encode event data for dedup: {e}"),
37+
)
38+
})?;
39+
Ok((event.timestamp, duration_ns, data_json))
40+
}
41+
1442
fn import(datastore_mutex: &Mutex<Datastore>, import: BucketsExport) -> Result<(), HttpErrorJson> {
1543
let datastore = endpoints_get_lock!(datastore_mutex);
16-
for (_bucketname, bucket) in import.buckets {
44+
for (_bucketname, mut bucket) in import.buckets {
1745
match datastore.create_bucket(&bucket) {
1846
Ok(_) => (),
47+
Err(DatastoreError::BucketAlreadyExists(_)) => {
48+
// Bucket already exists — merge events, skipping duplicates
49+
info!("Bucket '{}' already exists, merging events", bucket.id);
50+
if let Some(events) = bucket.events.take() {
51+
let events_vec = events.take_inner();
52+
if !events_vec.is_empty() {
53+
// Determine time range of events to import
54+
let start = events_vec.iter().map(|e| e.timestamp).min().unwrap();
55+
let end = events_vec
56+
.iter()
57+
.map(|e| e.calculate_endtime())
58+
.max()
59+
.unwrap();
60+
61+
// Fetch existing events in that range to detect duplicates.
62+
// Events without an explicit ID would otherwise be inserted as new rows
63+
// via AUTOINCREMENT, silently creating duplicates on re-import.
64+
//
65+
// **Memory note**: This loads all events in the import time range into
66+
// memory for O(1) dedup lookups. Typical Android re-imports involve a
67+
// few thousand events (~1-2 MB), which is well within server bounds.
68+
// Pathological cases (years of data) could be mitigated with pagination
69+
// or a bloom filter if OOM issues arise in practice.
70+
let existing = datastore
71+
.get_events_unclipped(&bucket.id, Some(start), Some(end), None)
72+
.map_err(|e| {
73+
HttpErrorJson::new(
74+
Status::InternalServerError,
75+
format!(
76+
"Failed to fetch existing events for dedup in '{}': {e:?}",
77+
bucket.id
78+
),
79+
)
80+
})?;
81+
82+
let existing_identities: HashSet<_> = existing
83+
.iter()
84+
.map(event_identity)
85+
.collect::<Result<_, _>>()?;
86+
87+
// Filter out events already present (matched by timestamp, duration, data)
88+
let new_events: Vec<_> = events_vec
89+
.into_iter()
90+
.map(|event| Ok((event_identity(&event)?, event)))
91+
.collect::<Result<Vec<_>, HttpErrorJson>>()?
92+
.into_iter()
93+
.filter_map(|(identity, event)| {
94+
(!existing_identities.contains(&identity)).then_some(event)
95+
})
96+
.collect();
97+
98+
if !new_events.is_empty() {
99+
if let Err(e) = datastore.insert_events(&bucket.id, &new_events) {
100+
let err_msg = format!(
101+
"Failed to merge events into existing bucket '{}': {e:?}",
102+
bucket.id
103+
);
104+
warn!("{}", err_msg);
105+
return Err(HttpErrorJson::new(
106+
Status::InternalServerError,
107+
err_msg,
108+
));
109+
}
110+
}
111+
}
112+
}
113+
}
19114
Err(e) => {
20115
let err_msg = format!("Failed to import bucket: {e:?}");
21116
warn!("{}", err_msg);

aw-server/tests/api.rs

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,39 @@ mod api_tests {
296296
.dispatch();
297297
assert_eq!(res.status(), rocket::http::Status::Ok);
298298

299-
// TODO: test more error cases
300-
// Import already existing bucket
299+
// Import already existing bucket with a new event — should merge instead of fail
300+
let res = client
301+
.post("/api/0/import")
302+
.header(ContentType::JSON)
303+
.header(Header::new("Host", "127.0.0.1:5600"))
304+
.body(
305+
r#"{"buckets":
306+
{"id1": {
307+
"id": "id1",
308+
"type": "type",
309+
"client": "client",
310+
"hostname": "hostname",
311+
"events": [{
312+
"timestamp":"2000-01-02T00:00:00Z",
313+
"duration":1.0,
314+
"data": {}
315+
}]
316+
}}}"#,
317+
)
318+
.dispatch();
319+
assert_eq!(res.status(), rocket::http::Status::Ok);
320+
321+
// Verify events were merged — bucket should now have 2 events
322+
let res = client
323+
.get("/api/0/buckets/id1/events")
324+
.header(ContentType::JSON)
325+
.header(Header::new("Host", "127.0.0.1:5600"))
326+
.dispatch();
327+
assert_eq!(res.status(), rocket::http::Status::Ok);
328+
let events: serde_json::Value = serde_json::from_str(&res.into_string().unwrap()).unwrap();
329+
assert_eq!(events.as_array().unwrap().len(), 2);
330+
331+
// Re-import the first event again — should be idempotent (no duplicate created)
301332
let res = client
302333
.post("/api/0/import")
303334
.header(ContentType::JSON)
@@ -317,10 +348,56 @@ mod api_tests {
317348
}}}"#,
318349
)
319350
.dispatch();
320-
assert_eq!(res.status(), rocket::http::Status::InternalServerError);
351+
assert_eq!(res.status(), rocket::http::Status::Ok);
352+
353+
// Count should still be 2, not 3 — re-import is idempotent
354+
let res = client
355+
.get("/api/0/buckets/id1/events")
356+
.header(ContentType::JSON)
357+
.header(Header::new("Host", "127.0.0.1:5600"))
358+
.dispatch();
359+
assert_eq!(res.status(), rocket::http::Status::Ok);
360+
let events: serde_json::Value = serde_json::from_str(&res.into_string().unwrap()).unwrap();
321361
assert_eq!(
322-
res.into_string().unwrap(),
323-
r#"{"message":"Failed to import bucket: BucketAlreadyExists(\"id1\")"}"#
362+
events.as_array().unwrap().len(),
363+
2,
364+
"Re-importing the same event should be idempotent"
365+
);
366+
367+
// Import a narrower event fully contained within an existing longer event.
368+
// This should be preserved as a distinct event, not dropped by clipped dedup.
369+
let res = client
370+
.post("/api/0/import")
371+
.header(ContentType::JSON)
372+
.header(Header::new("Host", "127.0.0.1:5600"))
373+
.body(
374+
r#"{"buckets":
375+
{"id1": {
376+
"id": "id1",
377+
"type": "type",
378+
"client": "client",
379+
"hostname": "hostname",
380+
"events": [{
381+
"timestamp":"2000-01-01T00:00:30Z",
382+
"duration":30.0,
383+
"data": {}
384+
}]
385+
}}}"#,
386+
)
387+
.dispatch();
388+
assert_eq!(res.status(), rocket::http::Status::Ok);
389+
390+
let res = client
391+
.get("/api/0/buckets/id1/events")
392+
.header(ContentType::JSON)
393+
.header(Header::new("Host", "127.0.0.1:5600"))
394+
.dispatch();
395+
assert_eq!(res.status(), rocket::http::Status::Ok);
396+
let events: serde_json::Value = serde_json::from_str(&res.into_string().unwrap()).unwrap();
397+
assert_eq!(
398+
events.as_array().unwrap().len(),
399+
3,
400+
"Contained event should not be dropped by clipped dedup"
324401
);
325402

326403
// Export single created bucket
@@ -388,7 +465,11 @@ mod api_tests {
388465
let mut buckets = export.buckets;
389466
assert_eq!(buckets.len(), 1);
390467
let b = buckets.remove("id1").unwrap();
391-
assert_eq!(b.events.unwrap().take_inner().len(), 1);
468+
assert_eq!(
469+
b.events.unwrap().take_inner().len(),
470+
3,
471+
"Export should preserve the contained event added during merge testing"
472+
);
392473

393474
assert_eq!(buckets.len(), 0);
394475
}

0 commit comments

Comments
 (0)