Skip to content

Commit

Permalink
GOCBC-954: Add internal ability to get agent from bucket via collection
Browse files Browse the repository at this point in the history
Motivation
----------
Transactions operations receive a collection from the user, however due
to us building gocbtransactions on top of gocbcore transactions we don't
actually route requests through the collection. Instead we need to get
the gocbcore agent that the collection is using.

Changes
-------
Create a BucketInternal type accessible via Bucket.
Add an internal IORouter function to BucketInternal.
Add a function to get Bucket from Collection.

Change-Id: Ife038540921ae8e1039253ef43b9b025c5c398a5
Reviewed-on: http://review.couchbase.org/c/gocb/+/132630
Reviewed-by: Brett Lawson <brett19@gmail.com>
Tested-by: Charles Dixon <chvckd@gmail.com>
  • Loading branch information
chvck committed Jul 20, 2020
1 parent 683edc2 commit 96c3928
Show file tree
Hide file tree
Showing 19 changed files with 127 additions and 47 deletions.
20 changes: 20 additions & 0 deletions bucket_internal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package gocb

import gocbcore "github.com/couchbase/gocbcore/v9"

// InternalBucket is used for internal functionality.
// Internal: This should never be used and is not supported.
type InternalBucket struct {
bucket *Bucket
}

// Internal returns a CollectionInternal.
// Internal: This should never be used and is not supported.
func (b *Bucket) Internal() *InternalBucket {
return &InternalBucket{bucket: b}
}

// IORouter returns the collection's internal core router.
func (ib *InternalBucket) IORouter() (*gocbcore.Agent, error) {
return ib.bucket.connectionManager.connection(ib.bucket.Name())
}
15 changes: 14 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type connectionManager interface {
getHTTPProvider() (httpProvider, error)
getDiagnosticsProvider(bucketName string) (diagnosticsProvider, error)
getWaitUntilReadyProvider(bucketName string) (waitUntilReadyProvider, error)
connection(bucketName string) (*gocbcore.Agent, error)
close() error
}

Expand Down Expand Up @@ -127,7 +128,7 @@ func (c *stdConnectionMgr) getKvProvider(bucketName string) (kvProvider, error)
return nil, errors.New("cluster not yet connected")
}
agent := c.agentgroup.GetAgent(bucketName)
if c.agentgroup == nil {
if agent == nil {
return nil, errors.New("bucket not yet connected")
}
return agent, nil
Expand Down Expand Up @@ -207,6 +208,18 @@ func (c *stdConnectionMgr) getWaitUntilReadyProvider(bucketName string) (waitUnt
return &waitUntilReadyProviderWrapper{provider: agent}, nil
}

func (c *stdConnectionMgr) connection(bucketName string) (*gocbcore.Agent, error) {
if c.agentgroup == nil {
return nil, errors.New("cluster not yet connected")
}

agent := c.agentgroup.GetAgent(bucketName)
if agent == nil {
return nil, errors.New("bucket not yet connected")
}
return agent, nil
}

func (c *stdConnectionMgr) close() error {
c.lock.Lock()
if c.agentgroup == nil {
Expand Down
12 changes: 8 additions & 4 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type kvTimeoutsConfig struct {
type Collection struct {
collectionName string
scope string
bucket string
bucket *Bucket

timeoutsConfig kvTimeoutsConfig

Expand All @@ -28,7 +28,7 @@ func newCollection(scope *Scope, collectionName string) *Collection {
return &Collection{
collectionName: collectionName,
scope: scope.Name(),
bucket: scope.bucketName,
bucket: scope.bucket,

timeoutsConfig: scope.timeoutsConfig,

Expand All @@ -52,9 +52,9 @@ func (c *Collection) ScopeName() string {
return c.scope
}

// BucketName returns the name of the bucket to which this collection belongs.
// Bucket returns the name of the bucket to which this collection belongs.
// UNCOMMITTED: This API may change in the future.
func (c *Collection) BucketName() string {
func (c *Collection) Bucket() *Bucket {
return c.bucket
}

Expand All @@ -69,3 +69,7 @@ func (c *Collection) startKvOpTrace(operationName string, tracectx requestSpanCo
SetTag("couchbase.collection", c.collectionName).
SetTag("couchbase.service", "kv")
}

func (c *Collection) bucketName() string {
return c.bucket.Name()
}
18 changes: 9 additions & 9 deletions collection_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (item *TouchOp) execute(tracectx requestSpanContext, c *Collection, provide
if res.MutationToken.VbUUID != 0 {
mutTok := &MutationToken{
token: res.MutationToken,
bucketName: c.bucket,
bucketName: c.bucketName(),
}
item.Result.mt = mutTok
}
Expand Down Expand Up @@ -276,7 +276,7 @@ func (item *RemoveOp) execute(tracectx requestSpanContext, c *Collection, provid
if res.MutationToken.VbUUID != 0 {
mutTok := &MutationToken{
token: res.MutationToken,
bucketName: c.bucket,
bucketName: c.bucketName(),
}
item.Result.mt = mutTok
}
Expand Down Expand Up @@ -345,7 +345,7 @@ func (item *UpsertOp) execute(tracectx requestSpanContext, c *Collection, provid
if res.MutationToken.VbUUID != 0 {
mutTok := &MutationToken{
token: res.MutationToken,
bucketName: c.bucket,
bucketName: c.bucketName(),
}
item.Result.mt = mutTok
}
Expand Down Expand Up @@ -413,7 +413,7 @@ func (item *InsertOp) execute(tracectx requestSpanContext, c *Collection, provid
if res.MutationToken.VbUUID != 0 {
mutTok := &MutationToken{
token: res.MutationToken,
bucketName: c.bucket,
bucketName: c.bucketName(),
}
item.Result.mt = mutTok
}
Expand Down Expand Up @@ -483,7 +483,7 @@ func (item *ReplaceOp) execute(tracectx requestSpanContext, c *Collection, provi
if res.MutationToken.VbUUID != 0 {
mutTok := &MutationToken{
token: res.MutationToken,
bucketName: c.bucket,
bucketName: c.bucketName(),
}
item.Result.mt = mutTok
}
Expand Down Expand Up @@ -538,7 +538,7 @@ func (item *AppendOp) execute(tracectx requestSpanContext, c *Collection, provid
if res.MutationToken.VbUUID != 0 {
mutTok := &MutationToken{
token: res.MutationToken,
bucketName: c.bucket,
bucketName: c.bucketName(),
}
item.Result.mt = mutTok
}
Expand Down Expand Up @@ -593,7 +593,7 @@ func (item *PrependOp) execute(tracectx requestSpanContext, c *Collection, provi
if res.MutationToken.VbUUID != 0 {
mutTok := &MutationToken{
token: res.MutationToken,
bucketName: c.bucket,
bucketName: c.bucketName(),
}
item.Result.mt = mutTok
}
Expand Down Expand Up @@ -661,7 +661,7 @@ func (item *IncrementOp) execute(tracectx requestSpanContext, c *Collection, pro
if res.MutationToken.VbUUID != 0 {
mutTok := &MutationToken{
token: res.MutationToken,
bucketName: c.bucket,
bucketName: c.bucketName(),
}
item.Result.mt = mutTok
}
Expand Down Expand Up @@ -729,7 +729,7 @@ func (item *DecrementOp) execute(tracectx requestSpanContext, c *Collection, pro
if res.MutationToken.VbUUID != 0 {
mutTok := &MutationToken{
token: res.MutationToken,
bucketName: c.bucket,
bucketName: c.bucketName(),
}
item.Result.mt = mutTok
}
Expand Down
2 changes: 1 addition & 1 deletion collection_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ func (c *Collection) GetAnyReplica(id string, opts *GetAnyReplicaOptions) (docOu
if res == nil {
return nil, &KeyValueError{
InnerError: ErrDocumentUnretrievable,
BucketName: c.bucket,
BucketName: c.bucketName(),
ScopeName: c.scope,
CollectionName: c.collectionName,
}
Expand Down
8 changes: 4 additions & 4 deletions collection_crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1431,8 +1431,8 @@ func (suite *IntegrationTestSuite) TestDurabilityGetFromAnyReplica() {
testCases := []tCase{
{
name: "insertDurabilityMajorityDoc",
method: "Insert",
args: []interface{}{"insertDurabilityMajorityDoc", doc, &InsertOptions{
method: "Upsert",
args: []interface{}{"insertDurabilityMajorityDoc", doc, &UpsertOptions{
DurabilityLevel: DurabilityLevelMajority,
}},
expectCas: true,
Expand Down Expand Up @@ -1514,7 +1514,7 @@ func (suite *UnitTestSuite) TestGetErrorCollectionUnknown() {
Return(pendingOp, nil)

col := &Collection{
bucket: "mock",
bucket: &Bucket{bucketName: "mock"},
collectionName: "",
scope: "",

Expand Down Expand Up @@ -1574,7 +1574,7 @@ func (suite *UnitTestSuite) TestGetErrorProperties() {
Return(pendingOp, nil)

col := &Collection{
bucket: "mock",
bucket: &Bucket{bucketName: "mock"},

collectionName: "",
scope: "",
Expand Down
6 changes: 3 additions & 3 deletions collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ func (suite *UnitTestSuite) TestCollectionName() {
s := b.Scope(sName)
c := s.Collection(cName)

suite.Assert().Equal(bName, c.BucketName())
suite.Assert().Equal(bName, c.bucketName())
suite.Assert().Equal(sName, c.ScopeName())
suite.Assert().Equal(cName, c.Name())
}
Expand All @@ -21,7 +21,7 @@ func (suite *UnitTestSuite) TestDefaultScopeCollectionName() {
b := suite.bucket(bName, suite.defaultTimeoutConfig(), nil)
c := b.Collection(cName)

suite.Assert().Equal(bName, c.BucketName())
suite.Assert().Equal(bName, c.bucketName())
suite.Assert().Equal("_default", c.ScopeName())
suite.Assert().Equal(cName, c.Name())
}
Expand All @@ -32,7 +32,7 @@ func (suite *UnitTestSuite) TestDefaultScopeDefaultCollectionName() {
b := suite.bucket(bName, suite.defaultTimeoutConfig(), nil)
c := b.DefaultCollection()

suite.Assert().Equal(bName, c.BucketName())
suite.Assert().Equal(bName, c.bucketName())
suite.Assert().Equal("_default", c.ScopeName())
suite.Assert().Equal("_default", c.Name())
}
2 changes: 1 addition & 1 deletion error_wrapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func maybeEnhanceKVErr(err error, bucketName, scopeName, collName, docKey string
}

func maybeEnhanceCollKVErr(err error, bucket kvProvider, coll *Collection, docKey string) error {
return maybeEnhanceKVErr(err, coll.bucket, coll.Name(), coll.ScopeName(), docKey)
return maybeEnhanceKVErr(err, coll.bucketName(), coll.Name(), coll.ScopeName(), docKey)
}

func maybeEnhanceViewError(err error) error {
Expand Down
2 changes: 1 addition & 1 deletion kvopmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (m *kvOpManager) ScopeName() string {
}

func (m *kvOpManager) BucketName() string {
return m.parent.bucket
return m.parent.bucketName()
}

func (m *kvOpManager) ValueBytes() []byte {
Expand Down
6 changes: 4 additions & 2 deletions mock_analyticsProvider_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 27 additions & 1 deletion mock_connectionManager_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions mock_diagnosticsProvider_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions mock_httpProvider_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions mock_kvProvider_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions mock_queryProvider_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions mock_searchProvider_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions mock_viewProvider_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 96c3928

Please sign in to comment.