Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: validate append batches are well formed #12246

Merged
merged 3 commits into from May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 22 additions & 34 deletions src/coord/src/coord.rs
Expand Up @@ -224,12 +224,6 @@ pub struct SinkConnectorReady {
pub compute_instance: ComputeInstanceId,
}

#[derive(Debug)]
pub struct TimestampedUpdate {
pub updates: Vec<BuiltinTableUpdate>,
pub timestamp_offset: u64,
}

/// Configures a coordinator.
pub struct Config {
pub dataflow_client: mz_dataflow_types::client::Controller,
Expand Down Expand Up @@ -807,9 +801,21 @@ impl Coordinator {
appends.push((id, vec![], advance_to));
}
}

// Scratch vector holding all updates that happen at `t < advance_to`
let mut scratch = vec![];
// Then drain all pending writes and prepare append commands
for (id, updates) in self.volatile_updates.drain() {
appends.push((id, updates, advance_to));
for (id, updates) in self.volatile_updates.iter_mut() {
// TODO(petrosagg): replace with `drain_filter` once it stabilizes
let mut cursor = 0;
while let Some(update) = updates.get(cursor) {
if update.timestamp < advance_to {
Copy link
Contributor

@maddyblue maddyblue May 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that these updates are coming from the send_builtin_table_updates_at_offset function, where we use some future timestamp to correctly retract data without possibility for forgetting it. This PR removes the second half of that ("correctly retract"), because now the data are split and backed by a volatile in memory data structure. We (coord) don't have plans on making that WAL-backed for now (it's hard and we don't need it for any upcoming milestone).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These updates come from anything that write to tables, including what you said but also user INSERT statements, which must conform to the semantics of the Append operation. Is there another way to cut the batches that makes coord happy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that between the various needs here, send_builtin_table_updates_at_offset is on the losing end and should be changed to not do its magical timestamp retraction thing. If we commit this PR as is, I think any coord restart will always create some permanently wrong metrics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, how do INSERTs generate timestamps that would trigger this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to tag in @jkosh44 here? There are various other "tables have the wrong data on restart" issues that he's looked at, where one conclusion was that it might be most ergonomic to support e.g. truncate(table_id).

But, we should def see if we agree on the API and that/whether the calls are each meant to be durable (even if they are not currently so).

edit: Ignore me; I though this was storage controller code, rather than adapter code.

Copy link
Contributor

@maddyblue maddyblue May 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the TimestampOracle does correct things such that advance_local_inputs would always append all INSERT data, even with this PR. Is that not the case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No that is not the case, and it's related to my own misconception of what the TimestampOracle does. We currently call advance_local_inputs whenever the oracle returns Some(ts) from its should_advance_to call. This however will return the same timestamp that is currently used for writing, if we are in writing mode.

This means that we are only able to append to tables the INSERT data that has been written in strictly previous times then the current time of the oracle. If the oracle is in reading mode then you're correct that all INSERT data is appended to the tables. If the oracle is in writing mode however we must exclude the timestamp that is currently being written, and therefore there might be some pending INSERT data that has to wait until the next time we decide to read.

I looked into send_builtin_table_updates_at_offset and we were only calling it with an offset of zero so I removed it in this PR and did the work directly in send_builtin_table_updates

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense now. I didn't have a full understanding of how your Append change some weeks ago interacted with the TimestampOracle. This analysis sounds correct. I'm ok with this PR now.

scratch.push(updates.swap_remove(cursor));
} else {
cursor += 1;
}
}
appends.push((*id, scratch.split_off(0), advance_to));
}
self.dataflow_client
.storage_mut()
Expand Down Expand Up @@ -4389,33 +4395,15 @@ impl Coordinator {
Ok(result)
}

async fn send_builtin_table_updates_at_offset(&mut self, updates: Vec<TimestampedUpdate>) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing that used this has apparently gone away, great! Looks like you can also remove the TimestampedUpdate struct now too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! I removed the struct and hit auto-merge

// NB: This makes sure to send all records for the same id in the same
// message.
let timestamp_base = self.get_local_write_ts();
let mut updates_by_id = HashMap::<GlobalId, Vec<Update>>::new();
for tu in updates.into_iter() {
let timestamp = timestamp_base + tu.timestamp_offset;
for u in tu.updates {
updates_by_id.entry(u.id).or_default().push(Update {
row: u.row,
diff: u.diff,
timestamp,
});
}
}
for (id, updates) in updates_by_id {
self.volatile_updates.entry(id).or_default().extend(updates);
}
}

async fn send_builtin_table_updates(&mut self, updates: Vec<BuiltinTableUpdate>) {
let timestamped = TimestampedUpdate {
updates,
timestamp_offset: 0,
};
self.send_builtin_table_updates_at_offset(vec![timestamped])
.await
let timestamp = self.get_local_write_ts();
for u in updates {
self.volatile_updates.entry(u.id).or_default().push(Update {
row: u.row,
diff: u.diff,
timestamp,
});
}
}

async fn drop_sinks(&mut self, sinks: Vec<(ComputeInstanceId, GlobalId)>) {
Expand Down
13 changes: 13 additions & 0 deletions src/dataflow-types/src/client/controller/storage.rs
Expand Up @@ -170,6 +170,8 @@ pub enum StorageError {
SourceIdReused(GlobalId),
/// The source identifier is not present.
IdentifierMissing(GlobalId),
/// The update contained in the appended batch was at a timestamp beyond the batche's upper
UpdateBeyondUpper(GlobalId),
/// An error from the underlying client.
ClientError(anyhow::Error),
/// An operation failed to read or write state
Expand All @@ -181,6 +183,7 @@ impl Error for StorageError {
match self {
Self::SourceIdReused(_) => None,
Self::IdentifierMissing(_) => None,
Self::UpdateBeyondUpper(_) => None,
Self::ClientError(_) => None,
Self::IOError(err) => Some(err),
}
Expand All @@ -196,6 +199,9 @@ impl fmt::Display for StorageError {
"source identifier was re-created after having been dropped: {id}"
),
Self::IdentifierMissing(id) => write!(f, "source identifier is not present: {id}"),
Self::UpdateBeyondUpper(id) => {
write!(f, "append batch for {id} contained update beyond its upper")
}
Self::ClientError(err) => write!(f, "underlying client error: {err}"),
Self::IOError(err) => write!(f, "failed to read or write state: {err}"),
}
Expand Down Expand Up @@ -363,6 +369,13 @@ where
&mut self,
commands: Vec<(GlobalId, Vec<Update<Self::Timestamp>>, Self::Timestamp)>,
) -> Result<(), StorageError> {
for (id, updates, upper) in commands.iter() {
for update in updates {
if !update.timestamp.less_than(upper) {
return Err(StorageError::UpdateBeyondUpper(*id));
}
}
}
self.state
.client
.send(StorageCommand::Append(commands))
Expand Down