Skip to content

Commit

Permalink
Add Delta Sync Pull support, and add new BlipTesterClient for emulating
Browse files Browse the repository at this point in the history
a connecting CBL client replicating over blip.

- Fix BLIP logging

- Initial delta sync pull implementation

- Move deltaSrc to blip message property

- Bump manifest for fleece, improve bliptest client WaitForRev

- Assert that the blip message sent was using deltas in EE and full-body in CE

- Split fleecedelta invocations out into files with EE/CE build flags

- Add notdefault to EE deps in manifest
  • Loading branch information
bbrks committed Dec 4, 2018
1 parent c15c066 commit 846859e
Show file tree
Hide file tree
Showing 9 changed files with 476 additions and 8 deletions.
20 changes: 20 additions & 0 deletions base/deltas_ce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// +build !cb_sg_enterprise

package base

import (
"fmt"
)

// ErrDeltasNotSupported is returned when these functions are called in CE
var ErrDeltasNotSupported = fmt.Errorf("Deltas not supported in CE")

// Diff is only implemented in EE, the CE stub always returns an error.
func Diff(old, new map[string]interface{}) (delta []byte, err error) {
return nil, ErrDeltasNotSupported
}

// Patch is only implemented in EE, the CE stub always returns an error.
func Patch(old *map[string]interface{}, delta []byte) (err error) {
return ErrDeltasNotSupported
}
17 changes: 17 additions & 0 deletions base/deltas_ee.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// +build cb_sg_enterprise

package base

import fleecedelta "github.com/couchbaselabs/go-fleecedelta"

// The reason for these wrappers is to keep go-fleecedelta out of the compiled CE builds by providing no-op versions in deltas_ce.go

// Diff will return the fleece delta between old and new.
func Diff(old, new map[string]interface{}) (delta []byte, err error) {
return fleecedelta.DiffJSON(old, new)
}

// Patch will patch old with the given delta.
func Patch(old *map[string]interface{}, delta []byte) (err error) {
return fleecedelta.PatchJSON(old, delta)
}
4 changes: 4 additions & 0 deletions db/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ func ParseRevID(revid string) (int, string) {
return generation, id
}

// compareRevIDs compares the two rev IDs and returns:
// 1 if id1 is 'greater' than id2
// -1 if id1 is 'less' than id2
// 0 if the two are equal.
func compareRevIDs(id1, id2 string) int {
gen1, sha1 := ParseRevID(id1)
gen2, sha2 := ParseRevID(id2)
Expand Down
4 changes: 4 additions & 0 deletions manifest/default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@

<project name="testify" path="godeps/src/github.com/stretchr/testify" remote="couchbasedeps" revision="04af85275a5c7ac09d16bb3b9b2e751ed45154e5"/>

<!-- Enterprise edition dependencies -->
<project groups="notdefault,cb_sg_enterprise" name="go-fleecedelta" path="godeps/src/github.com/couchbaselabs/go-fleecedelta" remote="couchbaselabs_private" revision="1277ab2af4e7b861a66aadca51a7f49679f9522f"/>
<project groups="notdefault,cb_sg_enterprise" name="go-diff" path="godeps/src/github.com/sergi/go-diff" remote="couchbasedeps" revision="da645544ed44df016359bd4c0e3dc60ee3a0da43"/>

<!-- gozip tools -->
<project name="ns_server" path="godeps/src/github.com/couchbase/ns_server" remote="couchbase" revision="6d835931f574f25e3781192c09e45a3ee30deb51"/>

Expand Down
71 changes: 67 additions & 4 deletions rest/blip_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -749,19 +750,22 @@ func AssertChangeEquals(t *testing.T, change []interface{}, expectedChange Expec

// Test adding / retrieving attachments
func TestAttachments(t *testing.T) {

// TODO: Write tests to cover scenario
t.Skip("not tested")
}

// Make sure it's not possible to have two outstanding subChanges w/ continuous=true.
// Expected behavior is that the first continous change subscription should get discarded in favor of 2nd.
func TestConcurrentChangesSubscriptions(t *testing.T) {

// TODO: Write tests to cover scenario
t.Skip("not tested")
}

// Create a continous changes subscription that has docs in multiple channels, and make sure
// all docs are received
func TestMultiChannelContinousChangesSubscription(t *testing.T) {

// TODO: Write tests to cover scenario
t.Skip("not tested")
}

// Test setting and getting checkpoints
Expand Down Expand Up @@ -811,7 +815,8 @@ func TestBlipSetCheckpoint(t *testing.T) {

// Test no-conflicts mode replication (proposeChanges endpoint)
func TestNoConflictsModeReplication(t *testing.T) {

// TODO: Write tests to cover scenario
t.Skip("not tested")
}

// Reproduce issue where ReloadUser was not being called, and so it was
Expand Down Expand Up @@ -1475,3 +1480,61 @@ func TestMissingNoRev(t *testing.T) {
goassert.True(t, len(docs) == 4)

}

// TestBlipDeltaSyncPull tests that a simple pull replication using deltas in EE,
// and checks that full body replicaiton still happens in CE.
func TestBlipDeltaSyncPull(t *testing.T) {

defer base.SetUpTestLogging(base.LevelTrace, base.KeyAll)()

bt, err := NewBlipTesterFromSpec(BlipTesterSpec{
restTester: &RestTester{
DeltaSyncEnabled: base.IsEnterpriseEdition(),
},
})
assert.NoError(t, err, "Error creating BlipTester")
defer bt.Close()

client, err := NewBlipTesterClient(bt)
assert.NoError(t, err)

client.Deltas = true
client.Start()

// create doc1 rev 1-0335a345b6ffed05707ccc4cbc1b67f4
resp := bt.restTester.SendAdminRequest(http.MethodPut, "/db/doc1", `{"greetings": [{"hello": "world!"}, {"hi": "alice"}]}`)
assert.Equal(t, http.StatusCreated, resp.Code)

data, ok := client.WaitForRev("doc1", "1-0335a345b6ffed05707ccc4cbc1b67f4")
assert.True(t, ok)
assert.Equal(t, `{"greetings":[{"hello":"world!"},{"hi":"alice"}]}`, string(data))

// create doc1 rev 2-959f0e9ad32d84ff652fb91d8d0caa7e
resp = bt.restTester.SendAdminRequest(http.MethodPut, "/db/doc1?rev=1-0335a345b6ffed05707ccc4cbc1b67f4", `{"greetings": [{"hello": "world!"}, {"hi": "alice"}, {"howdy": "bob"}]}`)
assert.Equal(t, http.StatusCreated, resp.Code)

data, ok = client.WaitForRev("doc1", "2-959f0e9ad32d84ff652fb91d8d0caa7e")
assert.True(t, ok)
assert.Equal(t, `{"greetings":[{"hello":"world!"},{"hi":"alice"},{"howdy":"bob"}]}`, string(data))

msg, ok := client.WaitForMessage(5)
assert.True(t, ok)

// Check EE is delta, and CE is full-body replication
if base.IsEnterpriseEdition() {
// Check the request was sent with the correct deltaSrc property
assert.Equal(t, "1-0335a345b6ffed05707ccc4cbc1b67f4", msg.Properties[revMessageDeltaSrc])
// Check the request body was the actual delta
msgBody, err := msg.Body()
assert.NoError(t, err)
assert.Equal(t, `{"greetings":{"2-":[{"howdy":"bob"}]}}`, string(msgBody))
} else {
// Check the request was NOT sent with a deltaSrc property
assert.Equal(t, "", msg.Properties[revMessageDeltaSrc])
// Check the request body was NOT the delta
msgBody, err := msg.Body()
assert.NoError(t, err)
assert.NotEqual(t, `{"greetings":{"2-":[{"howdy":"bob"}]}}`, string(msgBody))
assert.Equal(t, `{"greetings":[{"hello":"world!"},{"hi":"alice"},{"howdy":"bob"}]}`, string(msgBody))
}
}
Loading

0 comments on commit 846859e

Please sign in to comment.