Skip to content

Commit

Permalink
Unmarshal document bodies with decoder.UseNumber() (#3801)
Browse files Browse the repository at this point in the history
* Working

* Don't reuse Body type for revisions

* Add constants, test fixes

* Test and shadower cleanup

* Tweak test assert and improve test failure messages

* Improve unit test to validate values passed to sync function

Updates manifest to point to go-blip PR (for integration test).  Needs to be bumped again when go-blip PR is merged

* Fix handling for null/empty json

* Update go-blip to merged commit

* Fix TestImportWithStaleBucketDocCorrectExpiry

This test had three problems:
- Previously wasn't changing the docBody across subtests
- The previous 30s expiry was too short - in local testing, attempted Get immediately after add w/ expiry was reporting not found.  The test doesn't wait for expiry, so bumped to 30m.
- In the rawDocWithSyncMeta case, import is triggering two generations (instead of one).  This looks like a separate bug, filed #3804
  • Loading branch information
adamcfraser authored and bbrks committed Oct 19, 2018
1 parent f9425d4 commit 9c28192
Show file tree
Hide file tree
Showing 27 changed files with 438 additions and 138 deletions.
14 changes: 11 additions & 3 deletions base/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,15 +266,23 @@ func CbsExpiryToTime(expiry uint32) time.Time {

// ReflectExpiry attempts to convert expiry from one of the following formats to a Couchbase Server expiry value:
// 1. Numeric JSON values are converted to uint32 and returned as-is
// 2. String JSON values that are numbers are converted to int32 and returned as-is
// 3. String JSON values that are ISO-8601 dates are converted to UNIX time and returned
// 4. Null JSON values return 0
// 2. JSON numbers are converted to uint32 and returned as-is
// 3. String JSON values that are numbers are converted to int32 and returned as-is
// 4. String JSON values that are ISO-8601 dates are converted to UNIX time and returned
// 5. Null JSON values return 0
func ReflectExpiry(rawExpiry interface{}) (*uint32, error) {
switch expiry := rawExpiry.(type) {
case int64:
return ValidateUint32Expiry(expiry)
case float64:
return ValidateUint32Expiry(int64(expiry))
case json.Number:
// Attempt to convert to int
expInt, err := expiry.Int64()
if err != nil {
return nil, err
}
return ValidateUint32Expiry(expInt)
case string:
// First check if it's a numeric string
expInt, err := strconv.ParseInt(expiry, 10, 32)
Expand Down
1 change: 1 addition & 0 deletions db/attachment.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func ReadJSONFromMIME(headers http.Header, input io.Reader, into interface{}) er
}

decoder := json.NewDecoder(input)
decoder.UseNumber()
if err := decoder.Decode(into); err != nil {
base.Warnf(base.KeyAll, "Couldn't parse JSON in HTTP request: %v", err)
return base.HTTPErrorf(http.StatusBadRequest, "Bad JSON")
Expand Down
2 changes: 1 addition & 1 deletion db/change_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func TestLowSequenceHandlingWithAccessGrant(t *testing.T) {
assert.Equals(t, len(changes), 3)
assert.True(t, verifyChangesFullSequences(changes, []string{"1", "2", "2::6"}))

_, incrErr := db.Bucket.Incr("_sync:seq", 7, 7, 0)
_, incrErr := db.Bucket.Incr(SyncSeqKey, 7, 7, 0)
assert.True(t, incrErr == nil)

// Modify user to have access to both channels (sequence 2):
Expand Down
11 changes: 5 additions & 6 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (db *DatabaseContext) OnDemandImportForGet(docid string, rawDoc []byte, raw

// This is the RevisionCacheLoaderFunc callback for the context's RevisionCache.
// Its job is to load a revision from the bucket when there's a cache miss.
func (context *DatabaseContext) revCacheLoader(id IDAndRev) (body Body, history Body, channels base.Set, err error) {
func (context *DatabaseContext) revCacheLoader(id IDAndRev) (body Body, history Revisions, channels base.Set, err error) {
var doc *document
if doc, err = context.GetDocument(id.DocID, DocUnmarshalAll); doc == nil {
return body, history, channels, err
Expand All @@ -183,7 +183,7 @@ func (context *DatabaseContext) revCacheLoader(id IDAndRev) (body Body, history
}

// Common revCacheLoader functionality used either during a cache miss (from revCacheLoader), or directly when retrieving current rev from cache
func (context *DatabaseContext) revCacheLoaderForDocument(doc *document, revid string) (body Body, history Body, channels base.Set, err error) {
func (context *DatabaseContext) revCacheLoaderForDocument(doc *document, revid string) (body Body, history Revisions, channels base.Set, err error) {

if body, err = context.getRevision(doc, revid); err != nil {
// If we can't find the revision (either as active or conflicted body from the document, or as old revision body backup), check whether
Expand Down Expand Up @@ -280,7 +280,7 @@ func (db *Database) GetRevWithHistory(docid, revid string, maxHistory int, histo
redactedBody["_removed"] = true
}
if revisions != nil {
redactedBody["_revisions"] = revisions
redactedBody[BodyRevisions] = revisions
}
return redactedBody, nil
}
Expand All @@ -294,7 +294,7 @@ func (db *Database) GetRevWithHistory(docid, revid string, maxHistory int, histo

// Add revision metadata:
if revisions != nil {
body["_revisions"] = revisions
body[BodyRevisions] = revisions
}

// If doc is nil (we got the rev from the rev cache)
Expand Down Expand Up @@ -414,7 +414,6 @@ func (db *DatabaseContext) getRevision(doc *document, revid string) (Body, error
return nil, err
}
}
body.FixJSONNumbers() // Make sure big ints won't get output in scientific notation
body[BodyId] = doc.ID
body[BodyRev] = revid

Expand Down Expand Up @@ -489,7 +488,7 @@ func (db *Database) getRevFromDoc(doc *document, revid string, listRevisions boo
if getHistoryErr != nil {
return nil, getHistoryErr
}
body["_revisions"] = encodeRevisions(validatedHistory)
body[BodyRevisions] = encodeRevisions(validatedHistory)
}
return body, nil
}
Expand Down
18 changes: 18 additions & 0 deletions db/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,3 +613,21 @@ func TestOldRevisionStorageError(t *testing.T) {
assertNoError(t, db.PutExistingRev("doc1", rev3c_body, []string{"3-c", "2-b", "1-a"}, false), "add 3-c")

}

// Validate JSON number handling for large sequence values
func TestLargeSequence(t *testing.T) {

db, testBucket := setupTestDBWithCustomSyncSeq(t, 9223372036854775807)
defer tearDownTestDB(t, db)
defer testBucket.Close()

db.ChannelMapper = channels.NewDefaultChannelMapper()

// Write a doc via SG
body := Body{"key1": "largeSeqTest"}
assertNoError(t, db.PutExistingRev("largeSeqDoc", body, []string{"1-a"}, false), "add largeSeqDoc")

syncData, err := db.GetDocSyncData("largeSeqDoc")
assertNoError(t, err, "Error retrieving document sync data")
assert.Equals(t, syncData.Sequence, uint64(9223372036854775808))
}
69 changes: 44 additions & 25 deletions db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,25 @@ func setupTestDBWithCacheOptions(t testing.TB, options CacheOptions) (*Database,
return db, tBucket
}

// Sets up a test bucket with _sync:seq initialized to a high value prior to database creation. Used to test
// issues with custom _sync:seq values without triggering skipped sequences between 0 and customSeq
func setupTestDBWithCustomSyncSeq(t testing.TB, customSeq uint64) (*Database, base.TestBucket) {

dbcOptions := DatabaseContextOptions{}
AddOptionsFromEnvironmentVariables(&dbcOptions)
tBucket := testBucket()

log.Printf("Initializing test _sync:seq to %d", customSeq)
_, incrErr := tBucket.Incr(SyncSeqKey, customSeq, customSeq, 0)
assertNoError(t, incrErr, fmt.Sprintf("Couldn't increment _sync:seq by %d", customSeq))

context, err := NewDatabaseContext("db", tBucket.Bucket, false, dbcOptions)
assertNoError(t, err, "Couldn't create context for database 'db'")
db, err := CreateDatabase(context)
assertNoError(t, err, "Couldn't create database 'db'")
return db, tBucket
}

func testBucket() base.TestBucket {

// Retry loop in case the GSI indexes don't handle the flush and we need to drop them and retry
Expand Down Expand Up @@ -212,9 +231,9 @@ func TestDatabase(t *testing.T) {
// Test the _revisions property:
log.Printf("Check _revisions...")
gotbody, err = db.GetRev("doc1", rev2id, true, nil)
revisions := gotbody["_revisions"].(map[string]interface{})
assert.Equals(t, revisions["start"], 2)
assert.DeepEquals(t, revisions["ids"],
revisions := gotbody[BodyRevisions].(map[string]interface{})
assert.Equals(t, revisions[RevisionsStart], 2)
assert.DeepEquals(t, revisions[RevisionsIds],
[]string{"488724414d0ed6b398d6d2aeb228d797",
"cb0c9a22be0e5a1b01084ec019defa81"})

Expand Down Expand Up @@ -274,10 +293,10 @@ func TestGetDeleted(t *testing.T) {
body, err = db.GetRev("doc1", rev2id, true, nil)
assertNoError(t, err, "GetRev")
expectedResult := Body{
BodyId: "doc1",
BodyRev: rev2id,
BodyDeleted: true,
"_revisions": map[string]interface{}{"start": 2, "ids": []string{"bc6d97f6e97c0d034a34f8aac2bf8b44", "dfd5e19813767eeddd08270fc5f385cd"}},
BodyId: "doc1",
BodyRev: rev2id,
BodyDeleted: true,
BodyRevisions: map[string]interface{}{RevisionsStart: 2, RevisionsIds: []string{"bc6d97f6e97c0d034a34f8aac2bf8b44", "dfd5e19813767eeddd08270fc5f385cd"}},
}
assert.DeepEquals(t, body, expectedResult)

Expand Down Expand Up @@ -336,9 +355,9 @@ func TestGetRemovedAsUser(t *testing.T) {
expectedResult := Body{
"key1": 1234,
"channels": []string{"NBC"},
"_revisions": map[string]interface{}{
"start": 2,
"ids": []string{rev2digest, rev1digest}},
BodyRevisions: map[string]interface{}{
RevisionsStart: 2,
RevisionsIds: []string{rev2digest, rev1digest}},
BodyId: "doc1",
BodyRev: rev2id,
}
Expand Down Expand Up @@ -367,9 +386,9 @@ func TestGetRemovedAsUser(t *testing.T) {
BodyId: "doc1",
BodyRev: rev2id,
"_removed": true,
"_revisions": map[string]interface{}{
"start": 2,
"ids": []string{rev2digest, rev1digest}},
BodyRevisions: map[string]interface{}{
RevisionsStart: 2,
RevisionsIds: []string{rev2digest, rev1digest}},
}
assert.DeepEquals(t, body, expectedResult)

Expand Down Expand Up @@ -420,9 +439,9 @@ func TestGetRemoved(t *testing.T) {
expectedResult := Body{
"key1": 1234,
"channels": []string{"NBC"},
"_revisions": map[string]interface{}{
"start": 2,
"ids": []string{rev2digest, rev1digest}},
BodyRevisions: map[string]interface{}{
RevisionsStart: 2,
RevisionsIds: []string{rev2digest, rev1digest}},
BodyId: "doc1",
BodyRev: rev2id,
}
Expand All @@ -442,9 +461,9 @@ func TestGetRemoved(t *testing.T) {
BodyId: "doc1",
BodyRev: rev2id,
"_removed": true,
"_revisions": map[string]interface{}{
"start": 2,
"ids": []string{rev2digest, rev1digest}},
BodyRevisions: map[string]interface{}{
RevisionsStart: 2,
RevisionsIds: []string{rev2digest, rev1digest}},
}
assert.DeepEquals(t, body, expectedResult)

Expand Down Expand Up @@ -495,9 +514,9 @@ func TestGetRemovedAndDeleted(t *testing.T) {
expectedResult := Body{
"key1": 1234,
BodyDeleted: true,
"_revisions": map[string]interface{}{
"start": 2,
"ids": []string{rev2digest, rev1digest}},
BodyRevisions: map[string]interface{}{
RevisionsStart: 2,
RevisionsIds: []string{rev2digest, rev1digest}},
BodyId: "doc1",
BodyRev: rev2id,
}
Expand All @@ -518,9 +537,9 @@ func TestGetRemovedAndDeleted(t *testing.T) {
BodyRev: rev2id,
"_removed": true,
BodyDeleted: true,
"_revisions": map[string]interface{}{
"start": 2,
"ids": []string{rev2digest, rev1digest}},
BodyRevisions: map[string]interface{}{
RevisionsStart: 2,
RevisionsIds: []string{rev2digest, rev1digest}},
}
assert.DeepEquals(t, body, expectedResult)

Expand Down
14 changes: 8 additions & 6 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func newDocument(docid string) *document {
// Accessors for document properties. To support lazy unmarshalling of document contents, all access should be done through accessors
func (doc *document) Body() Body {
if doc._body == nil && doc.rawBody != nil {
err := json.Unmarshal(doc.rawBody, &doc._body)
err := doc._body.Unmarshal(doc.rawBody)
if err != nil {
base.Warnf(base.KeyAll, "Unable to unmarshal document body from raw body : %s", err)
return nil
Expand Down Expand Up @@ -132,7 +132,9 @@ func (doc *document) MarshalBody() ([]byte, error) {
func unmarshalDocument(docid string, data []byte) (*document, error) {
doc := newDocument(docid)
if len(data) > 0 {
if err := json.Unmarshal(data, doc); err != nil {
decoder := json.NewDecoder(bytes.NewReader(data))
decoder.UseNumber()
if err := decoder.Decode(doc); err != nil {
return nil, pkgerrors.Wrapf(err, "Error unmarshalling doc.")
}
if doc != nil && doc.Deleted_OLD {
Expand Down Expand Up @@ -398,7 +400,7 @@ func (doc *document) getNonWinningRevisionBody(revid string, loader RevLoaderFun
return nil
}

if err := json.Unmarshal(bodyBytes, &body); err != nil {
if err := body.Unmarshal(bodyBytes); err != nil {
base.Warnf(base.KeyAll, "Unexpected error parsing body of rev %q: %v", revid, err)
return nil
}
Expand Down Expand Up @@ -606,7 +608,7 @@ func (doc *document) updateChannels(newChannels base.Set) (changedChannels base.

// Determine whether the specified revision was a channel removal, based on doc.Channels. If so, construct the standard document body for a
// removal notification (_removed=true)
func (doc *document) IsChannelRemoval(revID string) (body Body, history Body, channels base.Set, isRemoval bool, err error) {
func (doc *document) IsChannelRemoval(revID string) (body Body, history Revisions, channels base.Set, isRemoval bool, err error) {

channels = make(base.Set)

Expand Down Expand Up @@ -702,7 +704,7 @@ func (doc *document) UnmarshalJSON(data []byte) error {
doc.syncData = *root.SyncData
}

if err := json.Unmarshal(data, &doc._body); err != nil {
if err := doc._body.Unmarshal(data); err != nil {
return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalJSON() doc with id: %s. Error: %v", base.UD(doc.ID), err))
}

Expand Down Expand Up @@ -744,7 +746,7 @@ func (doc *document) UnmarshalWithXattr(data []byte, xdata []byte, unmarshalLeve
}
// Unmarshal body if requested and present
if unmarshalLevel == DocUnmarshalAll && len(data) > 0 {
return json.Unmarshal(data, &doc._body)
return doc._body.Unmarshal(data)
} else {
doc.rawBody = data
}
Expand Down
8 changes: 5 additions & 3 deletions db/document_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,21 +144,23 @@ func BenchmarkUnmarshalBody(b *testing.B) {
useDecode bool
fixJSONNumbers bool
}{
{"UnmarshalAndFixNumbers", false, true},
{"Unmarshal", false, false},
{"Decode", true, false},
{"DecodeUseNumber", true, true},
{"UnmarshalAndFixNumbers", false, true},
{"Unmarshal", false, false},
}

for _, bm := range unmarshalBenchmarks {
b.Run(bm.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
doc := newDocument("testDocID")
docReader := bytes.NewReader(doc1k_body)
b.StartTimer()
var err error
if bm.useDecode {
decoder := json.NewDecoder(bytes.NewBuffer(doc1k_body))
//decoder := json.NewDecoder(bytes.NewReader(doc1k_body))
decoder := json.NewDecoder(docReader)
if bm.fixJSONNumbers {
decoder.UseNumber()
}
Expand Down
Loading

0 comments on commit 9c28192

Please sign in to comment.