From 785839c199daa24955049fc1b6d14ff9dd1930c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Sun, 15 Jul 2018 15:12:36 +0200 Subject: [PATCH] Create a package, add some godoc --- hub/hub.go | 46 +++++++ hub/publishHandler.go | 36 ++++++ hub/resource.go | 21 ++++ hub/subscribeHandler.go | 96 +++++++++++++++ main.go | 200 +------------------------------ {templates => public}/index.html | 2 +- 6 files changed, 206 insertions(+), 195 deletions(-) create mode 100644 hub/hub.go create mode 100644 hub/publishHandler.go create mode 100644 hub/resource.go create mode 100644 hub/subscribeHandler.go rename {templates => public}/index.html (96%) diff --git a/hub/hub.go b/hub/hub.go new file mode 100644 index 00000000..82a62cad --- /dev/null +++ b/hub/hub.go @@ -0,0 +1,46 @@ +package hub + +import "log" + +// Partially based on https://github.com/kljensen/golang-html5-sse-example + +// Hub stores channels with clients currently subcribed +type Hub struct { + subscribers map[chan Resource]bool + newSubscribers chan chan Resource + removedSubscribers chan chan Resource + resources chan Resource +} + +// NewHub creates a hub +func NewHub() Hub { + return Hub{ + make(map[chan Resource]bool), + make(chan (chan Resource)), + make(chan (chan Resource)), + make(chan Resource), + } +} + +// Start starts the hub +func (h *Hub) Start() { + go func() { + for { + select { + + case s := <-h.newSubscribers: + h.subscribers[s] = true + + case s := <-h.removedSubscribers: + delete(h.subscribers, s) + close(s) + + case content := <-h.resources: + for s := range h.subscribers { + s <- content + } + log.Printf("Broadcast resource to %d subscribers", len(h.subscribers)) + } + } + }() +} diff --git a/hub/publishHandler.go b/hub/publishHandler.go new file mode 100644 index 00000000..ef87b3e7 --- /dev/null +++ b/hub/publishHandler.go @@ -0,0 +1,36 @@ +package hub + +import ( + "fmt" + "net/http" +) + +// PublishHandler allows publisher to broadcast resources to all subscribers +func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request) { + err := r.ParseForm() + if err != nil { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, "Invalid request") + + return + } + + iri := r.Form.Get("iri") + if iri == "" { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, "Missing \"iri\" parameter") + + return + } + + data := r.Form.Get("data") + if data == "" { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, "Missing \"data\" parameter") + + return + } + + // Broadcast the resource + h.resources <- NewResource(iri, data) +} diff --git a/hub/resource.go b/hub/resource.go new file mode 100644 index 00000000..0278e69a --- /dev/null +++ b/hub/resource.go @@ -0,0 +1,21 @@ +package hub + +import ( + "fmt" + "strings" +) + +// Resource contains a server-sent event +type Resource struct { + // The Internationalized Resource Identifier (RFC3987) of the resource (will most likely be an URI), prefixed by "id: " + IRI string + + // Data, encoded in the sever-sent event format: every line starts with the string "data: " + // https://www.w3.org/TR/eventsource/#dispatchMessage + Data string +} + +// NewResource creates a new resource and encodes the data property +func NewResource(iri string, data string) Resource { + return Resource{iri, fmt.Sprintf("data: %s\n\n", strings.Replace(data, "\n", "\ndata: ", -1))} +} diff --git a/hub/subscribeHandler.go b/hub/subscribeHandler.go new file mode 100644 index 00000000..b457c669 --- /dev/null +++ b/hub/subscribeHandler.go @@ -0,0 +1,96 @@ +package hub + +import ( + "fmt" + "log" + "net/http" + "regexp" + + "github.com/yosida95/uritemplate" +) + +// SubscribeHandler create a keep alive connection and send the events to the subscribers +func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) { + f, ok := w.(http.Flusher) + if !ok { + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + log.Panic("The Reponse Writter must be an instance of Flusher.") + return + } + + iris := r.URL.Query()["iri[]"] + if len(iris) == 0 { + http.Error(w, "Missing \"iri[]\" parameters.", http.StatusBadRequest) + return + } + + var regexps = make([]*regexp.Regexp, len(iris)) + for index, iri := range iris { + tpl, err := uritemplate.New(iri) + if nil != err { + http.Error(w, fmt.Sprintf("\"%s\" is not a valid URI template (RFC6570).", iri), http.StatusBadRequest) + return + } + regexps[index] = tpl.Regexp() + } + + log.Printf("%s connected.", r.RemoteAddr) + + // Keep alive, useful only for HTTP 1 clients https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Keep-Alive + w.Header().Set("Connection", "keep-alive") + + // Server-sent events https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Sending_events_from_the_server + w.Header().Set("Content-Type", "text/event-stream") + + // Disable cache, even for old browsers and proxies + w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") + w.Header().Set("Pragma", "no-cache") + w.Header().Set("Expire", "0") + + // NGINX support https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering + w.Header().Set("X-Accel-Buffering", "no") + + // Create a new channel, over which the hub can send can send resources to this subscriber. + resourceChan := make(chan Resource) + + // Add this client to the map of those that should + // receive updates + h.newSubscribers <- resourceChan + + // Listen to the closing of the http connection via the CloseNotifier + notify := w.(http.CloseNotifier).CloseNotify() + go func() { + <-notify + // Remove this client from the map of attached clients + // when `EventHandler` exits. + h.removedSubscribers <- resourceChan + log.Printf("%s disconnected.", r.RemoteAddr) + }() + + for { + // Read from our resourceChan. + resource, open := <-resourceChan + + if !open { + // If our resourceChan was closed, this means that the client has disconnected. + break + } + + match := false + for _, r := range regexps { + if r.MatchString(resource.IRI) { + match = true + break + } + } + if !match { + continue + } + + fmt.Fprint(w, "event: mercure\n") + fmt.Fprintf(w, "id: %s\n", resource.IRI) + fmt.Fprint(w, resource.Data) + + f.Flush() + } +} diff --git a/main.go b/main.go index d7f516cb..54a82d71 100644 --- a/main.go +++ b/main.go @@ -1,209 +1,21 @@ package main import ( - "fmt" - "html/template" "log" "net/http" "os" - "regexp" - "strings" + "github.com/dunglas/mercure/hub" _ "github.com/joho/godotenv/autoload" - "github.com/yosida95/uritemplate" ) -type content struct { - iri string - data string -} - -type Broker struct { - // key: client, value: useless - clients map[chan content]bool - - // Channel into which new clients can be pushed - newClients chan chan content - - // Channel into which disconnected clients should be pushed - defunctClients chan chan content - - // Channel into which messages are pushed to be broadcast out - // to attahed clients. - contents chan content -} - -func (b *Broker) Start() { - go func() { - for { - select { - - case s := <-b.newClients: - b.clients[s] = true - log.Println("Added new client") - - case s := <-b.defunctClients: - delete(b.clients, s) - close(s) - log.Println("Removed client") - - case content := <-b.contents: - for s := range b.clients { - s <- content - } - log.Printf("Broadcast message to %d clients", len(b.clients)) - } - } - }() -} - -func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { - f, ok := w.(http.Flusher) - if !ok { - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - log.Panic("The Reponse Writter must be an instance of Flusher.") - return - } - - iris := r.URL.Query()["iri[]"] - if len(iris) == 0 { - http.Error(w, "Missing \"iri[]\" parameters.", http.StatusBadRequest) - return - } - - var regexps = make([]*regexp.Regexp, len(iris)) - for index, iri := range iris { - tpl, err := uritemplate.New(iri) - if nil != err { - http.Error(w, fmt.Sprintf("\"%s\" is not a valid URI template (RFC6570).", iri), http.StatusBadRequest) - return - } - regexps[index] = tpl.Regexp() - } - - // Server-sent events https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Sending_events_from_the_server - w.Header().Set("Connection", "keep-alive") - w.Header().Set("Content-Type", "text/event-stream") - - // Disable cache, even for old browsers and proxies - w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") - w.Header().Set("Pragma", "no-cache") - w.Header().Set("Expire", "0") - - // NGINX support https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering - w.Header().Set("X-Accel-Buffering", "no") - - // Create a new channel, over which the broker can - // send this client messages. - contentChan := make(chan content) - - // Add this client to the map of those that should - // receive updates - b.newClients <- contentChan - - // Listen to the closing of the http connection via the CloseNotifier - notify := w.(http.CloseNotifier).CloseNotify() - go func() { - <-notify - // Remove this client from the map of attached clients - // when `EventHandler` exits. - b.defunctClients <- contentChan - log.Println("HTTP connection just closed.") - }() - - for { - // Read from our messageChan. - content, open := <-contentChan - - if !open { - // If our messageChan was closed, this means that the client has - // disconnected. - break - } - - match := false - for _, r := range regexps { - log.Printf("%v", r) - if r.MatchString(content.iri) { - match = true - break - } - } - if !match { - continue - } - - fmt.Fprint(w, "event: mercure\n") - fmt.Fprintf(w, "id: %s\n", content.iri) - fmt.Fprint(w, content.data) - - f.Flush() - } - - log.Println("Finished HTTP request at ", r.URL.Path) -} - -func (b *Broker) PublishHandler(w http.ResponseWriter, r *http.Request) { - err := r.ParseForm() - if err != nil { - w.WriteHeader(http.StatusBadRequest) - fmt.Fprint(w, "Invalid request") - - return - } - - iri := r.Form.Get("iri") - if iri == "" { - w.WriteHeader(http.StatusBadRequest) - fmt.Fprint(w, "Missing \"iri\" parameter") - - return - } - - data := r.Form.Get("data") - if data == "" { - w.WriteHeader(http.StatusBadRequest) - fmt.Fprint(w, "Missing \"data\" parameter") - - return - } - - // Encode the message: replace newlines by "\ndata: ", https://www.w3.org/TR/eventsource/#dispatchMessage - encodedBody := fmt.Sprintf("data: %s\n\n", strings.Replace(data, "\n", "\ndata: ", -1)) - b.contents <- content{iri, encodedBody} - - fmt.Fprintf(w, "Published a new message: %s", iri) -} - -func IndexHandler(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/" { - http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) - return - } - - t, err := template.ParseFiles("templates/index.html") - if err != nil { - log.Fatal("Missing template file.") - } - - t.Execute(w, nil) -} - -// Main routine -// func main() { - b := &Broker{ - make(map[chan content]bool), - make(chan (chan content)), - make(chan (chan content)), - make(chan content), - } - b.Start() - - http.Handle("/events/", b) + hub := hub.NewHub() + hub.Start() - http.Handle("/publish", http.HandlerFunc(b.PublishHandler)) - http.Handle("/", http.HandlerFunc(IndexHandler)) + http.Handle("/subscribe", http.HandlerFunc(hub.SubscribeHandler)) + http.Handle("/publish", http.HandlerFunc(hub.PublishHandler)) + http.Handle("/", http.FileServer(http.Dir("public"))) listen := os.Getenv("LISTEN") if listen == "" { diff --git a/templates/index.html b/public/index.html similarity index 96% rename from templates/index.html rename to public/index.html index 876cf343..4c956887 100644 --- a/templates/index.html +++ b/public/index.html @@ -63,7 +63,7 @@

} const iris = document.querySelector('#iri').value.split("\n"); - const url = new URL('/events/', document.location); + const url = new URL('/subscribe', document.location); iris.forEach((iri) => { url.searchParams.append('iri[]', iri) }); eventSource = new EventSource(url);