Skip to content

Commit

Permalink
fix: more progress on sync
Browse files Browse the repository at this point in the history
  • Loading branch information
ErikBjare committed Apr 10, 2022
1 parent d2dc9d7 commit f3e78aa
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 35 deletions.
5 changes: 4 additions & 1 deletion aw-client-rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ pub struct AwClient {
impl AwClient {
pub fn new(ip: &str, port: &str, name: &str) -> AwClient {
let baseurl = format!("http://{}:{}", ip, port);
let client = reqwest::blocking::Client::new();
let client = reqwest::blocking::Client::builder()
.timeout(std::time::Duration::from_secs(60))
.build()
.unwrap();
let hostname = gethostname::gethostname().into_string().unwrap();
AwClient {
client,
Expand Down
17 changes: 12 additions & 5 deletions aw-sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ enum Commands {
/// Format: YYYY-MM-DD
#[clap(long)]
start_date: Option<String>,
/// Specify buckets to sync.
/// Specify buckets to sync using a comma-separated list.
/// If not specified, all buckets will be synced.
#[clap(long)]
buckets: Option<Vec<String>>,
buckets: Option<String>,
},
/// List buckets and their sync status.
List {},
Expand All @@ -73,7 +73,7 @@ enum Commands {
fn main() -> std::io::Result<()> {
let opts: Opts = Opts::parse();

println!("Started aw-sync-rust...");
info!("Started aw-sync...");

aw_server::logging::setup_logger(true).expect("Failed to setup logging");

Expand All @@ -83,14 +83,15 @@ fn main() -> std::io::Result<()> {
} else {
Path::new(&opts.sync_dir)
};
info!("Using sync dir: {}", sync_directory.display());

let port = if opts.testing && opts.port == DEFAULT_PORT {
"5666"
} else {
&opts.port
};

let client = AwClient::new(opts.host.as_str(), port, "aw-sync-rust");
let client = AwClient::new(opts.host.as_str(), port, "aw-sync");

match &opts.command {
// Perform two-way sync
Expand All @@ -107,9 +108,15 @@ fn main() -> std::io::Result<()> {
})
.expect("Date was not on the format YYYY-MM-DD")
});

// Parse comma-separated list
let buckets_vec: Option<Vec<String>> = buckets
.as_ref()
.map(|b| b.split(',').map(|s| s.to_string()).collect());

let sync_spec = sync::SyncSpec {
path: sync_directory.to_path_buf(),
buckets: buckets.clone(),
buckets: buckets_vec,
start,
};

Expand Down
79 changes: 56 additions & 23 deletions aw-sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,33 @@ impl Default for SyncSpec {
pub fn sync_run(client: AwClient, sync_spec: &SyncSpec) -> Result<(), String> {
let ds_localremote = setup_local_remote(&client, sync_spec.path.as_path())?;

//let ds_remotes = setup_test(sync_directory).unwrap();
//info!("Set up remotes for testing");

let info = client.get_info().unwrap();
let remote_dbfiles = find_remotes_nonlocal(sync_spec.path.as_path(), info.device_id.as_str());
info!("Found remotes: {:?}", remote_dbfiles);

// Log if remotes found
// TODO: Only log remotes of interest
if !remote_dbfiles.is_empty() {
println!(
"Found {} remote db files: {:?}",
remote_dbfiles.len(),
remote_dbfiles
);
}

// TODO: Check for compatible remote db version before opening
let ds_remotes: Vec<Datastore> = remote_dbfiles
.iter()
.map(|p| p.as_path())
.map(create_datastore)
.collect();
info!("Remote datastores: {:?}", ds_remotes);

if !ds_remotes.is_empty() {
println!(
"Found {} remote datastores: {:?}",
ds_remotes.len(),
ds_remotes
);
}

// Pull
info!("Pulling...");
Expand Down Expand Up @@ -89,7 +102,7 @@ pub fn sync_run(client: AwClient, sync_spec: &SyncSpec) -> Result<(), String> {
std::mem::drop(ds_localremote);

// NOTE: Will fail if db connections not closed (as it will open them again)
list_buckets(&client, sync_spec.path.as_path());
//list_buckets(&client, sync_spec.path.as_path());

Ok(())
}
Expand Down Expand Up @@ -130,15 +143,18 @@ fn setup_local_remote(client: &AwClient, path: &Path) -> Result<Datastore, Strin
.into_string()
.unwrap();

// Print a message if dbfile doesn't already exist
if !Path::new(&dbfile).exists() {
info!("Creating new database file: {}", dbfile);
}
let ds_localremote = Datastore::new(dbfile, false);
info!("Set up remote for local device");

Ok(ds_localremote)
}

/// Returns a list of all remote dbs
fn find_remotes(sync_directory: &Path) -> std::io::Result<Vec<PathBuf>> {
println!("{}", sync_directory.display());
//info!("Using sync dir: {}", sync_directory.display());
let dbs = fs::read_dir(sync_directory)?
.map(|res| res.ok().unwrap().path())
.filter(|p| p.is_dir())
Expand Down Expand Up @@ -233,6 +249,15 @@ pub fn sync_datastores(
.get_buckets()
.unwrap()
.iter_mut()
// If buckets vec isn't empty, filter out buckets not in the buckets vec
.filter(|tup| {
let bucket = &tup.1;
if let Some(buckets) = &sync_spec.buckets {
buckets.iter().any(|b_id| b_id == &bucket.id)
} else {
true
}
})
.map(|tup| {
// TODO: Refuse to sync buckets without hostname/device ID set, or if set to 'unknown'
if tup.1.hostname == "unknown" {
Expand All @@ -241,16 +266,17 @@ pub fn sync_datastores(
}
tup.1.clone()
})
// If buckets vec isn't empty, filter out buckets not in the buckets vec
.filter(|bucket| {
if let Some(buckets) = &sync_spec.buckets {
buckets.iter().any(|b_id| b_id == &bucket.id)
} else {
true
}
})
.collect();

// Log warning for buckets requested but not found
if let Some(buckets) = &sync_spec.buckets {
for b_id in buckets {
if buckets_from.iter().find(|b| b.id == *b_id).is_none() {
error!("Bucket \"{}\" not found in source datastore", b_id);
}
}
}

// Sync buckets in order of most recently updated
buckets_from.sort_by_key(|b| b.metadata.end);

Expand All @@ -268,7 +294,7 @@ fn sync_one(
bucket_to: Bucket,
) {
let eventcount_to_old = ds_to.get_event_count(bucket_to.id.as_str()).unwrap();
info!("Bucket: {:?}", bucket_to.id);
info!("Syncing bucket '{}'...", bucket_to.id);

// Sync events
// FIXME: This should use bucket_to.metadata.end, but it doesn't because it doesn't work
Expand All @@ -279,7 +305,11 @@ fn sync_one(
.unwrap();
let resume_sync_at = most_recent_events.first().map(|e| e.timestamp + e.duration);

info!("Resumed at: {:?}", resume_sync_at);
if let Some(resume_time) = resume_sync_at {
info!(" + Resuming at {:?}", resume_time);
} else {
info!(" + Starting from beginning");
}
let mut events: Vec<Event> = ds_from
.get_events(bucket_from.id.as_str(), resume_sync_at, None, None)
.unwrap()
Expand All @@ -297,15 +327,18 @@ fn sync_one(

// TODO: Do bulk insert using insert_events instead? (for performance)
for event in events {
print!("\r{}", event.timestamp);
//print!("\r{}", event.timestamp);
ds_to.heartbeat(bucket_to.id.as_str(), event, 0.0).unwrap();
}

let eventcount_to_new = ds_to.get_event_count(bucket_to.id.as_str()).unwrap();
info!(
"Synced {} new events",
eventcount_to_new - eventcount_to_old
);
let new_events_count = eventcount_to_new - eventcount_to_old;
assert!(new_events_count >= 0);
if new_events_count > 0 {
info!(" = Synced {} new events", new_events_count);
} else {
info!(" = Already up to date!");
}
}

fn log_buckets(ds: &dyn AccessMethod) {
Expand Down
17 changes: 11 additions & 6 deletions aw-sync/test-sync.sh
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
#!/bin/bash

# Helper script meant to be used to test aw-sync
# Example of a single-entry for cronjobs and the like

HOSTNAME="Teklas-Air.localdomain"
SYNCDIR="~/ActivityWatchSync/tekla-air-m1"
AWSYNCPARAMS="--port 5601 --sync-dir $SYNCDIR"
HOSTNAME=$(hostnamectl --static)
# TODO: Fetch in a cross-platform way (from aw-client command output?)
AWSERVERCONF=~/.config/activitywatch/aw-server/aw-server.toml

# trim everything in file AWSERVERCONF before '[server-testing]' section
# grep for the aw-server port in aw-server.toml
PORT=$(sed '/\[server-testing\]/,/\[.*\]/{//!d}' $AWSERVERCONF | grep -oP 'port = "\K[0-9]+')

SYNCDIR="$HOME/ActivityWatchSync/$HOSTNAME"
AWSYNCPARAMS="--port $PORT --sync-dir $SYNCDIR"

# TODO: Fix supplying multiple buckets in a single command
# NOTE: Only sync window and AFK buckets, for now
cargo run --bin aw-sync -- $AWSYNCPARAMS sync --buckets aw-watcher-window_$HOSTNAME
cargo run --bin aw-sync -- $AWSYNCPARAMS sync --buckets aw-watcher-afk_$HOSTNAME
cargo run --bin aw-sync -- $AWSYNCPARAMS sync --buckets aw-watcher-window_$HOSTNAME,aw-watcher-afk_$HOSTNAME

0 comments on commit f3e78aa

Please sign in to comment.