Skip to content

Commit 5f488b3

Browse files
committed
fix: very much WIP stuff, needs review
1 parent d9fbe29 commit 5f488b3

File tree

5 files changed

+114
-98
lines changed

5 files changed

+114
-98
lines changed

aw-datastore/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ mod worker;
2222
pub use self::datastore::DatastoreInstance;
2323
pub use self::worker::Datastore;
2424

25+
#[derive(Debug, Clone)]
2526
pub enum DatastoreMethod {
2627
Memory(),
2728
File(String),

aw-datastore/src/worker.rs

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ impl DatastoreWorker {
118118

119119
fn work_loop(&mut self, method: DatastoreMethod) {
120120
// Open SQLite connection
121-
let mut conn = match method {
121+
let mut conn = match &method {
122122
DatastoreMethod::Memory() => {
123123
Connection::open_in_memory().expect("Failed to create in-memory datastore")
124124
}
@@ -149,13 +149,26 @@ impl DatastoreWorker {
149149

150150
// Start handling and respond to requests
151151
loop {
152+
if self.quit {
153+
break;
154+
};
155+
152156
let last_commit_time: DateTime<Utc> = Utc::now();
153-
let mut transaction = conn
154-
.transaction_with_behavior(TransactionBehavior::Immediate)
155-
.unwrap();
157+
info!("Method '{:?}'", &method);
158+
let mut transaction: Transaction =
159+
match conn.transaction_with_behavior(TransactionBehavior::Immediate) {
160+
Ok(transaction) => transaction,
161+
Err(err) => {
162+
error!("Unable to start transaction! {:?}", err);
163+
// Wait 1s before retrying
164+
std::thread::sleep(std::time::Duration::from_millis(1000));
165+
continue;
166+
}
167+
};
168+
transaction.set_drop_behavior(DropBehavior::Commit);
169+
156170
self.uncommited_events = 0;
157171
self.commit = false;
158-
transaction.set_drop_behavior(DropBehavior::Commit);
159172
loop {
160173
let (request, response_sender) = match self.responder.poll() {
161174
Ok((req, res_sender)) => (req, res_sender),
@@ -182,9 +195,6 @@ impl DatastoreWorker {
182195
Ok(_) => (),
183196
Err(err) => panic!("Failed to commit datastore transaction! {}", err),
184197
}
185-
if self.quit {
186-
break;
187-
};
188198
}
189199
info!("DB Worker thread finished");
190200
}
@@ -196,29 +206,27 @@ impl DatastoreWorker {
196206
transaction: &Transaction,
197207
) -> Result<Response, DatastoreError> {
198208
match request {
199-
Command::CreateBucket(bucket) => match ds.create_bucket(&transaction, bucket) {
209+
Command::CreateBucket(bucket) => match ds.create_bucket(transaction, bucket) {
200210
Ok(_) => {
201211
self.commit = true;
202212
Ok(Response::Empty())
203213
}
204214
Err(e) => Err(e),
205215
},
206-
Command::DeleteBucket(bucketname) => {
207-
match ds.delete_bucket(&transaction, &bucketname) {
208-
Ok(_) => {
209-
self.commit = true;
210-
Ok(Response::Empty())
211-
}
212-
Err(e) => Err(e),
216+
Command::DeleteBucket(bucketname) => match ds.delete_bucket(transaction, &bucketname) {
217+
Ok(_) => {
218+
self.commit = true;
219+
Ok(Response::Empty())
213220
}
214-
}
221+
Err(e) => Err(e),
222+
},
215223
Command::GetBucket(bucketname) => match ds.get_bucket(&bucketname) {
216224
Ok(b) => Ok(Response::Bucket(b)),
217225
Err(e) => Err(e),
218226
},
219227
Command::GetBuckets() => Ok(Response::BucketMap(ds.get_buckets())),
220228
Command::InsertEvents(bucketname, events) => {
221-
match ds.insert_events(&transaction, &bucketname, events) {
229+
match ds.insert_events(transaction, &bucketname, events) {
222230
Ok(events) => {
223231
self.uncommited_events += events.len();
224232
self.last_heartbeat.insert(bucketname.to_string(), None); // invalidate last_heartbeat cache
@@ -229,7 +237,7 @@ impl DatastoreWorker {
229237
}
230238
Command::Heartbeat(bucketname, event, pulsetime) => {
231239
match ds.heartbeat(
232-
&transaction,
240+
transaction,
233241
&bucketname,
234242
event,
235243
pulsetime,
@@ -243,14 +251,14 @@ impl DatastoreWorker {
243251
}
244252
}
245253
Command::GetEvent(bucketname, event_id) => {
246-
match ds.get_event(&transaction, &bucketname, event_id) {
254+
match ds.get_event(transaction, &bucketname, event_id) {
247255
Ok(el) => Ok(Response::Event(el)),
248256
Err(e) => Err(e),
249257
}
250258
}
251259
Command::GetEvents(bucketname, starttime_opt, endtime_opt, limit_opt) => {
252260
match ds.get_events(
253-
&transaction,
261+
transaction,
254262
&bucketname,
255263
starttime_opt,
256264
endtime_opt,
@@ -261,13 +269,13 @@ impl DatastoreWorker {
261269
}
262270
}
263271
Command::GetEventCount(bucketname, starttime_opt, endtime_opt) => {
264-
match ds.get_event_count(&transaction, &bucketname, starttime_opt, endtime_opt) {
272+
match ds.get_event_count(transaction, &bucketname, starttime_opt, endtime_opt) {
265273
Ok(n) => Ok(Response::Count(n)),
266274
Err(e) => Err(e),
267275
}
268276
}
269277
Command::DeleteEventsById(bucketname, event_ids) => {
270-
match ds.delete_events_by_id(&transaction, &bucketname, event_ids) {
278+
match ds.delete_events_by_id(transaction, &bucketname, event_ids) {
271279
Ok(()) => Ok(Response::Empty()),
272280
Err(e) => Err(e),
273281
}
@@ -277,22 +285,22 @@ impl DatastoreWorker {
277285
Ok(Response::Empty())
278286
}
279287
Command::InsertKeyValue(key, data) => {
280-
match ds.insert_key_value(&transaction, &key, &data) {
288+
match ds.insert_key_value(transaction, &key, &data) {
281289
Ok(()) => Ok(Response::Empty()),
282290
Err(e) => Err(e),
283291
}
284292
}
285-
Command::GetKeyValue(key) => match ds.get_key_value(&transaction, &key) {
293+
Command::GetKeyValue(key) => match ds.get_key_value(transaction, &key) {
286294
Ok(result) => Ok(Response::KeyValue(result)),
287295
Err(e) => Err(e),
288296
},
289297
Command::GetKeysStarting(pattern) => {
290-
match ds.get_keys_starting(&transaction, &pattern) {
298+
match ds.get_keys_starting(transaction, &pattern) {
291299
Ok(result) => Ok(Response::StringVec(result)),
292300
Err(e) => Err(e),
293301
}
294302
}
295-
Command::DeleteKeyValue(key) => match ds.delete_key_value(&transaction, &key) {
303+
Command::DeleteKeyValue(key) => match ds.delete_key_value(transaction, &key) {
296304
Ok(()) => Ok(Response::Empty()),
297305
Err(e) => Err(e),
298306
},

aw-sync/src/main.rs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ extern crate serde_json;
1515

1616
use std::path::Path;
1717

18-
use chrono::{DateTime, Utc};
18+
use chrono::{DateTime, Datelike, TimeZone, Utc};
1919
use clap::{Parser, Subcommand};
2020

2121
use aw_client_rust::AwClient;
@@ -31,38 +31,42 @@ struct Opts {
3131
#[clap(subcommand)]
3232
command: Commands,
3333

34-
/// Host of instance to connect to
34+
/// Host of instance to connect to.
3535
#[clap(long, default_value = "127.0.0.1")]
3636
host: String,
37-
/// Port of instance to connect to
37+
/// Port of instance to connect to.
3838
#[clap(long, default_value = DEFAULT_PORT)]
3939
port: String,
4040
/// Convenience option for using the default testing host and port.
4141
#[clap(long)]
4242
testing: bool,
43-
/// Path to sync directory
44-
/// If not specified, exit
43+
/// Path to sync directory.
44+
/// If not specified, exit.
4545
#[clap(long)]
4646
sync_dir: String,
4747
}
4848

4949
#[derive(Subcommand)]
5050
enum Commands {
51-
/// Clones repos
51+
/// Sync subcommand.
52+
///
53+
/// Pulls remote buckets then pushes local buckets.
54+
/// First pulls remote buckets in the sync directory to the local aw-server.
55+
/// Then pushes local buckets from the aw-server to the local sync directory.
5256
#[clap(arg_required_else_help = true)]
5357
Sync {
54-
/// Date to start syncing from
55-
/// If not specified, start from beginning
58+
/// Date to start syncing from.
59+
/// If not specified, start from beginning.
5660
/// NOTE: might be unstable, as count cannot be used to verify integrity of sync.
5761
/// Format: YYYY-MM-DD
5862
#[clap(long)]
5963
start_date: Option<String>,
60-
/// Specify buckets to sync
61-
/// If not specified, all buckets will be synced
64+
/// Specify buckets to sync.
65+
/// If not specified, all buckets will be synced.
6266
#[clap(long)]
6367
buckets: Option<Vec<String>>,
6468
},
65-
/// List buckets and their sync status
69+
/// List buckets and their sync status.
6670
List {},
6771
}
6872

@@ -95,10 +99,13 @@ fn main() -> std::io::Result<()> {
9599
buckets,
96100
} => {
97101
let start: Option<DateTime<Utc>> = start_date.as_ref().map(|date| {
98-
let date_copy = date.clone();
99-
chrono::DateTime::parse_from_rfc3339(&date_copy)
100-
.unwrap()
101-
.with_timezone(&chrono::Utc)
102+
println!("{}", date.clone());
103+
chrono::NaiveDate::parse_from_str(&date.clone(), "%Y-%m-%d")
104+
.map(|nd| {
105+
let dt = Utc.ymd(nd.year(), nd.month(), nd.day());
106+
dt.and_hms(0, 0, 0)
107+
})
108+
.expect("Date was not on the format YYYY-MM-DD")
102109
});
103110
let sync_spec = sync::SyncSpec {
104111
path: sync_directory.to_path_buf(),

aw-sync/src/sync.rs

Lines changed: 1 addition & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::fs;
1313
use std::path::{Path, PathBuf};
1414

1515
use aw_client_rust::AwClient;
16-
use chrono::{DateTime, Duration, Utc};
16+
use chrono::{DateTime, Utc};
1717

1818
use aw_datastore::{Datastore, DatastoreError};
1919
use aw_models::{Bucket, Event};
@@ -159,61 +159,6 @@ fn create_datastore(path: &Path) -> Datastore {
159159
Datastore::new(pathstr.to_string(), false)
160160
}
161161

162-
// TODO: Move into tests
163-
fn setup_test(sync_directory: &Path) -> std::io::Result<Vec<Datastore>> {
164-
let mut datastores: Vec<Datastore> = Vec::new();
165-
for n in 0..2 {
166-
let dspath = sync_directory.join(format!("test-remote-{}.db", n));
167-
let ds_ = create_datastore(&dspath);
168-
let ds = &ds_ as &dyn AccessMethod;
169-
170-
// Create a bucket
171-
// NOTE: Created with duplicate name to make sure it still works under such conditions
172-
let bucket_jsonstr = format!(
173-
r#"{{
174-
"id": "bucket",
175-
"type": "test",
176-
"hostname": "device-{}",
177-
"client": "test"
178-
}}"#,
179-
n
180-
);
181-
let bucket: Bucket = serde_json::from_str(&bucket_jsonstr)?;
182-
match ds.create_bucket(&bucket) {
183-
Ok(()) => (),
184-
Err(e) => match e {
185-
DatastoreError::BucketAlreadyExists(_) => {
186-
debug!("bucket already exists, skipping");
187-
}
188-
e => panic!("woops! {:?}", e),
189-
},
190-
};
191-
192-
// Insert some testing events into the bucket
193-
let events: Vec<Event> = (0..3)
194-
.map(|i| {
195-
let timestamp: DateTime<Utc> = Utc::now() + Duration::milliseconds(i * 10);
196-
let event_jsonstr = format!(
197-
r#"{{
198-
"timestamp": "{}",
199-
"duration": 0,
200-
"data": {{"test": {} }}
201-
}}"#,
202-
timestamp.to_rfc3339(),
203-
i
204-
);
205-
serde_json::from_str(&event_jsonstr).unwrap()
206-
})
207-
.collect::<Vec<Event>>();
208-
209-
ds.insert_events(bucket.id.as_str(), events).unwrap();
210-
//let new_eventcount = ds.get_event_count(bucket.id.as_str(), None, None).unwrap();
211-
//info!("Eventcount: {:?} ({} new)", new_eventcount, events.len());
212-
datastores.push(ds_);
213-
}
214-
Ok(datastores)
215-
}
216-
217162
/// Returns the sync-destination bucket for a given bucket, creates it if it doesn't exist.
218163
fn get_or_create_sync_bucket(
219164
bucket_from: &Bucket,

aw-sync/tests/sync.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,4 +210,59 @@ mod sync_tests {
210210
// Check again that new events were indeed synced
211211
check_synced_buckets_equal_to_src(&all_buckets_map);
212212
}
213+
214+
// TODO: Move into tests
215+
fn setup_test(sync_directory: &Path) -> std::io::Result<Vec<Datastore>> {
216+
let mut datastores: Vec<Datastore> = Vec::new();
217+
for n in 0..2 {
218+
let dspath = sync_directory.join(format!("test-remote-{}.db", n));
219+
let ds_ = create_datastore(&dspath);
220+
let ds = &ds_ as &dyn AccessMethod;
221+
222+
// Create a bucket
223+
// NOTE: Created with duplicate name to make sure it still works under such conditions
224+
let bucket_jsonstr = format!(
225+
r#"{{
226+
"id": "bucket",
227+
"type": "test",
228+
"hostname": "device-{}",
229+
"client": "test"
230+
}}"#,
231+
n
232+
);
233+
let bucket: Bucket = serde_json::from_str(&bucket_jsonstr)?;
234+
match ds.create_bucket(&bucket) {
235+
Ok(()) => (),
236+
Err(e) => match e {
237+
DatastoreError::BucketAlreadyExists(_) => {
238+
debug!("bucket already exists, skipping");
239+
}
240+
e => panic!("woops! {:?}", e),
241+
},
242+
};
243+
244+
// Insert some testing events into the bucket
245+
let events: Vec<Event> = (0..3)
246+
.map(|i| {
247+
let timestamp: DateTime<Utc> = Utc::now() + Duration::milliseconds(i * 10);
248+
let event_jsonstr = format!(
249+
r#"{{
250+
"timestamp": "{}",
251+
"duration": 0,
252+
"data": {{"test": {} }}
253+
}}"#,
254+
timestamp.to_rfc3339(),
255+
i
256+
);
257+
serde_json::from_str(&event_jsonstr).unwrap()
258+
})
259+
.collect::<Vec<Event>>();
260+
261+
ds.insert_events(bucket.id.as_str(), events).unwrap();
262+
//let new_eventcount = ds.get_event_count(bucket.id.as_str(), None, None).unwrap();
263+
//info!("Eventcount: {:?} ({} new)", new_eventcount, events.len());
264+
datastores.push(ds_);
265+
}
266+
Ok(datastores)
267+
}
213268
}

0 commit comments

Comments
 (0)