Skip to content

Commit

Permalink
initial event streaming ui
Browse files Browse the repository at this point in the history
  • Loading branch information
d-led committed Jun 27, 2024
1 parent 579fd34 commit 1761ea4
Show file tree
Hide file tree
Showing 9 changed files with 363 additions and 13 deletions.
6 changes: 3 additions & 3 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (ps *Cluster) listenToInternalEventsForever() {
case VisitorJoinedEvent:
ps.counter.Increment(NewConnectionsCounter)
ps.counter.Increment(StartedConnectionsCounter)
ps.events.Pub(NewEventWithParam(TotalVisitorsEvent, ps.counter.Value(NewConnectionsCounter)), Topic)
ps.events.Pub(NewEventWithParam(TotalVisitorsEvent, ps.counter.Value(NewConnectionsCounter)), Topic, ClusterMessageTopic)
sendInitialClusterConnectionCount(ps.events, ps.counter)
case VisitorLeftEvent:
ps.counter.Increment(ClosedConnectionsCounter)
Expand All @@ -121,7 +121,7 @@ func (ps *Cluster) getPeers() {
log.Printf("Peers changed %v -> %v", ps.peers, peers)
ps.peers = peers
ps.counter.UpdatePeers(zmqPeers(peers))
ps.events.Pub(GetReplicasEvent(replicaCount), Topic)
ps.events.Pub(GetReplicasEvent(replicaCount), Topic, ClusterMessageTopic)
}
}

Expand Down Expand Up @@ -160,7 +160,7 @@ func sendInitialClusterConnectionCount(
started := counter.Value(StartedConnectionsCounter)
closed := counter.Value(ClosedConnectionsCounter)

events.Pub(NewEventWithParam(TotalClusterVisitorsActiveEvent, started-closed))
events.Pub(NewEventWithParam(TotalClusterVisitorsActiveEvent, started-closed), Topic, ClusterMessageTopic)
}

type nullPeerLocator struct {
Expand Down
6 changes: 3 additions & 3 deletions counter_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (n *CounterListener) OnNewCount(ev percounter.CountEvent) {
switch ev.Name {
case NewConnectionsCounter:
log.Println("New visitor count:", ev.Count)
n.events.Pub(NewEventWithParam(TotalVisitorsEvent, ev.Count), Topic)
n.events.Pub(NewEventWithParam(TotalVisitorsEvent, ev.Count), Topic, ClusterMessageTopic)

case StartedConnectionsCounter:
log.Printf("started event: %v", ev)
Expand All @@ -35,7 +35,7 @@ func (n *CounterListener) OnNewCount(ev percounter.CountEvent) {
TotalClusterVisitorsActiveEvent,
n.startedConnections-
n.closedConnections,
), Topic)
), Topic, ClusterMessageTopic)

case ClosedConnectionsCounter:
log.Printf("closed event: %v", ev)
Expand All @@ -44,7 +44,7 @@ func (n *CounterListener) OnNewCount(ev percounter.CountEvent) {
TotalClusterVisitorsActiveEvent,
n.startedConnections-
n.closedConnections,
), Topic)
), Topic, ClusterMessageTopic)

default:
// ignore the event
Expand Down
40 changes: 40 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,46 @@ func (s *Server) setupRoutes() {
}
})
})

clusterGroup := s.server.Group("cluster")
// httpie> http -S http://localhost:8080/cluster/events
clusterGroup.GET("/events", func(c *gin.Context) {
c.Header("Connection", "Keep-Alive")
c.Header("Keep-Alive", "timeout=10, max=1000")

ctx := c.Request.Context()
closeNotify := c.Writer.CloseNotify()

myEvents := s.events.Sub(ClusterMessageTopic)
defer s.events.Unsub(myEvents, ClusterMessageTopic)

streamOneEvent(c, NewSimpleEvent("StartedListening"))
streamOneEvent(c, NewEventWithParam("ConnectedToRegion", getFlyRegion()))
streamOneEvent(c, GetReplicasEvent(1))
streamOneEvent(c, NewEventWithParam("ConnectedToReplica", getPublicReplicaId()))

// callback returns false on end of processing
c.Stream(func(w io.Writer) bool {
select {
case <-s.serverContext.Done():
log.Printf("closing the connection: server shutting down")
return false

case <-ctx.Done():
log.Printf("client disconnected")
return false

case <-closeNotify:
log.Printf("client closed the connection")
return false

case event := <-myEvents:
streamOneEvent(c, event)

return true
}
})
})
}

func (s *Server) setupSignalHandler() {
Expand Down
2 changes: 2 additions & 0 deletions transpile.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func Refresh() {
func copyStatic() {
for _, f := range []string{
"index.html",
"cluster.html",
"index.css",
} {
text, err := os.ReadFile(filepath.Join(uiSrc, f))
Expand All @@ -29,6 +30,7 @@ func transpile() {
result := api.Build(api.BuildOptions{
EntryPoints: []string{
filepath.Join(uiSrc, "index.ts"),
filepath.Join(uiSrc, "cluster.ts"),
},
Bundle: true,
Outdir: dist,
Expand Down
87 changes: 87 additions & 0 deletions ui-src/cluster.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<!doctype html>
<html lang="en">
<head>
<title>Mermaid.js Live Update Demo: Cluster Messages</title>

<meta charset="utf-8" />
<meta
name="viewport"
content="width=device-width, initial-scale=1, shrink-to-fit=no"
/>
<script src="https://cdn.jsdelivr.net/npm/jquery@3.7.1/dist/jquery.min.js" integrity="sha256-/JqT3SQfawRcv/BIHPThkBvs0OEvtFFmqPF/lYI/Cxo=" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.3/dist/js/bootstrap.bundle.min.js" integrity="sha256-CDOy6cOibCWEdsRiZuaHf8dSGGJRYuBGC+mjoJimHGw=" crossorigin="anonymous"></script>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.3/dist/css/bootstrap.min.css" integrity="sha256-PI8n5gCcz9cQqQXm3PEtDuPG8qx9oFsFctPg0S5zb8g=" crossorigin="anonymous">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/@fortawesome/fontawesome-free@6.5.2/css/all.min.css" integrity="sha256-XOqroi11tY4EFQMR9ZYwZWKj5ZXiftSx36RRuC3anlA=" crossorigin="anonymous">
<script src="https://cdn.jsdelivr.net/npm/mermaid@10.9.1/dist/mermaid.min.js" integrity="sha256-YbM1pG3wWnzhyYN49g5fPnen+2CKEFaZfopkkwSpNtY=" crossorigin="anonymous"></script>

<link rel="stylesheet" href="./index.css" />
<script src="./cluster.js"></script>
</head>

<body>
<main role="main" class="container">
<div class="container">
<h4>Cluster Events</h4>
<p>
<small>Visit the UI of replicas several times to observe cluster events</small>
<br>
<small>This will be empty if only one replica is available.</small>
</p>
<div class="d-flex flex-row mt-2 mb-2" id="graph">
<pre class="mermaid">
sequenceDiagram
A->>+B: {"a": 42}
B->>+A: {"b": 33}
</pre>
</div>

<h4>Server-side updates & info</h4>

<div id="alerts">
<div id="alert-placeholder"></div>
<div class="alert alert-warning" role="alert" id="offline-alert" style="display: none;">
Offline, hold on ...
</div>
<div class="alert alert-success fade" id="connected-alert" style="display: none;">
Connected
</div>
</div>

<div class="table-responsive-sm d-flex flex-row mt-2 mb-2">
<table class="table table-fit">
<thead class="thead-light">
<tr>
<th scope="col" style="display: none;">Info</th>
<th scope="col" style="display: none;">Value</th>
</tr>
</thead>
<tbody>
<tr class="monospaced">
<td>Visitors active on this replica</td>
<td><span id="visitors-active"></span></td>
</tr>
<tr class="monospaced">
<td>Visitors active in the cluster</td>
<td><span id="visitors-active-cluster"></span></td>
</tr>
<tr class="monospaced">
<td>Replicas</td>
<td><span id="replicas"></span></td>
</tr>
<tr class="monospaced">
<td>Total started connections</td>
<td><span id="total-visitors"></span></td>
</tr>
<tr class="monospaced">
<td>Source</td>
<td><a href="https://github.com/d-led/mermaidlive">github.com/d-led/mermaidlive</a></td>
</tr>
</tbody>
</table>
</div>
</main>
<script type="module">
mermaid.initialize({ startOnLoad: false });
</script>
</body>
</html>
Loading

0 comments on commit 1761ea4

Please sign in to comment.