Skip to content

Commit

Permalink
MB-42270 - DCP to check for vbHighSeqno when requesting streams with …
Browse files Browse the repository at this point in the history
…end seqnos

When DCP requests a end seqno, if the OSO mode is turned on and the end seqno is higher
than what a VB's highSeqno is for KV, then the stream will not end

Also make OSO mode opt-in by introducing a new replication setting

Change-Id: I2b2e67e883f688485e00eb6857106d331278c1f1
Reviewed-on: http://review.couchbase.org/c/goxdcr/+/139326
Tested-by: Neil Huang <neil.huang@couchbase.com>
Reviewed-by: Neil Huang <neil.huang@couchbase.com>
  • Loading branch information
nelio2k committed Nov 2, 2020
1 parent 4040e1b commit c603b62
Show file tree
Hide file tree
Showing 9 changed files with 344 additions and 9 deletions.
19 changes: 17 additions & 2 deletions base/types.go
Expand Up @@ -733,6 +733,7 @@ const (
CollectionsMappingKey = "collectionsExplicitMapping"
CollectionsMirrorKey = "collectionsMirroringMode"
CollectionsMigrateKey = "collectionsMigrationMode"
CollectionsOsoKey = "collectionsOSOMode"
CollectionsMappingRulesKey = "colMappingRules"
CollectionsSkipSourceCheckKey = "collectionsSkipSrcValidation"
)
Expand All @@ -741,14 +742,16 @@ const (
colMgtMappingN = 0 // Non-Set bit if implicit, Set bit if explicit
colMgtMirroringN = 1 // Non-Set bit if mirroring off, set bit if mirroring on
colMgtMigrateN = 2 // Non-set if traditional, set bit if migration mode
colMgtOsoN = 3 // Non-set if traditional, set bit to opt into OSO mode
)

var CollectionsMgtDefault CollectionsMgtType = 0 // Implicit, no mirror, no migration
var CollectionsMgtDefault CollectionsMgtType = 0 // Implicit, no mirror, no migration, no OSO
var CollectionsExplicitBit CollectionsMgtType = 1 << colMgtMappingN
var CollectionsMirroringBit CollectionsMgtType = 1 << colMgtMirroringN
var CollectionsMigrationBit CollectionsMgtType = 1 << colMgtMigrateN
var CollectionsOSOBit CollectionsMgtType = 1 << colMgtOsoN

var CollectionsMgtMax = CollectionsExplicitBit | CollectionsMirroringBit | CollectionsMigrationBit
var CollectionsMgtMax = CollectionsExplicitBit | CollectionsMirroringBit | CollectionsMigrationBit | CollectionsOSOBit

func (c CollectionsMgtType) String() string {
var output []string
Expand All @@ -758,6 +761,8 @@ func (c CollectionsMgtType) String() string {
output = append(output, fmt.Sprintf("%v", c.IsMirroringOn()))
output = append(output, "Migration:")
output = append(output, fmt.Sprintf("%v", c.IsMigrationOn()))
output = append(output, "OSO:")
output = append(output, fmt.Sprintf("%v", c.IsOsoOn()))
return strings.Join(output, " ")
}

Expand Down Expand Up @@ -791,6 +796,16 @@ func (c *CollectionsMgtType) SetMigration(val bool) {
}
}

func (c *CollectionsMgtType) IsOsoOn() bool {
return *c&CollectionsOSOBit > 0
}

func (c *CollectionsMgtType) SetOSO(val bool) {
if c.IsOsoOn() != val {
*c ^= 1 << colMgtOsoN
}
}

type MergeFunctionMappingType map[string]string

func (mf MergeFunctionMappingType) SameAs(otherRaw interface{}) bool {
Expand Down
7 changes: 5 additions & 2 deletions factory/xdcr_factory.go
Expand Up @@ -1031,14 +1031,17 @@ func constructSharedSettingsForDcpNozzle(settings metadata.ReplicationSettingsMa
osoCheckMap = vbTasksMap.(metadata.VBTasksMapType)
}

if !modes.IsOsoOn() {
checkIfNeedOso = false
}

if checkIfNeedOso {
// Oso would only work if DCP is serving a single collection, and has to start from seqno 0
// Thus, check the backfill tasks and ensure that it only contains one source collections
shaToCollectionsMap := osoCheckMap.GetAllCollectionNamespaceMappings()
vbTasksMap := osoCheckMap.GetTopTasksOnly()
if len(shaToCollectionsMap) == 1 && vbTasksMap.AllStartsWithSeqno0() {
// MB-42273 - OSO not sending streamend
//dcpNozzleSettings[parts.DCP_EnableOSO] = true
dcpNozzleSettings[parts.DCP_EnableOSO] = true
}
}
}
Expand Down
27 changes: 27 additions & 0 deletions metadata/backfill_replication.go
Expand Up @@ -402,6 +402,33 @@ func (v VBTasksMapType) AllStartsWithSeqno0() bool {
return true
}

func (v VBTasksMapType) GetDeduplicatedSourceNamespaces() []*SourceNamespace {
var retList []*SourceNamespace
var dedupMap = make(CollectionNamespaceMapping)

for _, backfillTasks := range v {
if backfillTasks == nil {
continue
}
for _, backfillTask := range *backfillTasks {
if backfillTask == nil {
continue
}

for _, requestedCollection := range backfillTask.RequestedCollections() {
for srcNamespace, _ := range requestedCollection {
dedupMap.AddSingleSourceNsMapping(srcNamespace, &base.CollectionNamespace{})
}
}
}
}

for srcNs, _ := range dedupMap {
retList = append(retList, srcNs)
}
return retList
}

// Backfill tasks are ordered list of backfill jobs, and to be handled in sequence
type BackfillTasks []*BackfillTask

Expand Down
37 changes: 37 additions & 0 deletions metadata/collections_test.go
Expand Up @@ -1142,3 +1142,40 @@ func TestPerfWeirdMappingChange(t *testing.T) {
assert.False(filter.UseManifestUid)
assert.Equal(1, len(filter.CollectionsList))
}

func TestGetDeduplicatedSourceNamespaces(t *testing.T) {
fmt.Println("============== Test case start: TestGetDeduplicatedSourceNamespaces =================")
defer fmt.Println("============== Test case end: TestGetDeduplicatedSourceNamespaces =================")
assert := assert.New(t)

testMap := make(VBTasksMapType)
for i := uint16(1); i < uint16(10); i++ {
backfillTasks := &BackfillTasks{}
for j := uint16(0); j < i; j++ {
var namespaceMapping []CollectionNamespaceMapping
oneNamespace := make(CollectionNamespaceMapping)
srcNs := NewSourceCollectionNamespace(&base.CollectionNamespace{
ScopeName: "S1",
CollectionName: fmt.Sprintf("Col%v", j),
})
oneNamespace.AddSingleSourceNsMapping(srcNs, &base.CollectionNamespace{})
namespaceMapping = append(namespaceMapping, oneNamespace)

timestamps := BackfillVBTimestamps{
StartingTimestamp: &base.VBTimestamp{},
EndingTimestamp: &base.VBTimestamp{},
}
backfillTask := &BackfillTask{
Timestamps: &timestamps,
requestedCollections_: namespaceMapping,
RequestedCollectionsShas: []string{"test"},
}
*backfillTasks = append(*backfillTasks, backfillTask)
}
testMap[i] = backfillTasks
}

assert.NotNil(testMap)
deduplicatedSources := testMap.GetDeduplicatedSourceNamespaces()
assert.Equal(9, len(deduplicatedSources))
}
10 changes: 9 additions & 1 deletion metadata/replication_settings.go
Expand Up @@ -56,6 +56,7 @@ const (
CollectionsMgtMappingKey = base.CollectionsMappingKey
CollectionsMgtMirrorKey = base.CollectionsMirrorKey
CollectionsMgtMigrateKey = base.CollectionsMigrateKey
CollectionsMgtOsoKey = base.CollectionsOsoKey

CollectionsMappingRulesKey = base.CollectionsMappingRulesKey
CollectionsSkipSourceCheckKey = base.CollectionsSkipSourceCheckKey
Expand All @@ -77,7 +78,7 @@ const (
// settings whose default values cannot be viewed or changed through rest apis
var ImmutableDefaultSettings = []string{ReplicationTypeKey, FilterExpressionKey, ActiveKey, FilterVersionKey,
CollectionsMgtMultiKey, CollectionsSkipSourceCheckKey, CollectionsMappingRulesKey, CollectionsMgtMirrorKey,
CollectionsMgtMappingKey, CollectionsMgtMigrateKey, CollectionsManualBackfillKey, CollectionsDelAllBackfillKey, CollectionsDelVbBackfillKey}
CollectionsMgtMappingKey, CollectionsMgtMigrateKey, CollectionsMgtOsoKey, CollectionsManualBackfillKey, CollectionsDelAllBackfillKey, CollectionsDelVbBackfillKey}

// settings whose values cannot be changed after replication is created
var ImmutableSettings = []string{}
Expand All @@ -96,6 +97,7 @@ var MultiValueMap map[string]string = map[string]string{
CollectionsMgtMappingKey: CollectionsMgtMultiKey,
CollectionsMgtMirrorKey: CollectionsMgtMultiKey,
CollectionsMgtMigrateKey: CollectionsMgtMultiKey,
CollectionsMgtOsoKey: CollectionsMgtMultiKey,
}

var MaxBatchCount = 10000
Expand Down Expand Up @@ -321,6 +323,8 @@ func (r *ReplicationMultiValueHelper) handleCollectionsMgtKey(curConfig base.Col
curConfig.SetMirroring(boolVal)
case CollectionsMgtMigrateKey:
curConfig.SetMigration(boolVal)
case CollectionsMgtOsoKey:
curConfig.SetOSO(boolVal)
}
retVal = curConfig
return
Expand Down Expand Up @@ -400,6 +404,9 @@ func (r *ReplicationMultiValueHelper) handleCollectionsMgtKeyImport(s *Replicati
if val, ok := r.flagKeyIssued[CollectionsMgtMappingKey]; ok {
curVal.SetExplicitMapping(val)
}
if val, ok := r.flagKeyIssued[CollectionsMgtOsoKey]; ok {
curVal.SetOSO(val)
}

sm[CollectionsMgtMultiKey] = curVal
}
Expand Down Expand Up @@ -588,6 +595,7 @@ func (s *ReplicationSettings) exportFlagTypeValues() {
s.Values[CollectionsMgtMappingKey] = collectionModes.IsExplicitMapping()
s.Values[CollectionsMgtMirrorKey] = collectionModes.IsMirroringOn()
s.Values[CollectionsMgtMigrateKey] = collectionModes.IsMigrationOn()
s.Values[CollectionsMgtOsoKey] = collectionModes.IsOsoOn()
}

func (s *ReplicationSettings) PreprocessReplMultiValues(settingsMap map[string]interface{}) error {
Expand Down
107 changes: 106 additions & 1 deletion parts/dcp_nozzle.go
Expand Up @@ -367,7 +367,10 @@ type DcpNozzle struct {

endSeqnoForDcp map[uint16]*base.SeqnoWithLock

osoRequested bool
osoRequested bool
getHighSeqnoOneAtATime chan bool
vbHighSeqnoMap map[uint16]*base.SeqnoWithLock
savedMcReqFeatures utilities.HELOFeatures
}

func NewDcpNozzle(id string,
Expand Down Expand Up @@ -404,8 +407,13 @@ func NewDcpNozzle(id string,
setTsCheckedManifests: make(map[uint64]bool),
specificManifestGetter: specificManifestGetter,
endSeqnoForDcp: make(map[uint16]*base.SeqnoWithLock),
getHighSeqnoOneAtATime: make(chan bool, 1),
vbHighSeqnoMap: make(map[uint16]*base.SeqnoWithLock),
}

// Allow one caller the ability to execute
dcp.getHighSeqnoOneAtATime <- true

for _, vbno := range vbnos {
dcp.cur_ts[vbno] = &vbtsWithLock{lock: &sync.RWMutex{}, ts: nil}
dcp.vb_stream_status[vbno] = &streamStatusWithLock{lock: &sync.RWMutex{}, state: Dcp_Stream_NonInit}
Expand All @@ -414,6 +422,7 @@ func NewDcpNozzle(id string,
dcp.vb_xattr_seqno_map[vbno] = &xattr_seqno
}
dcp.endSeqnoForDcp[vbno] = base.NewSeqnoWithLock()
dcp.vbHighSeqnoMap[vbno] = base.NewSeqnoWithLock()
}

dcp.composeUserAgent()
Expand Down Expand Up @@ -474,6 +483,7 @@ func (dcp *DcpNozzle) initializeMemcachedClient(settings metadata.ReplicationSet
}

dcpMcReqFeatures.CompressionType = dcp.memcachedCompressionSetting
dcp.savedMcReqFeatures = dcpMcReqFeatures

dcp.client, respondedFeatures, err = dcp.utils.GetMemcachedConnectionWFeatures(addr, dcp.sourceBucketName, dcp.user_agent, base.KeepAlivePeriod, dcpMcReqFeatures, dcp.Logger())

Expand Down Expand Up @@ -1324,6 +1334,12 @@ func (dcp *DcpNozzle) startUprStreams_internal(streams_to_start []uint16) error
// randomizes the sequence of vbs to start, so that each outnozzle gets roughly even initial load
base.ShuffleVbList(streams_to_start)

err := dcp.getHighSeqnosIfNecessary(streams_to_start)
if err != nil {
dcp.Logger().Errorf("Getting HighSeqno for %v resulted in %v", streams_to_start, err)
return err
}

for _, vbno := range streams_to_start {
vbts, err := dcp.getTS(vbno, true)
if err == nil && vbts != nil {
Expand Down Expand Up @@ -1373,6 +1389,13 @@ func (dcp *DcpNozzle) startUprStreamInner(vbno uint16, vbts *base.VBTimestamp, v
err = fmt.Errorf("Error converting VBTask to DCP Nozzle Task %v", err)
return err
}

// Given a task, we should technically only end at a correct seqno. Check the vbSeqnos to ensure that those seqnos are correct
checkSeqEnd := dcp.vbHighSeqnoMap[vbno].GetSeqno()
if checkSeqEnd > 0 && checkSeqEnd < seqEnd {
seqEnd = checkSeqEnd
}

// In a corner case where startSeq == endSeqno, DCP will not send down a streamEnd and instead just
// close the connection. Mark the endSeqno here first to check if this is the case
dcp.endSeqnoForDcp[vbno].SetSeqno(seqEnd)
Expand Down Expand Up @@ -1950,3 +1973,85 @@ func (dcp *DcpNozzle) GetOSOSeqnoRaiser() func(vbno uint16, seqno uint64) {
dcp.RaiseEvent(common.NewEvent(common.SnapshotMarkerReceived, m, dcp, nil /*derivedItems*/, nil /*otherInfos*/))
}
}

func (dcp *DcpNozzle) getHighSeqnosIfNecessary(vbnos []uint16) error {
if len(dcp.specificVBTasks) == 0 {
// Main pipeline, no need to get high Seqno
return nil
}

topTasks := dcp.specificVBTasks.GetTopTasksOnly()
for _, vbno := range vbnos {
_, exists := topTasks[vbno]
if !exists {
return fmt.Errorf("Requesting vb %v but specific VBTasks does not contain such VB", vbno)
}
}

sourceCollectionNamespaces := topTasks.GetDeduplicatedSourceNamespaces()
if len(sourceCollectionNamespaces) == 0 {
return fmt.Errorf("Toptasks has no source collection namespaces")
}

latestManifest, err := dcp.specificManifestGetter(math.MaxUint64)
if err != nil {
return fmt.Errorf("Unable to get latest manifest %v", err)
}

var collectionIds []uint32
for _, sourceNs := range sourceCollectionNamespaces {
colId, err := latestManifest.GetCollectionId(sourceNs.ScopeName, sourceNs.CollectionName)
if err != nil {
dcp.Logger().Warnf("getHighSeqno could not find collection ID for %v", sourceNs.String())
} else {
collectionIds = append(collectionIds, colId)
}
}

if len(collectionIds) == 0 {
return fmt.Errorf("Unable to find any collection ID for getHighSeqno")
}

select {
case <-dcp.getHighSeqnoOneAtATime:
defer func() { dcp.getHighSeqnoOneAtATime <- true }()

addr, err := dcp.xdcr_topology_svc.MyMemcachedAddr()
if err != nil {
return err
}

client, _, err := dcp.utils.GetMemcachedConnectionWFeatures(addr, dcp.sourceBucketName, dcp.user_agent, base.KeepAlivePeriod, dcp.savedMcReqFeatures, dcp.Logger())
if err != nil {
return err
}
defer client.Close()

mccContext := &mcc.ClientContext{}
var vbSeqnoMap map[uint16]uint64

for _, vb := range dcp.vbnos {
dcp.vbHighSeqnoMap[vb].SetSeqno(0)
}

for _, collectionId := range collectionIds {
mccContext.CollId = collectionId
vbSeqnoMap, err = client.GetAllVbSeqnos(vbSeqnoMap, mccContext)
if err != nil {
return fmt.Errorf("Unable to GetAllVbSeqnos %v\n", err)
}

for _, vb := range dcp.vbnos {
highSeqno, exists := vbSeqnoMap[vb]
if !exists {
dcp.Logger().Warnf("DCP VBHighSeqno for colID %v did not return vb %v", collectionId, vb)
} else {
if dcp.vbHighSeqnoMap[vb].GetSeqno() < highSeqno {
dcp.vbHighSeqnoMap[vb].SetSeqno(highSeqno)
}
}
}
}
}
return nil
}
3 changes: 3 additions & 0 deletions replication_manager/msg_utils.go
Expand Up @@ -84,6 +84,7 @@ const (
CollectionsMappingKey = base.CollectionsMappingKey
CollectionsMirrorKey = base.CollectionsMirrorKey
CollectionsMigrateKey = base.CollectionsMigrateKey
CollectionsOsoKey = base.CollectionsOsoKey
CollectionsMappingRulesKey = base.CollectionsMappingRulesKey
MergeFunctionMappingKey = base.MergeFunctionMappingKey
ManualBackfillRequested = base.ManualBackfillKey
Expand Down Expand Up @@ -171,6 +172,7 @@ var RestKeyToSettingsKeyMap = map[string]string{
CollectionsMappingKey: metadata.CollectionsMgtMappingKey,
CollectionsMigrateKey: metadata.CollectionsMgtMigrateKey,
CollectionsMirrorKey: metadata.CollectionsMgtMirrorKey,
CollectionsOsoKey: metadata.CollectionsMgtOsoKey,
CollectionsMappingRulesKey: metadata.CollectionsMappingRulesKey,
MergeFunctionMappingKey: metadata.MergeFunctionMappingKey,
ManualBackfillRequested: metadata.CollectionsManualBackfillKey,
Expand Down Expand Up @@ -206,6 +208,7 @@ var SettingsKeyToRestKeyMap = map[string]string{
metadata.CollectionsMgtMappingKey: CollectionsMappingKey,
metadata.CollectionsMgtMigrateKey: CollectionsMigrateKey,
metadata.CollectionsMgtMirrorKey: CollectionsMirrorKey,
metadata.CollectionsMgtOsoKey: CollectionsOsoKey,
metadata.CollectionsMappingRulesKey: CollectionsMappingRulesKey,
metadata.MergeFunctionMappingKey: MergeFunctionMappingKey,
metadata.CollectionsManualBackfillKey: ManualBackfillRequested,
Expand Down
Expand Up @@ -54,7 +54,7 @@ declare -i ORIG_TARGET_MAN_PULL_INTERVAL

function runTestCase {
echo "============================================================================"
echo "Running basic explicit mapping test case - new explicit map replication"
echo "Running basic explicit mapping test case - new explicit map replication without OSO"
echo "============================================================================"
testForClusterRun
if (( $? != 0 ));then
Expand Down Expand Up @@ -121,8 +121,8 @@ function runTestCase {
# Backfill should have been raised
validateInternalLogWithInstance "C1" "$BACKFILL_MSG" $(( $currentBackfillInstanceCnt + 1 ))
validateInternalLogWithInstance "C1" "$VBTASKS_DONE_MSG" $(( $currentVBTasksDoneInstanceCnt + 1 ))
# 2 oso mode because one for each DCP nozzle
#validateInternalLogWithInstance "C1" "$OSO_MODE_MSG" $(( $currentOsoModeCnt + 2 ))
# oso mode is disabled
validateInternalLogWithInstance "C1" "$OSO_MODE_MSG" $(( $currentOsoModeCnt + 0 ))
grepForPanics
validateXDCRCheckpoints "C1"
validateXDCRCheckpoints "C2"
Expand Down

0 comments on commit c603b62

Please sign in to comment.