Skip to content

Commit

Permalink
coord: send builtin table updates directly
Browse files Browse the repository at this point in the history
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
  • Loading branch information
petrosagg committed May 5, 2022
1 parent aa51619 commit 49ecbb6
Showing 1 changed file with 8 additions and 26 deletions.
34 changes: 8 additions & 26 deletions src/coord/src/coord.rs
Expand Up @@ -4401,33 +4401,15 @@ impl Coordinator {
Ok(result)
}

async fn send_builtin_table_updates_at_offset(&mut self, updates: Vec<TimestampedUpdate>) {
// NB: This makes sure to send all records for the same id in the same
// message.
let timestamp_base = self.get_local_write_ts();
let mut updates_by_id = HashMap::<GlobalId, Vec<Update>>::new();
for tu in updates.into_iter() {
let timestamp = timestamp_base + tu.timestamp_offset;
for u in tu.updates {
updates_by_id.entry(u.id).or_default().push(Update {
row: u.row,
diff: u.diff,
timestamp,
});
}
}
for (id, updates) in updates_by_id {
self.volatile_updates.entry(id).or_default().extend(updates);
}
}

async fn send_builtin_table_updates(&mut self, updates: Vec<BuiltinTableUpdate>) {
let timestamped = TimestampedUpdate {
updates,
timestamp_offset: 0,
};
self.send_builtin_table_updates_at_offset(vec![timestamped])
.await
let timestamp = self.get_local_write_ts();
for u in updates {
self.volatile_updates.entry(u.id).or_default().push(Update {
row: u.row,
diff: u.diff,
timestamp,
});
}
}

async fn drop_sinks(&mut self, sinks: Vec<(ComputeInstanceId, GlobalId)>) {
Expand Down

0 comments on commit 49ecbb6

Please sign in to comment.