Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for fine-grained websocket filtering #82

Merged
merged 7 commits into from Dec 12, 2019
@@ -9,3 +9,5 @@ vendor
*.cov
.idea/*
artifacts
remod.dev
.remod
@@ -943,6 +943,7 @@ type mockSession struct {
func (s *mockSession) Identifier() string { return "" }
func (s *mockSession) Parameter(string) string { return "" }
func (s *mockSession) Header(string) string { return "" }
func (s *mockSession) PushConfig() *elemental.PushConfig { return nil }
func (s *mockSession) SetClaims(c []string) { s.claims = c }
func (s *mockSession) Claims() []string { return s.claims }
func (s *mockSession) ClaimsMap() map[string]string { return nil }
@@ -248,13 +248,21 @@ type PushDispatchHandler interface {
RelatedEventIdentities(string) []string

// SummarizeEvent is called once per event and allows the implementation
// to return am intenface that will be passed to ShouldDispatch.
// to return an interface that will be passed to ShouldDispatch.
// If you need to decode an event to read some information to make a
// dispatch decision, this is a good place as it will allow to only
// dispatch decision, this is a good place as it will allow you to only
// do this once.
SummarizeEvent(event *elemental.Event) (interface{}, error)
}

// CloserNotifierError is an error type that can optionally be returned by the callback, ShouldDispatch(PushSession, *elemental.Event, interface{}),
// defined in the PushDispatchHandler interface. If the callback returns an error that satisfies the CloserNotifierError,
// bahamut will call the `ShouldCloseSocket()` to determine whether the socket connection should be closed. Useful in situations where
// it makes no sense keeping the socket alive anymore (e.g. client sent a push config that contains an unsupported filter comparator)
type CloserNotifierError interface {
ShouldCloseSocket() (bool, int)
}

// PushPublishHandler is the interface that must be implemented in order to
// to be used as the Bahamut Push Publish handler.
type PushPublishHandler interface {
@@ -278,6 +286,7 @@ type Session interface {
Identifier() string
Parameter(string) string
Header(string) string
PushConfig() *elemental.PushConfig
SetClaims([]string)
Claims() []string
ClaimsMap() map[string]string
@@ -30,27 +30,27 @@ import (
type unregisterFunc func(*wsPushSession)

type wsPushSession struct {
dataCh chan []byte
filter *elemental.PushConfig
currentFilterLock sync.RWMutex
parametersLock sync.RWMutex
claims []string
claimsMap map[string]string
cfg config
headers http.Header
id string
metadata interface{}
parameters url.Values
remoteAddr string
conn wsc.Websocket
startTime time.Time
unregister unregisterFunc
tlsConnectionState *tls.ConnectionState
ctx context.Context
cancel context.CancelFunc
closeCh chan struct{}
encodingRead elemental.EncodingType
encodingWrite elemental.EncodingType
dataCh chan []byte
pushConfig *elemental.PushConfig
currentPushConfigLock sync.RWMutex
parametersLock sync.RWMutex
claims []string
claimsMap map[string]string
cfg config
headers http.Header
id string
metadata interface{}
parameters url.Values
remoteAddr string
conn wsc.Websocket
startTime time.Time
unregister unregisterFunc
tlsConnectionState *tls.ConnectionState
ctx context.Context
cancel context.CancelFunc
closeCh chan struct{}
encodingRead elemental.EncodingType
encodingWrite elemental.EncodingType
}

func newWSPushSession(
@@ -92,7 +92,7 @@ func (s *wsPushSession) DirectPush(events ...*elemental.Event) {
continue
}

f := s.currentFilter()
f := s.currentPushConfig()
if f != nil && f.IsFilteredOut(event.Identity, event.Type) {
continue
}
@@ -156,30 +156,30 @@ func (s *wsPushSession) setConn(conn wsc.Websocket) { s.conn
func (s *wsPushSession) close(code int) { s.conn.Close(code) }
func (s *wsPushSession) setTLSConnectionState(st *tls.ConnectionState) { s.tlsConnectionState = st }
func (s *wsPushSession) Header(key string) string { return s.headers.Get(key) }
func (s *wsPushSession) PushConfig() *elemental.PushConfig { return s.currentPushConfig() }
func (s *wsPushSession) Parameter(key string) string {
s.parametersLock.RLock()
defer s.parametersLock.RUnlock()
return s.parameters.Get(key)
}

func (s *wsPushSession) currentFilter() *elemental.PushConfig {
func (s *wsPushSession) currentPushConfig() *elemental.PushConfig {
s.currentPushConfigLock.RLock()
defer s.currentPushConfigLock.RUnlock()

s.currentFilterLock.RLock()
defer s.currentFilterLock.RUnlock()

if s.filter == nil {
if s.pushConfig == nil {
return nil
}

return s.filter.Duplicate()
return s.pushConfig.Duplicate()
}

func (s *wsPushSession) setCurrentFilter(f *elemental.PushConfig) {
func (s *wsPushSession) setCurrentPushConfig(f *elemental.PushConfig) {

s.currentFilterLock.Lock()
defer s.currentFilterLock.Unlock()
s.currentPushConfigLock.Lock()
defer s.currentPushConfigLock.Unlock()

s.filter = f
s.pushConfig = f
if f == nil {
return
}
@@ -217,13 +217,23 @@ func (s *wsPushSession) listen() {

case data := <-s.conn.Read():

filter := elemental.NewPushConfig()
if err := elemental.Decode(s.encodingRead, data, filter); err != nil {
pushConfig := elemental.NewPushConfig()
if err := elemental.Decode(s.encodingRead, data, pushConfig); err != nil {
s.close(websocket.CloseUnsupportedData)
return
}

if err := pushConfig.ParseIdentityFilters(); err != nil {
zap.L().Debug("error parsing filter(s) in the received *elemental.PushConfig",
zap.Error(err),
zap.String("sessionID", s.id),
zap.String("pushConfig", pushConfig.String()),
)
s.close(websocket.CloseUnsupportedData)
return
}

s.setCurrentFilter(filter)
s.setCurrentPushConfig(pushConfig)

case err := <-s.conn.Error():
zap.L().Error("Error received from websocket", zap.String("session", s.id), zap.Error(err))
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.