Skip to content

Commit

Permalink
CBG-1341 Backport CBG-1339 to 2.8.2 (#4970)
Browse files Browse the repository at this point in the history
Fix for infinite loop when tombstoning a server tombstone.
  • Loading branch information
adamcfraser committed Feb 26, 2021
1 parent 0be9ae8 commit 4df7a2d
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 5 deletions.
8 changes: 4 additions & 4 deletions base/bucket_gocb.go
Expand Up @@ -1329,11 +1329,12 @@ func (bucket *CouchbaseBucketGoCB) GetWithXattr(k string, xattrKey string, rv in
case gocbcore.ErrSubDocMultiPathFailureDeleted:
// ErrSubDocMultiPathFailureDeleted - one of the subdoc operations failed, and the doc is deleted. Occurs when xattr may exist but doc is deleted (tombstone)
xattrContentErr := res.Content(xattrKey, xv)
cas = uint64(res.Cas())
if xattrContentErr != nil {
// No doc, no xattr means the doc isn't found
return false, gocb.ErrKeyNotFound, uint64(0)
Debugf(KeyCRUD, "No xattr content found for key=%s, xattrKey=%s: %v", UD(k), UD(xattrKey), xattrContentErr)
return false, gocb.ErrKeyNotFound, cas
}
cas = uint64(res.Cas())
return false, nil, cas

default:
Expand Down Expand Up @@ -1729,8 +1730,7 @@ func (bucket *CouchbaseBucketGoCB) WriteUpdateWithXattr(k string, xattrKey strin
Debugf(KeyCRUD, "Retrieval of existing doc failed during WriteUpdateWithXattr for key=%s, xattrKey=%s: %v", UD(k), UD(xattrKey), err)
return emptyCas, err
}
// Key not found - initialize cas and values
cas = 0
// Key not found - initialize values
value = nil
xattrValue = nil
}
Expand Down
2 changes: 1 addition & 1 deletion base/util.go
Expand Up @@ -438,7 +438,7 @@ func RetryLoopCas(description string, worker RetryCasWorker, sleeper RetrySleepe
shouldRetry, err, value := worker()
if !shouldRetry {
if err != nil {
return err, 0
return err, value
}
return nil, value
}
Expand Down
77 changes: 77 additions & 0 deletions rest/api_test.go
Expand Up @@ -5473,6 +5473,83 @@ func TestTombstonedBulkDocs(t *testing.T) {
}
}

func TestTombstonedBulkDocsWithPriorPurge(t *testing.T) {
if !base.TestUseXattrs() {
t.Skip("Test requires xattrs to be enabled")
}

defer base.SetUpTestLogging(base.LevelDebug, base.KeyAll)()
rt := NewRestTester(t, &RestTesterConfig{
SyncFn: `function(doc,oldDoc){
console.log("doc:"+JSON.stringify(doc))
console.log("oldDoc:"+JSON.stringify(oldDoc))
}`,
})
defer rt.Close()

gocbBucket, ok := base.AsGoCBBucket(rt.Bucket())
if !ok {
t.Skip("Requires Couchbase bucket")
}

_, err := gocbBucket.Bucket.Insert(t.Name(), map[string]interface{}{"val": "val"}, 0)
require.NoError(t, err)

resp := rt.SendAdminRequest("POST", "/db/_purge", `{"`+t.Name()+`": ["*"]}`)
assertStatus(t, resp, http.StatusOK)

response := rt.SendAdminRequest("POST", "/db/_bulk_docs", `{"new_edits": false, "docs": [{"_id":"`+t.Name()+`", "_deleted": true, "_revisions":{"start":9, "ids":["c45c049b7fe6cf64cd8595c1990f6504", "6e01ac52ffd5ce6a4f7f4024c08d296f"]}}]}`)
assertStatus(t, response, http.StatusCreated)

var body map[string]interface{}
_, err = rt.GetDatabase().Bucket.Get(t.Name(), &body)

assert.Error(t, err)
assert.True(t, base.IsDocNotFoundError(err))
assert.Nil(t, body)

}

func TestTombstonedBulkDocsWithExistingTombstone(t *testing.T) {
if !base.TestUseXattrs() {
t.Skip("Test requires xattrs to be enabled")
}

defer base.SetUpTestLogging(base.LevelDebug, base.KeyAll)()
rt := NewRestTester(t, &RestTesterConfig{
SyncFn: `function(doc,oldDoc){
console.log("doc:"+JSON.stringify(doc))
console.log("oldDoc:"+JSON.stringify(oldDoc))
}`,
})
defer rt.Close()

bucket := rt.Bucket()
gocbBucket, ok := base.AsGoCBBucket(bucket)
if !ok {
t.Skip("Requires Couchbase bucket")
}

// Create a server tombstone with no mobile xattr
value := make(map[string]interface{})
value["foo"] = "bar"
insCas, err := gocbBucket.Bucket.Insert(t.Name(), value, 0)
require.NoError(t, err)

_, err = gocbBucket.Bucket.Remove(t.Name(), insCas)
require.NoError(t, err)

response := rt.SendAdminRequest("POST", "/db/_bulk_docs", `{"new_edits": false, "docs": [{"_id":"`+t.Name()+`", "_deleted": true, "_revisions":{"start":9, "ids":["c45c049b7fe6cf64cd8595c1990f6504", "6e01ac52ffd5ce6a4f7f4024c08d296f"]}}]}`)
assertStatus(t, response, http.StatusCreated)

var body map[string]interface{}
_, err = rt.GetDatabase().Bucket.Get(t.Name(), &body)

assert.Error(t, err)
assert.True(t, base.IsDocNotFoundError(err))
assert.Nil(t, body)
}

// This test is skipped usually as it requires code to be manually injected into bucket_gocb.go
func TestPutTombstoneWithoutCreateAsDeletedFlagCasFailure(t *testing.T) {
t.Skip("Requires manual intervention to run")
Expand Down

0 comments on commit 4df7a2d

Please sign in to comment.