Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Merge remote-tracking branch 'couchbase/unstable' into HEAD
http: //ci2i-unstable.northscale.in/gsi-08.01.2021-00.52.pass.html
Change-Id: Id5a5d9e5b968279d1d7d8be4f4693df6e3dfdcbc
  • Loading branch information
jeelanp2003 committed Jan 8, 2021
2 parents 7fd56d3 + f78a05e commit 332c377
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 20 deletions.
2 changes: 2 additions & 0 deletions secondary/common/timestamp.go
Expand Up @@ -446,6 +446,8 @@ func (ts *TsVbuuid) Clone() *TsVbuuid {
other.Crc64 = ts.Crc64
other.ScopeId = ts.ScopeId
other.CollectionId = ts.CollectionId
other.SnapAligned = ts.SnapAligned
other.OpenOSOSnap = ts.OpenOSOSnap

return other
}
Expand Down
2 changes: 1 addition & 1 deletion secondary/indexer/scan_scatter.go
Expand Up @@ -371,7 +371,7 @@ func statsSingleSlice(request *ScanRequest, ctx IndexReaderContext, snap SliceSn
var err error
var cnt uint64

if request.Low.Bytes() == nil && request.Low.Bytes() == nil {
if request.Low.Bytes() == nil && request.High.Bytes() == nil {
cnt, err = snap.Snapshot().StatCountTotal()
} else {
cnt, err = snap.Snapshot().CountRange(ctx, request.Low, request.High, request.Incl, stopch)
Expand Down
4 changes: 3 additions & 1 deletion secondary/indexer/stream_reader.go
Expand Up @@ -476,6 +476,7 @@ func (r *mutationStreamReader) maybeSendSync(fastpath bool) bool {
filterOSO.Snapshots[vb][0] == 1 {
hwtOSOMap[keyspaceId].Seqnos[vb] = filterOSO.Seqnos[vb]
hwtOSOMap[keyspaceId].Vbuuids[vb] = filterOSO.Vbuuids[vb]
hwtOSOMap[keyspaceId].Snapshots[vb][0] = filterOSO.Snapshots[vb][0]
hwtOSOMap[keyspaceId].Snapshots[vb][1] = filterOSO.Snapshots[vb][1]
hasOSO = true
}
Expand Down Expand Up @@ -1158,7 +1159,7 @@ func (w *streamWorker) updateOSOMarkerInFilter(meta *MutationMeta, eventType byt
if filterOSO.Snapshots[meta.vbucket][0] == 1 &&
filterOSO.Snapshots[meta.vbucket][1] == 0 {
logging.Errorf("MutationStreamReader::updateOSOMarkerInFilter %v %v "+
"Received OSO Start For Vbucket %v without OSO End. Seqno %v. "+
"Received OSO Start For Vbucket %v without previous OSO End. Seqno %v. "+
"Count %v. OSO Start %v. OSO End %v.", w.streamId,
meta.keyspaceId, meta.vbucket, filterOSO.Seqnos[meta.vbucket],
filterOSO.Vbuuids[meta.vbucket], filterOSO.Snapshots[meta.vbucket][0],
Expand Down Expand Up @@ -1264,6 +1265,7 @@ func (w *streamWorker) updateSnapInFilter(meta *MutationMeta,
}

prevSnap := w.keyspaceIdPrevSnapMap[meta.keyspaceId]
prevSnap.Snapshots[meta.vbucket][0] = filterOSO.Snapshots[meta.vbucket][0]
prevSnap.Snapshots[meta.vbucket][1] = filterOSO.Snapshots[meta.vbucket][1]
prevSnap.Seqnos[meta.vbucket] = filterOSO.Seqnos[meta.vbucket]

Expand Down
42 changes: 32 additions & 10 deletions secondary/indexer/stream_state.go
Expand Up @@ -1278,18 +1278,24 @@ func (ss *StreamState) updateHWT(streamId common.StreamId,

//update OSO bookkeeping
if hwtOSO != nil {

tsOSO := ss.streamKeyspaceIdHWTOSO[streamId][keyspaceId]

//if mutation count has incremented
if hwtOSO.Vbuuids[i] > tsOSO.Vbuuids[i] {
tsOSO.Seqnos[i] = hwtOSO.Seqnos[i] //high seqno
tsOSO.Vbuuids[i] = hwtOSO.Vbuuids[i] //Vbuuid stores count for OSO
ss.streamKeyspaceIdNewTsReqdMap[streamId][keyspaceId] = true
}

//OSO Snap Start
tsOSO.Snapshots[i][0] = hwtOSO.Snapshots[i][0]

//OSO Snap End
if hwtOSO.Snapshots[i][1] > tsOSO.Snapshots[i][1] {
tsOSO.Snapshots[i][1] = hwtOSO.Snapshots[i][1]
ss.streamKeyspaceIdNewTsReqdMap[streamId][keyspaceId] = true
}
tsOSO.Snapshots[i][1] = hwtOSO.Snapshots[i][1]
}

if seq > ts.Seqnos[i] { //if seqno has incremented, update it
Expand Down Expand Up @@ -1421,14 +1427,16 @@ func (ss *StreamState) alignSnapBoundary(streamId common.StreamId,
if enableOSO {

//if ts has OSO snapshot, skip
if s[0] == 0 {
if tsElem.osoCount != nil &&
tsElem.osoCount[i] != 0 &&
s[0] == 1 {
continue
}

//if lastSnap OSO, skip
if lastSnap.Snapshots[i][0] == 0 &&
if lastSnap.Snapshots[i][0] == 1 &&
lastSnap.Snapshots[i][1] == 1 &&
lastSnap.Seqnos[i] != 0 {
lastSnap.Seqnos[i] > lastSnap.Snapshots[i][1] {
continue
}
}
Expand Down Expand Up @@ -1535,9 +1543,14 @@ func (ss *StreamState) computeTsChangeVec(streamId common.StreamId,
for i, s := range ts.Seqnos {

//if OSO snapshot
if enableOSO && ts.Snapshots[i][0] == 0 && s != 0 {
//if lastFlushedTs has an incomplete oso snapshot
if lts.Snapshots[i][0] == 0 &&
if enableOSO &&
tsElem.osoCount != nil &&
tsElem.osoCount[i] != 0 &&
ts.Snapshots[i][0] == 1 &&
s != 0 {
//if lastFlushedTs has an incomplete or no oso snapshot
if (lts.Snapshots[i][0] == 0 ||
lts.Snapshots[i][0] == 1) &&
lts.Snapshots[i][1] == 0 {

if tsElem.osoCount[i] > lts.Seqnos[i] {
Expand All @@ -1555,6 +1568,10 @@ func (ss *StreamState) computeTsChangeVec(streamId common.StreamId,
//update the snapshot, seqno to high seqno
ts.Snapshots[i][0] = ts.Seqnos[i]
ts.Snapshots[i][1] = ts.Seqnos[i]

//count OSO End as change. Build is only
//considered done when all OSO End markers have arrived.
noChange = false
}
} else {
//use seqno to mark complete snapshot
Expand All @@ -1581,7 +1598,12 @@ func (ss *StreamState) computeTsChangeVec(streamId common.StreamId,
//if this is the first ts, check seqno > 0
for i, s := range ts.Seqnos {
//if OSO snapshot
if enableOSO && ts.Snapshots[i][0] == 0 && s != 0 {
if enableOSO &&
tsElem.osoCount != nil &&
tsElem.osoCount[i] != 0 &&
ts.Snapshots[i][0] == 1 &&
s != 0 {

changeVec[i] = true
noChange = false
countVec[i] = tsElem.osoCount[i] //count to be flushed
Expand Down Expand Up @@ -1755,10 +1777,10 @@ func (ss *StreamState) setCountForOSOTs(streamId common.StreamId,
for i, sn := range ts.Snapshots {

//if vb got an OSO Snapshot
if hwtOSO.Snapshots[i][0] == 0 &&
if hwtOSO.Snapshots[i][0] == 1 &&
hwtOSO.Seqnos[i] != 0 {
//if there is a regular snapshot with mutations
if sn[0] != 0 && ts.Seqnos[i] != 0 {
if sn[1] != 0 && ts.Seqnos[i] != 0 {
//use latest snapshot information already in ts
} else {
//NOTE if OSO has ended and the next snapshot marker has come in
Expand Down
18 changes: 10 additions & 8 deletions secondary/indexer/timekeeper.go
Expand Up @@ -2312,6 +2312,7 @@ func (tk *timekeeper) checkInitStreamReadyToMerge(streamId common.StreamId,
if tsListMaint != nil {
lenMaintTs = tsListMaint.Len()
}

logging.Infof("Timekeeper::checkInitStreamReadyToMerge FlushTs Not Snapshot "+
"Aligned. Continue both streams for keyspaceId %v. INIT PendTsCount %v. "+
"MAINT PendTsCount %v.", keyspaceId, lenInitTs, lenMaintTs)
Expand Down Expand Up @@ -2742,7 +2743,7 @@ func (tk *timekeeper) sendNewStabilityTS(tsElem *TsListElem, keyspaceId string,
})

tk.mayBeMakeSnapAligned(streamId, keyspaceId, flushTs)
tk.ensureMonotonicTs(streamId, keyspaceId, flushTs)
tk.ensureMonotonicTs(streamId, keyspaceId, tsElem)

var changeVec []bool
var countVec []uint64
Expand Down Expand Up @@ -3013,7 +3014,9 @@ func (tk *timekeeper) mayBeMakeSnapAligned(streamId common.StreamId,
}

func (tk *timekeeper) ensureMonotonicTs(streamId common.StreamId, keyspaceId string,
flushTs *common.TsVbuuid) {
tsElem *TsListElem) {

flushTs := tsElem.ts

// Seqno should be monotonically increasing when it comes to mutation queue.
// For pre-caution, if we detect a flushTS that is smaller than LastFlushTS,
Expand All @@ -3024,12 +3027,11 @@ func (tk *timekeeper) ensureMonotonicTs(streamId common.StreamId, keyspaceId str
for i, s := range flushTs.Seqnos {

enableOSO := tk.ss.streamKeyspaceIdEnableOSO[streamId][keyspaceId]
if enableOSO {
//oso can be non-monotonic
if flushTs.Snapshots[i][0] == 0 &&
s != 0 {
continue
}
//oso can be non-monotonic
if enableOSO &&
tsElem.osoCount != nil &&
tsElem.osoCount[i] != 0 {
continue
}

//if flushTs has a smaller seqno than lastFlushTs
Expand Down

0 comments on commit 332c377

Please sign in to comment.