Skip to content

Commit d9fbe29

Browse files
committed
fix: more work on syncing
1 parent a8287c5 commit d9fbe29

File tree

4 files changed

+103
-42
lines changed

4 files changed

+103
-42
lines changed

aw-sync/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ extern crate serde_json;
77
mod sync;
88
pub use sync::sync_datastores;
99
pub use sync::sync_run;
10+
pub use sync::SyncSpec;
1011

1112
mod accessmethod;
1213
pub use accessmethod::AccessMethod;

aw-sync/src/main.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ enum Commands {
6060
/// Specify buckets to sync
6161
/// If not specified, all buckets will be synced
6262
#[clap(long)]
63-
buckets: Vec<String>,
63+
buckets: Option<Vec<String>>,
6464
},
6565
/// List buckets and their sync status
6666
List {},
@@ -100,8 +100,13 @@ fn main() -> std::io::Result<()> {
100100
.unwrap()
101101
.with_timezone(&chrono::Utc)
102102
});
103+
let sync_spec = sync::SyncSpec {
104+
path: sync_directory.to_path_buf(),
105+
buckets: buckets.clone(),
106+
start,
107+
};
103108

104-
sync::sync_run(sync_directory, client, buckets, start).map_err(|e| {
109+
sync::sync_run(client, &sync_spec).map_err(|e| {
105110
println!("Error: {}", e);
106111
std::io::Error::new(std::io::ErrorKind::Other, e)
107112
})?;

aw-sync/src/sync.rs

Lines changed: 59 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,49 +20,49 @@ use aw_models::{Bucket, Event};
2020

2121
use crate::accessmethod::AccessMethod;
2222

23-
fn setup_local_remote(client: &AwClient, sync_directory: &Path) -> Result<Datastore, String> {
24-
// FIXME: Don't run twice if already exists
25-
fs::create_dir_all(sync_directory).unwrap();
26-
27-
let info = client.get_info().unwrap();
28-
let remotedir = sync_directory.join(info.device_id.as_str());
29-
fs::create_dir_all(&remotedir).unwrap();
30-
31-
let dbfile = remotedir
32-
.join("test.db")
33-
.into_os_string()
34-
.into_string()
35-
.unwrap();
36-
37-
let ds_localremote = Datastore::new(dbfile, false);
38-
info!("Set up remote for local device");
23+
pub struct SyncSpec {
24+
/// Path of sync folder
25+
pub path: PathBuf,
26+
/// Bucket IDs to sync
27+
pub buckets: Option<Vec<String>>,
28+
/// Start of time range to sync
29+
pub start: Option<DateTime<Utc>>,
30+
}
3931

40-
Ok(ds_localremote)
32+
impl Default for SyncSpec {
33+
fn default() -> Self {
34+
// TODO: Better default path
35+
let path = Path::new("/tmp/aw-sync").to_path_buf();
36+
SyncSpec {
37+
path,
38+
buckets: None,
39+
start: None,
40+
}
41+
}
4142
}
4243

4344
/// Performs a single sync pass
44-
pub fn sync_run(
45-
sync_directory: &Path,
46-
client: AwClient,
47-
buckets: &Vec<String>,
48-
start: Option<DateTime<Utc>>,
49-
) -> Result<(), String> {
50-
let ds_localremote = setup_local_remote(&client, sync_directory)?;
45+
pub fn sync_run(client: AwClient, sync_spec: &SyncSpec) -> Result<(), String> {
46+
let ds_localremote = setup_local_remote(&client, sync_spec.path.as_path())?;
5147

5248
//let ds_remotes = setup_test(sync_directory).unwrap();
5349
//info!("Set up remotes for testing");
5450

5551
let info = client.get_info().unwrap();
56-
let remote_dbfiles = find_remotes_nonlocal(sync_directory, info.device_id.as_str());
52+
let remote_dbfiles = find_remotes_nonlocal(sync_spec.path.as_path(), info.device_id.as_str());
5753
info!("Found remotes: {:?}", remote_dbfiles);
5854

5955
// TODO: Check for compatible remote db version before opening
60-
let ds_remotes: Vec<Datastore> = remote_dbfiles.iter().map(create_datastore).collect();
56+
let ds_remotes: Vec<Datastore> = remote_dbfiles
57+
.iter()
58+
.map(|p| p.as_path())
59+
.map(create_datastore)
60+
.collect();
6161

6262
// Pull
6363
info!("Pulling...");
6464
for ds_from in &ds_remotes {
65-
sync_datastores(ds_from, &client, false, None, buckets);
65+
sync_datastores(ds_from, &client, false, None, sync_spec);
6666
}
6767

6868
// Push local server buckets to sync folder
@@ -72,10 +72,10 @@ pub fn sync_run(
7272
&ds_localremote,
7373
true,
7474
Some(info.device_id.as_str()),
75-
buckets,
75+
sync_spec,
7676
);
7777

78-
list_buckets(&client, sync_directory);
78+
list_buckets(&client, sync_spec.path.as_path());
7979

8080
Ok(())
8181
}
@@ -88,7 +88,11 @@ pub fn list_buckets(client: &AwClient, sync_directory: &Path) {
8888
info!("Found remotes: {:?}", remote_dbfiles);
8989

9090
// TODO: Check for compatible remote db version before opening
91-
let ds_remotes: Vec<Datastore> = remote_dbfiles.iter().map(create_datastore).collect();
91+
let ds_remotes: Vec<Datastore> = remote_dbfiles
92+
.iter()
93+
.map(|p| p.as_path())
94+
.map(create_datastore)
95+
.collect();
9296

9397
log_buckets(client);
9498
log_buckets(&ds_localremote);
@@ -97,6 +101,26 @@ pub fn list_buckets(client: &AwClient, sync_directory: &Path) {
97101
}
98102
}
99103

104+
fn setup_local_remote(client: &AwClient, path: &Path) -> Result<Datastore, String> {
105+
// FIXME: Don't run twice if already exists
106+
fs::create_dir_all(path).unwrap();
107+
108+
let info = client.get_info().unwrap();
109+
let remotedir = path.join(info.device_id.as_str());
110+
fs::create_dir_all(&remotedir).unwrap();
111+
112+
let dbfile = remotedir
113+
.join("test.db")
114+
.into_os_string()
115+
.into_string()
116+
.unwrap();
117+
118+
let ds_localremote = Datastore::new(dbfile, false);
119+
info!("Set up remote for local device");
120+
121+
Ok(ds_localremote)
122+
}
123+
100124
/// Returns a list of all remote dbs
101125
fn find_remotes(sync_directory: &Path) -> std::io::Result<Vec<PathBuf>> {
102126
println!("{}", sync_directory.display());
@@ -130,9 +154,9 @@ fn find_remotes_nonlocal(sync_directory: &Path, device_id: &str) -> Vec<PathBuf>
130154
.collect()
131155
}
132156

133-
fn create_datastore(dspath: &PathBuf) -> Datastore {
134-
let pathstr = dspath.clone().into_os_string().into_string().unwrap();
135-
Datastore::new(pathstr, false)
157+
fn create_datastore(path: &Path) -> Datastore {
158+
let pathstr = path.as_os_str().to_str().unwrap();
159+
Datastore::new(pathstr.to_string(), false)
136160
}
137161

138162
// TODO: Move into tests
@@ -239,7 +263,7 @@ pub fn sync_datastores(
239263
ds_to: &dyn AccessMethod,
240264
is_push: bool,
241265
src_did: Option<&str>,
242-
buckets: &[String],
266+
sync_spec: &SyncSpec,
243267
) {
244268
// FIXME: "-synced" should only be appended when synced to the local database, not to the
245269
// staging area for local buckets.
@@ -259,7 +283,7 @@ pub fn sync_datastores(
259283
})
260284
// If buckets vec isn't empty, filter out buckets not in the buckets vec
261285
.filter(|bucket| {
262-
if buckets.is_empty() {
286+
if let Some(buckets) = &sync_spec.buckets {
263287
buckets.iter().any(|b_id| b_id == &bucket.id)
264288
} else {
265289
true

aw-sync/tests/sync.rs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ mod sync_tests {
1010
use aw_datastore::{Datastore, DatastoreError};
1111
use aw_models::{Bucket, Event};
1212
use aw_sync;
13+
use aw_sync::SyncSpec;
1314

1415
struct TestState {
1516
ds_src: Datastore,
@@ -101,7 +102,13 @@ mod sync_tests {
101102
let state = init_teststate();
102103
create_bucket(&state.ds_src, 0);
103104

104-
aw_sync::sync_datastores(&state.ds_src, &state.ds_dest, false, None, &[]);
105+
aw_sync::sync_datastores(
106+
&state.ds_src,
107+
&state.ds_dest,
108+
false,
109+
None,
110+
&SyncSpec::default(),
111+
);
105112

106113
let buckets_src: HashMap<String, Bucket> = state.ds_src.get_buckets().unwrap();
107114
let buckets_dest: HashMap<String, Bucket> = state.ds_dest.get_buckets().unwrap();
@@ -136,7 +143,13 @@ mod sync_tests {
136143
.heartbeat(bucket_id.as_str(), create_event("1"), 1.0)
137144
.unwrap();
138145

139-
aw_sync::sync_datastores(&state.ds_src, &state.ds_dest, false, None, &[]);
146+
aw_sync::sync_datastores(
147+
&state.ds_src,
148+
&state.ds_dest,
149+
false,
150+
None,
151+
&SyncSpec::default(),
152+
);
140153

141154
let all_datastores: Vec<&Datastore> =
142155
[&state.ds_src, &state.ds_dest].iter().cloned().collect();
@@ -150,7 +163,13 @@ mod sync_tests {
150163
.ds_src
151164
.heartbeat(bucket_id.as_str(), create_event("1"), 1.0)
152165
.unwrap();
153-
aw_sync::sync_datastores(&state.ds_src, &state.ds_dest, false, None, &[]);
166+
aw_sync::sync_datastores(
167+
&state.ds_src,
168+
&state.ds_dest,
169+
false,
170+
None,
171+
&SyncSpec::default(),
172+
);
154173

155174
// Check again that new events were indeed synced
156175
check_synced_buckets_equal_to_src(&all_buckets_map);
@@ -163,7 +182,13 @@ mod sync_tests {
163182
let bucket_id = create_bucket(&state.ds_src, 0);
164183
create_events(&state.ds_src, bucket_id.as_str(), 10);
165184

166-
aw_sync::sync_datastores(&state.ds_src, &state.ds_dest, false, None, &[]);
185+
aw_sync::sync_datastores(
186+
&state.ds_src,
187+
&state.ds_dest,
188+
false,
189+
None,
190+
&SyncSpec::default(),
191+
);
167192

168193
let all_datastores: Vec<&Datastore> =
169194
[&state.ds_src, &state.ds_dest].iter().cloned().collect();
@@ -174,7 +199,13 @@ mod sync_tests {
174199

175200
// Add some more events
176201
create_events(&state.ds_src, bucket_id.as_str(), 10);
177-
aw_sync::sync_datastores(&state.ds_src, &state.ds_dest, false, None, &[]);
202+
aw_sync::sync_datastores(
203+
&state.ds_src,
204+
&state.ds_dest,
205+
false,
206+
None,
207+
&SyncSpec::default(),
208+
);
178209

179210
// Check again that new events were indeed synced
180211
check_synced_buckets_equal_to_src(&all_buckets_map);

0 commit comments

Comments
 (0)