Skip to content

Commit

Permalink
Add broadcaster logic (#6)
Browse files Browse the repository at this point in the history
* Add broadcaster logic

* Update db schema

* Update tests

* Fix tests
  • Loading branch information
heppu committed Mar 17, 2024
1 parent e144df1 commit eec83ab
Show file tree
Hide file tree
Showing 14 changed files with 354 additions and 95 deletions.
55 changes: 13 additions & 42 deletions api/api.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package api

import (
"bytes"
"embed"
"encoding/json"
"io/fs"
"log/slog"
"net/http"
Expand All @@ -24,12 +22,14 @@ type Store interface {
type API struct {
r *http.ServeMux
s Store
b *Broadcaster
}

func New(s Store) (*API, error) {
func New(s Store, b *Broadcaster) (*API, error) {
a := &API{
r: http.NewServeMux(),
s: s,
b: b,
}

assetsFS, err := fs.Sub(assets, "assets")
Expand All @@ -38,8 +38,7 @@ func New(s Store) (*API, error) {
}

a.r.Handle("GET /", templ.Handler(templates.Index()))
a.r.HandleFunc("GET /api/v1/stats", a.stats)
a.r.HandleFunc("GET /api/v1/events", a.events)
a.r.HandleFunc("GET /api/v1/stream", a.stream)
a.r.Handle("GET /assets/", http.StripPrefix("/assets/", http.FileServer(http.FS(assetsFS))))

return a, nil
Expand All @@ -49,56 +48,28 @@ func (a *API) ServeHTTP(w http.ResponseWriter, r *http.Request) {
a.r.ServeHTTP(w, r)
}

func (a *API) stats(w http.ResponseWriter, r *http.Request) {
stats, err := a.s.Stats()
if err != nil {
slog.Error("failed to get stats", slog.Any("error", err))
http.Error(w, "failed to get stats", http.StatusInternalServerError)
return
}

if r.Header.Get("Accept") == "application/json" {
if err := json.NewEncoder(w).Encode(stats); err != nil {
slog.Error("failed encode stats", slog.Any("error", err))
return
}
return
}

if err := templates.Stats(stats).Render(r.Context(), w); err != nil {
slog.Error("failed render stats", slog.Any("error", err))
return
}
}

func (a *API) events(w http.ResponseWriter, r *http.Request) {
func (a *API) stream(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

slog.Info("client connected", slog.String("remote_addr", r.RemoteAddr))

ch, unsubscribe := a.b.subscribe()
defer unsubscribe()
for {
select {
case <-r.Context().Done(): // Client disconnected
slog.Info("client disconnected", slog.String("remote_addr", r.RemoteAddr))
return
case event, ok := <-a.s.Events():
if !ok {
return
}

buf := &bytes.Buffer{}
buf.WriteString("data: ")
if err := templates.Row(event).Render(r.Context(), buf); err != nil {
slog.Error("failed render row", slog.Any("error", err))
case data := <-ch:
if _, err := w.Write(data); err != nil {
slog.Error("failed to write to client, closing connection", slog.String("remote_addr", r.RemoteAddr), slog.Any("err", err))
return
}
buf.WriteString("\n\n")
if _, err := buf.WriteTo(w); err != nil {
slog.Error("failed send event", slog.Any("error", err))
}
w.(http.Flusher).Flush()
case <-a.b.done:
slog.Info("broadcaster stopped, closing connection", slog.String("remote_addr", r.RemoteAddr))
return
}
}
}
145 changes: 145 additions & 0 deletions api/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package api

import (
"bytes"
"context"
"fmt"
"io"
"log/slog"
"sync"
"time"

"github.com/a-h/templ"
"github.com/gevulotnetwork/devnet-explorer/api/templates"
"github.com/gevulotnetwork/devnet-explorer/model"
)

type Broadcaster struct {
s Store
clientsMu sync.Mutex
nextID uint64
clients map[uint64]chan<- []byte
headIndex uint8
head [50][]byte

done chan struct{}
}

func NewBroadcaster(s Store) *Broadcaster {
return &Broadcaster{
s: s,
clients: make(map[uint64]chan<- []byte),
done: make(chan struct{}),
}
}

func (b *Broadcaster) subscribe() (data <-chan []byte, unsubscribe func()) {
b.clientsMu.Lock()
defer b.clientsMu.Unlock()

id := b.nextID
ch := make(chan []byte, len(b.head)+2)
b.clients[id] = ch
b.nextID++
slog.Info("client subscribed", slog.Uint64("id", id))

for i := 1; i <= len(b.head); i++ {
idx := (b.headIndex + uint8(i)) % uint8(len(b.head))
if b.head[idx] != nil {
ch <- b.head[idx]
}
}

return ch, func() {
slog.Info("client unsubscribed", slog.Uint64("id", id))
b.clientsMu.Lock()
defer b.clientsMu.Unlock()
delete(b.clients, id)
close(ch)
}
}

func (b *Broadcaster) Run() error {
t := time.NewTicker(time.Second * 2)
for {
var ev EventComponent
select {
case event, ok := <-b.s.Events():
if !ok {
slog.Info("store.Events() channel closed, broadcasting stopped")
return nil
}
slog.Debug("new tx event received")
ev = TXRowEvent(event)
case <-t.C:
stats, err := b.s.Stats()
if err != nil {
return fmt.Errorf("failed to get stats: %w", err)
}
slog.Debug("stats updated")
ev = StatEvent(stats)
case <-b.done:
return nil
}

buf := &bytes.Buffer{}
if err := writeEvent(buf, ev); err != nil {
slog.Error("failed write event into buffer", slog.Any("error", err))
continue
}

b.broadcast(buf.Bytes())
}
}

func (b *Broadcaster) broadcast(data []byte) {
b.clientsMu.Lock()
defer b.clientsMu.Unlock()
b.head[b.headIndex] = data
b.headIndex = (b.headIndex + 1) % uint8(len(b.head))
for id, c := range b.clients {
select {
case c <- data:
slog.Debug("data broadcasted", slog.Uint64("id", id))
default:
slog.Info("client blocked, broadcasting event skipped", slog.Uint64("id", id))
}
}
}

func (b *Broadcaster) Stop() error {
close(b.done)
return nil
}

type EventComponent struct {
templ.Component
name string
}

func (e EventComponent) Name() string {
return e.name
}

func TXRowEvent(e model.Event) EventComponent {
return EventComponent{
Component: templates.Row(e),
name: templates.EventTXRow,
}
}

func StatEvent(s model.Stats) EventComponent {
return EventComponent{
Component: templates.Stats(s),
name: templates.EventStats,
}
}

func writeEvent(w io.Writer, c EventComponent) error {
fmt.Fprintf(w, "event: %s\ndata: ", c.Name())
if err := c.Render(context.Background(), w); err != nil {
return fmt.Errorf("failed render html: %w", err)
}
fmt.Fprint(w, "\n\n")
return nil
}
4 changes: 2 additions & 2 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ type Server struct {
srv *http.Server
}

func NewServer(addr string, s Store) (*Server, error) {
a, err := New(s)
func NewServer(addr string, s Store, b *Broadcaster) (*Server, error) {
a, err := New(s, b)
if err != nil {
return nil, fmt.Errorf("failed to create api: %w", err)
}
Expand Down
11 changes: 8 additions & 3 deletions api/templates/index.templ
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ package templates
import "github.com/gevulotnetwork/devnet-explorer/model"
import "strconv"

const (
EventTXRow = "tx-row"
EventStats = "stats"
)

templ Index() {
<!DOCTYPE html>
<html lang="en">
@head()
<body>
<div id="container">
<div id="container" hx-ext="sse" sse-connect="/api/v1/stream">
@header()
@Stats(model.Stats{})
@Table(nil)
Expand All @@ -19,7 +24,7 @@ templ Index() {
}

templ Stats(stats model.Stats) {
<div id="stats" hx-get="/api/v1/stats" hx-trigger="every 2s" hx-swap="outerHTML">
<div id="stats" sse-swap={ EventStats } hx-swap="outerHTML">
<div class="number-block">
<div class="rolling-number" id="registered_users">{ strconv.Itoa(int(stats.RegisteredUsers)) }</div>
<div class="number-title">Registered<br/>Users</div>
Expand Down Expand Up @@ -52,7 +57,7 @@ templ Table(events []model.Event) {
<div class="th"></div>
</div>
</div>
<div class="tbody" hx-ext="sse" sse-connect="/api/v1/events" sse-swap="message" hx-swap="afterbegin">
<div class="tbody" sse-swap={ EventTXRow } hx-swap="afterbegin">
for _, e := range events {
@Row(e)
}
Expand Down
Loading

0 comments on commit eec83ab

Please sign in to comment.