Skip to content

Commit adb08c3

Browse files
committed
fix: even more very much WIP, needs even more testing
1 parent 5f488b3 commit adb08c3

File tree

5 files changed

+40
-10
lines changed

5 files changed

+40
-10
lines changed

aw-datastore/src/datastore.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ use chrono::NaiveDateTime;
66
use chrono::Utc;
77

88
use rusqlite::Connection;
9+
use rusqlite::Transaction;
10+
use rusqlite::TransactionBehavior;
911

1012
use serde_json::value::Value;
1113

aw-datastore/src/worker.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ pub enum Response {
5353
Count(i64),
5454
KeyValue(KeyValue),
5555
StringVec(Vec<String>),
56+
// Used to indicate that no response should occur at all (not even an empty one)
57+
NoResponse(),
5658
}
5759

5860
#[allow(clippy::large_enum_variant)]
@@ -78,6 +80,7 @@ pub enum Command {
7880
GetKeyValue(String),
7981
GetKeysStarting(String),
8082
DeleteKeyValue(String),
83+
Close(),
8184
}
8285

8386
fn _unwrap_response(
@@ -149,12 +152,7 @@ impl DatastoreWorker {
149152

150153
// Start handling and respond to requests
151154
loop {
152-
if self.quit {
153-
break;
154-
};
155-
156155
let last_commit_time: DateTime<Utc> = Utc::now();
157-
info!("Method '{:?}'", &method);
158156
let mut transaction: Transaction =
159157
match conn.transaction_with_behavior(TransactionBehavior::Immediate) {
160158
Ok(transaction) => transaction,
@@ -179,8 +177,13 @@ impl DatastoreWorker {
179177
break;
180178
}
181179
};
182-
let response = self.handle_request(request, &mut ds, &transaction);
183-
response_sender.respond(response);
180+
let response = self.handle_request(request, &mut ds);
181+
match response {
182+
// The empty response is given by commands like close(), which should
183+
// not be responded to, as the requester might have disappeared.
184+
Ok(Response::NoResponse()) => (),
185+
_ => response_sender.respond(response),
186+
}
184187
let now: DateTime<Utc> = Utc::now();
185188
let commit_interval_passed: bool = (now - last_commit_time) > Duration::seconds(15);
186189
if self.commit || commit_interval_passed || self.uncommited_events > 100 {
@@ -195,6 +198,9 @@ impl DatastoreWorker {
195198
Ok(_) => (),
196199
Err(err) => panic!("Failed to commit datastore transaction! {}", err),
197200
}
201+
if self.quit {
202+
break;
203+
};
198204
}
199205
info!("DB Worker thread finished");
200206
}
@@ -524,4 +530,10 @@ impl Datastore {
524530
Err(e) => Err(e),
525531
}
526532
}
533+
534+
// TODO: Should this block until worker has stopped?
535+
pub fn close(&self) {
536+
info!("Sending close request to database");
537+
self.requester.request(Command::Close()).unwrap();
538+
}
527539
}

aw-sync/src/accessmethod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub trait AccessMethod: std::fmt::Debug {
2222
fn insert_events(&self, bucket_id: &str, events: Vec<Event>) -> Result<Vec<Event>, String>;
2323
fn get_event_count(&self, bucket_id: &str) -> Result<i64, String>;
2424
fn heartbeat(&self, bucket_id: &str, event: Event, duration: f64) -> Result<(), String>;
25+
fn close(&self);
2526
}
2627

2728
impl AccessMethod for Datastore {
@@ -58,6 +59,9 @@ impl AccessMethod for Datastore {
5859
fn get_event_count(&self, bucket_id: &str) -> Result<i64, String> {
5960
Ok(self.get_event_count(bucket_id, None, None).unwrap())
6061
}
62+
fn close(&self) {
63+
self.close();
64+
}
6165
}
6266

6367
impl AccessMethod for AwClient {
@@ -105,4 +109,7 @@ impl AccessMethod for AwClient {
105109
self.heartbeat(bucket_id, &event, duration)
106110
.map_err(|e| format!("{:?}", e))
107111
}
112+
fn close(&self) {
113+
// NOP
114+
}
108115
}

aw-sync/src/sync.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ pub fn sync_run(client: AwClient, sync_spec: &SyncSpec) -> Result<(), String> {
5858
.map(|p| p.as_path())
5959
.map(create_datastore)
6060
.collect();
61+
info!("Remote datastores: {:?}", ds_remotes);
6162

6263
// Pull
6364
info!("Pulling...");
@@ -75,11 +76,19 @@ pub fn sync_run(client: AwClient, sync_spec: &SyncSpec) -> Result<(), String> {
7576
sync_spec,
7677
);
7778

79+
// Close open database connections
80+
for ds_from in &ds_remotes {
81+
ds_from.close();
82+
}
83+
ds_localremote.close();
84+
85+
// Will fail if db connections not closed (as it will open them again)
7886
list_buckets(&client, sync_spec.path.as_path());
7987

8088
Ok(())
8189
}
8290

91+
#[allow(dead_code)]
8392
pub fn list_buckets(client: &AwClient, sync_directory: &Path) {
8493
let ds_localremote = setup_local_remote(client, sync_directory).unwrap();
8594

@@ -144,12 +153,12 @@ fn find_remotes_nonlocal(sync_directory: &Path, device_id: &str) -> Vec<PathBuf>
144153
remotes_all
145154
.into_iter()
146155
.filter(|path| {
147-
!path
156+
!(path
148157
.clone()
149158
.into_os_string()
150159
.into_string()
151160
.unwrap()
152-
.contains(device_id)
161+
.contains(device_id))
153162
})
154163
.collect()
155164
}

aw-sync/tests/sync.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ mod sync_tests {
211211
check_synced_buckets_equal_to_src(&all_buckets_map);
212212
}
213213

214-
// TODO: Move into tests
214+
// TODO: Find a way to reuse this (previously used in an integration test)
215215
fn setup_test(sync_directory: &Path) -> std::io::Result<Vec<Datastore>> {
216216
let mut datastores: Vec<Datastore> = Vec::new();
217217
for n in 0..2 {

0 commit comments

Comments
 (0)