Skip to content
Permalink
Browse files

Support duplex communication (#11)

* Support duplex communication

* Partial implementation. Missing CSRF protection.

* Prevent CSRF attacks. Return the event ID when publishing

* Fix tests and some bugs

* Test header auth mechanism

* Improve tests

* Wording
  • Loading branch information...
dunglas committed Oct 21, 2018
1 parent 0555391 commit 9d81bb03aef9e8b32b62e31fd143a8636a39abf6
Showing with 966 additions and 473 deletions.
  1. +10 −7 .env
  2. +31 −28 README.md
  3. +4 −2 examples/publisher-node.js
  4. +5 −2 examples/publisher-php.php
  5. +1 −0 go.mod
  6. +2 −0 go.sum
  7. +113 −0 hub/authorization.go
  8. +212 −0 hub/authorization_test.go
  9. +3 −15 hub/demo.go
  10. +3 −3 hub/demo_test.go
  11. +3 −2 hub/history_test.go
  12. +7 −13 hub/hub_test.go
  13. +24 −13 hub/options.go
  14. +13 −11 hub/options_test.go
  15. +22 −25 hub/publish.go
  16. +67 −20 hub/publish_test.go
  17. +17 −16 hub/server.go
  18. +19 −8 hub/server_test.go
  19. +4 −35 hub/subscribe.go
  20. +52 −21 hub/subscribe_test.go
  21. +4 −3 hub/subscriber.go
  22. +1 −1 public/app.css
  23. +132 −116 public/app.js
  24. +133 −101 public/index.html
  25. +84 −31 spec/mercure.md
17 .env
@@ -1,12 +1,15 @@
ACME_CERT_DIR=
ACME_HOSTS=
ADDR=:3001
PUBLISHER_JWT_KEY=!UnsecureChangeMePublisher!
SUBSCRIBER_JWT_KEY=!UnsecureChangeMeSubscriber!
ALLOW_ANONYMOUS=1
DEBUG=1
CORS_ALLOWED_ORIGINS=
DB_PATH=
ACME_HOSTS=
ACME_CERT_DIR=
CERT_FILE=
CERT_KEY=
CORS_ALLOWED_ORIGINS=
DB_PATH=
DEBUG=1
DEMO=
JWT_KEY=!UnsecureChangeMe!
LOG_FORMAT=JSON
PUBLISH_ALLOWED_ORIGINS=http://localhost:3000
PUBLISHER_JWT_KEY=
SUBSCRIBER_JWT_KEY=
@@ -1,4 +1,4 @@
# Mercure, Server-Sent Live Updates
# Mercure, Live Updates Made Easy
*Protocol and Reference Implementation*

[![GoDoc](https://godoc.org/github.com/dunglas/mercure?status.svg)](https://godoc.org/github.com/dunglas/mercure/hub)
@@ -20,7 +20,7 @@ In addition, a managed and high-scalability version of Mercure is [available in

## Mercure in a Few Words

* native browser support, no lib nor SDK required (built on top of [server-sent events](https://www.smashingmagazine.com/2018/02/sse-websockets-data-flow-http2/))
* native browser support, no lib nor SDK required (built on top of HTTP and [server-sent events](https://www.smashingmagazine.com/2018/02/sse-websockets-data-flow-http2/))
* compatible with all existing servers, even those who don't support persistent connections (serverless architecture, PHP, FastCGI...)
* built-in connection re-establishment and state reconciliation
* [JWT](https://jwt.io/)-based authorization mechanism (securely dispatch an update to some selected subscribers)
@@ -46,7 +46,7 @@ Example implementation of a client (the subscriber), in JavaScript:
```javascript
// The subscriber subscribes to updates for the https://example.com/foo topic
// and to any topic matching https://example.com/books/{name}
const url = new URL('https://hub.example.com/subscribe');
const url = new URL('https://example.com/hub');
url.searchParams.append('topic', 'https://example.com/books/{id}');
url.searchParams.append('topic', 'https://example.com/users/dunglas');
@@ -59,15 +59,15 @@ eventSource.onmessage = e => console.log(e); // do something with the payload
Optionaly, the hub URL can be automatically discovered:

```javascript
fetch('https://example.com/books/1') // Has this header `Link: <https://hub.example.com/subscribe>; rel="mercure"`
fetch('https://example.com/books/1') // Has this header `Link: <https://example.com/hub>; rel="mercure"`
.then(response => {
// Extract the hub URL from the Link header
const hubUrl = response.headers.get('Link').match(/<(.*)>.*rel="mercure".*/)[1];
// Subscribe to updates using the first snippet, do something with response's body...
});
```

To dispatch an update, the application server (the publisher) just need to send a `POST` HTTP request to the hub.
To dispatch an update, the publisher (an application server, a web browser...) just need to send a `POST` HTTP request to the hub.
Example using [Node.js](https://nodejs.org/) / [Serverless](https://serverless.com/):

```javascript
@@ -82,12 +82,14 @@ const postData = querystring.stringify({
});
const req = https.request({
hostname: 'hub.example.com',
hostname: 'example.com',
port: '443',
path: '/publish',
path: '/hub',
method: 'POST',
headers: {
Authorization: 'Bearer <valid-jwt-token>', // the JWT key must be shared between the hub and the server
Authorization: 'Bearer <valid-jwt-token>',
// the JWT must have a mercure.pulish key containing an array of targets (can be empty for public updates)
// the JWT key must be shared between the hub and the server
'Content-Type': 'application/x-www-form-urlencoded',
'Content-Length': Buffer.byteLength(postData),
}
@@ -141,22 +143,22 @@ A managed, high-scalability version of Mercure is available in private beta.

Grab a binary from the release page and run:

PUBLISHER_JWT_KEY=myPublisherKey SUBSCRIBER_JWT_KEY=mySubcriberKey ADDR=:3000 DEMO=1 ALLOW_ANONYMOUS=1 ./mercure
JWT_KEY=myJWTKey ADDR=:3000 DEMO=1 ALLOW_ANONYMOUS=1 PUBLISH_ALLOWED_ORIGINS=http://localhost:3000 ./mercure

The server is now available on `http://localhost:3000`, with the demo mode enabled. Because `ALLOW_ANONYMOUS` is set to `1`, anonymous subscribers are allowed.

To run it in production mode, and generate automatically a Let's Encrypt TLS certificate, just run the following command as root:

PUBLISHER_JWT_KEY=myPublisherKey SUBSCRIBER_JWT_KEY=mySubcriberKey ACME_HOSTS=example.com ./mercure
JWT_KEY=myJWTKey ACME_HOSTS=example.com ./mercure

The value of the `ACME_HOSTS` environment variable must be updated to match your domain name(s).
A Let's Enctypt TLS certificate will be automatically generated.
If you omit this variable, the server will be exposed on an (unsecure) HTTP connection.

When the server is up and running, the following endpoints are available:

* `POST https://example.com/publish`: to publish updates
* `GET https://example.com/subscribe`: to subscribe to updates
* `POST https://example.com/hub`: to publish updates
* `GET https://example.com/hub`: to subscribe to updates

See [the protocol](spec/mercure.md) for further informations.

@@ -167,7 +169,7 @@ To compile the development version and register the demo page, see [CONTRIBUTING
A Docker image is available on Docker Hub. The following command is enough to get a working server in demo mode:

docker run \
-e PUBLISHER_JWT_KEY=myPublisherKey -e SUBSCRIBER_JWT_KEY=mySubcriberKey -e DEMO=1 -e ALLOW_ANONYMOUS=1 \
-e JWT_KEY=myJWTKey -e DEMO=1 -e ALLOW_ANONYMOUS=1 -e PUBLISH_ALLOWED_ORIGINS=http://localhost \
-p 80:80 \
dunglas/mercure

@@ -176,7 +178,7 @@ The server, in demo mode, is available on `http://localhost:80`. Anonymous subsc
In production, run:

docker run \
-e PUBLISHER_JWT_KEY=myPublisherKey -e SUBSCRIBER_JWT_KEY=mySubcriberKey -e ACME_HOSTS=example.com \
-e JWT_KEY=myJWTKey -e ACME_HOSTS=example.com \
-p 80:80 -p 443:443 \
dunglas/mercure

@@ -188,15 +190,17 @@ Be sure to update the value of `ACME_HOSTS` to match your domain name(s), a Let'
* `ACME_HOSTS`: a comma separated list of hosts for which Let's Encrypt certificates must be issues
* `ADDR`: the address to listen on (example: `127.0.0.1:3000`, default to `:80` or `:http` or `:https` depending if HTTPS is enabled or not)
* `ALLOW_ANONYMOUS`: set to `1` to allow subscribers with no valid JWT to connect
* `DB_PATH`: the path of the [bbolt](https://github.com/etcd-io/bbolt) database (default to `updates.db` in the current directory)
* `CERT_FILE`: a cert file (to use a custom certificate)
* `CERT_KEY`: a cert key (to use a custom certificate)
* `CORS_ALLOWED_ORIGINS`: a comma separated list of hosts allowed CORS origins
* `CORS_ALLOWED_ORIGINS`: a comma separated list of allowed CORS origins, can be `*` for all
* `DB_PATH`: the path of the [bbolt](https://github.com/etcd-io/bbolt) database (default to `updates.db` in the current directory)
* `DEBUG`: set to `1` to enable the debug mode (prints recovery stack traces)
* `DEMO`: set to `1` to enable the demo mode (automatically enabled when `DEBUG=1`)
* `JWT_KEY`: the JWT key to use for both publishers and subscribers
* `LOG_FORMAT`: the log format, can be `JSON`, `FLUENTD` or `TEXT` (default)
* `PUBLISHER_JWT_KEY`: must contain the secret key to valid publishers' JWT
* `SUBSCRIBER_JWT_KEY`: must contain the secret key to valid subscribers' JWT
* `PUBLISH_ALLOWED_ORIGINS`: a comma separated list of origins allowed to publish (only applicable when using cookie-based auth)
* `PUBLISHER_JWT_KEY`: must contain the secret key to valid publishers' JWT, can be omited if `JWT_KEY` is set
* `SUBSCRIBER_JWT_KEY`: must contain the secret key to valid subscribers' JWT, can be omited if `JWT_KEY` is set

If `ACME_HOSTS` or both `CERT_FILE` and `CERT_KEY` are provided, an HTTPS server supporting HTTP/2 connection will be started.
If not, an HTTP server will be started (**not secure**).
@@ -218,32 +222,31 @@ Mercure can easily be integrated with Apollo GraphQL by creating [a dedicated tr

### What's the Difference Between Mercure and WebSocket?

[WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API) is a low level and bidirectional protocol. Mercure is a high level and unidirectional protocol (servers-to-clients, but we will come back to that later).
Unlike Mercure (which is built on top of Server-Sent Events), WebSocket [is not designed to leverage HTTP/2](https://www.infoq.com/articles/websocket-and-http2-coexist).

Also, Mercure provides convenient built-in features (authorization, re-connection, state reconciliation...) while with WebSocket, you need to implement them yourself.
[WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API) is a low level protocol, Mercure is a high level one.
Mercure provides convenient built-in features such as authorization, re-connection and state reconciliation ; while with WebSocket, you need to implement them yourself.
Also, unlike Mercure (which is built on top of HTTP and Server-Sent Events), WebSocket [is not designed to leverage HTTP/2](https://www.infoq.com/articles/websocket-and-http2-coexist).

HTTP/2 connections are multiplexed and bidirectional by default (it was not the case of HTTP/1).
Even if Mercure is unidirectional, when using it over a h2 connection (recommended), your app can receive data through Server-Sent Events, and send data to the server with regular `POST` (or `PUT`/`PATCH`/`DELETE`) requests, with no overhead.
When using Mercure over a h2 connection (recommended), your app can receive data through Server-Sent Events, and send data to the server with regular `POST` (or `PUT`/`PATCH`/`DELETE`) requests, with no overhead.

Basically, in most cases Mercure can be used as a modern, easier to use replacement for WebSocket, but it is a higher level protocol.
Basically, in most cases Mercure can be used as a modern and easier to use replacement for WebSocket.

### What's the Difference Between Mercure and WebSub?

[WebSub](https://www.w3.org/TR/websub/) is a server-to-server protocol while Mercure is mainly a server-to-client protocol (that can also be used for server-to-server communication, but it's not is main interest).
[WebSub](https://www.w3.org/TR/websub/) is a server-to-server only protocol, while Mercure is also a server-to-client and client-to-client protocol.

Mercure has been heavily inspired by WebSub, and we tried to make the protocol as close as possible from the WebSub one.

Mercure uses Server-Sent Events to dispatch the updates, while WebSub use `POST` requests. Also, Mercure has an advanced authorization mechanism, and allows to subscribe to several topics with only one connection using templated URIs.

### What's the Difference Between Mercure and Web Push?

The [Push API](https://developer.mozilla.org/en-US/docs/Web/API/Push_API) is [mainly designed](https://developers.google.com/web/fundamentals/push-notifications/) to send [notifications](https://developer.mozilla.org/en-US/docs/Web/API/Notifications_API) to devices currently not connected to the application.
The [Push API](https://developer.mozilla.org/en-US/docs/Web/API/Push_API) is a simplex protocol [mainly designed](https://developers.google.com/web/fundamentals/push-notifications/) to send [notifications](https://developer.mozilla.org/en-US/docs/Web/API/Notifications_API) to devices currently not connected to the application.
In most implementations, the size of the payload to dispatch is very limited, and the messages are sent through the proprietary APIs and servers of the browsers' and operating systems' vendors.

On the other hand, Mercure is designed to send live updates to devices currently connected to the web or mobile app. The payload is not limited, and the message goes directly from your servers to the clients.
On the other hand, Mercure is a duplex protocol designed to send live updates to devices currently connected to the web or mobile app. The payload is not limited, and the message goes directly from your servers to the clients.

In summary, use the Push API to send notifications to offline users (that will be available in Chrome, Android and iOS's notification centers), and use Mercure to receive live updates when the user is using the app.
In summary, use the Push API to send notifications to offline users (that will be available in Chrome, Android and iOS's notification centers), and use Mercure to receive and publish live updates when the user is using the app.

## Resources

@@ -1,6 +1,8 @@
const http = require('http');
const querystring = require('querystring');

const demoJwt = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InN1YnNjcmliZSI6WyJmb28iLCJiYXIiXSwicHVibGlzaCI6WyJmb28iXX19.LRLvirgONK13JgacQ_VbcjySbVhkSmHy3IznH3tA9PM';

const postData = querystring.stringify({
'topic': 'http://localhost:3000/demo/books/1.jsonld',
'data': JSON.stringify({ key: 'updated value' }),
@@ -9,10 +11,10 @@ const postData = querystring.stringify({
const req = http.request({
hostname: 'localhost',
port: '3000',
path: '/publish',
path: '/hub',
method: 'POST',
headers: {
Authorization: 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.HB0k08BaV8KlLZ3EafCRlTDGbkd9qdznCzJQ_l8ELTU',
Authorization: `Bearer ${demoJwt}`,
'Content-Type': 'application/x-www-form-urlencoded',
'Content-Length': Buffer.byteLength(postData),
}
@@ -1,11 +1,14 @@
<?php
define('DEMO_JWT', 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InN1YnNjcmliZSI6WyJmb28iLCJiYXIiXSwicHVibGlzaCI6WyJmb28iXX19.LRLvirgONK13JgacQ_VbcjySbVhkSmHy3IznH3tA9PM');
$postData = http_build_query([
'topic' => 'http://localhost:3000/demo/books/1.jsonld',
'data' => json_encode(['key' => 'updated value']),
]);
echo file_get_contents('http://localhost:3000/publish', false, stream_context_create(['http' => [
echo file_get_contents('http://localhost:3000/hub', false, stream_context_create(['http' => [
'method' => 'POST',
'header' => "Content-type: application/x-www-form-urlencoded\r\nAuthorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.HB0k08BaV8KlLZ3EafCRlTDGbkd9qdznCzJQ_l8ELTU",
'header' => "Content-type: application/x-www-form-urlencoded\r\nAuthorization: Bearer ".DEMO_JWT,
'content' => $postData,
]]));
1 go.mod
@@ -6,6 +6,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/gorilla/handlers v1.4.0
github.com/gorilla/mux v1.6.2
github.com/joho/godotenv v1.3.0
github.com/joonix/log v0.0.0-20180502111528-d2d3f2f4a806
github.com/kr/pretty v0.1.0 // indirect
2 go.sum
@@ -12,6 +12,8 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gorilla/handlers v1.4.0 h1:XulKRWSQK5uChr4pEgSE4Tc/OcmnU9GJuSwdog/tZsA=
github.com/gorilla/handlers v1.4.0/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/mux v1.6.2 h1:Pgr17XVTNXAk3q/r4CpKzC5xBM/qW1uVLV+IhRZpIIk=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
@@ -0,0 +1,113 @@
package hub

import (
"errors"
"fmt"
"net/http"
"net/url"

jwt "github.com/dgrijalva/jwt-go"
)

// Claims contains Mercure's JWT claims
type claims struct {
Mercure mercureClaim `json:"mercure"`
jwt.StandardClaims
}

type mercureClaim struct {
Publish []string `json:"publish"`
Subscribe []string `json:"subscribe"`
}

// Authorize validates the JWT that may be provided through an "Authorization" HTTP header or a "mercureAuthorization" cookie.
// It returns the claims contained in the token if it exists and is valid, nil if no token is provided (anonymous mode), and an error if the token is not valid.
func authorize(r *http.Request, jwtKey []byte, publishAllowedOrigins []string) (*claims, error) {
authorizationHeaders, headerExists := r.Header["Authorization"]
if headerExists {
if len(authorizationHeaders) != 1 || len(authorizationHeaders[0]) < 48 || authorizationHeaders[0][:7] != "Bearer " {
return nil, errors.New("Invalid \"Authorization\" HTTP header")
}

return validateJWT(authorizationHeaders[0][7:], jwtKey)
}

cookie, err := r.Cookie("mercureAuthorization")
if err != nil {
// Anonymous
return nil, nil
}

// CSRF attacks cannot occurs when using safe methods
if r.Method != "POST" {
return validateJWT(cookie.Value, jwtKey)
}

origin := r.Header.Get("Origin")
if origin == "" {
// Try to extract the origin from the Referer, or return an error
referer := r.Header.Get("Referer")
if referer == "" {
return nil, errors.New("An \"Origin\" or a \"Referer\" HTTP header must be present to use the cookie-based authorization mechanism")
}

u, err := url.Parse(referer)
if err != nil {
return nil, err
}

origin = fmt.Sprintf("%s://%s", u.Scheme, u.Host)
}

for _, allowedOrigin := range publishAllowedOrigins {
if origin == allowedOrigin {
return validateJWT(cookie.Value, jwtKey)
}
}

return nil, fmt.Errorf("The origin \"%s\" is not allowed to post updates", origin)
}

// validateJWT validates that the provided JWT token is a valid Mercure token
func validateJWT(encodedToken string, key []byte) (*claims, error) {
token, err := jwt.ParseWithClaims(encodedToken, &claims{}, func(token *jwt.Token) (interface{}, error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("Unexpected signing method: %v", token.Header["alg"])
}
return key, nil
})

if err != nil {
return nil, err
}

if claims, ok := token.Claims.(*claims); ok && token.Valid {
return claims, nil
}

return nil, errors.New("Invalid JWT")
}

func authorizedTargets(claims *claims, publisher bool) (all bool, targets map[string]struct{}) {
if claims == nil {
return false, map[string]struct{}{}
}

var providedTargets []string
if publisher {
providedTargets = claims.Mercure.Publish
} else {
providedTargets = claims.Mercure.Subscribe
}

authorizedTargets := make(map[string]struct{}, len(providedTargets))
for _, target := range providedTargets {
if target == "*" {
return true, nil
}

authorizedTargets[target] = struct{}{}
}

return false, authorizedTargets
}
Oops, something went wrong.

0 comments on commit 9d81bb0

Please sign in to comment.
You can’t perform that action at this time.