Skip to content

Commit 396bff8

Browse files
committed
feat: added subcommands to aw-sync, support selectively syncing buckets
1 parent e741278 commit 396bff8

File tree

4 files changed

+202
-131
lines changed

4 files changed

+202
-131
lines changed

aw-sync/src/accessmethod.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use std::collections::HashMap;
2+
3+
use aw_client_rust::AwClient;
4+
use chrono::{DateTime, Utc};
5+
use reqwest::StatusCode;
6+
7+
use aw_datastore::{Datastore, DatastoreError};
8+
use aw_models::{Bucket, Event};
9+
10+
// This trait should be implemented by both AwClient and Datastore, unifying them under a single API
11+
pub trait AccessMethod: std::fmt::Debug {
12+
fn get_buckets(&self) -> Result<HashMap<String, Bucket>, String>;
13+
fn get_bucket(&self, bucket_id: &str) -> Result<Bucket, DatastoreError>;
14+
fn create_bucket(&self, bucket: &Bucket) -> Result<(), DatastoreError>;
15+
fn get_events(
16+
&self,
17+
bucket_id: &str,
18+
start: Option<DateTime<Utc>>,
19+
end: Option<DateTime<Utc>>,
20+
limit: Option<u64>,
21+
) -> Result<Vec<Event>, String>;
22+
fn insert_events(&self, bucket_id: &str, events: Vec<Event>) -> Result<Vec<Event>, String>;
23+
fn get_event_count(&self, bucket_id: &str) -> Result<i64, String>;
24+
fn heartbeat(&self, bucket_id: &str, event: Event, duration: f64) -> Result<(), String>;
25+
}
26+
27+
impl AccessMethod for Datastore {
28+
fn get_buckets(&self) -> Result<HashMap<String, Bucket>, String> {
29+
Ok(self.get_buckets().unwrap())
30+
}
31+
fn get_bucket(&self, bucket_id: &str) -> Result<Bucket, DatastoreError> {
32+
self.get_bucket(bucket_id)
33+
}
34+
fn create_bucket(&self, bucket: &Bucket) -> Result<(), DatastoreError> {
35+
self.create_bucket(bucket)?;
36+
self.force_commit().unwrap();
37+
Ok(())
38+
}
39+
fn get_events(
40+
&self,
41+
bucket_id: &str,
42+
start: Option<DateTime<Utc>>,
43+
end: Option<DateTime<Utc>>,
44+
limit: Option<u64>,
45+
) -> Result<Vec<Event>, String> {
46+
Ok(self.get_events(bucket_id, start, end, limit).unwrap())
47+
}
48+
fn heartbeat(&self, bucket_id: &str, event: Event, duration: f64) -> Result<(), String> {
49+
self.heartbeat(bucket_id, event, duration).unwrap();
50+
self.force_commit().unwrap();
51+
Ok(())
52+
}
53+
fn insert_events(&self, bucket_id: &str, events: Vec<Event>) -> Result<Vec<Event>, String> {
54+
let res = self.insert_events(bucket_id, &events[..]).unwrap();
55+
self.force_commit().unwrap();
56+
Ok(res)
57+
}
58+
fn get_event_count(&self, bucket_id: &str) -> Result<i64, String> {
59+
Ok(self.get_event_count(bucket_id, None, None).unwrap())
60+
}
61+
}
62+
63+
impl AccessMethod for AwClient {
64+
fn get_buckets(&self) -> Result<HashMap<String, Bucket>, String> {
65+
Ok(self.get_buckets().unwrap())
66+
}
67+
fn get_bucket(&self, bucket_id: &str) -> Result<Bucket, DatastoreError> {
68+
let bucket = self.get_bucket(bucket_id);
69+
match bucket {
70+
Ok(bucket) => Ok(bucket),
71+
Err(e) => {
72+
warn!("{:?}", e);
73+
let code = e.status().unwrap();
74+
if code == StatusCode::NOT_FOUND {
75+
Err(DatastoreError::NoSuchBucket(bucket_id.into()))
76+
} else {
77+
panic!("Unexpected error");
78+
}
79+
}
80+
}
81+
}
82+
fn get_events(
83+
&self,
84+
bucket_id: &str,
85+
start: Option<DateTime<Utc>>,
86+
end: Option<DateTime<Utc>>,
87+
limit: Option<u64>,
88+
) -> Result<Vec<Event>, String> {
89+
Ok(self.get_events(bucket_id, start, end, limit).unwrap())
90+
}
91+
fn insert_events(&self, _bucket_id: &str, _events: Vec<Event>) -> Result<Vec<Event>, String> {
92+
//Ok(self.insert_events(bucket_id, &events[..]).unwrap())
93+
Err("Not implemented".to_string())
94+
}
95+
fn get_event_count(&self, bucket_id: &str) -> Result<i64, String> {
96+
Ok(self.get_event_count(bucket_id).unwrap())
97+
}
98+
fn create_bucket(&self, bucket: &Bucket) -> Result<(), DatastoreError> {
99+
self.create_bucket(bucket.id.as_str(), bucket._type.as_str())
100+
.unwrap();
101+
Ok(())
102+
//Err(DatastoreError::InternalError("Not implemented".to_string()))
103+
}
104+
fn heartbeat(&self, bucket_id: &str, event: Event, duration: f64) -> Result<(), String> {
105+
self.heartbeat(bucket_id, &event, duration)
106+
.map_err(|e| format!("{:?}", e))
107+
}
108+
}

aw-sync/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,6 @@ extern crate serde_json;
77
mod sync;
88
pub use sync::sync_datastores;
99
pub use sync::sync_run;
10+
11+
mod accessmethod;
12+
pub use accessmethod::AccessMethod;

aw-sync/src/main.rs

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,21 @@ extern crate serde_json;
1616
use std::path::Path;
1717

1818
use chrono::{DateTime, Utc};
19-
use clap::Parser;
19+
use clap::{Parser, Subcommand};
2020

2121
use aw_client_rust::AwClient;
2222

23+
mod accessmethod;
2324
mod sync;
2425

2526
const DEFAULT_PORT: &str = "5600";
2627

2728
#[derive(Parser)]
2829
#[clap(version = "0.1", author = "Erik Bjäreholt")]
2930
struct Opts {
31+
#[clap(subcommand)]
32+
command: Commands,
33+
3034
/// Host of instance to connect to
3135
#[clap(long, default_value = "127.0.0.1")]
3236
host: String,
@@ -40,16 +44,26 @@ struct Opts {
4044
/// If not specified, exit
4145
#[clap(long)]
4246
sync_dir: String,
43-
/// Date to start syncing from
44-
/// If not specified, start from beginning
45-
/// NOTE: might be unstable, as count cannot be used to verify integrity of sync.
46-
/// Format: YYYY-MM-DD
47-
#[clap(long)]
48-
start_date: Option<String>,
49-
/// Specify buckets to sync
50-
/// If not specified, all buckets will be synced
51-
#[clap(long)]
52-
buckets: Vec<String>,
47+
}
48+
49+
#[derive(Subcommand)]
50+
enum Commands {
51+
/// Clones repos
52+
#[clap(arg_required_else_help = true)]
53+
Sync {
54+
/// Date to start syncing from
55+
/// If not specified, start from beginning
56+
/// NOTE: might be unstable, as count cannot be used to verify integrity of sync.
57+
/// Format: YYYY-MM-DD
58+
#[clap(long)]
59+
start_date: Option<String>,
60+
/// Specify buckets to sync
61+
/// If not specified, all buckets will be synced
62+
#[clap(long)]
63+
buckets: Vec<String>,
64+
},
65+
/// List buckets and their sync status
66+
List {},
5367
}
5468

5569
fn main() -> std::io::Result<()> {
@@ -74,16 +88,37 @@ fn main() -> std::io::Result<()> {
7488

7589
let client = AwClient::new(opts.host.as_str(), port, "aw-sync-rust");
7690

77-
let start: Option<DateTime<Utc>> = opts.start_date.map(|date| {
78-
chrono::DateTime::parse_from_rfc3339(&date)
79-
.unwrap()
80-
.with_timezone(&chrono::Utc)
81-
});
82-
sync::sync_run(sync_directory, client, opts.buckets, start).map_err(|e| {
91+
match &opts.command {
92+
// Perform two-way sync
93+
Commands::Sync {
94+
start_date,
95+
buckets,
96+
} => {
97+
let start: Option<DateTime<Utc>> = start_date.as_ref().map(|date| {
98+
let date_copy = date.clone();
99+
chrono::DateTime::parse_from_rfc3339(&date_copy)
100+
.unwrap()
101+
.with_timezone(&chrono::Utc)
102+
});
103+
104+
sync::sync_run(sync_directory, client, buckets, start).map_err(|e| {
105+
println!("Error: {}", e);
106+
std::io::Error::new(std::io::ErrorKind::Other, e)
107+
})?;
108+
info!("Finished successfully, exiting...");
109+
Ok(())
110+
}
111+
112+
// List all buckets
113+
Commands::List {} => {
114+
sync::list_buckets(&client, sync_directory);
115+
Ok(())
116+
}
117+
}
118+
.map_err(|e: String| {
83119
println!("Error: {}", e);
84120
std::io::Error::new(std::io::ErrorKind::Other, e)
85121
})?;
86-
info!("Finished successfully, exiting...");
87122

88123
// Needed to give the datastores some time to commit before program is shut down.
89124
// 100ms isn't actually needed, seemed to work fine with as little as 10ms, but I'd rather give

0 commit comments

Comments
 (0)