Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

persist: introduce filter pushdown audit #18648

Merged
merged 1 commit into from
Apr 21, 2023

Conversation

danhhz
Copy link
Contributor

@danhhz danhhz commented Apr 6, 2023

This is a way to verify the ongoing end-to-end correctness of filter pushdown via "audit". When the filter rejects a part as completely unnecessary, we sometimes mark it with an audit bit. This means we fetch the part like normal and if the MFP keeps anything from it, then something has gone horribly wrong.

Touches #12684

Motivation

  • This PR adds a known-desirable feature.

Tips for reviewer

I manually tested this with bin/environmentd and a bug in the impl of should_fetch

diff --git a/src/storage-client/src/source/persist_source.rs b/src/storage-client/src/source/persist_source.rs
index f518cd5bb..b8164dae7 100644
--- a/src/storage-client/src/source/persist_source.rs
+++ b/src/storage-client/src/source/persist_source.rs
@@ -140,7 +140,7 @@ where
         move |stats| {
             mfp_pushdown.as_ref().map_or(true, |x| {
                 x.should_fetch(&PersistSourceDataStatsImpl { desc: &desc, stats })
-            })
+            }) && false
         },
     );
     let rows = decode_and_mfp(&fetched, &name, until, map_filter_project, yield_fn);
thread 'timely:work-0' panicked at 'persist filter pushdown correctness violation! u1 val=Ok((Row{[String("b"), UInt64(3), String("c")]}, 1680806140498, 1)) mfp=Some(MfpPlan { mfp: SafeMfpPlan { mfp: MapFilterProject { expressions: [], predicates: [(2, CallBinary { func: Gt, expr1: CallUnary { func: CastUint64ToNumeric(CastUint64ToNumeric(None)), expr: Column(1) }, expr2: Literal(Ok(Row{[Numeric(OrderedDecimal(2))]}), ColumnType { scalar_type: Numeric { max_scale: None }, nullable: false }) })], projection: [0, 1, 2], input_arity: 3 } }, lower_bounds: [], upper_bounds: [] })', ...

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered.
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • This PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way) and therefore is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • This PR includes the following user-facing behavior changes:

@danhhz danhhz requested review from bkirwi and a team April 6, 2023 18:39
@danhhz danhhz requested a review from a team as a code owner April 6, 2023 18:39
Copy link
Contributor

@bkirwi bkirwi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wise! All comments minor and nonblocking.

@@ -348,6 +349,7 @@ where
/// long as necessary to ensure the `SeqNo` isn't garbage collected while a
/// read still depends on it.
pub(crate) leased_seqno: Option<SeqNo>,
pub(crate) filter_pushdown_audit: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mild preference for this to turn into a wrapping struct in shard_source instead of pushing it in here. (Since it's a source-specific concern, and I doubt this is the last metadata we'll want to pass along.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same idea when I was typing this up, but that would mean a second copy of the Serde part, the Encoded part, and the Fetched part and I don't think the verbosity is worth it. gonna push back on this one, if that's okay

@@ -405,15 +407,26 @@ where
// atomically emit all parts here (e.g. no awaits).
let bytes_emitted = {
let mut bytes_emitted = 0;
for part_desc in std::mem::take(&mut batch_parts) {
for mut part_desc in std::mem::take(&mut batch_parts) {
// TODO(mfp): Push the filter down into the Subscribe?
if cfg.dynamic.stats_filter_enabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set the audit bit even if filtering's not enabled? (Might be overkill to assert on it, but a warning would be cool.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not run the logic if the feature flag is off. that would give us an out if e.g. the filtering took more cpu than we expect or something

if is_filter_pushdown_audit {
// Ideally we'd be able to include the part stats here, but that
// would require us to exchange them around. It's unclear if that's
// worth it for work that's already known to be unnecessary.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Threading around / logging the PartialBatchKey might be enough to be useful, but I agree this is fine the way it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a great idea! no guarantee that the part sticks around long enough for us to look at it, but definitely better than nothing

This is a way to verify the ongoing end-to-end correctness of filter
pushdown via "audit". When the filter rejects a part as completely
unnecessary, we sometimes mark it with an audit bit. This means we fetch
the part like normal and if the MFP keeps anything from it, then
something has gone horribly wrong.
@danhhz
Copy link
Contributor Author

danhhz commented Apr 21, 2023

TFTR!

@danhhz danhhz enabled auto-merge April 21, 2023 14:49
@danhhz danhhz merged commit 06ea75c into MaterializeInc:main Apr 21, 2023
@danhhz danhhz deleted the persist_mfp_audit branch April 21, 2023 15:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants