Skip to content

Commit

Permalink
[minor] added new Broadcast method to sse extension for easily broa…
Browse files Browse the repository at this point in the history
…dcasting a message to all active clients (#40)

[-] updated sample app for making things *prettier*
  • Loading branch information
bnkamalesh committed Mar 13, 2022
1 parent f4bb376 commit 0fed900
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 22 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[![](https://godoc.org/github.com/nathany/looper?status.svg)](http://godoc.org/github.com/bnkamalesh/webgo)
[![](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go#web-frameworks)

# WebGo v6.5.1
# WebGo v6.6.0

WebGo is a minimalistic framework for [Go](https://golang.org) to build web applications (server side) with no 3rd party dependencies. WebGo will always be Go standard library compliant; with the HTTP handlers having the same signature as [http.HandlerFunc](https://golang.org/pkg/net/http/#HandlerFunc).

Expand Down
3 changes: 2 additions & 1 deletion cmd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

![sse-demo](https://user-images.githubusercontent.com/1092882/158047065-447eb868-1efd-4a8d-b748-7caee2b3fcfd.png)

This picture shows the sample SSE implementation provided with this application.
This picture shows the sample SSE implementation provided with this application. In the sample app, the server is
sending timestamp every second, to all the clients.

**Important**: *[SSE](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
is a live connection between server & client. So a short WriteTimeout duration in webgo.Config will
Expand Down
10 changes: 3 additions & 7 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,9 @@ func main() {
retry := time.Millisecond * 500
for {
now := time.Now().Format(time.RFC1123Z)
sseService.Clients.Range(func(key, value interface{}) bool {
msg, _ := value.(chan *sse.Message)
msg <- &sse.Message{
Data: now + fmt.Sprintf(" (%d)", sseService.ClientsCount()),
Retry: retry,
}
return true
sseService.Broadcast(sse.Message{
Data: now + fmt.Sprintf(" (%d)", sseService.ActiveClients()),
Retry: retry,
})
time.Sleep(time.Second)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/static/css/main.css
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ a:hover {
section.main {
background: #fff;
width: 90%;
max-width: 360px;
max-width: 370px;
margin: 10vw auto;
padding: 0 2em;
border-radius: 4px;
Expand Down
2 changes: 1 addition & 1 deletion cmd/static/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ <h1 align="center">WebGo</h1>
</tr>
<tr>
<td><a
href="https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events">SSE time</a>
href="https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events">SSE</a> data
</td>
<td><span id="sse"></span></td>
</tr>
Expand Down
38 changes: 32 additions & 6 deletions cmd/static/js/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ const webgo = async () => {

const start = () => {
const source = new EventSource(url);
const configState = { initialBackoff, maxBackoff, backoffStep, backoff };

source.onopen = () => {
// reset backoff to initial, so further failures will again start with initial backoff
// instead of previous duration
backoff = initialBackoff;
configState.backoff = backoff
};

source.onmessage = (event) => {
onMessage && onMessage(event);
source.onmessage = (event, configState) => {
onMessage && onMessage(event, configState);
};

source.onerror = (err) => {
Expand All @@ -36,7 +39,7 @@ const webgo = async () => {
}
}
}, backoff);
onError && onError(err);
onError && onError(err, configState);
};
};
return start;
Expand All @@ -50,6 +53,14 @@ const webgo = async () => {
const sseClientsDOM = document.getElementById("sse-clients");
const sseClientIDDOM = document.getElementById("sse-client-id");

const formatBackoff = (backoff, precision = 2) => {
let boff = `${backoff}ms`;
if (backoff >= 1000) {
boff = `${parseFloat(backoff / 1000).toFixed(precision)}s`;
}
return boff;
};

sse(`/sse/${clientID}`, {
onMessage: (event) => {
const parts = event.data?.split("(");
Expand All @@ -59,11 +70,26 @@ const webgo = async () => {
sseClientsDOM.innerText = activeClients;
sseClientIDDOM.innerText = clientID;
},
onError: (err) => {
onError: (err, { backoff }) => {
sseClientsDOM.innerText = "N/A";

let interval = null;
interval = window.setInterval(() => {
sseDOM.innerHTML = `SSE failed, attempting reconnect in <strong>${formatBackoff(
backoff,
0
)}</strong>`;
backoff -= 1000;
if (backoff < 0) {
sseDOM.innerHTML = `SSE failed, attempting reconnect in <strong>0s</strong>`;
window.clearInterval(interval);
}
}, 1000);

console.log(err);
sseDOM.innerText = `SSE error, restarting`;
},
backoffStep: 150,
initialBackoff: 1000,
backoffStep: 1000,
})();
};
webgo();
27 changes: 22 additions & 5 deletions extensions/sse/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ import (
)

type SSE struct {
Clients sync.Map
clientsCount atomic.Value
ClientIDHeader string
Clients sync.Map
clientsCount atomic.Value
// ClientIDHeader is the HTTP request header in which the client ID is set. Default is `sse-clientid`
ClientIDHeader string
// UnsupportedMessage is used to send the error response to client if the
// server doesn't support SSE
UnsupportedMessage func(http.ResponseWriter, *http.Request) error
}

Expand All @@ -34,6 +37,7 @@ func (sse *SSE) Handler(w http.ResponseWriter, r *http.Request) error {
clientID := r.Header.Get(sse.ClientIDHeader)
msg, _ := sse.ClientMessageChan(clientID)
defer sse.RemoveClientMessageChan(clientID)

ctx := r.Context()
for {
select {
Expand Down Expand Up @@ -72,13 +76,23 @@ func (sse *SSE) ClientMessageChan(clientID string) (chan *Message, bool) {
return msg.(chan *Message), !ok
}

// RemoveClientMessageChan removes the channel from clients map given a clientID
func (sse *SSE) RemoveClientMessageChan(clientID string) {
sse.Clients.Delete(clientID)
count := sse.clientsCount.Load().(int)
sse.clientsCount.Store(count - 1)
}

func (sse *SSE) ClientsCount() int {
// Broadcast sends the message to all active clients
func (sse *SSE) Broadcast(msg Message) {
sse.Clients.Range(func(key, value interface{}) bool {
mchan, _ := value.(chan *Message)
mchan <- &msg
return true
})
}

func (sse *SSE) ActiveClients() int {
return sse.clientsCount.Load().(int)
}

Expand Down Expand Up @@ -121,16 +135,19 @@ func (m *Message) Bytes() []byte {
}

func New() *SSE {
clientsCount := atomic.Value{}
clientsCount.Store(int(0))

s := &SSE{
Clients: sync.Map{},
clientsCount: clientsCount,
ClientIDHeader: "sse-clientid",
UnsupportedMessage: func(w http.ResponseWriter, r *http.Request) error {
w.WriteHeader(http.StatusNotImplemented)
_, err := w.Write([]byte("Streaming not supported"))
return err
},
}
s.clientsCount.Store(int(0))

return s
}

0 comments on commit 0fed900

Please sign in to comment.