Skip to content

Commit

Permalink
inmem: reduce the interface to Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
dnephin committed Jun 14, 2021
1 parent 056fff3 commit 01db687
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 32 deletions.
32 changes: 6 additions & 26 deletions inmem_endpoint.go
Expand Up @@ -2,7 +2,6 @@ package metrics

import (
"context"
"encoding/json"
"fmt"
"net/http"
"sort"
Expand Down Expand Up @@ -136,38 +135,23 @@ func formatSamples(source map[string]SampledValue) []SampledValue {
return output
}

type Logger interface {
Warn(msg string, args ...interface{})
type Encoder interface {
Encode(interface{}) error
}

// Stream writes metrics to resp each time an interval ends. Runs until the
// request context is cancelled.
func (i *InmemSink) Stream(ctx context.Context, logger Logger, resp http.ResponseWriter) {
// Stream writes metrics using encoder.Encode each time an interval ends. Runs
// until the request context is cancelled, or the encoder returns an error.
// The caller is responsible for logging any errors from encoder.
func (i *InmemSink) Stream(ctx context.Context, encoder Encoder) {
interval := i.getInterval()

resp.WriteHeader(http.StatusOK)
flusher, ok := resp.(http.Flusher)
if ok {
// call Write with 0 bytes before a flush, so that GzipResponseWriter
// can write response headers.
resp.Write([]byte(""))
flusher.Flush()
} else {
flusher = noopFlusher{}
}

encoder := json.NewEncoder(resp)

for {
select {
case <-interval.done:
summary := newMetricSummaryFromInterval(interval)

if err := encoder.Encode(summary); err != nil {
logger.Warn("failed to encode metrics summary", "error", err)
return
}
flusher.Flush()

// update interval to the next one
interval = i.getInterval()
Expand All @@ -176,7 +160,3 @@ func (i *InmemSink) Stream(ctx context.Context, logger Logger, resp http.Respons
}
}
}

type noopFlusher struct{}

func (noopFlusher) Flush() {}
23 changes: 17 additions & 6 deletions inmem_endpoint_test.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
Expand Down Expand Up @@ -301,8 +302,11 @@ func TestInmemSink_Stream(t *testing.T) {
}()

resp := httptest.NewRecorder()
logger := stdoutLogger{}
inm.Stream(ctx, logger, resp)
enc := encoder{
encoder: json.NewEncoder(resp),
flusher: resp,
}
inm.Stream(ctx, enc)

<-chDone

Expand Down Expand Up @@ -330,9 +334,16 @@ func TestInmemSink_Stream(t *testing.T) {
}
}

type stdoutLogger struct{}
type encoder struct {
flusher http.Flusher
encoder *json.Encoder
}

func (stdoutLogger) Warn(msg string, args ...interface{}) {
fmt.Print(msg)
fmt.Println(args...)
func (e encoder) Encode(metrics interface{}) error {
if err := e.encoder.Encode(metrics); err != nil {
fmt.Println("failed to encode metrics summary", "error", err)
return err
}
e.flusher.Flush()
return nil
}

0 comments on commit 01db687

Please sign in to comment.