Skip to content

Commit

Permalink
Merge pull request #12246 from petrosagg/validate-append-batches
Browse files Browse the repository at this point in the history
storage: validate append batches are well formed
  • Loading branch information
petrosagg committed May 5, 2022
2 parents 6b4e249 + e6604d8 commit 95171c5
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 34 deletions.
56 changes: 22 additions & 34 deletions src/coord/src/coord.rs
Expand Up @@ -224,12 +224,6 @@ pub struct SinkConnectorReady {
pub compute_instance: ComputeInstanceId,
}

#[derive(Debug)]
pub struct TimestampedUpdate {
pub updates: Vec<BuiltinTableUpdate>,
pub timestamp_offset: u64,
}

/// Configures a coordinator.
pub struct Config {
pub dataflow_client: mz_dataflow_types::client::Controller,
Expand Down Expand Up @@ -807,9 +801,21 @@ impl Coordinator {
appends.push((id, vec![], advance_to));
}
}

// Scratch vector holding all updates that happen at `t < advance_to`
let mut scratch = vec![];
// Then drain all pending writes and prepare append commands
for (id, updates) in self.volatile_updates.drain() {
appends.push((id, updates, advance_to));
for (id, updates) in self.volatile_updates.iter_mut() {
// TODO(petrosagg): replace with `drain_filter` once it stabilizes
let mut cursor = 0;
while let Some(update) = updates.get(cursor) {
if update.timestamp < advance_to {
scratch.push(updates.swap_remove(cursor));
} else {
cursor += 1;
}
}
appends.push((*id, scratch.split_off(0), advance_to));
}
self.dataflow_client
.storage_mut()
Expand Down Expand Up @@ -4389,33 +4395,15 @@ impl Coordinator {
Ok(result)
}

async fn send_builtin_table_updates_at_offset(&mut self, updates: Vec<TimestampedUpdate>) {
// NB: This makes sure to send all records for the same id in the same
// message.
let timestamp_base = self.get_local_write_ts();
let mut updates_by_id = HashMap::<GlobalId, Vec<Update>>::new();
for tu in updates.into_iter() {
let timestamp = timestamp_base + tu.timestamp_offset;
for u in tu.updates {
updates_by_id.entry(u.id).or_default().push(Update {
row: u.row,
diff: u.diff,
timestamp,
});
}
}
for (id, updates) in updates_by_id {
self.volatile_updates.entry(id).or_default().extend(updates);
}
}

async fn send_builtin_table_updates(&mut self, updates: Vec<BuiltinTableUpdate>) {
let timestamped = TimestampedUpdate {
updates,
timestamp_offset: 0,
};
self.send_builtin_table_updates_at_offset(vec![timestamped])
.await
let timestamp = self.get_local_write_ts();
for u in updates {
self.volatile_updates.entry(u.id).or_default().push(Update {
row: u.row,
diff: u.diff,
timestamp,
});
}
}

async fn drop_sinks(&mut self, sinks: Vec<(ComputeInstanceId, GlobalId)>) {
Expand Down
13 changes: 13 additions & 0 deletions src/dataflow-types/src/client/controller/storage.rs
Expand Up @@ -170,6 +170,8 @@ pub enum StorageError {
SourceIdReused(GlobalId),
/// The source identifier is not present.
IdentifierMissing(GlobalId),
/// The update contained in the appended batch was at a timestamp beyond the batche's upper
UpdateBeyondUpper(GlobalId),
/// An error from the underlying client.
ClientError(anyhow::Error),
/// An operation failed to read or write state
Expand All @@ -181,6 +183,7 @@ impl Error for StorageError {
match self {
Self::SourceIdReused(_) => None,
Self::IdentifierMissing(_) => None,
Self::UpdateBeyondUpper(_) => None,
Self::ClientError(_) => None,
Self::IOError(err) => Some(err),
}
Expand All @@ -196,6 +199,9 @@ impl fmt::Display for StorageError {
"source identifier was re-created after having been dropped: {id}"
),
Self::IdentifierMissing(id) => write!(f, "source identifier is not present: {id}"),
Self::UpdateBeyondUpper(id) => {
write!(f, "append batch for {id} contained update beyond its upper")
}
Self::ClientError(err) => write!(f, "underlying client error: {err}"),
Self::IOError(err) => write!(f, "failed to read or write state: {err}"),
}
Expand Down Expand Up @@ -363,6 +369,13 @@ where
&mut self,
commands: Vec<(GlobalId, Vec<Update<Self::Timestamp>>, Self::Timestamp)>,
) -> Result<(), StorageError> {
for (id, updates, upper) in commands.iter() {
for update in updates {
if !update.timestamp.less_than(upper) {
return Err(StorageError::UpdateBeyondUpper(*id));
}
}
}
self.state
.client
.send(StorageCommand::Append(commands))
Expand Down

0 comments on commit 95171c5

Please sign in to comment.