Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http: //ci2i-unstable.northscale.in/gsi-16.12.2020-05.30.pass.html
Change-Id: I9b16dc559ba66056fc19789f2e4be6ea529e199c
  • Loading branch information
jeelanp2003 committed Dec 16, 2020
2 parents 72f477a + 223bf37 commit 6bd20ff
Showing 1 changed file with 92 additions and 21 deletions.
113 changes: 92 additions & 21 deletions secondary/projector/feed.go
@@ -1,8 +1,10 @@
package projector

import (
"errors"
"fmt"
"strconv"
"strings"
"time"

c "github.com/couchbase/indexing/secondary/common"
Expand All @@ -15,6 +17,9 @@ import (
"github.com/golang/protobuf/proto"
)

var errNilTimestamp = errors.New("Feed's book-keeping timestamp (actTss/rollTss/reqTss) is nil")
var errInconsistentTimestampMapping = errors.New("Not all keyspaceID's are present in all of feed's book-keeping timestamps")

// NOTE1:
// https://github.com/couchbase/indexing/commit/
// 9a2ea0f0ffaf9f103ace6ffe54e253001b28b9a8
Expand Down Expand Up @@ -709,14 +714,24 @@ func (feed *Feed) handleCommand(msg []interface{}) (status string) {
req := msg[1].(*protobuf.MutationTopicRequest)
opaque, respch := msg[2].(uint16), msg[3].(chan []interface{})
err := feed.start(req, opaque)
response := feed.topicResponse()
var response *protobuf.TopicResponse
if err != nil { // feed error takes priority in response
response, _ = feed.topicResponse()
} else {
response, err = feed.topicResponse()
}
respch <- []interface{}{response, err}

case fCmdRestartVbuckets:
req := msg[1].(*protobuf.RestartVbucketsRequest)
opaque, respch := msg[2].(uint16), msg[3].(chan []interface{})
err := feed.restartVbuckets(req, opaque)
response := feed.topicResponse()
var response *protobuf.TopicResponse
if err != nil { // feed error takes priority in response
response, _ = feed.topicResponse()
} else {
response, err = feed.topicResponse()
}
respch <- []interface{}{response, err}

case fCmdShutdownVbuckets:
Expand All @@ -728,7 +743,12 @@ func (feed *Feed) handleCommand(msg []interface{}) (status string) {
req := msg[1].(*protobuf.AddBucketsRequest)
opaque, respch := msg[2].(uint16), msg[3].(chan []interface{})
err := feed.addBuckets(req, opaque)
response := feed.topicResponse()
var response *protobuf.TopicResponse
if err != nil { // feed error takes priority in response
response, _ = feed.topicResponse()
} else {
response, err = feed.topicResponse()
}
respch <- []interface{}{response, err}

case fCmdDelBuckets:
Expand Down Expand Up @@ -774,7 +794,8 @@ func (feed *Feed) handleCommand(msg []interface{}) (status string) {

case fCmdGetTopicResponse:
respch := msg[1].(chan []interface{})
respch <- []interface{}{feed.topicResponse()}
response, _ := feed.topicResponse()
respch <- []interface{}{response}

case fCmdGetStatistics:
respch := msg[1].(chan []interface{})
Expand Down Expand Up @@ -2097,41 +2118,91 @@ loop:
}

// compose topic-response for caller
func (feed *Feed) topicResponse() *protobuf.TopicResponse {
func (feed *Feed) topicResponse() (*protobuf.TopicResponse, error) {
uuids := make([]uint64, 0)
for _, engines := range feed.engines {
for uuid := range engines {
uuids = append(uuids, uuid)
}
}
xs := make([]*protobuf.TsVbuuid, 0, len(feed.actTss))
keyspaceIds := make([]string, 0)
for keyspaceId, ts := range feed.actTss {
if ts != nil {
xs = append(xs, ts.Clone())
keyspaceIds = append(keyspaceIds, keyspaceId)
}

var err error
// One of the invariants in the feed's code is
// len(feed.actTss) == len(feed.rollTss) == len(feed.reqTss)
// and a keyspaceId will be present in all these timestamps i.e.
// there should not be a case where it is present in one timestamp
// and not present in another
if len(feed.actTss) != len(feed.rollTss) ||
len(feed.rollTss) != len(feed.reqTss) {
logging.Fatalf("%v Inconsistent book-keeping in feed. feed.actTss: %s, feed.rollTss: %s, feed.reqTss: %s",
feed.logPrefix, getTssAsStr(feed.actTss), getTssAsStr(feed.rollTss), getTssAsStr(feed.reqTss))
err = errInconsistentTimestampMapping
}

xs := make([]*protobuf.TsVbuuid, 0, len(feed.actTss))
ys := make([]*protobuf.TsVbuuid, 0, len(feed.rollTss))
for _, ts := range feed.rollTss {
if ts != nil {
ys = append(ys, ts.Clone())
}
}
zs := make([]*protobuf.TsVbuuid, 0, len(feed.reqTss))
for _, ts := range feed.reqTss {
if ts != nil {
zs = append(zs, ts.Clone())

keyspaceIds := make([]string, 0)
for keyspaceId, actTs := range feed.actTss {
if actTs != nil {
xs = append(xs, actTs.Clone())
keyspaceIds = append(keyspaceIds, keyspaceId)
} else {
// Ideally, this condition should never be executed
logging.Fatalf("%v ##%x actTss is nil for keyspaceId: %v, feed.actTss: %s",
feed.logPrefix, feed.opaque, getTssAsStr(feed.actTss))
err = errNilTimestamp
}

if rollTs, ok := feed.rollTss[keyspaceId]; !ok {
// Ideally, this condition should never be executed
logging.Fatalf("%v ##%x keyspaceId: %v present in actTss but not in feed.rollTss. feed.actTss: %v, feed.rollTss: %v",
feed.logPrefix, feed.opaque, keyspaceId, getTssAsStr(feed.actTss), getTssAsStr(feed.rollTss))
err = errInconsistentTimestampMapping
} else if rollTs == nil {
// Ideally, this condition should never be executed
logging.Fatalf("%v ##%x rollTs is nil for keyspaceId: %v, feed.actTss: %s, feed.rollTss: %s",
feed.logPrefix, feed.opaque, getTssAsStr(feed.actTss), getTssAsStr(feed.rollTss))
err = errNilTimestamp
} else {
ys = append(ys, rollTs.Clone())
}

if reqTs, ok := feed.reqTss[keyspaceId]; !ok {
// Ideally, this condition should never be executed
logging.Fatalf("%v ##%x keyspaceId: %v present in actTss but not in feed.reqTss. feed.actTss: %s, feed.reqTss: %s",
feed.logPrefix, feed.opaque, keyspaceId, getTssAsStr(feed.actTss), getTssAsStr(feed.reqTss))
err = errInconsistentTimestampMapping
} else if reqTs == nil {
// Ideally, this condition should never be executed
logging.Fatalf("%v ##%x reqTs is nil for keyspaceId: %v, feed.actTss: %s, feed.reqTss: %s",
feed.logPrefix, feed.opaque, keyspaceId, getTssAsStr(feed.actTss), getTssAsStr(feed.reqTss))
err = errNilTimestamp
} else {
zs = append(zs, reqTs.Clone())
}
}
return &protobuf.TopicResponse{

topicResp := &protobuf.TopicResponse{
Topic: proto.String(feed.topic),
InstanceIds: uuids,
ActiveTimestamps: xs,
RollbackTimestamps: ys,
PendingTimestamps: zs,
KeyspaceIds: keyspaceIds,
}
return topicResp, err
}

func getTssAsStr(ts map[string]*protobuf.TsVbuuid) string {
tsList := make([]string, 0)
if ts != nil {
for key, value := range ts {
tsList = append(tsList, fmt.Sprintf("%v:%v,", key, value))
}
}
return strings.Join(tsList, ",")
}

// generate a unique opaque identifier.
Expand Down

0 comments on commit 6bd20ff

Please sign in to comment.