Skip to content

Commit

Permalink
feat: progress on sync
Browse files Browse the repository at this point in the history
  • Loading branch information
ErikBjare committed Jun 8, 2021
1 parent 27a8eb1 commit dbe96b1
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 44 deletions.
2 changes: 1 addition & 1 deletion aw-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ extern crate serde;
extern crate serde_json;

mod sync;
pub use sync::sync_datastores;
pub use sync::sync_run;
18 changes: 15 additions & 3 deletions aw-sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,30 @@ extern crate chrono;
extern crate serde;
extern crate serde_json;

use std::path::Path;

use aw_client_rust::AwClient;

mod sync;

fn main() {
// What needs to be done:
// - [x] Setup local sync bucket
// - Import local buckets and sync events from aw-server (either through API or through creating a read-only Datastore)
// - Import buckets and sync events from remotes
// - [x] Import local buckets and sync events from aw-server (either through API or through creating a read-only Datastore)
// - [x] Import buckets and sync events from remotes
// - [ ] Add CLI arguments
// - [ ] For which local server to use
// - [ ] For which sync dir to use

println!("Started aw-sync-rust...");
aw_server::logging::setup_logger(true).expect("Failed to setup logging");

sync::sync_run();
// TODO: Get path using dirs module
let sync_directory = Path::new("sync-testing");

let client = AwClient::new("127.0.0.1", "5667", "aw-sync-rust");

sync::sync_run(sync_directory, client);
info!("Finished successfully, exiting...");

// Needed to give the datastores some time to commit before program is shut down.
Expand Down
101 changes: 61 additions & 40 deletions aw-sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub trait AccessMethod: std::fmt::Debug {
) -> Result<Vec<Event>, String>;
fn insert_events(&self, bucket_id: &str, events: Vec<Event>) -> Result<Vec<Event>, String>;
fn get_event_count(&self, bucket_id: &str) -> Result<i64, String>;
fn heartbeat(&self, bucket_id: &str, event: Event, duration: f64) -> Result<Event, String>;
fn heartbeat(&self, bucket_id: &str, event: Event, duration: f64) -> Result<(), String>;
}

impl AccessMethod for Datastore {
Expand All @@ -58,10 +58,10 @@ impl AccessMethod for Datastore {
) -> Result<Vec<Event>, String> {
Ok(self.get_events(bucket_id, start, end, limit).unwrap())
}
fn heartbeat(&self, bucket_id: &str, event: Event, duration: f64) -> Result<Event, String> {
let res = self.heartbeat(bucket_id, event, duration).unwrap();
fn heartbeat(&self, bucket_id: &str, event: Event, duration: f64) -> Result<(), String> {
self.heartbeat(bucket_id, event, duration).unwrap();
self.force_commit().unwrap();
Ok(res)
Ok(())
}
fn insert_events(&self, bucket_id: &str, events: Vec<Event>) -> Result<Vec<Event>, String> {
let res = self.insert_events(bucket_id, &events[..]).unwrap();
Expand Down Expand Up @@ -114,39 +114,44 @@ impl AccessMethod for AwClient {
Ok(())
//Err(DatastoreError::InternalError("Not implemented".to_string()))
}
fn heartbeat(&self, _bucket_id: &str, _event: Event, _duration: f64) -> Result<Event, String> {
Err("Not implemented".to_string())
fn heartbeat(&self, bucket_id: &str, event: Event, duration: f64) -> Result<(), String> {
self.heartbeat(bucket_id, &event, duration)
.map_err(|e| format!("{:?}", e))
}
}

/// Performs a single sync pass
pub fn sync_run() {
// TODO: Get path using dirs module
let sync_directory = Path::new("/tmp/aw-sync-rust/testing");
pub fn sync_run(sync_directory: &Path, client: AwClient) {
fs::create_dir_all(sync_directory).unwrap();

// TODO: Use the local datastore here, preferably passed from main
let ds_local = AwClient::new("127.0.0.1", "5666", "aw-sync-rust");
//let ds_local = Datastore::new(
// sync_directory
// .join("test-local.db")
// .into_os_string()
// .into_string()
// .unwrap(),
// false,
//);
info!("Set up local datastore");
//log_buckets(&ds_local)?;
// TODO: Get device id and use to name db file

let info = client.get_info().unwrap();
let ds_localremote = Datastore::new(
sync_directory
.join(format!("test-{}.db", info.device_id))
.into_os_string()
.into_string()
.unwrap(),
false,
);
info!("Set up remote for local device");

let ds_remotes = setup_test(sync_directory).unwrap();
info!("Set up remote datastores");
info!("Set up remotes for testing");

// FIXME: These are not the datastores that should actually be synced, I'm just testing
// Pull
info!("Pulling...");
for ds_from in &ds_remotes {
sync_datastores(ds_from, &ds_local);
sync_datastores(ds_from, &client, false);
}

log_buckets(&ds_local);
// Push local server buckets to sync folder
info!("Pushing...");
sync_datastores(&client, &ds_localremote, true);

log_buckets(&client);
log_buckets(&ds_localremote);
for ds_from in &ds_remotes {
log_buckets(ds_from);
}
Expand All @@ -166,14 +171,15 @@ fn setup_test(sync_directory: &Path) -> std::io::Result<Vec<Datastore>> {
let ds = &ds_ as &dyn AccessMethod;

// Create a bucket
// NOTE: Created with duplicate name to make sure it still works under such conditions
let bucket_jsonstr = format!(
r#"{{
"id": "bucket-{}",
"type": "test",
"hostname": "device-{}",
"client": "test"
}}"#,
n, n
"id": "bucket",
"type": "test",
"hostname": "device-{}",
"client": "test"
}}"#,
n
);
let bucket: Bucket = serde_json::from_str(&bucket_jsonstr)?;
match ds.create_bucket(&bucket) {
Expand Down Expand Up @@ -212,9 +218,18 @@ fn setup_test(sync_directory: &Path) -> std::io::Result<Vec<Datastore>> {
}

/// Returns the sync-destination bucket for a given bucket, creates it if it doesn't exist.
fn get_or_create_sync_bucket(bucket_from: &Bucket, ds_to: &dyn AccessMethod) -> Bucket {
// Ensure the bucket ID ends in "-synced"
let new_id = format!("{}-synced", bucket_from.id.replace("-synced", ""));
fn get_or_create_sync_bucket(
bucket_from: &Bucket,
ds_to: &dyn AccessMethod,
is_push: bool,
) -> Bucket {
let new_id = if is_push {
bucket_from.id.clone()
} else {
// Ensure the bucket ID ends in "-synced"
let orig_bucketid = bucket_from.id.split("-synced-from-").next().unwrap();
format!("{}-synced-from-{}", orig_bucketid, bucket_from.hostname)
};

match ds_to.get_bucket(new_id.as_str()) {
Ok(bucket) => bucket,
Expand All @@ -223,9 +238,10 @@ fn get_or_create_sync_bucket(bucket_from: &Bucket, ds_to: &dyn AccessMethod) ->
bucket_new.id = new_id.clone();
// TODO: Replace sync origin with hostname/GUID and discuss how we will treat the data
// attributes for internal use.
bucket_new
.data
.insert("$aw.sync.origin".to_string(), serde_json::json!("test"));
bucket_new.data.insert(
"$aw.sync.origin".to_string(),
serde_json::json!(bucket_from.hostname),
);
ds_to.create_bucket(&bucket_new).unwrap();
ds_to.get_bucket(new_id.as_str()).unwrap()
}
Expand All @@ -234,16 +250,21 @@ fn get_or_create_sync_bucket(bucket_from: &Bucket, ds_to: &dyn AccessMethod) ->
}

/// Syncs all buckets from `ds_from` to `ds_to` with `-synced` appended to the ID of the destination bucket.
pub fn sync_datastores(ds_from: &dyn AccessMethod, ds_to: &dyn AccessMethod) {
fn sync_datastores(ds_from: &dyn AccessMethod, ds_to: &dyn AccessMethod, is_push: bool) {
// FIXME: "-synced" should only be appended when synced to the local database, not to the
// staging area for local buckets.
info!("Syncing {:?} to {:?}", ds_from, ds_to);

let buckets_from = ds_from.get_buckets().unwrap();
for bucket_from in buckets_from.values() {
let bucket_to = get_or_create_sync_bucket(bucket_from, ds_to);
// TODO: Refuse to sync buckets without hostname/device ID set, or if set to 'unknown'
if bucket_from.hostname == "unknown" {
warn!("Bucket hostname/device ID was invalid, skipping");
continue;
}
let bucket_to = get_or_create_sync_bucket(bucket_from, ds_to, is_push);
let eventcount_to_old = ds_to.get_event_count(bucket_to.id.as_str()).unwrap();
//info!("{:?}", bucket_to);
info!("Bucket: {:?}", bucket_to.id);

// Sync events
// FIXME: This should use bucket_to.metadata.end, but it doesn't because it doesn't work
Expand Down

0 comments on commit dbe96b1

Please sign in to comment.