Skip to content

Commit

Permalink
Fix failing Snyk check
Browse files Browse the repository at this point in the history
  • Loading branch information
MiroslavGatsanoga committed Nov 27, 2020
1 parent 7ddf28e commit 994b898
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 37 deletions.
16 changes: 8 additions & 8 deletions consumer/mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ func TestMapToUpdateNotification(t *testing.T) {

standout := map[string]interface{}{"scoop": true}
payload := map[string]interface{}{"title": "This is a title", "standout": standout, "type": "Article"}

id, _ := uuid.NewV4()
event := ContentMessage{
ContentURI: "http://list-transformer-pr-uk-up.svc.ft.com:8081/list/blah/" + uuid.NewV4().String(),
ContentURI: "http://list-transformer-pr-uk-up.svc.ft.com:8081/list/blah/" + id.String(),
LastModified: "2016-11-02T10:54:22.234Z",
Payload: payload,
}
Expand Down Expand Up @@ -59,9 +59,9 @@ func TestMapToUpdateNotification_ForContentWithVersion3UUID(t *testing.T) {

func TestMapToDeleteNotification(t *testing.T) {
t.Parallel()

id, _ := uuid.NewV4()
event := ContentMessage{
ContentURI: "http://list-transformer-pr-uk-up.svc.ft.com:8080/list/blah/" + uuid.NewV4().String(),
ContentURI: "http://list-transformer-pr-uk-up.svc.ft.com:8080/list/blah/" + id.String(),
LastModified: "2016-11-02T10:54:22.234Z",
Payload: "",
}
Expand All @@ -79,9 +79,9 @@ func TestMapToDeleteNotification(t *testing.T) {

func TestMapToDeleteNotification_ContentTypeHeader(t *testing.T) {
t.Parallel()

id, _ := uuid.NewV4()
event := ContentMessage{
ContentURI: "http://list-transformer-pr-uk-up.svc.ft.com:8080/list/blah/" + uuid.NewV4().String(),
ContentURI: "http://list-transformer-pr-uk-up.svc.ft.com:8080/list/blah/" + id.String(),
LastModified: "2016-11-02T10:54:22.234Z",
ContentTypeHeader: "application/vnd.ft-upp-article+json",
Payload: "",
Expand Down Expand Up @@ -122,9 +122,9 @@ func TestNotificationMappingFieldsNotExtractedFromPayload(t *testing.T) {
t.Parallel()

payload := map[string]interface{}{"foo": "bar"}

id, _ := uuid.NewV4()
event := ContentMessage{
ContentURI: "http://list-transformer-pr-uk-up.svc.ft.com:8081/list/blah/" + uuid.NewV4().String(),
ContentURI: "http://list-transformer-pr-uk-up.svc.ft.com:8081/list/blah/" + id.String(),
LastModified: "2016-11-02T10:54:22.234Z",
Payload: payload,
}
Expand Down
15 changes: 11 additions & 4 deletions dispatch/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,22 @@ func (d *Dispatcher) Subscribers() []Subscriber {
return subs
}

func (d *Dispatcher) Subscribe(address string, subTypes []string, monitoring bool) Subscriber {
func (d *Dispatcher) Subscribe(address string, subTypes []string, monitoring bool) (Subscriber, error) {
var s NotificationConsumer
var err error
if monitoring {
s = NewMonitorSubscriber(address, subTypes)
s, err = NewMonitorSubscriber(address, subTypes)
} else {
s = NewStandardSubscriber(address, subTypes)
s, err = NewStandardSubscriber(address, subTypes)
}

if err != nil {
return nil, err
}

d.addSubscriber(s)
return s

return s, nil
}

func (d *Dispatcher) Unsubscribe(subscriber Subscriber) {
Expand Down
18 changes: 10 additions & 8 deletions dispatch/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func TestShouldDispatchNotificationsToMultipleSubscribers(t *testing.T) {
h := NewHistory(historySize)
d := NewDispatcher(delay, h, l)

m := d.Subscribe("192.168.1.2", contentSubscribeTypes, true)
s := d.Subscribe("192.168.1.3", contentSubscribeTypes, false)
m, _ := d.Subscribe("192.168.1.2", contentSubscribeTypes, true)
s, _ := d.Subscribe("192.168.1.3", contentSubscribeTypes, false)

go d.Start()
defer d.Stop()
Expand Down Expand Up @@ -93,9 +93,9 @@ func TestShouldDispatchNotificationsToSubscribersByType(t *testing.T) {
h := NewHistory(historySize)
d := NewDispatcher(delay, h, l)

m := d.Subscribe("192.168.1.2", contentSubscribeTypes, true)
s := d.Subscribe("192.168.1.3", []string{typeArticle}, false)
annSub := d.Subscribe("192.168.1.4", []string{annotationSubType}, false)
m, _ := d.Subscribe("192.168.1.2", contentSubscribeTypes, true)
s, _ := d.Subscribe("192.168.1.3", []string{typeArticle}, false)
annSub, _ := d.Subscribe("192.168.1.4", []string{annotationSubType}, false)

go d.Start()
defer d.Stop()
Expand Down Expand Up @@ -156,8 +156,10 @@ func TestAddAndRemoveOfSubscribers(t *testing.T) {
h := NewHistory(historySize)
d := NewDispatcher(delay, h, l)

m := d.Subscribe("192.168.1.2", contentSubscribeTypes, true).(NotificationConsumer)
s := d.Subscribe("192.168.1.3", contentSubscribeTypes, false).(NotificationConsumer)
m, _ := d.Subscribe("192.168.1.2", contentSubscribeTypes, true)
m = m.(NotificationConsumer)
s, _ := d.Subscribe("192.168.1.3", contentSubscribeTypes, false)
s = s.(NotificationConsumer)

go d.Start()
defer d.Stop()
Expand Down Expand Up @@ -186,7 +188,7 @@ func TestDispatchDelay(t *testing.T) {
h := NewHistory(historySize)
d := NewDispatcher(delay, h, l)

s := d.Subscribe("192.168.1.3", contentSubscribeTypes, false)
s, _ := d.Subscribe("192.168.1.3", contentSubscribeTypes, false)

go d.Start()
defer d.Stop()
Expand Down
22 changes: 16 additions & 6 deletions dispatch/subscribers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,20 @@ type StandardSubscriber struct {
}

// NewStandardSubscriber returns a new instance of a standard subscriber
func NewStandardSubscriber(address string, subTypes []string) *StandardSubscriber {
func NewStandardSubscriber(address string, subTypes []string) (*StandardSubscriber, error) {
notificationChannel := make(chan string, notificationBuffer)
id, err := uuid.NewV4()
if err != nil {
return nil, err
}

return &StandardSubscriber{
id: uuid.NewV4().String(),
id: id.String(),
notificationChannel: notificationChannel,
addr: address,
sinceTime: time.Now(),
acceptedTypes: subTypes,
}
}, nil
}

// Id returns the uniquely generated subscriber identifier
Expand Down Expand Up @@ -167,15 +172,20 @@ func (m *MonitorSubscriber) Send(n Notification) error {
}

// NewMonitorSubscriber returns a new instance of a Monitor subscriber
func NewMonitorSubscriber(address string, subTypes []string) *MonitorSubscriber {
func NewMonitorSubscriber(address string, subTypes []string) (*MonitorSubscriber, error) {
notificationChannel := make(chan string, notificationBuffer)
id, err := uuid.NewV4()
if err != nil {
return nil, err
}

return &MonitorSubscriber{
id: uuid.NewV4().String(),
id: id.String(),
notificationChannel: notificationChannel,
addr: address,
sinceTime: time.Now(),
acceptedTypes: subTypes,
}
}, nil
}

func buildMonitorNotificationMsg(n Notification) (string, error) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/onsi/gomega v1.7.0 // indirect
github.com/pkg/errors v0.8.0
github.com/samuel/go-zookeeper v0.0.0-20161028232340-1d7be4effb13
github.com/satori/go.uuid v1.1.1-0.20170321230731-5bf94b69c6b6
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/sirupsen/logrus v1.0.5
github.com/stretchr/testify v1.3.0
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhD
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/samuel/go-zookeeper v0.0.0-20161028232340-1d7be4effb13 h1:4AQBn5RJY4WH8t8TLEMZUsWeXHAUcoao42TCAfpEJJE=
github.com/samuel/go-zookeeper v0.0.0-20161028232340-1d7be4effb13/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/satori/go.uuid v1.1.1-0.20170321230731-5bf94b69c6b6 h1:oZag5hylqWwZrDdj/laMwWQnXaeWBQf66qm4PGQI6Wc=
github.com/satori/go.uuid v1.1.1-0.20170321230731-5bf94b69c6b6/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sirupsen/logrus v1.0.5 h1:8c8b5uO0zS4X6RPl/sd1ENwSkIc0/H2PaHxE3udaE8I=
github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
Expand Down
4 changes: 2 additions & 2 deletions mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ func (m *Dispatcher) Subscribers() []dispatch.Subscriber {
return args.Get(0).([]dispatch.Subscriber)
}

func (m *Dispatcher) Subscribe(address string, subTypes []string, monitoring bool) dispatch.Subscriber {
func (m *Dispatcher) Subscribe(address string, subTypes []string, monitoring bool) (dispatch.Subscriber, error) {
args := m.Called(address, subTypes, monitoring)
return args.Get(0).(dispatch.Subscriber)
return args.Get(0).(dispatch.Subscriber), nil
}
func (m *Dispatcher) Unsubscribe(s dispatch.Subscriber) {
m.Called(s)
Expand Down
9 changes: 7 additions & 2 deletions resources/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type keyValidator interface {
}

type notifier interface {
Subscribe(address string, subTypes []string, monitoring bool) dispatch.Subscriber
Subscribe(address string, subTypes []string, monitoring bool) (dispatch.Subscriber, error)
Unsubscribe(subscriber dispatch.Subscriber)
}

Expand Down Expand Up @@ -100,7 +100,12 @@ func (h *SubHandler) HandleSubscription(w http.ResponseWriter, r *http.Request)
monitorParam := r.URL.Query().Get("monitor")
isMonitor, _ := strconv.ParseBool(monitorParam)

s := h.notif.Subscribe(getClientAddr(r), subscriptionParams, isMonitor)
s, err := h.notif.Subscribe(getClientAddr(r), subscriptionParams, isMonitor)
if err != nil {
h.log.WithError(err).Error("Error creating subscription")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer h.notif.Unsubscribe(s)

ctx, cancel := context.WithCancel(r.Context())
Expand Down
8 changes: 4 additions & 4 deletions resources/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestSubscription(t *testing.T) {
v.On("Validate", mock.Anything, apiKey).Return(nil)

if test.ExpectStream {
sub := dispatch.NewStandardSubscriber(subAddress, test.ExpectedType)
sub, _ := dispatch.NewStandardSubscriber(subAddress, test.ExpectedType)
d.On("Subscribe", subAddress, test.ExpectedType, test.IsMonitor).Run(func(args mock.Arguments) {
go func() {
<-time.After(time.Millisecond * 10)
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestPassKeyAsParameter(t *testing.T) {
v := &mocks.KeyValidator{}
v.On("Validate", mock.Anything, keyAPI).Return(nil)

sub := dispatch.NewStandardSubscriber(req.RemoteAddr, []string{defaultSubscriptionType})
sub, _ := dispatch.NewStandardSubscriber(req.RemoteAddr, []string{defaultSubscriptionType})
d := &mocks.Dispatcher{}
d.On("Subscribe", req.RemoteAddr, []string{defaultSubscriptionType}, false).Run(func(args mock.Arguments) {
go func() {
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestHeartbeat(t *testing.T) {
v := &mocks.KeyValidator{}
v.On("Validate", mock.Anything, keyAPI).Return(nil)

sub := dispatch.NewStandardSubscriber(subAddress, []string{defaultSubscriptionType})
sub, _ := dispatch.NewStandardSubscriber(subAddress, []string{defaultSubscriptionType})
d := &mocks.Dispatcher{}
d.On("Subscribe", subAddress, []string{defaultSubscriptionType}, false).Return(sub)
d.On("Unsubscribe", mock.AnythingOfType("*dispatch.StandardSubscriber")).Return()
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestPushNotificationDelay(t *testing.T) {
v := &mocks.KeyValidator{}
v.On("Validate", mock.Anything, keyAPI).Return(nil)

sub := dispatch.NewStandardSubscriber(subAddress, []string{defaultSubscriptionType})
sub, _ := dispatch.NewStandardSubscriber(subAddress, []string{defaultSubscriptionType})
d := &mocks.Dispatcher{}
d.On("Subscribe", subAddress, []string{defaultSubscriptionType}, false).Run(func(args mock.Arguments) {
go func() {
Expand Down

0 comments on commit 994b898

Please sign in to comment.