Skip to content

Commit

Permalink
Add histogram for wait timer and nowait collect
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta committed Jul 17, 2024
1 parent c6efec0 commit bd56772
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ type tsoDispatcher struct {

dispatcherID string

beforeHandleDurationHist *AutoDumpHistogram
beforeHandleDurationHist *AutoDumpHistogram
batchWaitTimerDuration *AutoDumpHistogram
batchNoWaitCollectDuration *AutoDumpHistogram
}

func newTSODispatcher(
Expand Down Expand Up @@ -129,7 +131,9 @@ func newTSODispatcher(

dispatcherID: id,

beforeHandleDurationHist: NewAutoDumpingHistogram("beforeHandleDurationHist-"+id, 2e-5, 2000, 1, time.Minute),
beforeHandleDurationHist: NewAutoDumpingHistogram("beforeHandleDurationHist-"+id, 2e-5, 2000, 1, time.Minute),
batchWaitTimerDuration: NewAutoDumpingHistogram("batchWaitTimerDurationHist-"+id, 2e-5, 2000, 1, time.Minute),
batchNoWaitCollectDuration: NewAutoDumpingHistogram("batchNoWaitCollectDurationHist-"+id, 2e-5, 2000, 1, time.Minute),
}
go td.watchTSDeadline()
return td
Expand Down Expand Up @@ -343,7 +347,8 @@ tsoBatchLoop:
latency := stream.EstimatedRoundTripLatency()
estimateTSOLatencyGauge.WithLabelValues(td.dispatcherID, streamURL).Set(latency.Seconds())
totalBatchTime := latency / time.Duration(concurrencyFactor)
remainingBatchTime := totalBatchTime - time.Since(currentBatchStartTime)
waitTimerStart := time.Now()
remainingBatchTime := totalBatchTime - waitTimerStart.Sub(currentBatchStartTime)
if remainingBatchTime > 0 {
if !batchingTimer.Stop() {
select {
Expand All @@ -366,8 +371,10 @@ tsoBatchLoop:
}
}
}
waitTimerEnd := time.Now()
td.batchWaitTimerDuration.Observe(waitTimerEnd.Sub(waitTimerStart).Seconds(), waitTimerEnd)

now := time.Now()
nowaitCollectStart := time.Now()
// Continue collecting as many as possible without blocking
nonWaitingBatchLoop:
for {
Expand All @@ -377,11 +384,13 @@ tsoBatchLoop:
zap.String("dc-location", dc))
return
case req := <-td.reqChan:
batchController.pushRequest(req, td.beforeHandleDurationHist, now)
batchController.pushRequest(req, td.beforeHandleDurationHist, nowaitCollectStart)
default:
break nonWaitingBatchLoop
}
}
nowaitCollectEnd := time.Now()
td.batchWaitTimerDuration.Observe(nowaitCollectEnd.Sub(nowaitCollectStart).Seconds(), nowaitCollectEnd)

//done := make(chan struct{})
//dl := newTSDeadline(option.timeout, done, cancel)
Expand Down

0 comments on commit bd56772

Please sign in to comment.