Skip to content

Commit

Permalink
Issue #129: Support for server-sent events (SSE)
Browse files Browse the repository at this point in the history
Original PR #130 by @madeddie

Add a proxy.flushinterval option which enables periodic
flushing of the repsonse buffer for SSE connections which
have the 'Accept' header set to 'text/event-stream'.

This is really a route specific option and should be
configured as such once this becomes possible.
  • Loading branch information
magiconair committed Jul 30, 2016
1 parent 79e8284 commit ea7f9dd
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 3 deletions.
1 change: 1 addition & 0 deletions config/config.go
Expand Up @@ -54,6 +54,7 @@ type Proxy struct {
KeepAliveTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
FlushInterval time.Duration
LocalIP string
ClientIPHeader string
TLSHeader string
Expand Down
1 change: 1 addition & 0 deletions config/default.go
Expand Up @@ -13,6 +13,7 @@ var Default = &Config{
Matcher: "prefix",
NoRouteStatus: 404,
DialTimeout: 30 * time.Second,
FlushInterval: time.Second,
LocalIP: LocalIPString(),
},
Registry: Registry{
Expand Down
1 change: 1 addition & 0 deletions config/load.go
Expand Up @@ -104,6 +104,7 @@ func load(p *properties.Properties) (cfg *Config, err error) {
f.KVSliceVar(&cfg.CertSourcesValue, "proxy.cs", Default.CertSourcesValue, "certificate sources")
f.DurationVar(&cfg.Proxy.ReadTimeout, "proxy.readtimeout", Default.Proxy.ReadTimeout, "read timeout for incoming requests")
f.DurationVar(&cfg.Proxy.WriteTimeout, "proxy.writetimeout", Default.Proxy.WriteTimeout, "write timeout for outgoing responses")
f.DurationVar(&cfg.Proxy.FlushInterval, "proxy.flushinterval", Default.Proxy.FlushInterval, "flush interval for streaming responses")
f.StringVar(&cfg.Metrics.Target, "metrics.target", Default.Metrics.Target, "metrics backend")
f.StringVar(&cfg.Metrics.Prefix, "metrics.prefix", Default.Metrics.Prefix, "prefix for reported metrics")
f.DurationVar(&cfg.Metrics.Interval, "metrics.interval", Default.Metrics.Interval, "metrics reporting interval")
Expand Down
2 changes: 2 additions & 0 deletions config/load_test.go
Expand Up @@ -24,6 +24,7 @@ proxy.keepalivetimeout = 4s
proxy.dialtimeout = 60s
proxy.readtimeout = 5s
proxy.writetimeout = 10s
proxy.flushinterval = 15s
proxy.maxconn = 666
proxy.header.clientip = clientip
proxy.header.tls = tls
Expand Down Expand Up @@ -79,6 +80,7 @@ aws.apigw.cert.cn = furb
KeepAliveTimeout: 4 * time.Second,
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
FlushInterval: 15 * time.Second,
ClientIPHeader: "clientip",
TLSHeader: "tls",
TLSHeaderValue: "tls-true",
Expand Down
12 changes: 11 additions & 1 deletion fabio.properties
Expand Up @@ -274,6 +274,16 @@
# proxy.dialtimeout = 30s


# proxy.flushinterval configures periodic flushing of the
# response buffer for SSE (server-sent events) connections.
# They are detected when the 'Accept' header is
# 'text/event-stream'.
#
# The default is
#
# proxy.flushinterval = 1s


# proxy.maxconn configures the maximum number of cached
# incoming and outgoing connections.
#
Expand Down Expand Up @@ -536,4 +546,4 @@
#
# The default is
#
# ui.title =
# ui.title =
4 changes: 3 additions & 1 deletion proxy/http.go
Expand Up @@ -4,10 +4,12 @@ import (
"net/http"
"net/http/httputil"
"net/url"
"time"
)

func newHTTPProxy(t *url.URL, tr http.RoundTripper) http.Handler {
func newHTTPProxy(t *url.URL, tr http.RoundTripper, flush time.Duration) http.Handler {
rp := httputil.NewSingleHostReverseProxy(t)
rp.Transport = tr
rp.FlushInterval = flush
return rp
}
8 changes: 7 additions & 1 deletion proxy/proxy.go
Expand Up @@ -48,8 +48,14 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// To use the filtered proxy use
// h = newWSProxy(t.URL)

case r.Header.Get("Accept") == "text/event-stream":
// use the flush interval for SSE (server-sent events)
// must be > 0s to be effective
h = newHTTPProxy(t.URL, p.tr, p.cfg.FlushInterval)

default:
h = newHTTPProxy(t.URL, p.tr)
h = newHTTPProxy(t.URL, p.tr, time.Duration(0))
}

start := time.Now()
Expand Down

0 comments on commit ea7f9dd

Please sign in to comment.