Skip to content

Commit

Permalink
Create a package, add some godoc
Browse files Browse the repository at this point in the history
  • Loading branch information
dunglas committed Jul 15, 2018
1 parent 1ff63e1 commit 785839c
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 195 deletions.
46 changes: 46 additions & 0 deletions hub/hub.go
@@ -0,0 +1,46 @@
package hub

import "log"

// Partially based on https://github.com/kljensen/golang-html5-sse-example

// Hub stores channels with clients currently subcribed
type Hub struct {
subscribers map[chan Resource]bool
newSubscribers chan chan Resource
removedSubscribers chan chan Resource
resources chan Resource
}

// NewHub creates a hub
func NewHub() Hub {
return Hub{
make(map[chan Resource]bool),
make(chan (chan Resource)),
make(chan (chan Resource)),
make(chan Resource),
}
}

// Start starts the hub
func (h *Hub) Start() {
go func() {
for {
select {

case s := <-h.newSubscribers:
h.subscribers[s] = true

case s := <-h.removedSubscribers:
delete(h.subscribers, s)
close(s)

case content := <-h.resources:
for s := range h.subscribers {
s <- content
}
log.Printf("Broadcast resource to %d subscribers", len(h.subscribers))
}
}
}()
}
36 changes: 36 additions & 0 deletions hub/publishHandler.go
@@ -0,0 +1,36 @@
package hub

import (
"fmt"
"net/http"
)

// PublishHandler allows publisher to broadcast resources to all subscribers
func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, "Invalid request")

return
}

iri := r.Form.Get("iri")
if iri == "" {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, "Missing \"iri\" parameter")

return
}

data := r.Form.Get("data")
if data == "" {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, "Missing \"data\" parameter")

return
}

// Broadcast the resource
h.resources <- NewResource(iri, data)
}
21 changes: 21 additions & 0 deletions hub/resource.go
@@ -0,0 +1,21 @@
package hub

import (
"fmt"
"strings"
)

// Resource contains a server-sent event
type Resource struct {
// The Internationalized Resource Identifier (RFC3987) of the resource (will most likely be an URI), prefixed by "id: "
IRI string

// Data, encoded in the sever-sent event format: every line starts with the string "data: "
// https://www.w3.org/TR/eventsource/#dispatchMessage
Data string
}

// NewResource creates a new resource and encodes the data property
func NewResource(iri string, data string) Resource {
return Resource{iri, fmt.Sprintf("data: %s\n\n", strings.Replace(data, "\n", "\ndata: ", -1))}
}
96 changes: 96 additions & 0 deletions hub/subscribeHandler.go
@@ -0,0 +1,96 @@
package hub

import (
"fmt"
"log"
"net/http"
"regexp"

"github.com/yosida95/uritemplate"
)

// SubscribeHandler create a keep alive connection and send the events to the subscribers
func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {
f, ok := w.(http.Flusher)
if !ok {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
log.Panic("The Reponse Writter must be an instance of Flusher.")
return
}

iris := r.URL.Query()["iri[]"]
if len(iris) == 0 {
http.Error(w, "Missing \"iri[]\" parameters.", http.StatusBadRequest)
return
}

var regexps = make([]*regexp.Regexp, len(iris))
for index, iri := range iris {
tpl, err := uritemplate.New(iri)
if nil != err {
http.Error(w, fmt.Sprintf("\"%s\" is not a valid URI template (RFC6570).", iri), http.StatusBadRequest)
return
}
regexps[index] = tpl.Regexp()
}

log.Printf("%s connected.", r.RemoteAddr)

// Keep alive, useful only for HTTP 1 clients https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Keep-Alive
w.Header().Set("Connection", "keep-alive")

// Server-sent events https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Sending_events_from_the_server
w.Header().Set("Content-Type", "text/event-stream")

// Disable cache, even for old browsers and proxies
w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
w.Header().Set("Pragma", "no-cache")
w.Header().Set("Expire", "0")

// NGINX support https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering
w.Header().Set("X-Accel-Buffering", "no")

// Create a new channel, over which the hub can send can send resources to this subscriber.
resourceChan := make(chan Resource)

// Add this client to the map of those that should
// receive updates
h.newSubscribers <- resourceChan

// Listen to the closing of the http connection via the CloseNotifier
notify := w.(http.CloseNotifier).CloseNotify()
go func() {
<-notify
// Remove this client from the map of attached clients
// when `EventHandler` exits.
h.removedSubscribers <- resourceChan
log.Printf("%s disconnected.", r.RemoteAddr)
}()

for {
// Read from our resourceChan.
resource, open := <-resourceChan

if !open {
// If our resourceChan was closed, this means that the client has disconnected.
break
}

match := false
for _, r := range regexps {
if r.MatchString(resource.IRI) {
match = true
break
}
}
if !match {
continue
}

fmt.Fprint(w, "event: mercure\n")
fmt.Fprintf(w, "id: %s\n", resource.IRI)
fmt.Fprint(w, resource.Data)

f.Flush()
}
}

0 comments on commit 785839c

Please sign in to comment.