Skip to content

Commit

Permalink
live sequence diagram of cluster events
Browse files Browse the repository at this point in the history
  • Loading branch information
d-led committed Jun 27, 2024
1 parent 1761ea4 commit 3626cb8
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 35 deletions.
37 changes: 23 additions & 14 deletions cluster_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type PersistentClusterObserver struct {
phony.Inbox
identity string
peerIpToIdentity map[string]string
peerIdentities map[string]bool
messagesUpToNow []*messageEvent
events *pubsub.PubSub[string, Event]
}
Expand All @@ -30,23 +31,24 @@ func NewPersistentClusterObserver(identity string, myIP string, events *pubsub.P
return &PersistentClusterObserver{
identity: identity,
peerIpToIdentity: map[string]string{myIP: identity},
peerIdentities: map[string]bool{identity: true},
messagesUpToNow: []*messageEvent{},
events: events,
}
}

func (o *PersistentClusterObserver) AfterMessageSent(peer string, msg []byte) {
o.Act(o, func() {
msgString := string(msg)
o.messagesUpToNow = append(o.messagesUpToNow,
&messageEvent{SeenAt: o.identity, Src: o.identity, Dst: peer, Msg: msgString},
)
if peerIP, err := getIPOf(peer); err == nil {
peerIdentity, ok := o.peerIpToIdentity[peerIP]
if ok {
peer = peerIdentity
}
}
msgString := string(msg)
o.messagesUpToNow = append(o.messagesUpToNow,
&messageEvent{SeenAt: o.identity, Src: o.identity, Dst: peer, Msg: msgString},
)
log.Printf("Message sent to %s: %s", peer, msgString)
})
}
Expand Down Expand Up @@ -77,17 +79,17 @@ func (o *PersistentClusterObserver) trackCounterIdentitySync(msg *percounter.Net
return
}
o.peerIpToIdentity[peerIP] = msg.SourcePeer
o.peerIdentities[msg.SourcePeer] = true
o.processEventsSync()
}

func (o *PersistentClusterObserver) processEventsSync() {
if len(o.messagesUpToNow) == 0 ||
!o.anyUnkownPeersSync() ||
o.countUnkownPeersSync() < maxEventsWithUnknownPeersBeforePublishingAllEvents {
return
if len(o.messagesUpToNow) > 0 &&
(!o.anyUnkownPeersSync() ||
o.countUnkownPeersSync() >= maxEventsWithUnknownPeersBeforePublishingAllEvents) {
log.Println("publishing cluster messages")
o.publishPendingEventsSync()
}
log.Println("ready to publish message events")
o.publishPendingEventsSync()
}

func (o *PersistentClusterObserver) anyUnkownPeersSync() bool {
Expand Down Expand Up @@ -116,17 +118,24 @@ func (o *PersistentClusterObserver) countUnkownPeersSync() int {
}

func (o *PersistentClusterObserver) unknownPeerSync(peer string) bool {
_, ok := o.peerIpToIdentity[peer]
return !ok
_, ipKnown := o.peerIpToIdentity[peer]
return !ipKnown && !o.peerIdentities[peer]
}

func (o *PersistentClusterObserver) idOfSync(peer string) string {
if id, ok := o.peerIpToIdentity[peer]; ok {
return id
}
return peer
}

func (o *PersistentClusterObserver) publishPendingEventsSync() {
for _, msg := range o.messagesUpToNow {
e := NewSimpleEvent(ClusterMessageEvent)
e.Properties = map[string]interface{}{
"seen_at": msg.SeenAt,
"src": msg.Src,
"dst": msg.Dst,
"src": o.idOfSync(msg.Src),
"dst": o.idOfSync(msg.Dst),
"msg": msg.Msg,
}
o.events.Pub(e, ClusterMessageTopic)
Expand Down
5 changes: 1 addition & 4 deletions ui-src/cluster.html
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<!doctype html>
<html lang="en">
<head>
<title>Mermaid.js Live Update Demo: Cluster Messages</title>
<title>Mermaid.js Live Update Demo: Cluster Events</title>

<meta charset="utf-8" />
<meta
Expand Down Expand Up @@ -29,9 +29,6 @@ <h4>Cluster Events</h4>
</p>
<div class="d-flex flex-row mt-2 mb-2" id="graph">
<pre class="mermaid">
sequenceDiagram
A->>+B: {"a": 42}
B->>+A: {"b": 33}
</pre>
</div>

Expand Down
97 changes: 81 additions & 16 deletions ui-src/cluster.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
console.log(`loaded index.js`);
console.log(`loaded cluster.js`);

document.lastInput = "";
document.myReplica = null;
var clusterEvents = [];

const sourceReplicaIdKey = "Source-Replica-Id";

Expand Down Expand Up @@ -130,8 +130,6 @@ async function processEvent(event) {
return;
}

console.log("INCOMING_EVENT:", event);

switch (event.name) {
case "StartedListening":
case "ConnectedToRegion":
Expand All @@ -155,40 +153,107 @@ async function processEvent(event) {
return;
case "ConnectedToReplica":
document.myReplica = event?.properties?.param;
// do not show this event in the log
return;
break;
case "TotalVisitors":
showTotalVisitors(event?.properties?.param);
// do not show this event in the log
return;
case "ClusterMessage":
processClusterMessage(event);
await reRenderGraph();
// do not show this event in the log
return;
default:
console.log(`unhandled event: ${event.name}`);
// await reRenderGraph("", "");
break;
return;
}

console.log("INCOMING_EVENT:", event);
}

async function reRenderGraph() {
if (!clusterEvents?.length) {
console.log("nothing to render");
return;
}
let input = updateGraphDefinition();
if (input === document.lastInput) {
console.log("nothing to re-render");
return;
}
document.lastInput = input;
let rendered = await mermaid.mermaidAPI.render("temporary-graph", input);
let graph = document.querySelector("#graph");
if (graph) {
graph.innerHTML = rendered.svg;
} else {
console.log("ERROR: could not find target element for redrawing");
try {
let rendered = await mermaid.mermaidAPI.render("temporary-graph", input);
let graph = document.querySelector("#graph");
if (graph) {
graph.innerHTML = rendered.svg;
} else {
console.log("ERROR: could not find target element for redrawing");
}
} catch (e) {
console.log("error rendering graph:", e);
}
}

function processClusterMessage(event) {
clusterEvents.push(renderableClusterEvent(event))
}

function renderableClusterEvent(event) {
let arrowText = arrowTextFrom(event);
let from = normalizeParticipant(event?.properties?.src ?? 'unknown-src');
let to = normalizeParticipant(event?.properties?.dst ?? 'unknown-dst');
let re = {
from,
to,
arrowText,
};
console.log("event to render:", re)
return re;
}

function normalizeParticipant(participant) {
if (participant.indexOf(':') == -1) {
return participant;
}
try {
const url = new URL(participant);
return url.hostname;
} catch (e) {
console.log("could not parse participant:", participant, e);
return `${participant}`.replaceAll(':', '/');
}
}

function arrowTextFrom(event) {
try {
let orig = JSON.parse(event?.properties?.msg);
if (orig.name && orig.peers) {
return JSON.stringify({
name: orig.name,
peers: orig.peers,
total: sumUp(orig.peers),
});
}
return event?.properties?.msg ?? 'unknown-msg';
} catch (e) {
console.log("error parsing message", e)
}
}

function sumUp(peers) {
let sum = 0;
Object.keys(peers).forEach(peer=>sum+=parseInt(peers[peer]));
return sum;
}

function updateGraphDefinition() {
let res = `sequenceDiagram
A->>+B: {"a": 42}
B->>+A: {"b": 33}
`;
clusterEvents.forEach(e => {
res+=`${e.from}->>+${e.to}: ${e.arrowText}
`
});
return res;
}

Expand Down
2 changes: 1 addition & 1 deletion ui-src/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ <h4>Server-side updates & info</h4>
<tbody>
<tr class="monospaced">
<td>Last event</td>
<td><span id="last-event"></p></td>
<td><span id="last-event"></span></td>
</tr>
<tr class="monospaced">
<td>Last error</td>
Expand Down

0 comments on commit 3626cb8

Please sign in to comment.