Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Commit

Permalink
Change implementation of streamer
Browse files Browse the repository at this point in the history
The previous streamer assumed pushing down a channel was immediate,
which may not be correct. This streamer uses a simulated clock to push
items as soon as they should have been sent. If the channel blocks for a
moment, the streamer will push a succession of items until the timestamp
catches up.
  • Loading branch information
lawrencejones committed Apr 6, 2019
1 parent 5fb8d72 commit b788c70
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions pkg/pgreplay/streamer.go
Expand Up @@ -57,19 +57,21 @@ func (s Streamer) Stream(items chan Item, rate float64) (chan Item, error) {
out := make(chan Item)

go func() {
var lastSeen time.Time
var first, start time.Time
var seenItem bool

for item := range s.Filter(items) {
if !seenItem {
lastSeen = item.GetTimestamp()
first = item.GetTimestamp()
start = time.Now()
seenItem = true
}

if diff := item.GetTimestamp().Sub(lastSeen); diff > 0 {
elapsedSinceStart := time.Duration(rate) * time.Now().Sub(start)
elapsedSinceFirst := item.GetTimestamp().Sub(first)

if diff := elapsedSinceFirst - elapsedSinceStart; diff > 0 {
time.Sleep(time.Duration(float64(diff) / rate))
lastSeen = item.GetTimestamp()
ItemsLastStreamedTimestamp.Set(float64(lastSeen.Unix()))
}

out <- item
Expand Down

0 comments on commit b788c70

Please sign in to comment.