Skip to content

Commit

Permalink
Send changes only when Unison is waiting for them
Browse files Browse the repository at this point in the history
Closes: #17
  • Loading branch information
thomas-ross-aws authored and autozimu committed May 27, 2024
1 parent 810322d commit 6fa200f
Showing 1 changed file with 100 additions and 3 deletions.
103 changes: 100 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ struct Replica {
pub paths: HashSet<PathBuf>,
/// Paths of pending changes. Paths are relative as required by unison.
pub pending_changes: HashSet<PathBuf>,
/// Whether or not unison is waiting for this replica.
pub waited_on: bool,
}

impl Replica {
Expand All @@ -76,6 +78,7 @@ impl Replica {
root,
paths: HashSet::new(),
pending_changes: HashSet::new(),
waited_on: false,
}
}

Expand Down Expand Up @@ -117,7 +120,15 @@ impl<WATCH: Watch, WRITE: Write> Monitor<WATCH, WRITE> {
Event::Input(input) => {
let (cmd, args) = parse_input(&input)?;

match cmd.as_str() {
let cmd_str = cmd.as_str();

if cmd_str != "WAIT" {
for replica in self.replicas.values_mut() {
replica.waited_on = false;
}
}

match cmd_str {
"VERSION" => {
let version = &args[0];
if version != "1" {
Expand Down Expand Up @@ -176,7 +187,12 @@ impl<WATCH: Watch, WRITE: Write> Monitor<WATCH, WRITE> {
"WAIT" => {
// Start waiting replica.
let replica_id = &args[0];
if !self.replicas.contains_key(replica_id) {
if let Some(replica) = self.replicas.get_mut(replica_id) {
replica.waited_on = true;
if !replica.pending_changes.is_empty() {
self.send_changes(replica_id);
}
} else {
self.send_error(&format!("Unknown replica: {}", replica_id));
}
}
Expand Down Expand Up @@ -242,7 +258,11 @@ impl<WATCH: Watch, WRITE: Write> Monitor<WATCH, WRITE> {
}

for id in &matched_replica_ids {
self.send_changes(id);
if let Some(replica) = self.replicas.get(id) {
if replica.waited_on {
self.send_changes(id);
}
}
}
}
}
Expand Down Expand Up @@ -459,6 +479,49 @@ mod test {
cookie: None,
}))
.unwrap();
monitor
.handle_event(Event::Input(format!("WAIT {}\n", id)))
.unwrap();
monitor
.handle_event(Event::Input(format!("CHANGES {}\n", id)))
.unwrap();

monitor.writer.set_position(0);
assert_eq!(
monitor
.writer
.lines()
.collect::<Result<Vec<String>, _>>()
.unwrap(),
vec![
"OK",
&format!("CHANGES {}", id),
&format!("RECURSIVE {}", filename),
"DONE"
]
);
}

#[test]
fn test_changes_after_wait() {
let mut monitor = Monitor::new(Watcher {}, Cursor::new(vec![]));
let id = "123";
let root = "/tmp/sample";
let filename = "filename";

monitor
.handle_event(Event::Input(format!("START {} {}\n", id, root)))
.unwrap();
monitor
.handle_event(Event::Input(format!("WAIT {}\n", id)))
.unwrap();
monitor
.handle_event(Event::FSEvent(RawEvent {
path: Option::Some(PathBuf::from(root).join(filename)),
op: Result::Ok(Op::CREATE),
cookie: None,
}))
.unwrap();
monitor
.handle_event(Event::Input(format!("CHANGES {}\n", id)))
.unwrap();
Expand Down Expand Up @@ -497,6 +560,9 @@ mod test {
cookie: None,
}))
.unwrap();
monitor
.handle_event(Event::Input(format!("WAIT {}\n", id)))
.unwrap();
monitor
.handle_event(Event::Input(format!("CHANGES {}\n", id)))
.unwrap();
Expand All @@ -516,6 +582,37 @@ mod test {
]
);
}

#[test]
fn test_changes_no_wait() {
let mut monitor = Monitor::new(Watcher {}, Cursor::new(vec![]));
let id = "123";
let root = "/tmp/sample";
let filename = "filename";

monitor
.handle_event(Event::Input(format!("START {} {}\n", id, root)))
.unwrap();
monitor
.handle_event(Event::FSEvent(RawEvent {
path: Option::Some(PathBuf::from(root).join(filename)),
op: Result::Ok(Op::CREATE),
cookie: None,
}))
.unwrap();

monitor.writer.set_position(0);
assert_eq!(
monitor
.writer
.lines()
.collect::<Result<Vec<String>, _>>()
.unwrap(),
vec![
"OK",
]
);
}
}

fn main() -> Fallible<()> {
Expand Down

0 comments on commit 6fa200f

Please sign in to comment.