From 01db6876d8b120509a8d3bafc87bc361b600a94b Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 14 Jun 2021 12:16:58 -0400 Subject: [PATCH] inmem: reduce the interface to Stream --- inmem_endpoint.go | 32 ++++++-------------------------- inmem_endpoint_test.go | 23 +++++++++++++++++------ 2 files changed, 23 insertions(+), 32 deletions(-) diff --git a/inmem_endpoint.go b/inmem_endpoint.go index fb7b04e..24eefa9 100644 --- a/inmem_endpoint.go +++ b/inmem_endpoint.go @@ -2,7 +2,6 @@ package metrics import ( "context" - "encoding/json" "fmt" "net/http" "sort" @@ -136,38 +135,23 @@ func formatSamples(source map[string]SampledValue) []SampledValue { return output } -type Logger interface { - Warn(msg string, args ...interface{}) +type Encoder interface { + Encode(interface{}) error } -// Stream writes metrics to resp each time an interval ends. Runs until the -// request context is cancelled. -func (i *InmemSink) Stream(ctx context.Context, logger Logger, resp http.ResponseWriter) { +// Stream writes metrics using encoder.Encode each time an interval ends. Runs +// until the request context is cancelled, or the encoder returns an error. +// The caller is responsible for logging any errors from encoder. +func (i *InmemSink) Stream(ctx context.Context, encoder Encoder) { interval := i.getInterval() - resp.WriteHeader(http.StatusOK) - flusher, ok := resp.(http.Flusher) - if ok { - // call Write with 0 bytes before a flush, so that GzipResponseWriter - // can write response headers. - resp.Write([]byte("")) - flusher.Flush() - } else { - flusher = noopFlusher{} - } - - encoder := json.NewEncoder(resp) - for { select { case <-interval.done: summary := newMetricSummaryFromInterval(interval) - if err := encoder.Encode(summary); err != nil { - logger.Warn("failed to encode metrics summary", "error", err) return } - flusher.Flush() // update interval to the next one interval = i.getInterval() @@ -176,7 +160,3 @@ func (i *InmemSink) Stream(ctx context.Context, logger Logger, resp http.Respons } } } - -type noopFlusher struct{} - -func (noopFlusher) Flush() {} diff --git a/inmem_endpoint_test.go b/inmem_endpoint_test.go index 846ff30..baccd8b 100644 --- a/inmem_endpoint_test.go +++ b/inmem_endpoint_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "net/http" "net/http/httptest" "testing" "time" @@ -301,8 +302,11 @@ func TestInmemSink_Stream(t *testing.T) { }() resp := httptest.NewRecorder() - logger := stdoutLogger{} - inm.Stream(ctx, logger, resp) + enc := encoder{ + encoder: json.NewEncoder(resp), + flusher: resp, + } + inm.Stream(ctx, enc) <-chDone @@ -330,9 +334,16 @@ func TestInmemSink_Stream(t *testing.T) { } } -type stdoutLogger struct{} +type encoder struct { + flusher http.Flusher + encoder *json.Encoder +} -func (stdoutLogger) Warn(msg string, args ...interface{}) { - fmt.Print(msg) - fmt.Println(args...) +func (e encoder) Encode(metrics interface{}) error { + if err := e.encoder.Encode(metrics); err != nil { + fmt.Println("failed to encode metrics summary", "error", err) + return err + } + e.flusher.Flush() + return nil }