-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
catalog/descs: use btrees to store leased descriptors #51997
Open
thoszhang
wants to merge
2
commits into
cockroachdb:master
Choose a base branch
from
thoszhang:leased-descriptors-btree
base: master
Could not load branches
Branch not found: {{ refName }}
Could not load tags
Nothing to show
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+148
−42
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,6 @@ package descs | |
|
||
import ( | ||
"context" | ||
"sort" | ||
"sync" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/keys" | ||
|
@@ -32,6 +31,7 @@ import ( | |
"github.com/cockroachdb/cockroach/pkg/util/log" | ||
"github.com/cockroachdb/cockroach/pkg/util/protoutil" | ||
"github.com/cockroachdb/errors" | ||
"github.com/google/btree" | ||
) | ||
|
||
// UncommittedDatabase is a database that has been created/dropped | ||
|
@@ -50,6 +50,129 @@ type uncommittedDescriptor struct { | |
immutable catalog.Descriptor | ||
} | ||
|
||
// leasedDescriptors holds references to all the descriptors leased in the | ||
// transaction, and supports access by name and by ID. | ||
type leasedDescriptors struct { | ||
byName *btree.BTree | ||
byID *btree.BTree | ||
} | ||
|
||
type nameKey struct { | ||
parentID sqlbase.ID | ||
parentSchemaID sqlbase.ID | ||
name string | ||
} | ||
|
||
type idKey sqlbase.ID | ||
|
||
type descriptorWithName struct { | ||
nameKey | ||
catalog.Descriptor | ||
} | ||
|
||
type descriptorWithID struct { | ||
idKey | ||
catalog.Descriptor | ||
} | ||
|
||
func lessNameKey(this nameKey, other nameKey) bool { | ||
if this.parentID != other.parentID { | ||
return this.parentID < other.parentID | ||
} | ||
if this.parentSchemaID != other.parentSchemaID { | ||
return this.parentSchemaID < other.parentSchemaID | ||
} | ||
return this.name < other.name | ||
} | ||
|
||
func (k nameKey) Less(than btree.Item) bool { | ||
switch other := than.(type) { | ||
case nameKey: | ||
return lessNameKey(k, other) | ||
case descriptorWithName: | ||
return lessNameKey(k, other.nameKey) | ||
default: | ||
panic(errors.AssertionFailedf("unexpected type %T", other)) | ||
} | ||
} | ||
|
||
func (k idKey) Less(than btree.Item) bool { | ||
switch other := than.(type) { | ||
case idKey: | ||
return k < other | ||
case descriptorWithID: | ||
return k < other.idKey | ||
default: | ||
panic(errors.AssertionFailedf("unexpected type %T", other)) | ||
} | ||
} | ||
|
||
func (ld *leasedDescriptors) assertConsistency() { | ||
if l1, l2 := ld.byName.Len(), ld.byID.Len(); l1 != l2 { | ||
panic(errors.AssertionFailedf("mismatched leased descriptors: %d, %d", l1, l2)) | ||
} | ||
} | ||
|
||
func (ld *leasedDescriptors) add(desc catalog.Descriptor) { | ||
defer ld.assertConsistency() | ||
ld.byName.ReplaceOrInsert(descriptorWithName{ | ||
nameKey: nameKey{desc.GetParentID(), desc.GetParentSchemaID(), desc.GetName()}, | ||
Descriptor: desc, | ||
}) | ||
ld.byID.ReplaceOrInsert(descriptorWithID{ | ||
idKey: idKey(desc.GetID()), | ||
Descriptor: desc, | ||
}) | ||
} | ||
|
||
func (ld *leasedDescriptors) releaseAll() (toRelease []catalog.Descriptor) { | ||
defer ld.assertConsistency() | ||
ld.byID.Ascend(func(i btree.Item) bool { | ||
toRelease = append(toRelease, i.(descriptorWithID).Descriptor) | ||
return true | ||
}) | ||
ld.byName.Clear(true) | ||
ld.byID.Clear(true) | ||
return toRelease | ||
} | ||
|
||
func (ld *leasedDescriptors) release(ids []sqlbase.ID) (toRelease []catalog.Descriptor) { | ||
defer ld.assertConsistency() | ||
for _, id := range ids { | ||
item := ld.byID.Delete(idKey(id)) | ||
if item == nil { | ||
continue | ||
} | ||
desc := item.(descriptorWithID).Descriptor | ||
ld.byName.Delete(nameKey{desc.GetParentID(), desc.GetParentSchemaID(), desc.GetName()}) | ||
toRelease = append(toRelease, desc) | ||
} | ||
return toRelease | ||
} | ||
|
||
func (ld *leasedDescriptors) getByID(id sqlbase.ID) catalog.Descriptor { | ||
defer ld.assertConsistency() | ||
if result := ld.byID.Get(idKey(id)); result != nil { | ||
return result.(descriptorWithID).Descriptor | ||
} | ||
return nil | ||
} | ||
|
||
func (ld *leasedDescriptors) getByName( | ||
dbID sqlbase.ID, schemaID sqlbase.ID, name string, | ||
) catalog.Descriptor { | ||
defer ld.assertConsistency() | ||
if result := ld.byName.Get(nameKey{dbID, schemaID, name}); result != nil { | ||
return result.(descriptorWithName).Descriptor | ||
} | ||
return nil | ||
} | ||
|
||
func (ld *leasedDescriptors) numDescriptors() int { | ||
defer ld.assertConsistency() | ||
return ld.byName.Len() | ||
} | ||
|
||
// MakeCollection constructs a Collection. | ||
func MakeCollection( | ||
leaseMgr *lease.Manager, | ||
|
@@ -58,6 +181,10 @@ func MakeCollection( | |
dbCacheSubscriber DatabaseCacheSubscriber, | ||
) Collection { | ||
return Collection{ | ||
leasedDescriptors: leasedDescriptors{ | ||
byName: btree.New(2), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How did you pick this degree (not that I have a suggestion for another value) |
||
byID: btree.New(2), | ||
}, | ||
leaseMgr: leaseMgr, | ||
settings: settings, | ||
databaseCache: dbCache, | ||
|
@@ -83,7 +210,7 @@ type Collection struct { | |
// the transaction using them is complete. If the transaction gets pushed and | ||
// the timestamp changes, the descriptors are released. | ||
// TODO (lucy): Use something other than an unsorted slice for faster lookups. | ||
leasedDescriptors []catalog.Descriptor | ||
leasedDescriptors leasedDescriptors | ||
// Descriptors modified by the uncommitted transaction affiliated with this | ||
// Collection. This allows a transaction to see its own modifications while | ||
// bypassing the descriptor lease mechanism. The lease mechanism will have its | ||
|
@@ -376,11 +503,9 @@ func (tc *Collection) getObjectVersion( | |
// This ensures that, once a SQL transaction resolved name N to id X, it will | ||
// continue to use N to refer to X even if N is renamed during the | ||
// transaction. | ||
for _, table := range tc.leasedDescriptors { | ||
if lease.NameMatchesDescriptor(table, dbID, schemaID, name.Object()) { | ||
log.VEventf(ctx, 2, "found descriptor in collection for '%s'", name) | ||
return table, nil | ||
} | ||
if desc := tc.leasedDescriptors.getByName(dbID, schemaID, name.Object()); desc != nil { | ||
log.VEventf(ctx, 2, "found descriptor in collection for '%s'", name) | ||
return desc, nil | ||
} | ||
|
||
readTimestamp := txn.ReadTimestamp() | ||
|
@@ -402,7 +527,7 @@ func (tc *Collection) getObjectVersion( | |
log.Fatalf(ctx, "bad descriptor for T=%s, expiration=%s", readTimestamp, expiration) | ||
} | ||
|
||
tc.leasedDescriptors = append(tc.leasedDescriptors, desc) | ||
tc.leasedDescriptors.add(desc) | ||
log.VEventf(ctx, 2, "added descriptor '%s' to collection", name) | ||
|
||
// If the descriptor we just acquired expires before the txn's deadline, | ||
|
@@ -469,11 +594,9 @@ func (tc *Collection) getDescriptorVersionByID( | |
} | ||
|
||
// First, look to see if we already have the table in the shared cache. | ||
for _, desc := range tc.leasedDescriptors { | ||
if desc.GetID() == id { | ||
log.VEventf(ctx, 2, "found descriptor %d in cache", id) | ||
return desc, nil | ||
} | ||
if desc := tc.leasedDescriptors.getByID(id); desc != nil { | ||
log.VEventf(ctx, 2, "found descriptor %d in cache", id) | ||
return desc, nil | ||
} | ||
|
||
readTimestamp := txn.ReadTimestamp() | ||
|
@@ -486,7 +609,7 @@ func (tc *Collection) getDescriptorVersionByID( | |
log.Fatalf(ctx, "bad descriptor for T=%s, expiration=%s", readTimestamp, expiration) | ||
} | ||
|
||
tc.leasedDescriptors = append(tc.leasedDescriptors, desc) | ||
tc.leasedDescriptors.add(desc) | ||
log.VEventf(ctx, 2, "added descriptor %q to collection", desc.GetName()) | ||
|
||
// If the descriptor we just acquired expires before the txn's deadline, | ||
|
@@ -601,43 +724,26 @@ func (tc *Collection) hydrateTypesInTableDesc( | |
// ReleaseSpecifiedLeases releases the leases for the descriptors with ids in | ||
// the passed slice. Errors are logged but ignored. | ||
func (tc *Collection) ReleaseSpecifiedLeases(ctx context.Context, descs []lease.IDVersion) { | ||
// Sort the descriptors and leases to make it easy to find the leases to release. | ||
leasedDescs := tc.leasedDescriptors | ||
sort.Slice(descs, func(i, j int) bool { | ||
return descs[i].ID < descs[j].ID | ||
}) | ||
sort.Slice(leasedDescs, func(i, j int) bool { | ||
return leasedDescs[i].GetID() < leasedDescs[j].GetID() | ||
}) | ||
|
||
filteredLeases := leasedDescs[:0] // will store the remaining leases | ||
descsToConsider := descs | ||
shouldRelease := func(id sqlbase.ID) (found bool) { | ||
for len(descsToConsider) > 0 && descsToConsider[0].ID < id { | ||
descsToConsider = descsToConsider[1:] | ||
} | ||
return len(descsToConsider) > 0 && descsToConsider[0].ID == id | ||
ids := make([]sqlbase.ID, len(descs)) | ||
for i := range descs { | ||
ids[i] = descs[i].ID | ||
} | ||
for _, l := range leasedDescs { | ||
if !shouldRelease(l.GetID()) { | ||
filteredLeases = append(filteredLeases, l) | ||
} else if err := tc.leaseMgr.Release(l); err != nil { | ||
toRelease := tc.leasedDescriptors.release(ids) | ||
for _, desc := range toRelease { | ||
if err := tc.leaseMgr.Release(desc); err != nil { | ||
log.Warningf(ctx, "%v", err) | ||
} | ||
} | ||
tc.leasedDescriptors = filteredLeases | ||
} | ||
|
||
// ReleaseLeases releases all leases. Errors are logged but ignored. | ||
func (tc *Collection) ReleaseLeases(ctx context.Context) { | ||
if len(tc.leasedDescriptors) > 0 { | ||
log.VEventf(ctx, 2, "releasing %d descriptors", len(tc.leasedDescriptors)) | ||
for _, desc := range tc.leasedDescriptors { | ||
if err := tc.leaseMgr.Release(desc); err != nil { | ||
log.Warningf(ctx, "%v", err) | ||
} | ||
log.VEventf(ctx, 2, "releasing %d descriptors", tc.leasedDescriptors.numDescriptors()) | ||
toRelease := tc.leasedDescriptors.releaseAll() | ||
for _, desc := range toRelease { | ||
if err := tc.leaseMgr.Release(desc); err != nil { | ||
log.Warningf(ctx, "%v", err) | ||
} | ||
tc.leasedDescriptors = tc.leasedDescriptors[:0] | ||
} | ||
} | ||
|
||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] add a comment for these true arguments