Skip to content

Commit

Permalink
feat(hub): add an option to support legacy versions of the protocol (#…
Browse files Browse the repository at this point in the history
…670)

* feat(hub): add an option to support legacy versions of the protocol

* review and improvements

* review and improvements
  • Loading branch information
dunglas committed Jul 22, 2022
1 parent bc1839f commit ec70440
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 24 deletions.
22 changes: 22 additions & 0 deletions caddy/caddy.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ type Mercure struct {
// The name of the authorization cookie. Defaults to "mercureAuthorization".
CookieName string `json:"cookie_name,omitempty"`

// The version of the Mercure protocol to be backward compatible with (only version 7 is supported)
ProtocolVersionCompatibility int `json:"protocol_version_compatibility,omitempty"`

hub *mercure.Hub
logger *zap.Logger
}
Expand Down Expand Up @@ -205,6 +208,9 @@ func (m *Mercure) Provision(ctx caddy.Context) error { //nolint:funlen
if len(m.CORSOrigins) > 0 {
opts = append(opts, mercure.WithCORSOrigins(m.CORSOrigins))
}
if m.ProtocolVersionCompatibility != 0 {
opts = append(opts, mercure.WithProtocolVersionCompatibility(m.ProtocolVersionCompatibility))
}

h, err := mercure.NewHub(opts...)
if err != nil {
Expand Down Expand Up @@ -347,6 +353,22 @@ func (m *Mercure) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { //nolint:fu
}

m.CookieName = d.Val()

case "protocol_version_compatibility":
if !d.NextArg() {
return d.ArgErr()
}

v, err := strconv.Atoi(d.Val())
if err != nil {
return err //nolint:wrapcheck
}

if v != 7 {
return errors.New("compatibility mode only supports protocol version 7")
}

m.ProtocolVersionCompatibility = v
}
}
}
Expand Down
58 changes: 40 additions & 18 deletions hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package mercure

import (
"errors"
"fmt"
"net/http"
"time"
Expand All @@ -12,6 +13,9 @@ import (
"go.uber.org/zap"
)

// ErrUnsupportedProtocolVersion is returned when the version passed is unsupported.
var ErrUnsupportedProtocolVersion = errors.New("compatibility mode only supports protocol version 7")

// Option instances allow to configure the library.
type Option func(h *opt) error

Expand Down Expand Up @@ -193,6 +197,19 @@ func WithCookieName(cookieName string) Option {
}
}

// WithProtocolVersionCompatibility sets the version of the Mercure protocol to be backward compatible with (only version 7 is supported).
func WithProtocolVersionCompatibility(protocolVersionCompatibility int) Option {
return func(o *opt) error {
if protocolVersionCompatibility != 7 {
return ErrUnsupportedProtocolVersion
}

o.protocolVersionCompatibility = protocolVersionCompatibility

return nil
}
}

type jwtConfig struct {
key []byte
signingMethod jwt.SigningMethod
Expand All @@ -202,24 +219,29 @@ type jwtConfig struct {
//
// If you change this, also update the Caddy module and the documentation.
type opt struct {
transport Transport
topicSelectorStore *TopicSelectorStore
anonymous bool
debug bool
subscriptions bool
ui bool
demo bool
logger Logger
writeTimeout time.Duration
dispatchTimeout time.Duration
heartbeat time.Duration
publisherJWT *jwtConfig
subscriberJWT *jwtConfig
metrics Metrics
allowedHosts []string
publishOrigins []string
corsOrigins []string
cookieName string
transport Transport
topicSelectorStore *TopicSelectorStore
anonymous bool
debug bool
subscriptions bool
ui bool
demo bool
logger Logger
writeTimeout time.Duration
dispatchTimeout time.Duration
heartbeat time.Duration
publisherJWT *jwtConfig
subscriberJWT *jwtConfig
metrics Metrics
allowedHosts []string
publishOrigins []string
corsOrigins []string
cookieName string
protocolVersionCompatibility int
}

func (o *opt) isBackwardCompatiblyEnabledWith(version int) bool {
return o.protocolVersionCompatibility != 0 && version >= o.protocolVersionCompatibility
}

// Hub stores channels with clients currently subscribed and allows to dispatch updates.
Expand Down
20 changes: 20 additions & 0 deletions hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,26 @@ func TestStop(t *testing.T) {
wg.Wait()
}

func TestWithProtocolVersionCompatibility(t *testing.T) {
op := &opt{}

assert.False(t, op.isBackwardCompatiblyEnabledWith(7))

o := WithProtocolVersionCompatibility(7)
require.Nil(t, o(op))
assert.Equal(t, 7, op.protocolVersionCompatibility)
assert.True(t, op.isBackwardCompatiblyEnabledWith(7))
assert.True(t, op.isBackwardCompatiblyEnabledWith(8))
assert.False(t, op.isBackwardCompatiblyEnabledWith(6))
}

func TestInvalidWithProtocolVersionCompatibility(t *testing.T) {
op := &opt{}

o := WithProtocolVersionCompatibility(6)
require.NotNil(t, o(op))
}

func createDummy(options ...Option) *Hub {
tss, _ := NewTopicSelectorStoreLRU(0, 0)
options = append(
Expand Down
14 changes: 9 additions & 5 deletions subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {

// registerSubscriber initializes the connection.
func (h *Hub) registerSubscriber(w http.ResponseWriter, r *http.Request) *Subscriber {
s := NewSubscriber(retrieveLastEventID(r, h.logger), h.logger)
s := NewSubscriber(retrieveLastEventID(r, h.opt, h.logger), h.logger)
s.Debug = h.debug
s.RemoteAddr = r.RemoteAddr
var privateTopics []string
Expand Down Expand Up @@ -153,7 +153,7 @@ func sendHeaders(w http.ResponseWriter, s *Subscriber) {
}

// retrieveLastEventID extracts the Last-Event-ID from the corresponding HTTP header with a fallback on the query parameter.
func retrieveLastEventID(r *http.Request, logger Logger) string {
func retrieveLastEventID(r *http.Request, opt *opt, logger Logger) string {
if id := r.Header.Get("Last-Event-ID"); id != "" {
return id
}
Expand All @@ -164,10 +164,14 @@ func retrieveLastEventID(r *http.Request, logger Logger) string {
}

if legacyEventIDValues, present := query["Last-Event-ID"]; present {
logger.Info("deprecation: the 'Last-Event-ID' query parameter is deprecated, use 'lastEventID' instead.")
if opt.isBackwardCompatiblyEnabledWith(7) {
logger.Info("deprecation: the 'Last-Event-ID' query parameter is deprecated since the version 8 of the protocol, use 'lastEventID' instead.")

if len(legacyEventIDValues) != 0 {
return legacyEventIDValues[0]
if len(legacyEventIDValues) != 0 {
return legacyEventIDValues[0]
}
} else {
logger.Info("unsupported: the 'Last-Event-ID' query parameter is not supported anymore, use 'lastEventID' instead or enable backward compatibility with version 7 of the protocol.")
}
}

Expand Down
2 changes: 1 addition & 1 deletion subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func TestSubscribeAll(t *testing.T) {

func TestSendMissedEvents(t *testing.T) {
bt := createBoltTransport("bolt://test.db")
hub := createAnonymousDummy(WithLogger(bt.logger), WithTransport(bt))
hub := createAnonymousDummy(WithLogger(bt.logger), WithTransport(bt), WithProtocolVersionCompatibility(7))
transport := hub.transport.(*BoltTransport)
defer transport.Close()
defer os.Remove("test.db")
Expand Down

0 comments on commit ec70440

Please sign in to comment.