Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions aw-sync/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ aw-sync daemon
# Sync daemon with specific buckets only
aw-sync daemon --buckets "aw-watcher-window,aw-watcher-afk" --start-date "2024-01-01"

# Sync daemon in push-only or pull-only mode
aw-sync daemon --mode push
aw-sync daemon --mode pull

# Sync all buckets once and exit
aw-sync sync --start-date "2024-01-01"
```
Expand All @@ -35,6 +39,7 @@ For more options, see `aw-sync --help`. Some notable options:
- `--start-date`: Only sync events after this date (YYYY-MM-DD)
- `--sync-db`: Specify a specific database file in the sync directory
- `--mode`: Choose sync mode: "push", "pull", or "both" (default: "both")
- For `aw-sync sync`, passing `--mode` (with no other options) is enough to opt into the per-bucket sync path where it's respected; without it, or any of `--buckets`/`--start-date`/`--sync-db`, `aw-sync sync` falls back to the legacy host-based mode, which always does both a pull and a push

### Setting up sync

Expand Down
27 changes: 20 additions & 7 deletions aw-sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ enum Commands {
#[clap(long, value_parser=parse_list)]
buckets: Option<Vec<String>>,

/// Mode to sync in. Can be "push", "pull", or "both".
/// Defaults to "both".
#[clap(long, default_value = "both")]
mode: sync::SyncMode,

/// Full path to sync db file
/// Useful for syncing buckets from a specific db file in the sync directory.
/// Must be a valid absolute path to a file in the sync directory.
Expand Down Expand Up @@ -103,9 +108,10 @@ enum Commands {
buckets: Option<Vec<String>>,

/// Mode to sync in. Can be "push", "pull", or "both".
/// Defaults to "both".
#[clap(long, default_value = "both")]
mode: sync::SyncMode,
/// Defaults to "both". Implies advanced sync mode (see below),
/// since the simple host-based sync mode always does both.
#[clap(long)]
mode: Option<sync::SyncMode>,

/// Full path to sync db file
/// Useful for syncing buckets from a specific db file in the sync directory.
Expand Down Expand Up @@ -166,19 +172,21 @@ fn main() -> Result<(), Box<dyn Error>> {
match opts.command.unwrap_or(Commands::Daemon {
start_date: None,
buckets: None,
mode: sync::SyncMode::Both,
sync_db: None,
}) {
// Start daemon
Commands::Daemon {
start_date,
buckets,
mode,
sync_db,
} => {
info!("Starting daemon...");

let effective_buckets = buckets;

daemon(&client, start_date, effective_buckets, sync_db)?;
daemon(&client, start_date, effective_buckets, sync_db, mode)?;
}
// Perform sync
Commands::Sync {
Expand All @@ -191,7 +199,11 @@ fn main() -> Result<(), Box<dyn Error>> {
let effective_buckets = buckets;

// If advanced options are provided, use advanced sync mode
if start_date.is_some() || effective_buckets.is_some() || sync_db.is_some() {
if start_date.is_some()
|| effective_buckets.is_some()
|| sync_db.is_some()
|| mode.is_some()
{
let sync_dir = dirs::get_sync_dir()?;
if let Some(db_path) = &sync_db {
info!("Using sync db: {}", &db_path.display());
Expand All @@ -211,7 +223,7 @@ fn main() -> Result<(), Box<dyn Error>> {
start: start_date,
};

sync::sync_run(&client, &sync_spec, mode)?
sync::sync_run(&client, &sync_spec, mode.unwrap_or(sync::SyncMode::Both))?
} else {
// Simple host-based sync mode (backwards compatibility)
// Pull
Expand Down Expand Up @@ -251,6 +263,7 @@ fn daemon(
start_date: Option<DateTime<Utc>>,
buckets: Option<Vec<String>>,
sync_db: Option<PathBuf>,
mode: sync::SyncMode,
) -> Result<(), Box<dyn Error>> {
let (tx, rx) = channel();

Expand Down Expand Up @@ -278,7 +291,7 @@ fn daemon(
};

loop {
if let Err(e) = sync::sync_run(client, &sync_spec, sync::SyncMode::Both) {
if let Err(e) = sync::sync_run(client, &sync_spec, mode) {
error!("Error during sync cycle: {}", e);
return Err(e);
}
Expand Down
10 changes: 8 additions & 2 deletions aw-sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ pub fn sync_datastores(

for bucket_from in buckets_from {
let bucket_to = get_or_create_sync_bucket(&bucket_from, ds_to, is_push);
sync_one(ds_from, ds_to, bucket_from, bucket_to);
sync_one(ds_from, ds_to, bucket_from, bucket_to, sync_spec);
}
}

Expand All @@ -299,6 +299,7 @@ fn sync_one(
ds_to: &dyn AccessMethod,
bucket_from: Bucket,
bucket_to: Bucket,
sync_spec: &SyncSpec,
) {
let eventcount_to_old = ds_to.get_event_count(bucket_to.id.as_str()).unwrap();
info!(" ⟳ Syncing bucket '{}'", bucket_to.id);
Expand All @@ -310,7 +311,12 @@ fn sync_one(
let most_recent_events = ds_to
.get_events(bucket_to.id.as_str(), None, None, Some(1))
.unwrap();
let resume_sync_at = most_recent_events.first().map(|e| e.timestamp + e.duration);
// If the destination bucket already has events, resume from where it left off.
// Otherwise (first sync of this bucket), fall back to sync_spec.start, if specified.
let resume_sync_at = most_recent_events
.first()
.map(|e| e.timestamp + e.duration)
.or(sync_spec.start);

if let Some(resume_time) = resume_sync_at {
info!(" + Resuming at {:?}", resume_time);
Expand Down
Loading