Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

persist: hook up user tables to persistence core behind flag #7283

Merged
merged 1 commit into from
Jul 29, 2021

Conversation

danhhz
Copy link
Contributor

@danhhz danhhz commented Jul 1, 2021

Revive the --persistent_tables flag which enables persistence of user
tables. This is nowhere near ready for even the most experimental of
users, so hide the flag. Really only doing this now to establish the
necessary plumbing and to make sure the interface of the persist crate
plays well with what coord/dataflow need.

@danhhz danhhz requested review from benesch and ruchirK July 1, 2021 17:31
@danhhz
Copy link
Contributor Author

danhhz commented Jul 1, 2021

Opening as a draft as there are still a few open questions I'd like feedback on (look for "WIP"). Also, this is currently untestable as FileBuffer and FileBlob don't clean up LOCK files after themselves and this prevents materialize from restarting unless the LOCK files are manually cleaned up. I'm looking to get this in sooner than later, but at the very least it will have to wait for an end to end test of materialize restarting and correctly keeping data in a persistent user table.

Copy link
Contributor

@ruchirK ruchirK left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Did a quick review of everything and put more thought into the requested WIPs. I have to admit the callback chaining parts kind of lost me I'll need to reread that later to make sure I grok it

} else {
false
};
// WIP what else should go in here? hostname would be nice
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo: start time, number of workers / invocation if its easier, mz_cluster_id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any other opinions here before I add these?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the lock info used for anything in particular, or just display to the user when the lock hasn't been released? If the latter then Ruchir's suggestion SGTM!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just the latter. Done except for mz_cluster_id. Looks like we need to get that one from the catalog but it's not started where we're doing this

render_state
.local_inputs
.insert(src_id, LocalInput { handle, capability });
// WIP is src_id or orig_id the right thing to pass in here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think orig_id is the one you want

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer relevant with new approach

let token = persist
.persister
.create_or_load(&stream_name)
.expect("WIP this could be an error if the persistence runtime has unexpectedly shut down, what do we do with this?");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spitballing: maybe create a non-persistent transient source instead in this case? Not sure I really believe in that because if the feature's behind a hidden flag we don't need to keep tables running at all cost if theres a runtime error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ended up hooking this up to the err_collection

scope.new_persistent_unordered_input(token);
let stream = stream.map(|(row, ts, diff)| {
let row = persistcfg::string_to_row(&row)
.expect("WIP this should be emitted in err_collection");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got nerdsniped into making this work I think this is close to right (only tested that it compiled)

diff --git a/src/dataflow-types/src/errors.rs b/src/dataflow-types/src/errors.rs
index f0b137e72..60e8e0f5c 100644
--- a/src/dataflow-types/src/errors.rs
+++ b/src/dataflow-types/src/errors.rs
@@ -49,6 +49,7 @@ impl Display for SourceError {
 pub enum SourceErrorDetails {
     Initialization(String),
     FileIO(String),
+    Persistence(String),
 }

 impl Display for SourceErrorDetails {
@@ -62,6 +63,7 @@ impl Display for SourceErrorDetails {
                 )
             }
             SourceErrorDetails::FileIO(e) => write!(f, "file IO: {}", e),
+            SourceErrorDetails::Persistence(e) => write!(f, "persistence: {}", e),
         }
     }
 }
diff --git a/src/dataflow/src/render/sources.rs b/src/dataflow/src/render/sources.rs
index 340b08efd..7e053b136 100644
--- a/src/dataflow/src/render/sources.rs
+++ b/src/dataflow/src/render/sources.rs
@@ -97,22 +97,40 @@ where
                             .expect("WIP this could be an error if the persistence runtime has unexpectedly shut down, what do we do with this?");
                         let ((handle, capability), stream) =
                             scope.new_persistent_unordered_input(token);
-                        let stream = stream.map(|(row, ts, diff)| {
-                            let row = persistcfg::string_to_row(&row)
-                                .expect("WIP this should be emitted in err_collection");
-                            (row, ts, diff)
-                        });
-                        Some(((LocalInputHandle::Persistent(handle), capability), stream))
+                        let (ok_stream, err_stream) =
+                            stream.map_fallible(move |(row, ts, diff)| {
+                                persistcfg::string_to_row(&row)
+                                    .map(|row| (row, ts, diff))
+                                    .map_err(|e| {
+                                        SourceError::new(
+                                            stream_name.clone(),
+                                            SourceErrorDetails::Persistence(e),
+                                        )
+                                    })
+                            });
+                        Some((
+                            (LocalInputHandle::Persistent(handle), capability),
+                            ok_stream,
+                            err_stream
+                                .map(DataflowError::SourceError)
+                                .pass_through("table-errors")
+                                .as_collection(),
+                        ))
                     } else {
                         None
                     }
                 } else {
                     None
                 };
-                let ((handle, capability), stream) = persisted_source.unwrap_or_else(|| {
-                    let ((handle, capability), stream) = scope.new_unordered_input();
-                    ((LocalInputHandle::Transient(handle), capability), stream)
-                });
+                let ((handle, capability), ok_stream, err_collection) = persisted_source
+                    .unwrap_or_else(|| {
+                        let ((handle, capability), stream) = scope.new_unordered_input();
+                        (
+                            (LocalInputHandle::Transient(handle), capability),
+                            stream,
+                            Collection::empty(scope),
+                        )
+                    });

                 render_state.local_inputs.insert(
                     src_id,
@@ -122,12 +140,11 @@ where
                     },
                 );
                 let as_of_frontier = self.as_of_frontier.clone();
-                let ok_collection = stream
+                let ok_collection = ok_stream
                     .map_in_place(move |(_, mut time, _)| {
                         time.advance_by(as_of_frontier.borrow());
                     })
                     .as_collection();
-                let err_collection = Collection::empty(scope);
                 self.insert_id(
                     Id::Global(src_id),
                     crate::render::CollectionBundle::from_collections(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Done (except for .pass_through("table-errors") which I didn't recognize, what does that do?)

@danhhz danhhz requested a review from maddyblue July 1, 2021 21:22
@@ -153,6 +153,11 @@ pub enum ExecuteResponse {
TransactionExited {
was_implicit: bool,
tag: &'static str,
/// If Some, a channel containing the results of durably storing the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does a single failure mean any other transaction operations were rolled-back? I kind of wonder if it'd be better for this message to be the final result of a transaction (failed or succeeded) instead of have an additional "still need to process more to find out if it succeeded" step. Can you add a comment either way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's fair, I had been vaguely planning to deal with this as a followup, but it's probably better to bite the bullet and do it now. Lemme think through the details of what the story should be here and get back to this. Persistence already multiplexes multiple "streams" (tables, sources, etc) and individual writes are atomic so given that we already just batch up the whole txn in memory, it's definitely easiest to expose some API for an atomic write to multiple tables and use that.

We could also build support for "rollbacks" (presumably a -1 in dd but the restart recovery part gets tricky here), but I'm hoping to punt on that until at least after 1.0. Do you expect the "txn batches everything up and doesn't do anything with it until commit" model to change anytime soon? Nikhil's instinct here was no.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that's not going to change anytime soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect. Makes this much easier

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay this is now a oneshot that returns success or failure of the entire transaction's writes

/// that executed the INSERT is severed before the write finishes
/// persisting) and errors resulting from that are ignored by the
/// sender.
tx: Option<mpsc::UnboundedSender<Result<(), String>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Frank has banned this from Slack: "Now hear this: no new mpsc queues in the dataflow crate!"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However I'm not sure what they're supposed to be replaced with -_- so 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup! I'm waiting to see what the blessed alternative is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay this is fixed as described by my top level comment

// Make sure any writes have been persisted as necessary.
if let Some(mut rx) = rx {
loop {
match rx.recv().await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I stated above, I think this is the wrong design. Should the presence of an error change the tag? Did the transaction partially commit? Are there atomicity guarantees provided or not provided by persistence? If persistence isn't atomic then we may have to rethink it or what transactions allow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've fixed this to make the transaction's writes atomic.

Should the presence of an error change the tag?

I don't know a ton about sql/pgwire. Can you expand on this?

danhhz added a commit to danhhz/materialize that referenced this pull request Jul 12, 2021
This is in prep for MaterializeInc#7283. The implementation of the method is sadly
still blocking, but after attempting a few fixes, I don't have anything
workable. If possible, would like to merge this as-is and fix the
blocking bit in a followup.
danhhz added a commit to danhhz/materialize that referenced this pull request Jul 12, 2021
This is in prep for MaterializeInc#7283. The implementation of the method is sadly
still blocking, but after attempting a few fixes, I don't have anything
workable. If possible, would like to merge this as-is and fix the
blocking bit in a followup.
danhhz added a commit to danhhz/materialize that referenced this pull request Jul 12, 2021
This is in prep for MaterializeInc#7283. The implementation of the method is sadly
still blocking, but after attempting a few fixes, I don't have anything
workable. If possible, would like to merge this as-is and fix the
blocking bit in a followup.
danhhz added a commit to danhhz/materialize that referenced this pull request Jul 13, 2021
Still TODO is to figure out the hard part of making these retractable
for situations where we can recover, but this does the initial plumbing
so we can hook things up correctly in MaterializeInc#7283.
danhhz added a commit to danhhz/materialize that referenced this pull request Jul 13, 2021
Still TODO is to figure out the hard part of making these retractable
for situations where we can recover, but this does the initial plumbing
so we can hook things up correctly in MaterializeInc#7283.
danhhz added a commit to danhhz/materialize that referenced this pull request Jul 14, 2021
Still TODO is to figure out the hard part of making these retractable
for situations where we can recover, but this does the initial plumbing
so we can hook things up correctly in MaterializeInc#7283.
aljoscha pushed a commit to aljoscha/materialize that referenced this pull request Jul 21, 2021
This is in prep for MaterializeInc#7283. The implementation of the method is sadly
still blocking, but after attempting a few fixes, I don't have anything
workable. If possible, would like to merge this as-is and fix the
blocking bit in a followup.
aljoscha pushed a commit to aljoscha/materialize that referenced this pull request Jul 21, 2021
Still TODO is to figure out the hard part of making these retractable
for situations where we can recover, but this does the initial plumbing
so we can hook things up correctly in MaterializeInc#7283.
danhhz added a commit that referenced this pull request Jul 26, 2021
Copy link
Contributor Author

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay finally figured out what to do here. It's fundamentally different than what we were doing before, so best to review this as an entirely new PR rather than an update (sorry!).

Before, there was a single place in dataflow that wrote down its input and passed it through to the output once that had finished.

Now, the coordinator is entirely responsible for persisting the table updates. Then in dataflow, there's a new(ly used in this PR) persisted_source operator that on startup emits everything that had previously been written to disk and then registers to listen for new successful writes and emits them. This means that we no longer emit SequencedCommand::Inserts for persisted writes, as they'd end up duplicated in the dataflow.

(Internally this listener currently only works if both haves are in the same process, but there's no reason we couldn't make that work over the network in the future. This sets us up for making persistence a nice boundary between coord and dataflow).

Down to 2 WIPs if anyone has opinions on them. Also, I guessed at how I should be doing the error handling in most of the coord changes, so please pay special attention to that.

render_state
.local_inputs
.insert(src_id, LocalInput { handle, capability });
// WIP is src_id or orig_id the right thing to pass in here?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer relevant with new approach

scope.new_persistent_unordered_input(token);
let stream = stream.map(|(row, ts, diff)| {
let row = persistcfg::string_to_row(&row)
.expect("WIP this should be emitted in err_collection");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Done (except for .pass_through("table-errors") which I didn't recognize, what does that do?)

@@ -153,6 +153,11 @@ pub enum ExecuteResponse {
TransactionExited {
was_implicit: bool,
tag: &'static str,
/// If Some, a channel containing the results of durably storing the
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay this is now a oneshot that returns success or failure of the entire transaction's writes

let token = persist
.persister
.create_or_load(&stream_name)
.expect("WIP this could be an error if the persistence runtime has unexpectedly shut down, what do we do with this?");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ended up hooking this up to the err_collection

/// that executed the INSERT is severed before the write finishes
/// persisting) and errors resulting from that are ignored by the
/// sender.
tx: Option<mpsc::UnboundedSender<Result<(), String>>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay this is fixed as described by my top level comment

} else {
false
};
// WIP what else should go in here? hostname would be nice
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any other opinions here before I add these?

// Make sure any writes have been persisted as necessary.
if let Some(mut rx) = rx {
loop {
match rx.recv().await {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've fixed this to make the transaction's writes atomic.

Should the presence of an error change the tag?

I don't know a ton about sql/pgwire. Can you expand on this?

@danhhz danhhz marked this pull request as ready for review July 27, 2021 21:20
@@ -1076,6 +1080,27 @@ where
);
self.conn.send(msg).await?;
}

// Make sure any writes have been persisted as necessary.
if let Some(rx) = rx {
Copy link
Contributor

@maddyblue maddyblue Jul 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think rx should not go onto the TransactionExited struct. This whole block should be handled in sequence_end_transaction instead, which can then present the final results to the pgwire crate. sequence_end_transaction's job is to do the hard work and figure out if a transaction committed or not. I think this means that sequence_end_transaction needs to spin up a tokio task to wait for the rx, and then send back the result to pgwire, so sequence_end_transaction may also need to get passed the client tx.

This makes me worried about table compaction. Immediately after a table write txn is committed, coord sends a message to dataflow to advance all table frontiers. If persistence is happening in another task, how does it guarantee that dataflow diffs are sent to dataflow before that happens?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got this approach from @benesch (it's meant to mirror how tail works). What you're suggesting also makes sense to me, but I have no opinion here; happy to do whatever the two of you agree on.

Addressed the table compaction comment below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, pushed a new commit (which I'll squash before merging) that does this the way we discussed offline. Thanks for the handholding through this!

//
// TODO: Allow sealing multiple streams at once to reduce the
// overhead.
let (tx, rx) = std::sync::mpsc::channel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is called about 5x per second. Is the below performance ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not even close but it only does anything if you specify --experimental and --persistent-tables. Even just --experimental ends up being a no-op here (the persist field on Table is false).

Meta question I have: the persistence usage in coord is necessarily split up a bit and really wants a big comment somewhere about how it all fits together, any opinions on what the best place for that is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Does it make sense to put this new code behind that flag then? It would prevent needless creation of a channel and iterating over the entire catalog many times per second.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't worried about the channel, but I hadn't been thinking about the cost of iterating the catalog. Great point! Done

..
}) = entry.item()
{
persist.write_handle.seal(next_ts, tx.clone().into());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might address my concern about table compaction, and this thing here makes sure persistence writes are complete before closing the ts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write call and this call both enqueue something in a channel before returning, similar to how SequencedCommand works. The channel is then processed in order, so as long as the calls are serialized, I think this is okay?

@ruchirK
Copy link
Contributor

ruchirK commented Jul 28, 2021

looking again now!

/// interacting with it. Returns None and does not start the runtime if all
/// persistence features are disabled.
pub fn init(&self) -> Result<PersisterWithConfig, anyhow::Error> {
let persister = if self.user_table_enabled {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haven't read through the whole thing yet but this condition on user_table_enabled is confusing to me -- it seems like if user_table_enabled is false then all tables (user and system) will not be persisted (this could be a misread). Not sure if that is the current intent and perhaps the flag should be renamed or if the intent is to always persist system tables and only conditionally persist user tables

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System tables are unconditionally disabled elsewhere right now (the persisted field is false for the definition of each). When we turn them on by default, we'll have to add an opt-out escape hatch flag, so I think this is fine for now?

//
// TODO: Allow sealing multiple streams at once to reduce the
// overhead.
let (tx, rx) = std::sync::mpsc::channel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Does it make sense to put this new code behind that flag then? It would prevent needless creation of a channel and iterating over the entire catalog many times per second.

Copy link
Member

@benesch benesch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only looked at the parts that I knew how to review (didn't look under the covers of the persist API, for example, or at the stuff in dataflow/server.rs in tooo much detail), but the structure all seems great to me!

let persist = if table.persistent {
catalog
.persist_details(table.id)
.expect("WIP how do I turn this into a catalog::Error?")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just add a new variant that embeds persist::Error or whatever you have!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

} else {
false
};
// WIP what else should go in here? hostname would be nice
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the lock info used for anything in particular, or just display to the user when the lock hasn't been released? If the latter then Ruchir's suggestion SGTM!

Copy link
Contributor Author

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I think I hit everything. Thanks for the reviews everyone!

//
// TODO: Allow sealing multiple streams at once to reduce the
// overhead.
let (tx, rx) = std::sync::mpsc::channel();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't worried about the channel, but I hadn't been thinking about the cost of iterating the catalog. Great point! Done

} else {
false
};
// WIP what else should go in here? hostname would be nice
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just the latter. Done except for mz_cluster_id. Looks like we need to get that one from the catalog but it's not started where we're doing this

let persist = if table.persistent {
catalog
.persist_details(table.id)
.expect("WIP how do I turn this into a catalog::Error?")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


async fn sequence_end_transaction_inner(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#7593 got merged, so this can be made not async now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Working through the rebase now and I've made it not async

Copy link
Contributor

@ruchirK ruchirK left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! I looked at all the places where this hooks into the dataflow layer and that all seemed reasonable to me. I didn't look super closely at the coordinator changes, but if it works for Matt it works for me

Revive the --persistent_tables flag which enables persistence of user
tables. This is nowhere near ready for even the most experimental of
users, so hide the flag. Really only doing this now to establish the
necessary plumbing and to make sure the interface of the persist crate
plays well with what coord/dataflow need.
@danhhz danhhz merged commit 9346641 into MaterializeInc:main Jul 29, 2021
@danhhz danhhz deleted the persist_usertables_flag branch July 29, 2021 15:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants