diff --git a/packages/grafana-data/src/types/live.ts b/packages/grafana-data/src/types/live.ts index aef0cb8b95b3..6dfccd6fa864 100644 --- a/packages/grafana-data/src/types/live.ts +++ b/packages/grafana-data/src/types/live.ts @@ -36,7 +36,7 @@ export interface LiveChannelConfig { /** * The channel keeps track of who else is connected to the same channel */ - hasPresense?: boolean; + hasPresence?: boolean; /** * This method will be defined if it is possible to publish in this channel. @@ -61,10 +61,19 @@ export enum LiveChannelConnectionState { Invalid = 'invalid', } +export enum LiveChannelEventType { + Status = 'status', + Join = 'join', + Leave = 'leave', + Message = 'message', +} + /** * @experimental */ -export interface LiveChannelStatus { +export interface LiveChannelStatusEvent { + type: LiveChannelEventType.Status; + /** * {scope}/{namespace}/{path} */ @@ -85,28 +94,53 @@ export interface LiveChannelStatus { /** * The last error. * - * This will remain in the status until a new message is succesfully recieved from the channel + * This will remain in the status until a new message is succesfully received from the channel */ error?: any; } -/** - * @experimental - */ -export interface LiveChannelJoinLeave { - user: any; +export interface LiveChannelJoinEvent { + type: LiveChannelEventType.Join; + user: any; // @experimental -- will be filled in when we improve the UI +} + +export interface LiveChannelLeaveEvent { + type: LiveChannelEventType.Leave; + user: any; // @experimental -- will be filled in when we improve the UI +} + +export interface LiveChannelMessageEvent { + type: LiveChannelEventType.Message; + message: T; +} + +export type LiveChannelEvent = + | LiveChannelStatusEvent + | LiveChannelJoinEvent + | LiveChannelLeaveEvent + | LiveChannelMessageEvent; + +export function isLiveChannelStatusEvent(evt: LiveChannelEvent): evt is LiveChannelStatusEvent { + return evt.type === LiveChannelEventType.Status; +} + +export function isLiveChannelJoinEvent(evt: LiveChannelEvent): evt is LiveChannelJoinEvent { + return evt.type === LiveChannelEventType.Join; +} + +export function isLiveChannelLeaveEvent(evt: LiveChannelEvent): evt is LiveChannelLeaveEvent { + return evt.type === LiveChannelEventType.Leave; +} + +export function isLiveChannelMessageEvent(evt: LiveChannelEvent): evt is LiveChannelMessageEvent { + return evt.type === LiveChannelEventType.Message; } /** * @experimental */ -export interface LiveChannelPresense { - users: any; -} - -export interface LiveChannelMessage { - type: 'status' | 'message' | 'join' | 'leave'; - message: TMessage | LiveChannelStatus | LiveChannelJoinLeave; +export interface LiveChannelPresenceStatus { + users: any; // @experimental -- will be filled in when we improve the UI } /** @@ -134,14 +168,14 @@ export interface LiveChannel { /** * Watch all events in this channel */ - getStream: () => Observable>; + getStream: () => Observable>; /** - * For channels that support presense, this will request the current state from the server. + * For channels that support presence, this will request the current state from the server. * * Join and leave messages will be sent to the open stream */ - getPresense?: () => Promise; + getPresence?: () => Promise; /** * Write a message into the channel diff --git a/pkg/api/dashboard.go b/pkg/api/dashboard.go index 65e58545e724..554a2549b7a8 100644 --- a/pkg/api/dashboard.go +++ b/pkg/api/dashboard.go @@ -262,6 +262,17 @@ func (hs *HTTPServer) PostDashboard(c *models.ReqContext, cmd models.SaveDashboa } } + // Tell everyone listening that the dashboard changed + if hs.Live != nil { + err := hs.Live.GrafanaScope.Dashboards.DashboardSaved( + dashboard.Uid, + c.UserId, + ) + if err != nil { + hs.log.Warn("unable to broadcast save event", "uid", dashboard.Uid, "error", err) + } + } + c.TimeRequest(metrics.MApiDashboardSave) return JSON(200, util.DynMap{ "status": "success", diff --git a/pkg/api/http_server.go b/pkg/api/http_server.go index 237467b52bc9..b7c6665672f2 100644 --- a/pkg/api/http_server.go +++ b/pkg/api/http_server.go @@ -79,15 +79,11 @@ func (hs *HTTPServer) Init() error { // Set up a websocket broker if hs.Cfg.IsLiveEnabled() { // feature flag - node, err := live.InitalizeBroker() + node, err := live.InitializeBroker() if err != nil { return err } hs.Live = node - - // Spit random walk to example - go live.RunRandomCSV(hs.Live, "grafana/testdata/random-2s-stream", 2000, 0) - go live.RunRandomCSV(hs.Live, "grafana/testdata/random-flakey-stream", 400, .6) } hs.macaron = hs.newMacaron() diff --git a/pkg/models/live.go b/pkg/models/live.go new file mode 100644 index 000000000000..9042968b56ed --- /dev/null +++ b/pkg/models/live.go @@ -0,0 +1,30 @@ +package models + +import "github.com/centrifugal/centrifuge" + +// ChannelPublisher writes data into a channel +type ChannelPublisher func(channel string, data []byte) error + +// ChannelHandler defines the core channel behavior +type ChannelHandler interface { + // This is called fast and often -- it must be synchrnozed + GetChannelOptions(id string) centrifuge.ChannelOptions + + // Called when a client wants to subscribe to a channel + OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error + + // Called when something writes into the channel. The returned value will be broadcast if len() > 0 + OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) +} + +// ChannelHandlerProvider -- this should be implemented by any core feature +type ChannelHandlerProvider interface { + // This is called fast and often -- it must be synchrnozed + GetHandlerForPath(path string) (ChannelHandler, error) +} + +// DashboardActivityChannel is a service to advertise dashboard activity +type DashboardActivityChannel interface { + DashboardSaved(uid string, userID int64) error + DashboardDeleted(uid string, userID int64) error +} diff --git a/pkg/services/live/channel.go b/pkg/services/live/channel.go new file mode 100644 index 000000000000..c901438e456e --- /dev/null +++ b/pkg/services/live/channel.go @@ -0,0 +1,27 @@ +package live + +import ( + "fmt" + "strings" +) + +// ChannelIdentifier is the channel id split by parts +type ChannelIdentifier struct { + Scope string // grafana, ds, or plugin + Namespace string // feature, id, or name + Path string // path within the channel handler +} + +// ParseChannelIdentifier parses the parts from a channel id: +// ${scope} / ${namespace} / ${path} +func ParseChannelIdentifier(id string) (ChannelIdentifier, error) { + parts := strings.SplitN(id, "/", 3) + if len(parts) == 3 { + return ChannelIdentifier{ + Scope: parts[0], + Namespace: parts[1], + Path: parts[2], + }, nil + } + return ChannelIdentifier{}, fmt.Errorf("Invalid channel id: %s", id) +} diff --git a/pkg/services/live/channel_test.go b/pkg/services/live/channel_test.go new file mode 100644 index 000000000000..7d6600ddd828 --- /dev/null +++ b/pkg/services/live/channel_test.go @@ -0,0 +1,30 @@ +package live + +import ( + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestParseChannelIdentifier(t *testing.T) { + ident, err := ParseChannelIdentifier("aaa/bbb/ccc/ddd") + if err != nil { + t.FailNow() + } + + ex := ChannelIdentifier{ + Scope: "aaa", + Namespace: "bbb", + Path: "ccc/ddd", + } + + if diff := cmp.Diff(ident, ex); diff != "" { + t.Fatalf("Result mismatch (-want +got):\n%s", diff) + } + + // Check an invalid identifier + _, err = ParseChannelIdentifier("aaa/bbb") + if err == nil { + t.FailNow() + } +} diff --git a/pkg/services/live/channels.go b/pkg/services/live/channels.go deleted file mode 100644 index 80a9eebcd134..000000000000 --- a/pkg/services/live/channels.go +++ /dev/null @@ -1,54 +0,0 @@ -package live - -import ( - "encoding/json" - "math/rand" - "time" -) - -// channelInfo holds metadata about each channel and is returned on connection. -// Eventually each plugin should control exactly what is in this structure. -type channelInfo struct { - Description string -} - -type randomWalkMessage struct { - Time int64 - Value float64 - Min float64 - Max float64 -} - -// RunRandomCSV just for an example -func RunRandomCSV(broker *GrafanaLive, channel string, speedMillis int, dropPercent float64) { - spread := 50.0 - - walker := rand.Float64() * 100 - ticker := time.NewTicker(time.Duration(speedMillis) * time.Millisecond) - - line := randomWalkMessage{} - - for t := range ticker.C { - if rand.Float64() <= dropPercent { - continue // - } - delta := rand.Float64() - 0.5 - walker += delta - - line.Time = t.UnixNano() / int64(time.Millisecond) - line.Value = walker - line.Min = walker - ((rand.Float64() * spread) + 0.01) - line.Max = walker + ((rand.Float64() * spread) + 0.01) - - bytes, err := json.Marshal(&line) - if err != nil { - logger.Warn("unable to marshal line", "error", err) - continue - } - - v := broker.Publish(channel, bytes) - if !v { - logger.Warn("write", "channel", channel, "line", line, "ok", v) - } - } -} diff --git a/pkg/services/live/features/broadcast.go b/pkg/services/live/features/broadcast.go new file mode 100644 index 000000000000..7359b0e6947d --- /dev/null +++ b/pkg/services/live/features/broadcast.go @@ -0,0 +1,32 @@ +package features + +import ( + "github.com/centrifugal/centrifuge" + "github.com/grafana/grafana/pkg/models" +) + +// BroadcastRunner will simply broadcast all events to `grafana/broadcast/*` channels +// This makes no assumptions about the shape of the data and will broadcast it to anyone listening +type BroadcastRunner struct{} + +// GetHandlerForPath called on init +func (g *BroadcastRunner) GetHandlerForPath(path string) (models.ChannelHandler, error) { + return g, nil // for now all channels share config +} + +// GetChannelOptions called fast and often +func (g *BroadcastRunner) GetChannelOptions(id string) centrifuge.ChannelOptions { + return centrifuge.ChannelOptions{} +} + +// OnSubscribe for now allows anyone to subscribe to any dashboard +func (g *BroadcastRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error { + // anyone can subscribe + return nil +} + +// OnPublish called when an event is received from the websocket +func (g *BroadcastRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) { + // expect the data to be the right shape? + return e.Data, nil +} diff --git a/pkg/services/live/features/dashboard.go b/pkg/services/live/features/dashboard.go new file mode 100644 index 000000000000..819b66640304 --- /dev/null +++ b/pkg/services/live/features/dashboard.go @@ -0,0 +1,80 @@ +package features + +import ( + "encoding/json" + + "github.com/centrifugal/centrifuge" + "github.com/grafana/grafana/pkg/models" +) + +// DashboardEvent events related to dashboards +type dashboardEvent struct { + UID string `json:"uid"` + Action string `json:"action"` // saved, editing + UserID int64 `json:"userId,omitempty"` + SessionID string `json:"sessionId,omitempty"` +} + +// DashboardHandler manages all the `grafana/dashboard/*` channels +type DashboardHandler struct { + publisher models.ChannelPublisher +} + +// CreateDashboardHandler Initialize a dashboard handler +func CreateDashboardHandler(p models.ChannelPublisher) DashboardHandler { + return DashboardHandler{ + publisher: p, + } +} + +// GetHandlerForPath called on init +func (g *DashboardHandler) GetHandlerForPath(path string) (models.ChannelHandler, error) { + return g, nil // all dashboards share the same handler +} + +// GetChannelOptions called fast and often +func (g *DashboardHandler) GetChannelOptions(id string) centrifuge.ChannelOptions { + return centrifuge.ChannelOptions{ + Presence: true, + JoinLeave: true, // if enterprise? + } +} + +// OnSubscribe for now allows anyone to subscribe to any dashboard +func (g *DashboardHandler) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error { + // TODO? check authentication + return nil +} + +// OnPublish called when an event is received from the websocket +func (g *DashboardHandler) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) { + // TODO -- verify and keep track of editors? + return e.Data, nil +} + +// DashboardSaved should broadcast to the appropriate stream +func (g *DashboardHandler) publish(event dashboardEvent) error { + msg, err := json.Marshal(event) + if err != nil { + return err + } + return g.publisher("grafana/dashboard/"+event.UID, msg) +} + +// DashboardSaved will broadcast to all connected dashboards +func (g *DashboardHandler) DashboardSaved(uid string, userID int64) error { + return g.publish(dashboardEvent{ + UID: uid, + Action: "saved", + UserID: userID, + }) +} + +// DashboardDeleted will broadcast to all connected dashboards +func (g *DashboardHandler) DashboardDeleted(uid string, userID int64) error { + return g.publish(dashboardEvent{ + UID: uid, + Action: "deleted", + UserID: userID, + }) +} diff --git a/pkg/services/live/features/testdata.go b/pkg/services/live/features/testdata.go new file mode 100644 index 000000000000..1b7fe527d452 --- /dev/null +++ b/pkg/services/live/features/testdata.go @@ -0,0 +1,124 @@ +package features + +import ( + "encoding/json" + "fmt" + "math/rand" + "time" + + "github.com/centrifugal/centrifuge" + "github.com/grafana/grafana/pkg/cmd/grafana-cli/logger" + "github.com/grafana/grafana/pkg/models" +) + +// TestdataRunner manages all the `grafana/dashboard/*` channels +type testdataRunner struct { + publisher models.ChannelPublisher + running bool + speedMillis int + dropPercent float64 + channel string +} + +// TestdataSupplier manages all the `grafana/testdata/*` channels +type TestdataSupplier struct { + publisher models.ChannelPublisher +} + +// CreateTestdataSupplier Initialize a dashboard handler +func CreateTestdataSupplier(p models.ChannelPublisher) TestdataSupplier { + return TestdataSupplier{ + publisher: p, + } +} + +// GetHandlerForPath called on init +func (g *TestdataSupplier) GetHandlerForPath(path string) (models.ChannelHandler, error) { + channel := "grafana/testdata/" + path + + if path == "random-2s-stream" { + return &testdataRunner{ + publisher: g.publisher, + running: false, + speedMillis: 2000, + dropPercent: 0, + channel: channel, + }, nil + } + + if path == "random-flakey-stream" { + return &testdataRunner{ + publisher: g.publisher, + running: false, + speedMillis: 400, + dropPercent: .6, + channel: channel, + }, nil + } + + return nil, fmt.Errorf("unknown channel") +} + +// GetChannelOptions called fast and often +func (g *testdataRunner) GetChannelOptions(id string) centrifuge.ChannelOptions { + return centrifuge.ChannelOptions{} +} + +// OnSubscribe for now allows anyone to subscribe to any dashboard +func (g *testdataRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error { + if !g.running { + g.running = true + + // Run in the background + go g.runRandomCSV() + } + + // TODO? check authentication + return nil +} + +// OnPublish called when an event is received from the websocket +func (g *testdataRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) { + return nil, fmt.Errorf("can not publish to testdata") +} + +type randomWalkMessage struct { + Time int64 + Value float64 + Min float64 + Max float64 +} + +// RunRandomCSV just for an example +func (g *testdataRunner) runRandomCSV() { + spread := 50.0 + + walker := rand.Float64() * 100 + ticker := time.NewTicker(time.Duration(g.speedMillis) * time.Millisecond) + + line := randomWalkMessage{} + + for t := range ticker.C { + if rand.Float64() <= g.dropPercent { + continue + } + delta := rand.Float64() - 0.5 + walker += delta + + line.Time = t.UnixNano() / int64(time.Millisecond) + line.Value = walker + line.Min = walker - ((rand.Float64() * spread) + 0.01) + line.Max = walker + ((rand.Float64() * spread) + 0.01) + + bytes, err := json.Marshal(&line) + if err != nil { + logger.Warn("unable to marshal line", "error", err) + continue + } + + err = g.publisher(g.channel, bytes) + if err != nil { + logger.Warn("write", "channel", g.channel, "line", line) + } + } +} diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index ebbaeb3d1aee..e693f20d8e77 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -4,10 +4,13 @@ import ( "encoding/json" "fmt" "strings" + "sync" "github.com/centrifugal/centrifuge" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/services/live/features" ) var ( @@ -15,14 +18,39 @@ var ( loggerCF = log.New("live.centrifuge") ) +// CoreGrafanaScope list of core features +type CoreGrafanaScope struct { + Features map[string]models.ChannelHandlerProvider + + // The generic service to advertise dashboard changes + Dashboards models.DashboardActivityChannel +} + // GrafanaLive pretends to be the server type GrafanaLive struct { - node *centrifuge.Node - Handler interface{} // handler func + node *centrifuge.Node + + // The websocket handler + Handler interface{} + + // Full channel handler + channels map[string]models.ChannelHandler + channelsMu sync.RWMutex + + // The core internal features + GrafanaScope CoreGrafanaScope } -// InitalizeBroker initializes the broker and starts listening for requests. -func InitalizeBroker() (*GrafanaLive, error) { +// InitializeBroker initializes the broker and starts listening for requests. +func InitializeBroker() (*GrafanaLive, error) { + glive := &GrafanaLive{ + channels: make(map[string]models.ChannelHandler), + channelsMu: sync.RWMutex{}, + GrafanaScope: CoreGrafanaScope{ + Features: make(map[string]models.ChannelHandlerProvider), + }, + } + // We use default config here as starting point. Default config contains // reasonable values for available options. cfg := centrifuge.DefaultConfig @@ -30,6 +58,20 @@ func InitalizeBroker() (*GrafanaLive, error) { // cfg.LogLevel = centrifuge.LogLevelDebug cfg.LogHandler = handleLog + // This function is called fast and often -- it must be sychronized + cfg.ChannelOptionsFunc = func(channel string) (centrifuge.ChannelOptions, bool, error) { + handler, err := glive.GetChannelHandler(channel) + if err != nil { + logger.Error("ChannelOptionsFunc", "channel", channel, "err", err) + if err.Error() == "404" { // ???? + return centrifuge.ChannelOptions{}, false, nil + } + return centrifuge.ChannelOptions{}, true, err + } + opts := handler.GetChannelOptions(channel) + return opts, true, nil + } + // Node is the core object in Centrifuge library responsible for many useful // things. For example Node allows to publish messages to channels from server // side with its Publish method, but in this example we will publish messages @@ -38,10 +80,16 @@ func InitalizeBroker() (*GrafanaLive, error) { if err != nil { return nil, err } + glive.node = node - b := &GrafanaLive{ - node: node, - } + // Initialize the main features + dash := features.CreateDashboardHandler(glive.Publish) + tds := features.CreateTestdataSupplier(glive.Publish) + + glive.GrafanaScope.Dashboards = &dash + glive.GrafanaScope.Features["dashboard"] = &dash + glive.GrafanaScope.Features["testdata"] = &tds + glive.GrafanaScope.Features["broadcast"] = &features.BroadcastRunner{} // Set ConnectHandler called when client successfully connected to Node. Your code // inside handler must be synchronized since it will be called concurrently from @@ -56,54 +104,54 @@ func InitalizeBroker() (*GrafanaLive, error) { logger.Debug("client connected", "transport", transportName, "encoding", transportEncoding) }) + // Set Disconnect handler to react on client disconnect events. + node.OnDisconnect(func(c *centrifuge.Client, e centrifuge.DisconnectEvent) { + logger.Info("client disconnected") + }) + // Set SubscribeHandler to react on every channel subscription attempt // initiated by client. Here you can theoretically return an error or // disconnect client from server if needed. But now we just accept // all subscriptions to all channels. In real life you may use a more // complex permission check here. node.OnSubscribe(func(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) { - info := &channelInfo{ - Description: fmt.Sprintf("channel: %s", e.Channel), + reply := centrifuge.SubscribeReply{} + + handler, err := glive.GetChannelHandler(e.Channel) + if err != nil { + return reply, err } - bytes, err := json.Marshal(&info) + + err = handler.OnSubscribe(c, e) if err != nil { - return centrifuge.SubscribeReply{}, err + return reply, err } - logger.Debug("client subscribes on channel", "channel", e.Channel, "info", string(bytes)) - return centrifuge.SubscribeReply{ - ExpireAt: 0, // does not expire - ChannelInfo: bytes, - }, nil + return reply, nil }) node.OnUnsubscribe(func(c *centrifuge.Client, e centrifuge.UnsubscribeEvent) { - s, err := node.PresenceStats(e.Channel) - if err != nil { - logger.Warn("unable to get presence stats", "channel", e.Channel, "error", err) - } - logger.Debug("unsubscribe from channel", "channel", e.Channel, "clients", s.NumClients, "users", s.NumUsers) + logger.Debug("unsubscribe from channel", "channel", e.Channel, "user", c.UserID()) }) - // By default, clients can not publish messages into channels. By setting - // PublishHandler we tell Centrifuge that publish from client side is possible. - // Now each time client calls publish method this handler will be called and - // you have a possibility to validate publication request before message will - // be published into channel and reach active subscribers. In our simple chat - // app we allow everyone to publish into any channel. + // Called when something is written to the websocket node.OnPublish(func(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) { - // logger.Debug("client publishes into channel", "channel", e.Channel, "body", string(e.Data)) + reply := centrifuge.PublishReply{} + handler, err := glive.GetChannelHandler(e.Channel) + if err != nil { + return reply, err + } - // For now, broadcast any messages to everyone - _, err := node.Publish(e.Channel, e.Data) + data, err := handler.OnPublish(c, e) + if err != nil { + return reply, err + } + if len(data) > 0 { + _, err = node.Publish(e.Channel, e.Data) + } return centrifuge.PublishReply{}, err // returns an error if it could not publish }) - // Set Disconnect handler to react on client disconnect events. - node.OnDisconnect(func(c *centrifuge.Client, e centrifuge.DisconnectEvent) { - logger.Info("client disconnected") - }) - // Run node. This method does not block. if err := node.Run(); err != nil { return nil, err @@ -123,7 +171,7 @@ func InitalizeBroker() (*GrafanaLive, error) { WriteBufferSize: 1024, }) - b.Handler = func(ctx *models.ReqContext) { + glive.Handler = func(ctx *models.ReqContext) { user := ctx.SignedInUser if user == nil { ctx.Resp.WriteHeader(401) @@ -171,16 +219,71 @@ func InitalizeBroker() (*GrafanaLive, error) { // Unknown path ctx.Resp.WriteHeader(404) } - return b, nil + return glive, nil } -// Publish sends the data to the channel -func (b *GrafanaLive) Publish(channel string, data []byte) bool { - _, err := b.node.Publish(channel, data) +// GetChannelHandler gives threadsafe access to the channel +func (g *GrafanaLive) GetChannelHandler(channel string) (models.ChannelHandler, error) { + g.channelsMu.RLock() + c, ok := g.channels[channel] + g.channelsMu.RUnlock() // defer? but then you can't lock further down + if ok { + return c, nil + } + + // Parse the identifier ${scope}/${namespace}/${path} + id, err := ParseChannelIdentifier(channel) if err != nil { - logger.Warn("error writing to channel", "channel", channel, "err", err) + return nil, err + } + logger.Info("initChannel", "channel", channel, "id", id) + + g.channelsMu.Lock() + defer g.channelsMu.Unlock() + c, ok = g.channels[channel] // may have filled in while locked + if ok { + return c, nil } - return err == nil + + c, err = g.initChannel(id) + if err != nil { + return nil, err + } + g.channels[channel] = c + return c, nil +} + +func (g *GrafanaLive) initChannel(id ChannelIdentifier) (models.ChannelHandler, error) { + if id.Scope == "grafana" { + p, ok := g.GrafanaScope.Features[id.Namespace] + if ok { + return p.GetHandlerForPath(id.Path) + } + return nil, fmt.Errorf("Unknown feature: %s", id.Namespace) + } + + if id.Scope == "ds" { + return nil, fmt.Errorf("todo... look up datasource: %s", id.Namespace) + } + + if id.Scope == "plugin" { + p, ok := plugins.Plugins[id.Namespace] + if ok { + h := &PluginHandler{ + Plugin: p, + } + return h.GetHandlerForPath(id.Path) + } + return nil, fmt.Errorf("unknown plugin: %s", id.Namespace) + } + + return nil, fmt.Errorf("invalid scope: %s", id.Scope) +} + +// Publish sends the data to the channel without checking permissions etc +func (g *GrafanaLive) Publish(channel string, data []byte) error { + _, err := g.node.Publish(channel, data) + return err } // Write to the standard log15 logger diff --git a/pkg/services/live/pluginHandler.go b/pkg/services/live/pluginHandler.go new file mode 100644 index 000000000000..9941dd05b575 --- /dev/null +++ b/pkg/services/live/pluginHandler.go @@ -0,0 +1,32 @@ +package live + +import ( + "github.com/centrifugal/centrifuge" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/plugins" +) + +// PluginHandler manages all the `grafana/dashboard/*` channels +type PluginHandler struct { + Plugin *plugins.PluginBase +} + +// GetHandlerForPath called on init +func (g *PluginHandler) GetHandlerForPath(path string) (models.ChannelHandler, error) { + return g, nil // all dashboards share the same handler +} + +// GetChannelOptions called fast and often +func (g *PluginHandler) GetChannelOptions(id string) centrifuge.ChannelOptions { + return centrifuge.ChannelOptions{} +} + +// OnSubscribe for now allows anyone to subscribe to any dashboard +func (g *PluginHandler) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error { + return nil // anyone can subscribe +} + +// OnPublish called when an event is received from the websocket +func (g *PluginHandler) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) { + return e.Data, nil // broadcast any event +} diff --git a/public/app/core/components/Select/FolderPicker.test.tsx b/public/app/core/components/Select/FolderPicker.test.tsx index d9065e79ae3d..58512a0a5c2f 100644 --- a/public/app/core/components/Select/FolderPicker.test.tsx +++ b/public/app/core/components/Select/FolderPicker.test.tsx @@ -3,6 +3,7 @@ import { shallow } from 'enzyme'; import { FolderPicker } from './FolderPicker'; jest.mock('@grafana/runtime', () => ({ + ...((jest.requireActual('@grafana/runtime') as unknown) as object), getBackendSrv: () => ({ search: jest.fn(() => [ { title: 'Dash 1', id: 'A' }, diff --git a/public/app/features/admin/LiveAdmin.tsx b/public/app/features/admin/LiveAdmin.tsx index cb197239501a..6718cf18690d 100644 --- a/public/app/features/admin/LiveAdmin.tsx +++ b/public/app/features/admin/LiveAdmin.tsx @@ -169,13 +169,21 @@ export class LiveAdmin extends PureComponent {
Namespace
s.value === path) || ''} onChange={this.onPathChanged} /> +