Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rowexec: make lookup joins with no required ordering more efficient #48439

Merged
merged 2 commits into from May 12, 2020

Conversation

asubiotto
Copy link
Contributor

This is achieved by abstracting out order-maintaining code into a joinReaderStrategy and using a strategy that doesn't preserve the input ordering. The results of this are here. The true improvements though, are in being able to use a much higher lookup batch size for these cases since looked up rows don't need to be buffered. This will be added in a future PR.

@sumeerbhola could you take a look at the refactor of the joinReader into separate strategies? Ideally, you would add your invertedJoiner behavior as a new joinReaderStrategy. Let me know what you think of this.

Release note (performance improvement): lookup joins with no required ordering are now more efficient.

Closes #48117

@asubiotto asubiotto requested a review from yuzefovich May 5, 2020 15:33
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@blathers-crl
Copy link

blathers-crl bot commented May 5, 2020

❌ The GitHub CI (Cockroach) build has failed on 86229add.

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is otan.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice improvement!

Reviewed 3 of 3 files at r1, 3 of 3 files at r2.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)


pkg/sql/rowexec/joinreader.go, line 230 at r2 (raw file):

	jr.diskMonitor = execinfra.NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "joinreader-disk")
	drc := rowcontainer.NewDiskBackedIndexedRowContainer(
		nil, /* ordering */

Hm, I wonder whether this should be non-nil?


pkg/sql/rowexec/joinreader_strategies.go, line 25 at r2 (raw file):

	processLookupRows(rows []sqlbase.EncDatumRow)
	// processLookedUpRow processes a looked up row. A joinReaderState is returned
	// to indicate the next state to transition to. If jrPerformingLookup,

nit: I'm a little confused by "If jrPerformingLookup" - feels like a few words are missing.


pkg/sql/rowexec/joinreader_strategies.go, line 60 at r2 (raw file):

		// to be emitted.
		processingLookupRow     bool
		curUnmatchedInputRowIdx int

nit: maybe rename curUnmatchedInputRowIdx to unmatchedInputRowIndicesCursor (and the same for matchingInputRowIndices) so that there are less "indices" flying around?


pkg/sql/rowexec/joinreader_strategies.go, line 79 at r2 (raw file):

		s.matched = s.matched[:len(s.inputRows)]
		for i := range s.matched {
			s.matched[i] = false

Do you think it makes sense here to use something like zeroBoolVector to zero out matched in chunks?


pkg/sql/rowexec/joinreader_strategies.go, line 87 at r2 (raw file):

	_ context.Context, row sqlbase.EncDatumRow, matchingInputRowIndices []int,
) (joinReaderState, error) {
	if s.isPartialJoin {

Hm, we probably could be smarter here. IIUC "partial" joins are LEFT SEMI and LEFT ANTI, and it seems to me that all unmatched rows can be ignored in LEFT SEMI case, right?


pkg/sql/rowexec/joinreader_strategies.go, line 91 at r2 (raw file):

		// matched yet. Make a copy of the matching input row indices to avoid
		// overwriting the caller's slice.
		if len(s.scratchMatchingInputRowIndices) != 0 {

Could there be a NPE if we remove the if condition and always slice up to :0?


pkg/sql/rowexec/joinreader_strategies.go, line 113 at r2 (raw file):

	if !s.emitState.processingLookupRow {
		// processLookedUpRow was not called before nextRowToEmit, which means that
		// the next unmatched row needs to be emitted.

nit: maybe s/to be emitted/to be processed/g because it won't always be emitted?


pkg/sql/rowexec/joinreader_strategies.go, line 157 at r2 (raw file):

		}
		if outputRow == nil {
			// This row failed the on condition, so remains unmatched.

nit: s/on/ON/g, s/so remains/so it remains/g.


pkg/sql/rowexec/joinreader_strategies.go, line 158 at r2 (raw file):

		if outputRow == nil {
			// This row failed the on condition, so remains unmatched.
			s.matched[inputRowIdx] = false

Do we always want to unset this? What if we have multiple looked up rows, and some of them match whereas other do not. I think we want to remove s.matched[inputRowIdx] = true above as well as this unsetting and simply set matched to true after if outputRow == nil block. (I might be off here though.)

Update: Or I guess we always process a single lookup row at a time?


pkg/sql/rowexec/joinreader_strategies.go, line 162 at r2 (raw file):

		}

		if !shouldIncludeRightColsInOutput(s.joinType) {

I feel like this outer if block should not be necessary - we evaluated the ON condition and rendered the correct columns, right? So we could only check whether we have LEFT ANTI join, and if not, we could always do return outputRow, jrEmittingRows, nil.


pkg/sql/rowexec/joinreader_strategies.go, line 176 at r2 (raw file):

	// nextRowToEmit, the strategy knows that no more lookup rows were processed
	// and should proceed to emit unmatched rows.
	s.emitState.processingLookupRow = false

This knob seems a little sketchy to me. Do you think it would make sense to introduce an additional join reader state?


pkg/sql/rowexec/joinreader_strategies.go, line 200 at r2 (raw file):

	// indicating a matching lookup if it's present, since the right side of a
	// semi/anti join is not used.
	inputRowIdxToLookedUpRowIdx [][]int

nit: it probably makes sense to s/inputRowIdxToLookedUpRowIdx/inputRowIdxToLookedUpRowIndices/g.

Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)


pkg/sql/rowexec/joinreader.go, line 95 at r2 (raw file):

	// State variables for each batch of input rows.
	scratchInputRows     sqlbase.EncDatumRows
	keyToInputRowIndices map[string][]int

btw, this is an example of how what joinReader is doing is quite different from invertedJoiner -- here we are matching on equality, hence the map. generateSpan is different too.
Also, the specs are different, and invertedJoiner can't avoid storing the looked up inverted index rows in a row container (which will have a different setting since it will be deduping). So the joinReaderStrategy is not sufficient to unify the two.

My opinion is that crucial optimizations, like the one in this PR, are more important than code unification. Also we will probably need to aggressively optimize the inverted joiner once we start experimenting with geospatial data -- it would be a pity if it was harder to iterate because of premature unification. And long term, if we are serious about optimizing inverted indexes, the divergence will further increase.


pkg/sql/rowexec/joinreader.go, line 230 at r2 (raw file):

Previously, yuzefovich wrote…

Hm, I wonder whether this should be non-nil?

This gives me an opportunity to ask a question -- I am not clear on how DiskBackedIndexedRowContainer is achieving the index based retrieval. It doesn't look like the index is the first element of the key, since it puts the index int at the end of scratchEncRow.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @sumeerbhola)


pkg/sql/rowexec/joinreader.go, line 230 at r2 (raw file):

Previously, sumeerbhola wrote…

This gives me an opportunity to ask a question -- I am not clear on how DiskBackedIndexedRowContainer is achieving the index based retrieval. It doesn't look like the index is the first element of the key, since it puts the index int at the end of scratchEncRow.

Hm, I wrote the code, and it is quite confusing to me as well :/

I think the difficulty here is that word "index" is overloaded in regards to DiskBackedIndexedRowContainer. It was designed so that whenever we add a row into the container, an ordinal is appended to the row, and the row with the ordinal is stored. That "ordinal" is what "Indexed" refers to in the name of the container. This container can be "reordered" (meaning that rows will be exported from a container with old ordering to a newly-created container with new ordering), so "positions" of the rows can change, yet the "ordinals" will be kept the same. After reordering of the container, original "ordinal" of a row might not be the same as current "position" of the row in the container.

To answer your question, there is no magic in this container to retrieve a row by its index (I think here "index" means current "position") - we simply create an iterator and advance it as many times as necessary. There is an optimization that we're reusing the same iterator when possible, but if we advanced too far - we have to rewind and start from scratch.

This discussion points out that joinReader's use case is different from what DiskBackedIndexedRowContainer was designed for (which was window functions), and lookup join's - when spilled to disk - performance should improve if we are to create a separate container (or play around with disabling the cache / changing the cache size) as #48118 outlines.

Copy link
Contributor Author

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @sumeerbhola)


pkg/sql/rowexec/joinreader.go, line 95 at r2 (raw file):

Previously, sumeerbhola wrote…

btw, this is an example of how what joinReader is doing is quite different from invertedJoiner -- here we are matching on equality, hence the map. generateSpan is different too.
Also, the specs are different, and invertedJoiner can't avoid storing the looked up inverted index rows in a row container (which will have a different setting since it will be deduping). So the joinReaderStrategy is not sufficient to unify the two.

My opinion is that crucial optimizations, like the one in this PR, are more important than code unification. Also we will probably need to aggressively optimize the inverted joiner once we start experimenting with geospatial data -- it would be a pity if it was harder to iterate because of premature unification. And long term, if we are serious about optimizing inverted indexes, the divergence will further increase.

The equality matching can be moved out of here into the strategy. The joinReader could simply pass in a lookedUpRow and it would be up to the strategy to decide what to do with it. Span generation is also something that can easily be moved into the strategies (as a return value from processLookupRows). Regarding the row container, that is set up when instantiating the strategy so that shouldn't be a problem.

I agree that optimizations are more important than code unification, but I think in this case the invertedJoiner is redoing a lot of things that it shouldn't have to worry about. Examples of this are setting up memory and disk monitoring, creating and setting up a row fetcher, outputting stats, and fetching rows based on a set of spans. I can't really speak to the strategy approach making it harder to implement further optimizations. It's definitely possible, but the joinReader behavior should be generic enough that any optimization to the shared code (which should only concern itself with fetching rows based on some generated spans) would apply to all strategies. If not, the performance optimization would likely be strategy-specific anyway. The interface might be a limitation to implementing this, but hopefully changing that shouldn't be difficult.

You have more context on what you need, so if you believe a new processor will suit your needs better, that's fine. I just want to make sure you've considered the alternative.

Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @sumeerbhola)


pkg/sql/rowexec/joinreader.go, line 95 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

The equality matching can be moved out of here into the strategy. The joinReader could simply pass in a lookedUpRow and it would be up to the strategy to decide what to do with it. Span generation is also something that can easily be moved into the strategies (as a return value from processLookupRows). Regarding the row container, that is set up when instantiating the strategy so that shouldn't be a problem.

I agree that optimizations are more important than code unification, but I think in this case the invertedJoiner is redoing a lot of things that it shouldn't have to worry about. Examples of this are setting up memory and disk monitoring, creating and setting up a row fetcher, outputting stats, and fetching rows based on a set of spans. I can't really speak to the strategy approach making it harder to implement further optimizations. It's definitely possible, but the joinReader behavior should be generic enough that any optimization to the shared code (which should only concern itself with fetching rows based on some generated spans) would apply to all strategies. If not, the performance optimization would likely be strategy-specific anyway. The interface might be a limitation to implementing this, but hopefully changing that shouldn't be difficult.

You have more context on what you need, so if you believe a new processor will suit your needs better, that's fine. I just want to make sure you've considered the alternative.

Thanks -- I'll mull over this some more when redoing the inverted joiner code into a separate PR for review.

Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)


pkg/sql/rowexec/joinreader.go, line 230 at r2 (raw file):

Previously, yuzefovich wrote…

Hm, I wrote the code, and it is quite confusing to me as well :/

I think the difficulty here is that word "index" is overloaded in regards to DiskBackedIndexedRowContainer. It was designed so that whenever we add a row into the container, an ordinal is appended to the row, and the row with the ordinal is stored. That "ordinal" is what "Indexed" refers to in the name of the container. This container can be "reordered" (meaning that rows will be exported from a container with old ordering to a newly-created container with new ordering), so "positions" of the rows can change, yet the "ordinals" will be kept the same. After reordering of the container, original "ordinal" of a row might not be the same as current "position" of the row in the container.

To answer your question, there is no magic in this container to retrieve a row by its index (I think here "index" means current "position") - we simply create an iterator and advance it as many times as necessary. There is an optimization that we're reusing the same iterator when possible, but if we advanced too far - we have to rewind and start from scratch.

This discussion points out that joinReader's use case is different from what DiskBackedIndexedRowContainer was designed for (which was window functions), and lookup join's - when spilled to disk - performance should improve if we are to create a separate container (or play around with disabling the cache / changing the cache size) as #48118 outlines.

Thanks. I reread the code this morning, and together with your comment it helped clarify what is going on.

  • I see that DiskRowContainer adds a RowID at the end of the key to prevent collision -- I guess when it is used in the context of an indexed row container it isn't necessary, though harmless.

  • The lookup join code never reorders and the index is the order in which it added the rows. The code in DiskBackedIndexedRowContainer.getRowWithoutCache seems inefficient for this case (I didn't look at the cache path, but I presume it does something similar for a cache miss), since it is iterating over each row between the current index and the desired index. For this case one could have the key in the engine be simply the index, and depending on the difference in the current index and the desired index either iterate (if the difference is small) or seek (when it will cause us to skip ssblocks -- this can be based on the row container keeping track of the mean row size). This is similar to what is discussed in rowexec: improve disk spilling in the lookup joiner #48118 except for the addition of the seek here.

  • In [DNM] rowexec, rowcontainer: add an invertedJoiner, for #48019 I had added a new row container, to do deduping and to read with an index. It didn't have a disk implementation yet, and the efficient one would be to maintain two on-disk maps, since writing to the engine is cheap in terms of seek costs. That is, the DiskBackedInvertedIndexRowContainer would write twice:

    • deduping map would be key => index. This would need to be read for each row to see if it is a duplicate.
    • reading by index map would be index => key

@asubiotto Do you still think I should add the above behavior into DiskBackedIndexedRowContainer, as you were suggesting on #48019? It seems to me that the optimizations to do seeking motivate a new row container for joinReader with a narrower interface. Adding the deduping behavior to that new row container, for additional use by the inverted joiner, seems far less confusing. Writing the container behavior for the inverted joiner was the next thing I planned to do, so if you'd like I am happy to write a new one for use by both joinReader and invertedJoiner.

Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)


pkg/sql/rowexec/joinreader.go, line 230 at r2 (raw file):

Previously, sumeerbhola wrote…

Thanks. I reread the code this morning, and together with your comment it helped clarify what is going on.

  • I see that DiskRowContainer adds a RowID at the end of the key to prevent collision -- I guess when it is used in the context of an indexed row container it isn't necessary, though harmless.

  • The lookup join code never reorders and the index is the order in which it added the rows. The code in DiskBackedIndexedRowContainer.getRowWithoutCache seems inefficient for this case (I didn't look at the cache path, but I presume it does something similar for a cache miss), since it is iterating over each row between the current index and the desired index. For this case one could have the key in the engine be simply the index, and depending on the difference in the current index and the desired index either iterate (if the difference is small) or seek (when it will cause us to skip ssblocks -- this can be based on the row container keeping track of the mean row size). This is similar to what is discussed in rowexec: improve disk spilling in the lookup joiner #48118 except for the addition of the seek here.

  • In [DNM] rowexec, rowcontainer: add an invertedJoiner, for #48019 I had added a new row container, to do deduping and to read with an index. It didn't have a disk implementation yet, and the efficient one would be to maintain two on-disk maps, since writing to the engine is cheap in terms of seek costs. That is, the DiskBackedInvertedIndexRowContainer would write twice:

    • deduping map would be key => index. This would need to be read for each row to see if it is a duplicate.
    • reading by index map would be index => key

@asubiotto Do you still think I should add the above behavior into DiskBackedIndexedRowContainer, as you were suggesting on #48019? It seems to me that the optimizations to do seeking motivate a new row container for joinReader with a narrower interface. Adding the deduping behavior to that new row container, for additional use by the inverted joiner, seems far less confusing. Writing the container behavior for the inverted joiner was the next thing I planned to do, so if you'd like I am happy to write a new one for use by both joinReader and invertedJoiner.

Hmm, I also added a question here #48118 (comment)
It seems to me that the best we can do for joinReader when it needs to maintain ordering is to use seeks on the engine iterator. And if there are some hot indexes, the row container can do some explicit caching for those (the same thing could work for the inverted index). Do we have any microbenchmarks that demonstrate the slowness of joinReader when spilling?

Copy link
Contributor Author

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @yuzefovich)


pkg/sql/rowexec/joinreader.go, line 230 at r2 (raw file):

Do you still think I should add the above behavior into DiskBackedIndexedRowContainer, as you were suggesting on #48019? It seems to me that the optimizations to do seeking motivate a new row container for joinReader with a narrower interface.

Hmm, agreed. I think the seeking and deduping by a separate diskmap instance justifies a separate row container.

Do we have any microbenchmarks that demonstrate the slowness of joinReader when spilling?

BenchmarkJoinReader prints a log message when it spills (which with the current low batch size it does not) but does so rarely with the default 64 MiB work memory. Adding another loop with flowCtx.Cfg.TestingKnobs.ForceDiskSpill = true will force the join to eagerly spill to disk in all cases.


pkg/sql/rowexec/joinreader_strategies.go, line 79 at r2 (raw file):

Previously, yuzefovich wrote…

Do you think it makes sense here to use something like zeroBoolVector to zero out matched in chunks?

I think that might be an overoptimization at this point but


pkg/sql/rowexec/joinreader_strategies.go, line 87 at r2 (raw file):

Previously, yuzefovich wrote…

Hm, we probably could be smarter here. IIUC "partial" joins are LEFT SEMI and LEFT ANTI, and it seems to me that all unmatched rows can be ignored in LEFT SEMI case, right?

Could you expand on what you would do here? Unmatched rows can be ignored, but we need to move to nextRowToEmit to mark whether or not the row was matched based on the on expression and emit it if it is matched.


pkg/sql/rowexec/joinreader_strategies.go, line 91 at r2 (raw file):

Previously, yuzefovich wrote…

Could there be a NPE if we remove the if condition and always slice up to :0?

No, that surprised me.


pkg/sql/rowexec/joinreader_strategies.go, line 158 at r2 (raw file):

Previously, yuzefovich wrote…

Do we always want to unset this? What if we have multiple looked up rows, and some of them match whereas other do not. I think we want to remove s.matched[inputRowIdx] = true above as well as this unsetting and simply set matched to true after if outputRow == nil block. (I might be off here though.)

Update: Or I guess we always process a single lookup row at a time?

Good point, I think we could have had cases where a single lookup row passes the on condition but the next one does not, and we mark an input row as unmatched.


pkg/sql/rowexec/joinreader_strategies.go, line 162 at r2 (raw file):

Previously, yuzefovich wrote…

I feel like this outer if block should not be necessary - we evaluated the ON condition and rendered the correct columns, right? So we could only check whether we have LEFT ANTI join, and if not, we could always do return outputRow, jrEmittingRows, nil.

The thing is that the ON condition is evaluated on an outputRow with all columns, so it wouldn't correspond to the expected schema.


pkg/sql/rowexec/joinreader_strategies.go, line 176 at r2 (raw file):

Previously, yuzefovich wrote…

This knob seems a little sketchy to me. Do you think it would make sense to introduce an additional join reader state?

The joinReaderOrderingStrategy currently does its own check of whether it should move to emit unmatched rows based on a cursor position which is more or less the equivalent of having this knob in joinReaderNoOrderingStrategy. I'm not sure how to move a transition to emitting unmatched rows in the joinReader. What should be the signal? I think this state belongs in the strategies, but can be convinced otherwise.

@blathers-crl
Copy link

blathers-crl bot commented May 6, 2020

❌ The GitHub CI (Cockroach) build has failed on 4f946cb9.

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is otan.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 6 of 6 files at r3, 3 of 3 files at r4.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @asubiotto and @sumeerbhola)


pkg/sql/rowexec/joinreader.go, line 230 at r2 (raw file):

I guess when it is used in the context of an indexed row container it isn't necessary, though harmless

Yep. I needed to introduce an extra "index" as the last column for DiskBackedIndexedRowContainer because MemRowContainer doesn't store this RowID, and it was messy to get that working.

I didn't look at the cache path, but I presume it does something similar for a cache miss

The cache in DiskBackedIndexedRowContainer simply keeps copies the last N rows that were seen by the iterator (by default, N = 4096), so it maintains a "window" into rows up to the current row of the iterator. It is likely not very useful for lookup join's use case.

iterate (if the difference is small) or seek

I agree, good idea.

Hmm, agreed. I think the seeking and deduping by a separate diskmap instance justifies a separate row container.

+1

One note here is having DiskBackedIndexedRowContainer and DiskBackedInvertedIndexedRowContainer might be confusing as per my comment about word "index" being overloaded. Maybe we should rename the former? To something like DiskBackedOrdinaledRowContainer?


pkg/sql/rowexec/joinreader_strategies.go, line 79 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

I think that might be an overoptimization at this point but

Probably, up to you.


pkg/sql/rowexec/joinreader_strategies.go, line 87 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Could you expand on what you would do here? Unmatched rows can be ignored, but we need to move to nextRowToEmit to mark whether or not the row was matched based on the on expression and emit it if it is matched.

Maybe I'm confused by the comment, but here is my thinking.

We only "process input rows that have not been matched yet", but do we already know if those rows are definitely unmatched? If so, it seems to me we could ignore them in case of LEFT SEMI, but for LEFT ANTI we would need check the ON expression and emit only those that don't pass the ON expression.

Possibly I just don't have a good mental model of transitions between states here, so feel free to ignore this comment if it doesn't make sense.


pkg/sql/rowexec/joinreader_strategies.go, line 162 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

The thing is that the ON condition is evaluated on an outputRow with all columns, so it wouldn't correspond to the expected schema.

I see. I thought s.render behaves similar to ProcOutputHelper.Render (or something like that) that does populate the row with the desired output schema, but I was wrong, never mind then.


pkg/sql/rowexec/joinreader_strategies.go, line 176 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

The joinReaderOrderingStrategy currently does its own check of whether it should move to emit unmatched rows based on a cursor position which is more or less the equivalent of having this knob in joinReaderNoOrderingStrategy. I'm not sure how to move a transition to emitting unmatched rows in the joinReader. What should be the signal? I think this state belongs in the strategies, but can be convinced otherwise.

Ok, I see that it is specific to this strategy. Then the knob probably makes sense.

Copy link
Contributor Author

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added span generation to the strategies.

The fk_opt logictest failure can be reproed in a smaller test:
Repro in a smaller test:

# LogicTest: local

statement ok
CREATE TABLE parent (a INT PRIMARY KEY, b INT, UNIQUE (b))

statement ok
CREATE TABLE child (a INT PRIMARY KEY, b INT REFERENCES parent (b))

statement ok
INSERT INTO parent VALUES (1, 3), (2, 2)

statement ok
INSERT INTO child VALUES (10, 2)

query II
SELECT * FROM child
----
10  2

query II rowsort
SELECT * FROM parent
----
1  3
2  2

# This mutation *also* removes that row, but also via an update, introduces a
# new one, making it acceptable.
statement ok
INSERT INTO parent VALUES (2, 2), (1, 3) ON CONFLICT (a) DO UPDATE SET b = parent.b - 1

This test passes if we manually set MaintainOrdering. The only difference is the ordering of the rows:

Maintain ordering:
emitting row [2 2 2 2 1 2]
emitting row [1 3 1 3 2 1]

Without maintain:
emitting row [1 3 1 3 2 1]
emitting row [2 2 2 2 1 2]

So it seems like there's a required ordering that isn't plumbed down in this case? cc @RaduBerinde

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @asubiotto, @sumeerbhola, and @yuzefovich)


pkg/sql/rowexec/joinreader_strategies.go, line 87 at r2 (raw file):

Previously, yuzefovich wrote…

Maybe I'm confused by the comment, but here is my thinking.

We only "process input rows that have not been matched yet", but do we already know if those rows are definitely unmatched? If so, it seems to me we could ignore them in case of LEFT SEMI, but for LEFT ANTI we would need check the ON expression and emit only those that don't pass the ON expression.

Possibly I just don't have a good mental model of transitions between states here, so feel free to ignore this comment if it doesn't make sense.

I think what you're referring to is what happens in nextRowToEmit. In the case of LEFT SEMI we don't know whether the input row is definitely unmatched until the end of the batch, since we could have multiple "matches" but only the last match actually passes the on condition. The matchingInputRowIndices are candidates to emit, and the emit stage will run the on expression and emit them only if they pass. If it is emitted, then s.matched = true, so we'll never add the same row as a "candidate".

In the case of LEFT ANTI, it's sort of the same. We pass in the candidates to the emit stage which takes care of marking rows that do pass the ON expression.

So this stage is really just to prepare "candidates" for further evaluation in nextRowToEmit.

@RaduBerinde
Copy link
Member

This is fantastic! Once the other improvements are done, we should use the benchmark results to update the cost in the optimizer.

@asubiotto
Copy link
Contributor Author

Definitely! It should be easy to add other cases we're interested in benchmarking as well.

@RaduBerinde do you know what's going on re the logictest failure in the above comment? To me it looks like there's a required ordering for fk checks that we're not plumbing down.

@RaduBerinde
Copy link
Member

The query error is duplicate key value (b)=(2) violates unique constraint "parent_b_key". This has nothing to do with the FK check. The problem is that this statement only works if we update the rows in a certain order. If we first update (1, 3), we hit a unique index violation for b=2. We (like postgres) don't make any guarantee as to the order in which the row mutations are processed. In this case we are using lookup join to get the list of rows to modify (but in principle we could have used a hash join which also doesn't guarantee an ordering).

So I think it's just a bad test. In general, any mutation which removes a unique value and adds it back can only work if it's executed in a particular order, so I'm not sure if is possible to come up with a reliable test for this condition. I would just remove it.

Copy link
Contributor Author

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, thanks for checking.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @asubiotto, @sumeerbhola, and @yuzefovich)

@blathers-crl
Copy link

blathers-crl bot commented May 8, 2020

❌ The GitHub CI (Cockroach) build has failed on 1a64bab5.

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is otan.

@blathers-crl
Copy link

blathers-crl bot commented May 8, 2020

❌ The GitHub CI (Cockroach) build has failed on 37f17e52.

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is otan.

@asubiotto
Copy link
Contributor Author

@yuzefovich PTAL at the new span generation code

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that CLA check didn't trigger.

Reviewed 4 of 4 files at r5, 4 of 4 files at r6.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @asubiotto)


pkg/sql/rowexec/joinreader.go, line 236 at r5 (raw file):

	jr.diskMonitor = execinfra.NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "joinreader-disk")
	drc := rowcontainer.NewDiskBackedIndexedRowContainer(
		nil, /* ordering */

Could you add a TODO to figure out whether nil ordering here makes sense in all cases. It's likely it will be addressed by disk spilling improvements issue, but I don't want for this thought to get lost if not.


pkg/sql/rowexec/joinreader_strategies.go, line 25 at r6 (raw file):

)

type defaultSpanGenerator struct {

Is the idea that we will have another implementation of span generator later? Or is it just clean up of joinReader?

This commit introduces the joinReaderStrategy abstraction and two
implementations: joinReaderOrderingStrategy which maintains the ordering of
the input, and joinReaderNoOrderingStrategy, which is more efficient, but does
not maintain the ordering of the input.

Release note (performance improvement): lookup joins with no required ordering
are now more efficient.
Copy link
Contributor Author

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @yuzefovich)


pkg/sql/rowexec/joinreader.go, line 236 at r5 (raw file):

Previously, yuzefovich wrote…

Could you add a TODO to figure out whether nil ordering here makes sense in all cases. It's likely it will be addressed by disk spilling improvements issue, but I don't want for this thought to get lost if not.

Done.


pkg/sql/rowexec/joinreader_strategies.go, line 25 at r6 (raw file):

Previously, yuzefovich wrote…

Is the idea that we will have another implementation of span generator later? Or is it just clean up of joinReader?

Both, if we add an invertedJoinerStrategy, it will need to generate spans in a different way.

@asubiotto
Copy link
Contributor Author

bors r=yuzefovich

@craig
Copy link
Contributor

craig bot commented May 12, 2020

Build succeeded

@craig craig bot merged commit eb2bd4a into cockroachdb:master May 12, 2020
@asubiotto asubiotto deleted the ljro branch May 14, 2020 09:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

rowexec: speed up lookup joins when no ordering is required
5 participants