Skip to content

Commit 3398993

Browse files
committed
feat: even more sync progress, including verified working testing scripts!
1 parent aebed23 commit 3398993

File tree

6 files changed

+131
-27
lines changed

6 files changed

+131
-27
lines changed

aw-client-rust/src/lib.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ impl AwClient {
2929
pub fn new(ip: &str, port: &str, name: &str) -> AwClient {
3030
let baseurl = format!("http://{}:{}", ip, port);
3131
let client = reqwest::blocking::Client::builder()
32-
.timeout(std::time::Duration::from_secs(60))
32+
.timeout(std::time::Duration::from_secs(120))
3333
.build()
3434
.unwrap();
3535
let hostname = gethostname::gethostname().into_string().unwrap();
@@ -52,9 +52,18 @@ impl AwClient {
5252
self.client.get(&url).send()?.json()
5353
}
5454

55-
pub fn create_bucket(&self, bucketname: &str, buckettype: &str) -> Result<(), reqwest::Error> {
56-
let url = format!("{}/api/0/buckets/{}", self.baseurl, bucketname);
57-
let data = Bucket {
55+
pub fn create_bucket(&self, bucket: &Bucket) -> Result<(), reqwest::Error> {
56+
let url = format!("{}/api/0/buckets/{}", self.baseurl, bucket.id);
57+
self.client.post(&url).json(bucket).send()?;
58+
Ok(())
59+
}
60+
61+
pub fn create_bucket_simple(
62+
&self,
63+
bucketname: &str,
64+
buckettype: &str,
65+
) -> Result<(), reqwest::Error> {
66+
let bucket = Bucket {
5867
bid: None,
5968
id: bucketname.to_string(),
6069
client: self.name.clone(),
@@ -66,8 +75,7 @@ impl AwClient {
6675
created: None,
6776
last_updated: None,
6877
};
69-
self.client.post(&url).json(&data).send()?;
70-
Ok(())
78+
self.create_bucket(&bucket)
7179
}
7280

7381
pub fn delete_bucket(&self, bucketname: &str) -> Result<(), reqwest::Error> {
@@ -111,6 +119,16 @@ impl AwClient {
111119
Ok(())
112120
}
113121

122+
pub fn insert_events(
123+
&self,
124+
bucketname: &str,
125+
events: Vec<Event>,
126+
) -> Result<(), reqwest::Error> {
127+
let url = format!("{}/api/0/buckets/{}/events", self.baseurl, bucketname);
128+
self.client.post(&url).json(&events).send()?;
129+
Ok(())
130+
}
131+
114132
pub fn heartbeat(
115133
&self,
116134
bucketname: &str,

aw-client-rust/tests/test.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ mod test {
7676

7777
let bucketname = format!("aw-client-rust-test_{}", client.hostname);
7878
let buckettype = "test-type";
79-
client.create_bucket(&bucketname, &buckettype).unwrap();
79+
client
80+
.create_bucket_simple(&bucketname, &buckettype)
81+
.unwrap();
8082

8183
let bucket = client.get_bucket(&bucketname).unwrap();
8284
assert!(bucket.id == bucketname);

aw-sync/src/accessmethod.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub trait AccessMethod: std::fmt::Debug {
1919
end: Option<DateTime<Utc>>,
2020
limit: Option<u64>,
2121
) -> Result<Vec<Event>, String>;
22-
fn insert_events(&self, bucket_id: &str, events: Vec<Event>) -> Result<Vec<Event>, String>;
22+
fn insert_events(&self, bucket_id: &str, events: Vec<Event>) -> Result<(), String>;
2323
fn get_event_count(&self, bucket_id: &str) -> Result<i64, String>;
2424
fn heartbeat(&self, bucket_id: &str, event: Event, duration: f64) -> Result<(), String>;
2525
fn close(&self);
@@ -30,10 +30,10 @@ impl AccessMethod for Datastore {
3030
Ok(self.get_buckets().unwrap())
3131
}
3232
fn get_bucket(&self, bucket_id: &str) -> Result<Bucket, DatastoreError> {
33-
self.get_bucket(bucket_id)
33+
Datastore::get_bucket(self, bucket_id)
3434
}
3535
fn create_bucket(&self, bucket: &Bucket) -> Result<(), DatastoreError> {
36-
self.create_bucket(bucket)?;
36+
Datastore::create_bucket(self, bucket)?;
3737
self.force_commit().unwrap();
3838
Ok(())
3939
}
@@ -44,23 +44,23 @@ impl AccessMethod for Datastore {
4444
end: Option<DateTime<Utc>>,
4545
limit: Option<u64>,
4646
) -> Result<Vec<Event>, String> {
47-
Ok(self.get_events(bucket_id, start, end, limit).unwrap())
47+
Ok(Datastore::get_events(self, bucket_id, start, end, limit).unwrap())
4848
}
4949
fn heartbeat(&self, bucket_id: &str, event: Event, duration: f64) -> Result<(), String> {
50-
self.heartbeat(bucket_id, event, duration).unwrap();
50+
Datastore::heartbeat(self, bucket_id, event, duration).unwrap();
5151
self.force_commit().unwrap();
5252
Ok(())
5353
}
54-
fn insert_events(&self, bucket_id: &str, events: Vec<Event>) -> Result<Vec<Event>, String> {
55-
let res = self.insert_events(bucket_id, &events[..]).unwrap();
54+
fn insert_events(&self, bucket_id: &str, events: Vec<Event>) -> Result<(), String> {
55+
Datastore::insert_events(self, bucket_id, &events[..]).unwrap();
5656
self.force_commit().unwrap();
57-
Ok(res)
57+
Ok(())
5858
}
5959
fn get_event_count(&self, bucket_id: &str) -> Result<i64, String> {
60-
Ok(self.get_event_count(bucket_id, None, None).unwrap())
60+
Ok(Datastore::get_event_count(self, bucket_id, None, None).unwrap())
6161
}
6262
fn close(&self) {
63-
self.close();
63+
Datastore::close(self);
6464
}
6565
}
6666

@@ -92,18 +92,15 @@ impl AccessMethod for AwClient {
9292
) -> Result<Vec<Event>, String> {
9393
Ok(self.get_events(bucket_id, start, end, limit).unwrap())
9494
}
95-
fn insert_events(&self, _bucket_id: &str, _events: Vec<Event>) -> Result<Vec<Event>, String> {
96-
//Ok(self.insert_events(bucket_id, &events[..]).unwrap())
97-
Err("Not implemented".to_string())
95+
fn insert_events(&self, bucket_id: &str, events: Vec<Event>) -> Result<(), String> {
96+
AwClient::insert_events(self, bucket_id, events).map_err(|e| e.to_string())
9897
}
9998
fn get_event_count(&self, bucket_id: &str) -> Result<i64, String> {
10099
Ok(self.get_event_count(bucket_id).unwrap())
101100
}
102101
fn create_bucket(&self, bucket: &Bucket) -> Result<(), DatastoreError> {
103-
self.create_bucket(bucket.id.as_str(), bucket._type.as_str())
104-
.unwrap();
102+
AwClient::create_bucket(self, bucket).unwrap();
105103
Ok(())
106-
//Err(DatastoreError::InternalError("Not implemented".to_string()))
107104
}
108105
fn heartbeat(&self, bucket_id: &str, event: Event, duration: f64) -> Result<(), String> {
109106
self.heartbeat(bucket_id, &event, duration)

aw-sync/src/sync.rs

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,10 @@ fn sync_one(
317317
} else {
318318
info!(" + Starting from beginning");
319319
}
320+
321+
// Fetch events
322+
// Unset ID on events, as they are not globally unique
323+
// TODO: Fetch at most ~5,000 events at a time (or so, to avoid timeout from huge buckets)
320324
let mut events: Vec<Event> = ds_from
321325
.get_events(bucket_from.id.as_str(), resume_sync_at, None, None)
322326
.unwrap()
@@ -329,13 +333,47 @@ fn sync_one(
329333
.collect();
330334

331335
// Sort ascending
336+
// FIXME: What happens here if two events have the same timestamp?
332337
events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
333-
//info!("{:?}", events);
334338

335339
// TODO: Do bulk insert using insert_events instead? (for performance)
336-
for event in events {
337-
//print!("\r{}", event.timestamp);
338-
ds_to.heartbeat(bucket_to.id.as_str(), event, 0.0).unwrap();
340+
// Client-side heartbeat queueing should keep things somewhat performant though?
341+
// NOTE: First event needs to be inserted with heartbeat, to ensure appropriate
342+
// merging/updating of pulsed events.
343+
let events_total = events.len();
344+
let mut events_sent = 0;
345+
let mut events_iter = events.into_iter();
346+
if let Some(e) = events_iter.next() {
347+
ds_to.heartbeat(bucket_to.id.as_str(), e, 0.0).unwrap();
348+
events_sent += 1;
349+
}
350+
351+
const BATCH_SIZE: usize = 5000;
352+
if BATCH_SIZE == 1 {
353+
for event in events_iter {
354+
print!("{} ({}/{})\r", &event.timestamp, events_sent, events_total);
355+
ds_to.heartbeat(bucket_to.id.as_str(), event, 0.0).unwrap();
356+
events_sent += 1;
357+
}
358+
} else {
359+
let mut batch_events = Vec::with_capacity(BATCH_SIZE);
360+
for e in events_iter {
361+
print!("{} ({}/{})\r", e.timestamp, events_sent, events_total);
362+
batch_events.push(e);
363+
events_sent += 1;
364+
if batch_events.len() >= BATCH_SIZE {
365+
ds_to
366+
.insert_events(bucket_to.id.as_str(), batch_events.clone())
367+
.unwrap();
368+
batch_events.clear();
369+
}
370+
}
371+
372+
if !batch_events.is_empty() {
373+
ds_to
374+
.insert_events(bucket_to.id.as_str(), batch_events)
375+
.unwrap();
376+
}
339377
}
340378

341379
let eventcount_to_new = ds_to.get_event_count(bucket_to.id.as_str()).unwrap();

aw-sync/test-import-sync.sh

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#!/bin/bash
2+
3+
# get script path
4+
SCRIPTPATH="$( cd "$(dirname "$0")" ; pwd -P )"
5+
pushd $SCRIPTPATH
6+
7+
# port used for testing instance
8+
PORT=5667
9+
10+
# if server already running on port 5667, don't start again
11+
if [ "$(lsof -i:$PORT -sTCP:LISTEN -t)" ]; then
12+
echo "ActivityWatch server already running on port $PORT, using that."
13+
else
14+
# Set up an isolated ActivityWatch instance
15+
./test-server.sh $PORT &
16+
fi
17+
18+
19+
sleep 1;
20+
SYNCROOTDIR="$HOME/ActivityWatchSync"
21+
22+
# For each host in the sync directory, pull the data from each database file using aw-sync
23+
for host in $(ls $SYNCROOTDIR); do
24+
SYNCDIR="$SYNCROOTDIR/$host"
25+
for db in $(ls $SYNCDIR/*/*.db); do
26+
AWSYNCPARAMS="--port $PORT --sync-dir $SYNCDIR"
27+
BUCKETS="aw-watcher-window_$host,aw-watcher-afk_$host"
28+
29+
echo "Syncing $db to $host"
30+
cargo run --bin aw-sync -- $AWSYNCPARAMS sync --mode pull --buckets $BUCKETS
31+
done
32+
done
33+
34+
# kill aw-server-rust
35+
#kill %1
36+
fg

aw-sync/test-server.sh

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#!/bin/bash
2+
3+
# port and db used for testing instance
4+
5+
# If port already set, use that, otherwise, use 5667
6+
PORT=${PORT:-5667}
7+
8+
DBPATH=/tmp/aw-server-rust-sync-testing/
9+
mkdir -p $DBPATH
10+
11+
# Set up an isolated ActivityWatch instance
12+
pushd ..
13+
cargo run --bin aw-server -- --testing --port $PORT --dbpath $DBPATH/data.db --no-legacy-import

0 commit comments

Comments
 (0)