Skip to content

Commit

Permalink
Live: check schema equality between push messages (#34548)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryantxu committed May 27, 2021
1 parent c59a2e1 commit 67028af
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 101 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/gosimple/slug v1.9.0
github.com/grafana/grafana-aws-sdk v0.4.0
github.com/grafana/grafana-live-sdk v0.0.6
github.com/grafana/grafana-plugin-sdk-go v0.101.0
github.com/grafana/grafana-plugin-sdk-go v0.102.0
github.com/grafana/loki v1.6.2-0.20210520072447-15d417efe103
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/hashicorp/go-hclog v0.16.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -920,8 +920,8 @@ github.com/grafana/grafana-live-sdk v0.0.6 h1:P1QFn0ZradOJp3zVpfG0STZMP+pgZrW0e0
github.com/grafana/grafana-live-sdk v0.0.6/go.mod h1:f15hHmWyLdFjmuWLsjeKeZnq/HnNQ3QkoPcaEww45AY=
github.com/grafana/grafana-plugin-sdk-go v0.79.0/go.mod h1:NvxLzGkVhnoBKwzkst6CFfpMFKwAdIUZ1q8ssuLeF60=
github.com/grafana/grafana-plugin-sdk-go v0.91.0/go.mod h1:Ot3k7nY7P6DXmUsDgKvNB7oG1v7PRyTdmnYVoS554bU=
github.com/grafana/grafana-plugin-sdk-go v0.101.0 h1:QyXMkgwZXUX9EQjLv5S5uDcvYjwsntqFV/dCC49Fn+w=
github.com/grafana/grafana-plugin-sdk-go v0.101.0/go.mod h1:D7x3ah+1d4phNXpbnOaxa/osSaZlwh9/ZUnGGzegRbk=
github.com/grafana/grafana-plugin-sdk-go v0.102.0 h1:Pknh7mlOaJvdhPgKHxcimDOSd9h29eSpA34W0/sOF6c=
github.com/grafana/grafana-plugin-sdk-go v0.102.0/go.mod h1:D7x3ah+1d4phNXpbnOaxa/osSaZlwh9/ZUnGGzegRbk=
github.com/grafana/loki v1.6.2-0.20210520072447-15d417efe103 h1:qCmofFVwQR9QnsinstVqI1NPLMVl33jNCnOCXEAVn6E=
github.com/grafana/loki v1.6.2-0.20210520072447-15d417efe103/go.mod h1:GHIsn+EohCChsdu5YouNZewqLeV9L2FNw4DEJU3P9qE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
Expand Down
27 changes: 15 additions & 12 deletions pkg/services/live/live.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package live

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -621,22 +622,24 @@ func (g *GrafanaLive) HandleListHTTP(c *models.ReqContext) response.Response {
}

// Hardcode sample streams
frame := data.NewFrame("testdata",
frameJSON, err := data.FrameToJSON(data.NewFrame("testdata",
data.NewField("Time", nil, make([]time.Time, 0)),
data.NewField("Value", nil, make([]float64, 0)),
data.NewField("Min", nil, make([]float64, 0)),
data.NewField("Max", nil, make([]float64, 0)),
)
channels = append(channels, util.DynMap{
"channel": "plugin/testdata/random-2s-stream",
"data": frame,
}, util.DynMap{
"channel": "plugin/testdata/random-flakey-stream",
"data": frame,
}, util.DynMap{
"channel": "plugin/testdata/random-20Hz-stream",
"data": frame,
})
), data.IncludeSchemaOnly)
if err == nil {
channels = append(channels, util.DynMap{
"channel": "plugin/testdata/random-2s-stream",
"data": json.RawMessage(frameJSON),
}, util.DynMap{
"channel": "plugin/testdata/random-flakey-stream",
"data": json.RawMessage(frameJSON),
}, util.DynMap{
"channel": "plugin/testdata/random-20Hz-stream",
"data": json.RawMessage(frameJSON),
})
}

info["channels"] = channels
return response.JSONStreaming(200, info)
Expand Down
72 changes: 28 additions & 44 deletions pkg/services/live/managedstream/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type ManagedStream struct {
mu sync.RWMutex
id string
start time.Time
last map[int64]map[string]json.RawMessage
last map[int64]map[string]data.FrameJSONCache
publisher models.ChannelPublisher
}

Expand All @@ -78,7 +78,7 @@ func NewManagedStream(id string, publisher models.ChannelPublisher) *ManagedStre
return &ManagedStream{
id: id,
start: time.Now(),
last: map[int64]map[string]json.RawMessage{},
last: map[int64]map[string]data.FrameJSONCache{},
publisher: publisher,
}
}
Expand All @@ -96,72 +96,56 @@ func (s *ManagedStream) ListChannels(orgID int64, prefix string) []util.DynMap {
for k, v := range s.last[orgID] {
ch := util.DynMap{}
ch["channel"] = prefix + k
ch["data"] = v
ch["data"] = json.RawMessage(v.Bytes(data.IncludeSchemaOnly))
info = append(info, ch)
}
return info
}

// Push sends frame to the stream and saves it for later retrieval by subscribers.
// unstableSchema flag can be set to disable schema caching for a path.
func (s *ManagedStream) Push(orgID int64, path string, frame *data.Frame, unstableSchema bool) error {
func (s *ManagedStream) Push(orgID int64, path string, frame *data.Frame) error {
// Keep schema + data for last packet.
frameJSONWrapper, err := data.FrameToJSON(frame)
msg, err := data.FrameToJSONCache(frame)
if err != nil {
logger.Error("Error marshaling frame with Schema", "error", err)
logger.Error("Error marshaling frame with data", "error", err)
return err
}
frameJSON := frameJSONWrapper.Bytes(data.IncludeAll)

if !unstableSchema {
// If schema is stable we can safely cache it, and only send values if
// stream already has schema cached.
s.mu.Lock()
if _, ok := s.last[orgID]; !ok {
s.last[orgID] = map[string]json.RawMessage{}
}
_, exists := s.last[orgID][path]
s.last[orgID][path] = frameJSON
s.mu.Unlock()

// When the packet already exits, only send the data.
// TODO: maybe a good idea would be MarshalJSON function of
// frame to keep Schema JSON and Values JSON in frame object
// to avoid encoding twice.
if exists {
frameJSONWrapper, err = data.FrameToJSON(frame)
if err != nil {
logger.Error("Error marshaling Frame to JSON", "error", err)
return err
}
frameJSON = frameJSONWrapper.Bytes(data.IncludeDataOnly)
}
} else {
// For unstable schema we always need to send everything to a connection.
// And we don't want to cache schema for unstable case. But we still need to
// set path to a map to make stream visible in UI stream select widget.
s.mu.Lock()
if _, ok := s.last[orgID]; ok {
s.last[orgID][path] = nil
}
s.mu.Unlock()

s.mu.Lock()
if _, ok := s.last[orgID]; !ok {
s.last[orgID] = map[string]data.FrameJSONCache{}
}
last, exists := s.last[orgID][path]
s.last[orgID][path] = msg
s.mu.Unlock()

include := data.IncludeAll
if exists && last.SameSchema(&msg) {
// When the schema has not changed, just send the data.
include = data.IncludeDataOnly
}
frameJSON := msg.Bytes(include)

// The channel this will be posted into.
channel := live.Channel{Scope: live.ScopeStream, Namespace: s.id, Path: path}.String()
logger.Debug("Publish data to channel", "channel", channel, "dataLength", len(frameJSON))
return s.publisher(orgID, channel, frameJSON)
}

// getLastPacket retrieves schema for a channel.
// getLastPacket retrieves last packet channel.
func (s *ManagedStream) getLastPacket(orgId int64, path string) (json.RawMessage, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
_, ok := s.last[orgId]
if !ok {
return nil, false
}
schema, ok := s.last[orgId][path]
return schema, ok && schema != nil
msg, ok := s.last[orgId][path]
if ok {
return msg.Bytes(data.IncludeAll), ok
}
return nil, ok
}

func (s *ManagedStream) GetHandlerForPath(_ string) (models.ChannelHandler, error) {
Expand All @@ -184,7 +168,7 @@ func (s *ManagedStream) OnPublish(_ context.Context, u *models.SignedInUser, evt
// Stream scope only deals with data frames.
return models.PublishReply{}, 0, err
}
err = s.Push(u.OrgId, evt.Path, &frame, true)
err = s.Push(u.OrgId, evt.Path, &frame)
if err != nil {
// Stream scope only deals with data frames.
return models.PublishReply{}, 0, err
Expand Down
18 changes: 2 additions & 16 deletions pkg/services/live/managedstream/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,17 @@ func TestNewManagedStream(t *testing.T) {
require.NotNil(t, c)
}

func TestManagedStream_GetLastPacket_UnstableSchema(t *testing.T) {
var orgID int64 = 1
publisher := &testPublisher{orgID: orgID, t: t}
c := NewManagedStream("a", publisher.publish)
_, ok := c.getLastPacket(orgID, "test")
require.False(t, ok)
err := c.Push(orgID, "test", data.NewFrame("hello"), true)
require.NoError(t, err)

_, ok = c.getLastPacket(orgID, "test")
require.NoError(t, err)
require.False(t, ok)
}

func TestManagedStream_GetLastPacket(t *testing.T) {
var orgID int64 = 1
publisher := &testPublisher{orgID: orgID, t: t}
c := NewManagedStream("a", publisher.publish)
_, ok := c.getLastPacket(orgID, "test")
require.False(t, ok)
err := c.Push(orgID, "test", data.NewFrame("hello"), false)
err := c.Push(orgID, "test", data.NewFrame("hello"))
require.NoError(t, err)

s, ok := c.getLastPacket(orgID, "test")
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, `{"schema":{"name":"hello","fields":[]},"data":{"values":[]}}`, string(s))
require.JSONEq(t, `{"schema":{"name":"hello","fields":[]},"data":{"values":[]}}`, string(s))
}
4 changes: 1 addition & 3 deletions pkg/services/live/pushhttp/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func (g *Gateway) Handle(ctx *models.ReqContext) {
// TODO Grafana 8: decide which formats to use or keep all.
urlValues := ctx.Req.URL.Query()
frameFormat := pushurl.FrameFormatFromValues(urlValues)
unstableSchema := pushurl.UnstableSchemaFromValues(urlValues)

body, err := ctx.Req.Body().Bytes()
if err != nil {
Expand All @@ -69,7 +68,6 @@ func (g *Gateway) Handle(ctx *models.ReqContext) {
"protocol", "http",
"streamId", streamID,
"bodyLength", len(body),
"unstableSchema", unstableSchema,
"frameFormat", frameFormat,
)

Expand All @@ -88,7 +86,7 @@ func (g *Gateway) Handle(ctx *models.ReqContext) {
// interval = "1s" vs flush_interval = "5s"

for _, mf := range metricFrames {
err := stream.Push(ctx.SignedInUser.OrgId, mf.Key(), mf.Frame(), unstableSchema)
err := stream.Push(ctx.SignedInUser.OrgId, mf.Key(), mf.Frame())
if err != nil {
ctx.Resp.WriteHeader(http.StatusInternalServerError)
return
Expand Down
8 changes: 1 addition & 7 deletions pkg/services/live/pushurl/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,9 @@ import (
)

const (
unstableSchemaParam = "gf_live_unstable_schema"
frameFormatParam = "gf_live_frame_format"
frameFormatParam = "gf_live_frame_format"
)

// UnstableSchemaFromValues extracts unstable schema tip from url values.
func UnstableSchemaFromValues(values url.Values) bool {
return strings.ToLower(values.Get(unstableSchemaParam)) == "true" || values.Get(unstableSchemaParam) == "1"
}

// FrameFormatFromValues extracts frame format tip from url values.
func FrameFormatFromValues(values url.Values) string {
frameFormat := strings.ToLower(values.Get(frameFormatParam))
Expand Down
11 changes: 0 additions & 11 deletions pkg/services/live/pushurl/values_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,6 @@ import (
"github.com/stretchr/testify/require"
)

func TestUnstableSchemaFromValues(t *testing.T) {
values := url.Values{}
require.False(t, UnstableSchemaFromValues(values))
values.Set(unstableSchemaParam, "yes")
require.False(t, UnstableSchemaFromValues(values))
values.Set(unstableSchemaParam, "true")
require.True(t, UnstableSchemaFromValues(values))
values.Set(unstableSchemaParam, "True")
require.True(t, UnstableSchemaFromValues(values))
}

func TestFrameFormatFromValues(t *testing.T) {
values := url.Values{}
require.Equal(t, "labels_column", FrameFormatFromValues(values))
Expand Down
4 changes: 1 addition & 3 deletions pkg/services/live/pushws/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,11 @@ func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
// TODO Grafana 8: decide which formats to use or keep all.
urlValues := r.URL.Query()
frameFormat := pushurl.FrameFormatFromValues(urlValues)
unstableSchema := pushurl.UnstableSchemaFromValues(urlValues)

logger.Debug("Live Push request",
"protocol", "http",
"streamId", streamID,
"bodyLength", len(body),
"unstableSchema", unstableSchema,
"frameFormat", frameFormat,
)

Expand All @@ -191,7 +189,7 @@ func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
}

for _, mf := range metricFrames {
err := stream.Push(user.OrgId, mf.Key(), mf.Frame(), unstableSchema)
err := stream.Push(user.OrgId, mf.Key(), mf.Frame())
if err != nil {
return
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/tsdb/testdatasource/csv_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ func TestReadCSV(t *testing.T) {
require.NoError(t, err)

frame := data.NewFrame("", fBool, fBool2, fNum, fStr)
frameToJSON, err := data.FrameToJSON(frame)
out, err := data.FrameToJSON(frame, data.IncludeAll)
require.NoError(t, err)
out := frameToJSON.Bytes(data.IncludeAll)

require.JSONEq(t, `{"schema":{
"fields":[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ export class QueryEditor extends PureComponent<Props, State> {
const { onChange, query, onRunQuery } = this.props;
onChange({ ...query, queryType: sel.value! });
onRunQuery();

// Reload the channel list
this.loadChannelInfo();
};

onChannelChange = (sel: SelectableValue<string>) => {
Expand Down

0 comments on commit 67028af

Please sign in to comment.