Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsql: support lookup join on secondary index #25628

Merged
merged 1 commit into from
May 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.0-4</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.0-5</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
</tbody>
</table>
2 changes: 1 addition & 1 deletion pkg/server/updates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func TestReportUsage(t *testing.T) {
"diagnostics.reporting.send_crash_reports": "false",
"server.time_until_store_dead": "1m30s",
"trace.debug.enable": "false",
"version": "2.0-4",
"version": "2.0-5",
"cluster.secret": "<redacted>",
} {
if got, ok := r.last.AlteredSettings[key]; !ok {
Expand Down
6 changes: 6 additions & 0 deletions pkg/settings/cluster/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
VersionProposedTSLeaseRequest
VersionRangeAppliedStateKey
VersionImportFormats
VersionSecondaryLookupJoins

// Add new versions here (step one of two).

Expand Down Expand Up @@ -212,6 +213,11 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: VersionImportFormats,
Version: roachpb.Version{Major: 2, Minor: 0, Unstable: 4},
},
{
// VersionSecondaryLookupJoins is https://github.com/cockroachdb/cockroach/pull/25628.
Key: VersionSecondaryLookupJoins,
Version: roachpb.Version{Major: 2, Minor: 0, Unstable: 5},
},

// Add new versions here (step two of two).

Expand Down
7 changes: 1 addition & 6 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,10 @@ func (cb *ColumnBackfiller) Init(evalCtx *tree.EvalContext, desc sqlbase.TableDe
var valNeededForCol util.FastIntSet
valNeededForCol.AddRange(0, len(desc.Columns)-1)

colIdxMap := make(map[sqlbase.ColumnID]int, len(desc.Columns))
for i, c := range desc.Columns {
colIdxMap[c.ID] = i
}

tableArgs := sqlbase.RowFetcherTableArgs{
Desc: &desc,
Index: &desc.PrimaryIndex,
ColIdxMap: colIdxMap,
ColIdxMap: desc.ColumnIdxMap(),
Cols: desc.Columns,
ValNeededForCol: valNeededForCol,
}
Expand Down
57 changes: 34 additions & 23 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,19 @@ func (dsp *DistSQLPlanner) nodeVersionIsCompatible(
return distsqlrun.FlowVerIsCompatible(dsp.planVersion, v.MinAcceptedVersion, v.Version)
}

func getIndexIdx(n *scanNode) (uint32, error) {
if n.index == &n.desc.PrimaryIndex {
return 0, nil
}
for i := range n.desc.Indexes {
if n.index == &n.desc.Indexes[i] {
// IndexIdx is 1 based (0 means primary index).
return uint32(i + 1), nil
}
}
return 0, errors.Errorf("invalid scanNode index %v (table %s)", n.index, n.desc.Name)
}

// initTableReaderSpec initializes a TableReaderSpec/PostProcessSpec that
// corresponds to a scanNode, except for the Spans and OutputColumns.
func initTableReaderSpec(
Expand All @@ -782,19 +795,11 @@ func initTableReaderSpec(
Reverse: n.reverse,
IsCheck: n.run.isCheck,
}
if n.index != &n.desc.PrimaryIndex {
for i := range n.desc.Indexes {
if n.index == &n.desc.Indexes[i] {
// IndexIdx is 1 based (0 means primary index).
s.IndexIdx = uint32(i + 1)
break
}
}
if s.IndexIdx == 0 {
err := errors.Errorf("invalid scanNode index %v (table %s)", n.index, n.desc.Name)
return distsqlrun.TableReaderSpec{}, distsqlrun.PostProcessSpec{}, err
}
indexIdx, err := getIndexIdx(n)
if err != nil {
return distsqlrun.TableReaderSpec{}, distsqlrun.PostProcessSpec{}, err
}
s.IndexIdx = indexIdx

// When a TableReader is running scrub checks, do not allow a
// post-processor. This is because the outgoing stream is a fixed
Expand Down Expand Up @@ -1820,9 +1825,10 @@ func (dsp *DistSQLPlanner) createPlanForJoin(
rightEqCols = eqCols(n.pred.rightEqualityIndices, rightPlan.planToStreamColMap)
}

// Can use a lookupJoiner if there is a scan node on the right that uses the
// table's primary index.
// TODO(pbardea): Loosen restriction when joinReader takes secondary indexes.
// A lookup join can be performed if the right node is a scan or index join,
// and the right equality columns are a prefix of that node's index. In the
// index join case, joinReader will first perform the secondary index lookup
// and then do a primary index lookup on the resulting rows.
lookupJoinEnabled := planCtx.EvalContext().SessionData.LookupJoinEnabled
isLookupJoin, lookupJoinScan, lookupFailReason := verifyLookupJoin(leftEqCols, rightEqCols, n, joinType)

Expand Down Expand Up @@ -1926,9 +1932,13 @@ func (dsp *DistSQLPlanner) createPlanForJoin(
return physicalPlan{}, err
}

indexIdx, err := getIndexIdx(lookupJoinScan)
if err != nil {
return physicalPlan{}, err
}
core.JoinReader = &distsqlrun.JoinReaderSpec{
Table: *(lookupJoinScan.desc),
IndexIdx: 0,
IndexIdx: indexIdx,
LookupColumns: lookupCols,
OnExpr: onExpr,
}
Expand Down Expand Up @@ -1974,13 +1984,14 @@ func (dsp *DistSQLPlanner) createPlanForJoin(
func verifyLookupJoin(
leftEqCols, rightEqCols []uint32, n *joinNode, joinType sqlbase.JoinType,
) (bool, *scanNode, string) {
lookupJoinScan, ok := n.right.plan.(*scanNode)
if !ok {
return false, nil, "lookup join's right side must be a scan"
}

if lookupJoinScan.index != &lookupJoinScan.desc.PrimaryIndex {
return false, nil, "lookup joins can only perform lookups through the primary index"
var lookupJoinScan *scanNode
switch p := n.right.plan.(type) {
case *scanNode:
lookupJoinScan = p
case *indexJoinNode:
lookupJoinScan = p.index
default:
return false, nil, "lookup join's right side must be a scan or index join"
}

if joinType != sqlbase.InnerJoin {
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/distsqlrun/interleaved_reader_joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,7 @@ func (irj *interleavedReaderJoiner) initRowFetcher(
// since we do not expect any projections or rendering
// on a scan before a join.
args[i].ValNeededForCol.AddRange(0, len(desc.Columns)-1)
args[i].ColIdxMap = make(map[sqlbase.ColumnID]int, len(desc.Columns))
for j, c := range desc.Columns {
args[i].ColIdxMap[c.ID] = j
}
args[i].ColIdxMap = desc.ColumnIdxMap()
args[i].Desc = &desc
args[i].Cols = desc.Columns
args[i].Spans = make(roachpb.Spans, len(table.Spans))
Expand Down
Loading