Skip to content

Commit

Permalink
Live: broadcast events when dashboard is saved (#27583)
Browse files Browse the repository at this point in the history
Co-authored-by: kay delaney <45561153+kaydelaney@users.noreply.github.com>
Co-authored-by: Torkel Ödegaard <torkel@grafana.org>
  • Loading branch information
3 people committed Oct 1, 2020
1 parent 44c9aea commit 8a5fc00
Show file tree
Hide file tree
Showing 28 changed files with 974 additions and 207 deletions.
70 changes: 52 additions & 18 deletions packages/grafana-data/src/types/live.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export interface LiveChannelConfig<TMessage = any> {
/**
* The channel keeps track of who else is connected to the same channel
*/
hasPresense?: boolean;
hasPresence?: boolean;

/**
* This method will be defined if it is possible to publish in this channel.
Expand All @@ -61,10 +61,19 @@ export enum LiveChannelConnectionState {
Invalid = 'invalid',
}

export enum LiveChannelEventType {
Status = 'status',
Join = 'join',
Leave = 'leave',
Message = 'message',
}

/**
* @experimental
*/
export interface LiveChannelStatus {
export interface LiveChannelStatusEvent {
type: LiveChannelEventType.Status;

/**
* {scope}/{namespace}/{path}
*/
Expand All @@ -85,28 +94,53 @@ export interface LiveChannelStatus {
/**
* The last error.
*
* This will remain in the status until a new message is succesfully recieved from the channel
* This will remain in the status until a new message is succesfully received from the channel
*/
error?: any;
}

/**
* @experimental
*/
export interface LiveChannelJoinLeave {
user: any;
export interface LiveChannelJoinEvent {
type: LiveChannelEventType.Join;
user: any; // @experimental -- will be filled in when we improve the UI
}

export interface LiveChannelLeaveEvent {
type: LiveChannelEventType.Leave;
user: any; // @experimental -- will be filled in when we improve the UI
}

export interface LiveChannelMessageEvent<T> {
type: LiveChannelEventType.Message;
message: T;
}

export type LiveChannelEvent<T = any> =
| LiveChannelStatusEvent
| LiveChannelJoinEvent
| LiveChannelLeaveEvent
| LiveChannelMessageEvent<T>;

export function isLiveChannelStatusEvent<T>(evt: LiveChannelEvent<T>): evt is LiveChannelStatusEvent {
return evt.type === LiveChannelEventType.Status;
}

export function isLiveChannelJoinEvent<T>(evt: LiveChannelEvent<T>): evt is LiveChannelJoinEvent {
return evt.type === LiveChannelEventType.Join;
}

export function isLiveChannelLeaveEvent<T>(evt: LiveChannelEvent<T>): evt is LiveChannelLeaveEvent {
return evt.type === LiveChannelEventType.Leave;
}

export function isLiveChannelMessageEvent<T>(evt: LiveChannelEvent<T>): evt is LiveChannelMessageEvent<T> {
return evt.type === LiveChannelEventType.Message;
}

/**
* @experimental
*/
export interface LiveChannelPresense {
users: any;
}

export interface LiveChannelMessage<TMessage = any> {
type: 'status' | 'message' | 'join' | 'leave';
message: TMessage | LiveChannelStatus | LiveChannelJoinLeave;
export interface LiveChannelPresenceStatus {
users: any; // @experimental -- will be filled in when we improve the UI
}

/**
Expand Down Expand Up @@ -134,14 +168,14 @@ export interface LiveChannel<TMessage = any, TPublish = any> {
/**
* Watch all events in this channel
*/
getStream: () => Observable<LiveChannelMessage<TMessage>>;
getStream: () => Observable<LiveChannelEvent<TMessage>>;

/**
* For channels that support presense, this will request the current state from the server.
* For channels that support presence, this will request the current state from the server.
*
* Join and leave messages will be sent to the open stream
*/
getPresense?: () => Promise<LiveChannelPresense>;
getPresence?: () => Promise<LiveChannelPresenceStatus>;

/**
* Write a message into the channel
Expand Down
11 changes: 11 additions & 0 deletions pkg/api/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,17 @@ func (hs *HTTPServer) PostDashboard(c *models.ReqContext, cmd models.SaveDashboa
}
}

// Tell everyone listening that the dashboard changed
if hs.Live != nil {
err := hs.Live.GrafanaScope.Dashboards.DashboardSaved(
dashboard.Uid,
c.UserId,
)
if err != nil {
hs.log.Warn("unable to broadcast save event", "uid", dashboard.Uid, "error", err)
}
}

c.TimeRequest(metrics.MApiDashboardSave)
return JSON(200, util.DynMap{
"status": "success",
Expand Down
6 changes: 1 addition & 5 deletions pkg/api/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,11 @@ func (hs *HTTPServer) Init() error {

// Set up a websocket broker
if hs.Cfg.IsLiveEnabled() { // feature flag
node, err := live.InitalizeBroker()
node, err := live.InitializeBroker()
if err != nil {
return err
}
hs.Live = node

// Spit random walk to example
go live.RunRandomCSV(hs.Live, "grafana/testdata/random-2s-stream", 2000, 0)
go live.RunRandomCSV(hs.Live, "grafana/testdata/random-flakey-stream", 400, .6)
}

hs.macaron = hs.newMacaron()
Expand Down
30 changes: 30 additions & 0 deletions pkg/models/live.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package models

import "github.com/centrifugal/centrifuge"

// ChannelPublisher writes data into a channel
type ChannelPublisher func(channel string, data []byte) error

// ChannelHandler defines the core channel behavior
type ChannelHandler interface {
// This is called fast and often -- it must be synchrnozed
GetChannelOptions(id string) centrifuge.ChannelOptions

// Called when a client wants to subscribe to a channel
OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error

// Called when something writes into the channel. The returned value will be broadcast if len() > 0
OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error)
}

// ChannelHandlerProvider -- this should be implemented by any core feature
type ChannelHandlerProvider interface {
// This is called fast and often -- it must be synchrnozed
GetHandlerForPath(path string) (ChannelHandler, error)
}

// DashboardActivityChannel is a service to advertise dashboard activity
type DashboardActivityChannel interface {
DashboardSaved(uid string, userID int64) error
DashboardDeleted(uid string, userID int64) error
}
27 changes: 27 additions & 0 deletions pkg/services/live/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package live

import (
"fmt"
"strings"
)

// ChannelIdentifier is the channel id split by parts
type ChannelIdentifier struct {
Scope string // grafana, ds, or plugin
Namespace string // feature, id, or name
Path string // path within the channel handler
}

// ParseChannelIdentifier parses the parts from a channel id:
// ${scope} / ${namespace} / ${path}
func ParseChannelIdentifier(id string) (ChannelIdentifier, error) {
parts := strings.SplitN(id, "/", 3)
if len(parts) == 3 {
return ChannelIdentifier{
Scope: parts[0],
Namespace: parts[1],
Path: parts[2],
}, nil
}
return ChannelIdentifier{}, fmt.Errorf("Invalid channel id: %s", id)
}
30 changes: 30 additions & 0 deletions pkg/services/live/channel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package live

import (
"testing"

"github.com/google/go-cmp/cmp"
)

func TestParseChannelIdentifier(t *testing.T) {
ident, err := ParseChannelIdentifier("aaa/bbb/ccc/ddd")
if err != nil {
t.FailNow()
}

ex := ChannelIdentifier{
Scope: "aaa",
Namespace: "bbb",
Path: "ccc/ddd",
}

if diff := cmp.Diff(ident, ex); diff != "" {
t.Fatalf("Result mismatch (-want +got):\n%s", diff)
}

// Check an invalid identifier
_, err = ParseChannelIdentifier("aaa/bbb")
if err == nil {
t.FailNow()
}
}
54 changes: 0 additions & 54 deletions pkg/services/live/channels.go

This file was deleted.

32 changes: 32 additions & 0 deletions pkg/services/live/features/broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package features

import (
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana/pkg/models"
)

// BroadcastRunner will simply broadcast all events to `grafana/broadcast/*` channels
// This makes no assumptions about the shape of the data and will broadcast it to anyone listening
type BroadcastRunner struct{}

// GetHandlerForPath called on init
func (g *BroadcastRunner) GetHandlerForPath(path string) (models.ChannelHandler, error) {
return g, nil // for now all channels share config
}

// GetChannelOptions called fast and often
func (g *BroadcastRunner) GetChannelOptions(id string) centrifuge.ChannelOptions {
return centrifuge.ChannelOptions{}
}

// OnSubscribe for now allows anyone to subscribe to any dashboard
func (g *BroadcastRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error {
// anyone can subscribe
return nil
}

// OnPublish called when an event is received from the websocket
func (g *BroadcastRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) {
// expect the data to be the right shape?
return e.Data, nil
}

0 comments on commit 8a5fc00

Please sign in to comment.