Skip to content

Commit

Permalink
add event writer
Browse files Browse the repository at this point in the history
  • Loading branch information
btubbs committed Oct 20, 2018
1 parent e022eac commit 9cbc314
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 54 deletions.
9 changes: 3 additions & 6 deletions client_example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@ import (
)

func main() {
req, err := http.NewRequest(
http.MethodGet,
// This app will output a stream of incrementing integers.
"http://localhost:8080",
nil,
)
// If you run server_example/main.go, then it will be listening on this host/port.
url := "http://localhost:8080"
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
panic(err.Error())
}
Expand Down
59 changes: 20 additions & 39 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ func TestClient(t *testing.T) {
handlers: []http.HandlerFunc{
func(w http.ResponseWriter, r *http.Request) {
// assert that the required request headers are present
h := w.Header()
h.Set(contentTypeHeader, textEventStream)
evw := NewEventWriter(w)
assert.Equal(t, noCache, r.Header.Get(cacheControlHeader))
writeEvent(w, Event{Data: []byte("msg1")})
writeEvent(w, Event{Data: []byte("msg2")})
assert.Nil(t, evw.Write(Event{Data: []byte("msg1")}))
assert.Nil(t, evw.Write(Event{Data: []byte("msg2")}))
}},
expectedEvents: []Event{
{Data: []byte("msg1")},
Expand All @@ -39,8 +38,7 @@ func TestClient(t *testing.T) {
desc: "cache control header",
handlers: []http.HandlerFunc{
func(w http.ResponseWriter, r *http.Request) {
h := w.Header()
h.Set(contentTypeHeader, textEventStream)
NewEventWriter(w)
assert.Equal(t, noCache, r.Header.Get(cacheControlHeader))
}},
expectedEvents: []Event{},
Expand All @@ -49,8 +47,7 @@ func TestClient(t *testing.T) {
desc: "last ID option",
handlers: []http.HandlerFunc{
func(w http.ResponseWriter, r *http.Request) {
h := w.Header()
h.Set(contentTypeHeader, textEventStream)
NewEventWriter(w)
assert.Equal(t, "foo", r.Header.Get(lastEventIDHeader))
}},
expectedEvents: []Event{},
Expand All @@ -59,10 +56,7 @@ func TestClient(t *testing.T) {
{
desc: "bad content type",
handlers: []http.HandlerFunc{
func(w http.ResponseWriter, r *http.Request) {
h := w.Header()
h.Set(contentTypeHeader, "foo/foo")
}},
func(w http.ResponseWriter, r *http.Request) {}},
expectedEvents: []Event{},
expectedErrRegexp: "response does not have text/event-stream Content-Type",
},
Expand All @@ -80,10 +74,9 @@ func TestClient(t *testing.T) {
desc: "IDs",
handlers: []http.HandlerFunc{
func(w http.ResponseWriter, r *http.Request) {
h := w.Header()
h.Set(contentTypeHeader, textEventStream)
writeEvent(w, Event{Data: []byte("msg1"), ID: "foo"})
writeEvent(w, Event{Data: []byte("msg2"), ID: "bar"})
evw := NewEventWriter(w)
assert.Nil(t, evw.Write(Event{Data: []byte("msg1"), ID: "foo"}))
assert.Nil(t, evw.Write(Event{Data: []byte("msg2"), ID: "bar"}))
},
},
expectedEvents: []Event{
Expand All @@ -98,10 +91,9 @@ func TestClient(t *testing.T) {
desc: "retries",
handlers: []http.HandlerFunc{
func(w http.ResponseWriter, r *http.Request) {
h := w.Header()
h.Set(contentTypeHeader, textEventStream)
writeEvent(w, Event{Data: []byte("msg1"), Retry: 500 * time.Millisecond})
writeEvent(w, Event{Data: []byte("msg2"), Retry: 1200 * time.Millisecond})
evw := NewEventWriter(w)
assert.Nil(t, evw.Write(Event{Data: []byte("msg1"), Retry: 500 * time.Millisecond}))
assert.Nil(t, evw.Write(Event{Data: []byte("msg2"), Retry: 1200 * time.Millisecond}))
},
},
expectedEvents: []Event{
Expand All @@ -125,23 +117,21 @@ func TestClient(t *testing.T) {
desc: "auto retry on disconnect",
handlers: []http.HandlerFunc{
func(w http.ResponseWriter, r *http.Request) {
h := w.Header()
h.Set(contentTypeHeader, textEventStream)
writeEvent(w, Event{Data: []byte("msg1"), Retry: 500 * time.Millisecond})
evw := NewEventWriter(w)
assert.Nil(t, evw.Write(Event{Data: []byte("msg1"), Retry: 500 * time.Millisecond}))
},
func(w http.ResponseWriter, r *http.Request) {
h := w.Header()
h.Set(contentTypeHeader, textEventStream)
NewEventWriter(w)
if _, err := w.Write([]byte(":\n")); err != nil {
panic("i hate you, linter")
}
},
func(w http.ResponseWriter, r *http.Request) {
h := w.Header()
h.Set(contentTypeHeader, textEventStream)
writeEvent(w, Event{Data: []byte("msg2")})
evw := NewEventWriter(w)
assert.Nil(t, evw.Write(Event{Data: []byte("msg2")}))
},
func(w http.ResponseWriter, r *http.Request) {
NewEventWriter(w)
panic("boom") // force breaking out of the loop
},
},
Expand Down Expand Up @@ -179,9 +169,8 @@ func TestClient(t *testing.T) {

func TestSubscribe(t *testing.T) {
f := func(w http.ResponseWriter, r *http.Request) {
h := w.Header()
h.Set(contentTypeHeader, textEventStream)
writeEvent(w, Event{Data: []byte("msg1"), Retry: 500 * time.Millisecond})
evw := NewEventWriter(w)
assert.Nil(t, evw.Write(Event{Data: []byte("msg1"), Retry: 500 * time.Millisecond}))
}

server := httptest.NewServer(http.HandlerFunc(f))
Expand All @@ -202,11 +191,3 @@ func buildTestServer(handlers []http.HandlerFunc) http.HandlerFunc {
}
}
}

func writeEvent(w http.ResponseWriter, e Event) {
if _, err := w.Write(e.Bytes()); err != nil {
panic(err.Error())
}
flusher := w.(http.Flusher)
flusher.Flush()
}
31 changes: 31 additions & 0 deletions event_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package sse

import (
"net/http"
)

// EventWriter is a helper for writing events to an io.Writer stream.
type EventWriter struct {
w http.ResponseWriter
}

// NewEventWriter returns a new EventWriter.
func NewEventWriter(w http.ResponseWriter) *EventWriter {
w.Header().Set(contentTypeHeader, textEventStream)
return &EventWriter{w: w}
}

// Write takes a io.Writer and an Event, serializes the Event to bytes, and writes them out to the
// writer. If the writer is also an http.Flusher, then it will also flush the bytes out so the
// event will be sent over the wire immediately. It automatically sets the Content-Type header to
// text/event-stream.
func (evw *EventWriter) Write(e Event) error {

if _, err := evw.w.Write(e.Bytes()); err != nil {
return err
}
if flusher, ok := evw.w.(http.Flusher); ok {
flusher.Flush()
}
return nil
}
18 changes: 9 additions & 9 deletions server_example/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"fmt"
"log"
"net/http"
"strconv"
"time"
Expand All @@ -9,22 +11,20 @@ import (
)

func main() {
http.ListenAndServe(
log.Fatal(http.ListenAndServe(
":8080",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
h := w.Header()
h.Set("Content-Type", "text/event-stream")
// loop forever and emit a counter
ew := sse.NewEventWriter(w)
n := 0
for {
n++
e := sse.Event{Data: []byte(strconv.Itoa(n))}
if _, err := w.Write(e.Bytes()); err != nil {
panic(err.Error())
if err := ew.Write(e); err != nil {
// web browsers are expected to disconnect at any time. That will raise an error here.
fmt.Println(err)
return
}
flusher := w.(http.Flusher)
flusher.Flush()
time.Sleep(time.Second)
}
}))
})))
}

0 comments on commit 9cbc314

Please sign in to comment.