diff --git a/secondary/common/timestamp.go b/secondary/common/timestamp.go index 1c14f73ce..4a7592f9a 100644 --- a/secondary/common/timestamp.go +++ b/secondary/common/timestamp.go @@ -446,6 +446,8 @@ func (ts *TsVbuuid) Clone() *TsVbuuid { other.Crc64 = ts.Crc64 other.ScopeId = ts.ScopeId other.CollectionId = ts.CollectionId + other.SnapAligned = ts.SnapAligned + other.OpenOSOSnap = ts.OpenOSOSnap return other } diff --git a/secondary/indexer/scan_scatter.go b/secondary/indexer/scan_scatter.go index b733730b6..b2c25eebb 100644 --- a/secondary/indexer/scan_scatter.go +++ b/secondary/indexer/scan_scatter.go @@ -371,7 +371,7 @@ func statsSingleSlice(request *ScanRequest, ctx IndexReaderContext, snap SliceSn var err error var cnt uint64 - if request.Low.Bytes() == nil && request.Low.Bytes() == nil { + if request.Low.Bytes() == nil && request.High.Bytes() == nil { cnt, err = snap.Snapshot().StatCountTotal() } else { cnt, err = snap.Snapshot().CountRange(ctx, request.Low, request.High, request.Incl, stopch) diff --git a/secondary/indexer/stream_reader.go b/secondary/indexer/stream_reader.go index 3b08064c8..24f175c5f 100644 --- a/secondary/indexer/stream_reader.go +++ b/secondary/indexer/stream_reader.go @@ -476,6 +476,7 @@ func (r *mutationStreamReader) maybeSendSync(fastpath bool) bool { filterOSO.Snapshots[vb][0] == 1 { hwtOSOMap[keyspaceId].Seqnos[vb] = filterOSO.Seqnos[vb] hwtOSOMap[keyspaceId].Vbuuids[vb] = filterOSO.Vbuuids[vb] + hwtOSOMap[keyspaceId].Snapshots[vb][0] = filterOSO.Snapshots[vb][0] hwtOSOMap[keyspaceId].Snapshots[vb][1] = filterOSO.Snapshots[vb][1] hasOSO = true } @@ -1158,7 +1159,7 @@ func (w *streamWorker) updateOSOMarkerInFilter(meta *MutationMeta, eventType byt if filterOSO.Snapshots[meta.vbucket][0] == 1 && filterOSO.Snapshots[meta.vbucket][1] == 0 { logging.Errorf("MutationStreamReader::updateOSOMarkerInFilter %v %v "+ - "Received OSO Start For Vbucket %v without OSO End. Seqno %v. "+ + "Received OSO Start For Vbucket %v without previous OSO End. Seqno %v. "+ "Count %v. OSO Start %v. OSO End %v.", w.streamId, meta.keyspaceId, meta.vbucket, filterOSO.Seqnos[meta.vbucket], filterOSO.Vbuuids[meta.vbucket], filterOSO.Snapshots[meta.vbucket][0], @@ -1264,6 +1265,7 @@ func (w *streamWorker) updateSnapInFilter(meta *MutationMeta, } prevSnap := w.keyspaceIdPrevSnapMap[meta.keyspaceId] + prevSnap.Snapshots[meta.vbucket][0] = filterOSO.Snapshots[meta.vbucket][0] prevSnap.Snapshots[meta.vbucket][1] = filterOSO.Snapshots[meta.vbucket][1] prevSnap.Seqnos[meta.vbucket] = filterOSO.Seqnos[meta.vbucket] diff --git a/secondary/indexer/stream_state.go b/secondary/indexer/stream_state.go index 86f0db715..f17a71fc0 100644 --- a/secondary/indexer/stream_state.go +++ b/secondary/indexer/stream_state.go @@ -1278,18 +1278,24 @@ func (ss *StreamState) updateHWT(streamId common.StreamId, //update OSO bookkeeping if hwtOSO != nil { + tsOSO := ss.streamKeyspaceIdHWTOSO[streamId][keyspaceId] + //if mutation count has incremented if hwtOSO.Vbuuids[i] > tsOSO.Vbuuids[i] { tsOSO.Seqnos[i] = hwtOSO.Seqnos[i] //high seqno tsOSO.Vbuuids[i] = hwtOSO.Vbuuids[i] //Vbuuid stores count for OSO ss.streamKeyspaceIdNewTsReqdMap[streamId][keyspaceId] = true } + + //OSO Snap Start + tsOSO.Snapshots[i][0] = hwtOSO.Snapshots[i][0] + //OSO Snap End if hwtOSO.Snapshots[i][1] > tsOSO.Snapshots[i][1] { - tsOSO.Snapshots[i][1] = hwtOSO.Snapshots[i][1] ss.streamKeyspaceIdNewTsReqdMap[streamId][keyspaceId] = true } + tsOSO.Snapshots[i][1] = hwtOSO.Snapshots[i][1] } if seq > ts.Seqnos[i] { //if seqno has incremented, update it @@ -1421,14 +1427,16 @@ func (ss *StreamState) alignSnapBoundary(streamId common.StreamId, if enableOSO { //if ts has OSO snapshot, skip - if s[0] == 0 { + if tsElem.osoCount != nil && + tsElem.osoCount[i] != 0 && + s[0] == 1 { continue } //if lastSnap OSO, skip - if lastSnap.Snapshots[i][0] == 0 && + if lastSnap.Snapshots[i][0] == 1 && lastSnap.Snapshots[i][1] == 1 && - lastSnap.Seqnos[i] != 0 { + lastSnap.Seqnos[i] > lastSnap.Snapshots[i][1] { continue } } @@ -1535,9 +1543,14 @@ func (ss *StreamState) computeTsChangeVec(streamId common.StreamId, for i, s := range ts.Seqnos { //if OSO snapshot - if enableOSO && ts.Snapshots[i][0] == 0 && s != 0 { - //if lastFlushedTs has an incomplete oso snapshot - if lts.Snapshots[i][0] == 0 && + if enableOSO && + tsElem.osoCount != nil && + tsElem.osoCount[i] != 0 && + ts.Snapshots[i][0] == 1 && + s != 0 { + //if lastFlushedTs has an incomplete or no oso snapshot + if (lts.Snapshots[i][0] == 0 || + lts.Snapshots[i][0] == 1) && lts.Snapshots[i][1] == 0 { if tsElem.osoCount[i] > lts.Seqnos[i] { @@ -1555,6 +1568,10 @@ func (ss *StreamState) computeTsChangeVec(streamId common.StreamId, //update the snapshot, seqno to high seqno ts.Snapshots[i][0] = ts.Seqnos[i] ts.Snapshots[i][1] = ts.Seqnos[i] + + //count OSO End as change. Build is only + //considered done when all OSO End markers have arrived. + noChange = false } } else { //use seqno to mark complete snapshot @@ -1581,7 +1598,12 @@ func (ss *StreamState) computeTsChangeVec(streamId common.StreamId, //if this is the first ts, check seqno > 0 for i, s := range ts.Seqnos { //if OSO snapshot - if enableOSO && ts.Snapshots[i][0] == 0 && s != 0 { + if enableOSO && + tsElem.osoCount != nil && + tsElem.osoCount[i] != 0 && + ts.Snapshots[i][0] == 1 && + s != 0 { + changeVec[i] = true noChange = false countVec[i] = tsElem.osoCount[i] //count to be flushed @@ -1755,10 +1777,10 @@ func (ss *StreamState) setCountForOSOTs(streamId common.StreamId, for i, sn := range ts.Snapshots { //if vb got an OSO Snapshot - if hwtOSO.Snapshots[i][0] == 0 && + if hwtOSO.Snapshots[i][0] == 1 && hwtOSO.Seqnos[i] != 0 { //if there is a regular snapshot with mutations - if sn[0] != 0 && ts.Seqnos[i] != 0 { + if sn[1] != 0 && ts.Seqnos[i] != 0 { //use latest snapshot information already in ts } else { //NOTE if OSO has ended and the next snapshot marker has come in diff --git a/secondary/indexer/timekeeper.go b/secondary/indexer/timekeeper.go index f10ea75ea..f4eea3a42 100644 --- a/secondary/indexer/timekeeper.go +++ b/secondary/indexer/timekeeper.go @@ -2312,6 +2312,7 @@ func (tk *timekeeper) checkInitStreamReadyToMerge(streamId common.StreamId, if tsListMaint != nil { lenMaintTs = tsListMaint.Len() } + logging.Infof("Timekeeper::checkInitStreamReadyToMerge FlushTs Not Snapshot "+ "Aligned. Continue both streams for keyspaceId %v. INIT PendTsCount %v. "+ "MAINT PendTsCount %v.", keyspaceId, lenInitTs, lenMaintTs) @@ -2742,7 +2743,7 @@ func (tk *timekeeper) sendNewStabilityTS(tsElem *TsListElem, keyspaceId string, }) tk.mayBeMakeSnapAligned(streamId, keyspaceId, flushTs) - tk.ensureMonotonicTs(streamId, keyspaceId, flushTs) + tk.ensureMonotonicTs(streamId, keyspaceId, tsElem) var changeVec []bool var countVec []uint64 @@ -3013,7 +3014,9 @@ func (tk *timekeeper) mayBeMakeSnapAligned(streamId common.StreamId, } func (tk *timekeeper) ensureMonotonicTs(streamId common.StreamId, keyspaceId string, - flushTs *common.TsVbuuid) { + tsElem *TsListElem) { + + flushTs := tsElem.ts // Seqno should be monotonically increasing when it comes to mutation queue. // For pre-caution, if we detect a flushTS that is smaller than LastFlushTS, @@ -3024,12 +3027,11 @@ func (tk *timekeeper) ensureMonotonicTs(streamId common.StreamId, keyspaceId str for i, s := range flushTs.Seqnos { enableOSO := tk.ss.streamKeyspaceIdEnableOSO[streamId][keyspaceId] - if enableOSO { - //oso can be non-monotonic - if flushTs.Snapshots[i][0] == 0 && - s != 0 { - continue - } + //oso can be non-monotonic + if enableOSO && + tsElem.osoCount != nil && + tsElem.osoCount[i] != 0 { + continue } //if flushTs has a smaller seqno than lastFlushTs