Skip to content

Commit

Permalink
Dispatch connection and disconnection events
Browse files Browse the repository at this point in the history
  • Loading branch information
dunglas committed Feb 18, 2020
1 parent e52bba8 commit 40895b7
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 50 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Expand Up @@ -16,3 +16,4 @@ issues:
linters:
- gochecknoglobals
- funlen
- godox
55 changes: 28 additions & 27 deletions docs/hub/config.md

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions hub/config.go
Expand Up @@ -23,6 +23,8 @@ func SetConfigDefaults(v *viper.Viper) {
v.SetDefault("compress", false)
v.SetDefault("use_forwarded_headers", false)
v.SetDefault("demo", false)
v.SetDefault("dispatch_subscriptions", false)
v.SetDefault("subscriptions_include_ip", false)
}

// ValidateConfig validates a Viper instance
Expand Down Expand Up @@ -64,6 +66,8 @@ func SetFlags(fs *pflag.FlagSet, v *viper.Viper) {
fs.BoolP("use-forwarded-headers", "f", false, "enable headers forwarding")
fs.BoolP("demo", "D", false, "enable the demo mode")
fs.StringP("log-format", "l", "", "the log format (JSON, FLUENTD or TEXT)")
fs.BoolP("dispatch-subscriptions", "s", false, "dispatch updates when subscriptions are created or terminated")
fs.BoolP("subscriptions-include-ip", "I", false, "include the IP address of the subscriber in the subscription update")

fs.VisitAll(func(f *pflag.Flag) {
v.BindPFlag(strings.ReplaceAll(f.Name, "-", "_"), fs.Lookup(f.Name))
Expand Down
2 changes: 1 addition & 1 deletion hub/config_test.go
Expand Up @@ -37,7 +37,7 @@ func TestSetFlags(t *testing.T) {
fs := pflag.NewFlagSet("test", pflag.PanicOnError)
SetFlags(fs, v)

assert.Subset(t, []string{"cert_file", "compress", "demo", "jwt_algorithm", "transport_url", "acme_hosts", "acme_cert_dir", "subscriber_jwt_key", "log_format", "jwt_key", "allow_anonymous", "debug", "read_timeout", "publisher_jwt_algorithm", "write_timeout", "key_file", "use_forwarded_headers", "subscriber_jwt_algorithm", "addr", "publisher_jwt_key", "heartbeat_interval", "cors_allowed_origins", "publish_allowed_origins"}, v.AllKeys())
assert.Subset(t, v.AllKeys(), []string{"cert_file", "compress", "demo", "jwt_algorithm", "transport_url", "acme_hosts", "acme_cert_dir", "subscriber_jwt_key", "log_format", "jwt_key", "allow_anonymous", "debug", "read_timeout", "publisher_jwt_algorithm", "write_timeout", "key_file", "use_forwarded_headers", "subscriber_jwt_algorithm", "addr", "publisher_jwt_key", "heartbeat_interval", "cors_allowed_origins", "publish_allowed_origins", "dispatch_subscriptions", "subscriptions_include_ip"})
}

func TestInitConfig(t *testing.T) {
Expand Down
101 changes: 91 additions & 10 deletions hub/subscribe.go
Expand Up @@ -2,16 +2,29 @@ package hub

import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
"time"

"github.com/gofrs/uuid"
log "github.com/sirupsen/logrus"
"github.com/yosida95/uritemplate"
)

type subscription struct {
ID string `json:"@id"`
Type string `json:"@type"`
Topic string `json:"topic"`
Active bool `json:"active"`
mercureClaim
Address string `json:"address,omitempty"`
}

// SubscribeHandler create a keep alive connection and send the events to the subscribers
func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {
f, ok := w.(http.Flusher)
Expand Down Expand Up @@ -80,39 +93,57 @@ func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscri
}
fields["subscriber_topics"] = topics

var rawTopics = make([]string, 0, len(topics))
var templateTopics = make([]*uritemplate.Template, 0, len(topics))
for _, topic := range topics {
if tpl := h.getURITemplate(topic); tpl == nil {
rawTopics = append(rawTopics, topic)
} else {
templateTopics = append(templateTopics, tpl)
}
}
rawTopics, templateTopics := h.parseTopics(topics)

authorizedAlltargets, authorizedTargets := authorizedTargets(claims, false)
subscriber := NewSubscriber(authorizedAlltargets, authorizedTargets, topics, rawTopics, templateTopics, retrieveLastEventID(r))

encodedTopics := escapeTopics(topics)

// Connection events must be sent before creating the pipe to prevent a deadlock
connectionID := uuid.Must(uuid.NewV4()).String()
var address string
if h.config.GetBool("subscriptions_include_ip") {
address, _, _ = net.SplitHostPort(r.RemoteAddr)
}
h.dispatchSubscriptionUpdate(topics, encodedTopics, connectionID, claims, true, address)
pipe, err := h.transport.CreatePipe(subscriber.LastEventID)
if err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
h.dispatchSubscriptionUpdate(topics, encodedTopics, connectionID, claims, false, address)
log.WithFields(fields).Error(err)
return nil, nil, false
}

sendHeaders(w)

log.WithFields(fields).Info("New subscriber")

// Listen to the closing of the http connection via the Request's Context
go func() {
<-r.Context().Done()
pipe.Close()

h.dispatchSubscriptionUpdate(topics, encodedTopics, connectionID, claims, false, address)
log.WithFields(fields).Info("Subscriber disconnected")
}()

return subscriber, pipe, true
}

func (h *Hub) parseTopics(topics []string) (rawTopics []string, templateTopics []*uritemplate.Template) {
rawTopics = make([]string, 0, len(topics))
templateTopics = make([]*uritemplate.Template, 0, len(topics))
for _, topic := range topics {
if tpl := h.getURITemplate(topic); tpl == nil {
rawTopics = append(rawTopics, topic)
} else {
templateTopics = append(templateTopics, tpl)
}
}

return rawTopics, templateTopics
}

// getURITemplate retrieves or creates the uritemplate.Template associated with this topic, or nil if it's not a template
func (h *Hub) getURITemplate(topic string) *uritemplate.Template {
var tpl *uritemplate.Template
Expand Down Expand Up @@ -202,3 +233,53 @@ func (h *Hub) cleanup(s *Subscriber) {
}
h.uriTemplates.Unlock()
}

func (h *Hub) dispatchSubscriptionUpdate(topics, encodedTopics []string, connectionID string, claims *claims, active bool, address string) {
if !h.config.GetBool("dispatch_subscriptions") {
return
}

for k, topic := range topics {
connection := &subscription{
ID: "https://mercure.rocks/subscriptions/" + encodedTopics[k] + "/" + connectionID,
Type: "https://mercure.rocks/Subscription",
Topic: topic,
Active: active,
Address: address,
}

if claims == nil {
connection.mercureClaim.Publish = []string{}
connection.mercureClaim.Subscribe = []string{}
} else {
if connection.mercureClaim.Publish == nil {
connection.mercureClaim.Publish = []string{}
}
if connection.mercureClaim.Subscribe == nil {
connection.mercureClaim.Subscribe = []string{}
}
}

json, err := json.MarshalIndent(connection, "", " ")
if err != nil {
panic(err)
}

u := &Update{
Topics: []string{connection.ID},
Targets: map[string]struct{}{"https://mercure.rocks/targets/subscriptions": {}, "https://mercure.rocks/targets/subscriptions/" + encodedTopics[k]: {}},
Event: Event{Data: string(json), ID: uuid.Must(uuid.NewV4()).String()},
}

h.transport.Write(u)
}
}

func escapeTopics(topics []string) []string {
encodedTopics := make([]string, 0, len(topics))
for _, topic := range topics {
encodedTopics = append(encodedTopics, url.QueryEscape(topic))
}

return encodedTopics
}
90 changes: 89 additions & 1 deletion hub/subscribe_test.go
Expand Up @@ -8,6 +8,7 @@ import (
"net/http/httptest"
"net/url"
"os"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -48,6 +49,9 @@ func (rt *responseTester) Write(buf []byte) (int, error) {

if rt.body == rt.expectedBody {
rt.cancel()
} else if !strings.HasPrefix(rt.expectedBody, rt.body) {
rt.t.Errorf(`Received body "%s" doesn't match expected body "%s"`, rt.body, rt.expectedBody)
rt.cancel()
}

return len(buf), nil
Expand Down Expand Up @@ -244,7 +248,7 @@ func testSubscribe(numberOfSubscribers int, t *testing.T) {
}

func TestSubscribe(t *testing.T) {
log.SetLevel(log.DebugLevel)
log.SetLevel(log.ErrorLevel)
testSubscribe(3, t)
}

Expand Down Expand Up @@ -329,6 +333,90 @@ func TestSubscribeTarget(t *testing.T) {
hub.Stop()
}

func TestSubscriptionEvents(t *testing.T) {
hub := createDummy()
hub.config.Set("dispatch_subscriptions", true)
hub.config.Set("subscriptions_include_ip", true)

var wg sync.WaitGroup
ctx1, cancel1 := context.WithCancel(context.Background())
ctx2, cancel2 := context.WithCancel(context.Background())
wg.Add(3)
go func() {
// Authorized to receive connection events
defer wg.Done()
req := httptest.NewRequest("GET", defaultHubURL+"?topic=https://mercure.rocks/subscriptions/{topic}/{connectionID}", nil).WithContext(ctx1)
req.AddCookie(&http.Cookie{Name: "mercureAuthorization", Value: createDummyAuthorizedJWT(hub, subscriberRole, []string{"https://mercure.rocks/targets/subscriptions"})})
w := httptest.NewRecorder()
hub.SubscribeHandler(w, req)

resp := w.Result()
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)

assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyContent := string(body)
assert.Contains(t, bodyContent, `data: "@id": "https://mercure.rocks/subscriptions/https%3A%2F%2Fexample.com/`)
assert.Contains(t, bodyContent, `data: "@type": "https://mercure.rocks/Subscription",`)
assert.Contains(t, bodyContent, `data: "topic": "https://example.com",`)
assert.Contains(t, bodyContent, `data: "publish": [],`)
assert.Contains(t, bodyContent, `data: "subscribe": []`)
assert.Contains(t, bodyContent, `data: "active": true,`)
assert.Contains(t, bodyContent, `data: "active": false,`)
assert.Contains(t, bodyContent, `data: "address": "`)
}()

go func() {
// Not authorized to receive connection events
defer wg.Done()
req := httptest.NewRequest("GET", defaultHubURL+"?topic=https://mercure.rocks/subscriptions/{topic}/{connectionID}", nil).WithContext(ctx2)
req.AddCookie(&http.Cookie{Name: "mercureAuthorization", Value: createDummyAuthorizedJWT(hub, subscriberRole, []string{})})
w := httptest.NewRecorder()
hub.SubscribeHandler(w, req)

resp := w.Result()
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)

assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, ":\n", string(body))
}()

go func() {
defer wg.Done()

s, _ := hub.transport.(*LocalTransport)
for {
s.RLock()
ready := len(s.pipes) == 2
s.RUnlock()

log.Info("Waiting for subscriber...")
if ready {
break
}
}

ctx, cancelRequest2 := context.WithCancel(context.Background())
req := httptest.NewRequest("GET", defaultHubURL+"?topic=https://example.com", nil).WithContext(ctx)
req.AddCookie(&http.Cookie{Name: "mercureAuthorization", Value: createDummyAuthorizedJWT(hub, subscriberRole, []string{})})

w := &responseTester{
expectedStatusCode: http.StatusOK,
expectedBody: ":\n",
t: t,
cancel: cancelRequest2,
}
hub.SubscribeHandler(w, req)
time.Sleep(1 * time.Second) // TODO: find a better way to wait for the disconnection update to be dispatched
cancel2()
cancel1()
}()

wg.Wait()
hub.Stop()
}

func TestSubscribeAllTargets(t *testing.T) {
hub := createDummy()
s, _ := hub.transport.(*LocalTransport)
Expand Down
60 changes: 49 additions & 11 deletions spec/mercure.md
Expand Up @@ -19,9 +19,8 @@ organization = "Les-Tilleuls.coop"
email = "kevin@les-tilleuls.coop"
[author.address.postal]
city = "Lille"
street = "2 rue Hegel"
code = "59000"
postalline= ["Bâtiment Canal"]
street = "82 rue Winston Churchill"
code = "59160"
country = "France"
%%%

Expand Down Expand Up @@ -195,18 +194,18 @@ following data:
alternate URLs. The hub **MUST** dispatch this update to subscribers that are subscribed to both
canonical or alternate URLs.

* `data`: The content of the new version of this topic.
* `data`: the content of the new version of this topic.

* `target` (optional): Target audience of this update. This key can be present several times. See
* `target` (optional): target audience of this update. This key can be present several times. See
section #Authorization for further information.

* `id` (optional): The topic's revision identifier: it will be used as the SSE's `id` property.
If omitted, the hub **MUST** generate a valid globally unique id. It **MAY** be a UUID. Even if
provided, the hub **MAY** ignore the id provided by the client and generate its own id.
* `id` (optional): the topic's revision identifier: it will be used as the SSE's `id` property. If
omitted, the hub **MUST** generate a valid globally unique id. It **MAY** be a UUID [@RFC4122].
Even if provided, the hub **MAY** ignore the id provided by the client and generate its own id.

* `type` (optional): The SSE's `event` property (a specific event type).
* `type` (optional): the SSE's `event` property (a specific event type).

* `retry` (optional): The SSE's `retry` property (the reconnection time).
* `retry` (optional): the SSE's `retry` property (the reconnection time).

In the event of success, the HTTP response's body **MUST** be the `id` associated to this update
generated by the hub and a success HTTP status code **MUST** be returned. The publisher **MUST** be
Expand Down Expand Up @@ -298,7 +297,7 @@ authorized to receive updates destined for all targets.

To allow re-establishment in case of connection lost, events dispatched by the hub **SHOULD**
include an `id` property. The value contained in this `id` property **SHOULD** be a globally unique
identifier. To do so, a UUID [@!RFC4122] **MAY** be used.
identifier. To do so, a UUID [@RFC4122] **MAY** be used.

According to the server-sent events specification, in case of connection
lost the subscriber will try to automatically re-connect. During the
Expand All @@ -324,6 +323,45 @@ after a long disconnection time).
The hub **MAY** also specify the reconnection time using the `retry` key, as specified in the
server-sent events format.

# Subscription Events

The hub **MAY** publish an update when a subscription to a topic is created or terminated. If this
feature is implemented by the hub, an update **MUST** be dispatched every time that a subscription
is created or terminated, and for each topic to which the client subscribes.

The topic of this update **MUST** follow the pattern
`https://mercure.rocks/subscriptions/{topic}/{subscriptionID}` where `topic` is the URL-encoded
value of the subscribed topic and `subscriptionID` is an unique identifier for this subscription.
`subscriptionID` **MAY** be a UUID [@RFC4122].

The content of the update **MUST** be a JSON-LD [@!W3C.REC-json-ld-20140116] document containing at
least the following properties:

* `@id`: the identifier of this update, it **MUST** be the same value as the subscription update's
topic

* `@type`: the fixed value `https://mercure.rocks/Subscription`

* `topic`: the topic to which the subscription refers

* `active`: `true` when the subscription is created, and `false` when it is terminated

* `subscribe`: the subscription targets provided by the subscriber (see section #Authorization)

* `publish`: the publication targets provided by the subscriber (see section #Authorization)

* `address` (optional): the IP address ([@!RFC791], [@!RFC8200]) of the subscriber

The JSON-LD document **MAY** contain other properties.

In order to only allow authorized subscribers to receive subscription events, the subscription
update **MUST** be marked as intended for subscribers providing the following targets:

* the fixed value `https://mercure.rocks/targets/subscriptions`

* a URL following the pattern `https://mercure.rocks/targets/subscriptions/{topic}` where topic is
the URL-encoded value of the subscribed topic

# Encryption

Using HTTPS does not prevent the hub from accessing the update's content. Depending of the intended
Expand Down

0 comments on commit 40895b7

Please sign in to comment.