Skip to content

Commit

Permalink
todo's cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
roblaszczak committed Nov 10, 2018
1 parent 98497a1 commit b8af3d6
Show file tree
Hide file tree
Showing 33 changed files with 709 additions and 695 deletions.
5 changes: 4 additions & 1 deletion Makefile
Expand Up @@ -5,4 +5,7 @@ mycli:
@mycli -h 127.0.0.1 -u root -p secret

test:
go test `glide novendor -dir message`
go test `glide novendor`

test_stress:
go test -tags=stress `glide novendor`
23 changes: 20 additions & 3 deletions components/domain/eventproducer_test.go
@@ -1,14 +1,31 @@
package domain
package domain_test

import (
"testing"
"time"

"github.com/ThreeDotsLabs/watermill/components/domainents/domain"
"github.com/ThreeDotsLabs/watermill/components/domain"
"github.com/stretchr/testify/assert"
)

type testEvent struct {
Name string
name string
}

func (testEvent) OccurredOn() time.Time {
return time.Now()
}

func (t testEvent) Name() string {
return t.name
}

func (testEvent) AggregateID() []byte {
return []byte("1")
}

func (testEvent) AggregateType() string {
return "aggregate_type"
}

func TestEventProducer(t *testing.T) {
Expand Down
5 changes: 1 addition & 4 deletions internal/tests/asserts.go
@@ -1,7 +1,6 @@
package tests

import (
"fmt"
"sort"
"testing"

Expand Down Expand Up @@ -40,9 +39,7 @@ func AssertAllMessagesReceived(t *testing.T, sent message.Messages, received mes
sort.Strings(sentIDs)
sort.Strings(receivedIDs)

fmt.Println(difference(sentIDs, receivedIDs))

return assert.EqualValues(t, receivedIDs, sentIDs)
return assert.Equal(t, sentIDs, receivedIDs)
}

func AssertMessagesPayloads(
Expand Down
118 changes: 101 additions & 17 deletions message/infrastructure/gochannel/publisher.go
Expand Up @@ -4,50 +4,134 @@ import (
"sync"
"time"

"github.com/satori/go.uuid"

"github.com/pkg/errors"

"github.com/ThreeDotsLabs/watermill"

"github.com/ThreeDotsLabs/watermill/message"
)

type subscriber struct {
outputChannel chan message.Message
uuid string
outputChannel chan *message.Message
}

type GoChannel struct {
subscribers []*subscriber
type goChannel struct {
sendTimeout time.Duration
buffer int64

subscribers map[string][]*subscriber
subscribersLock *sync.RWMutex

logger watermill.LoggerAdapter

closed bool
}

func NewGoChannel(buffer int64, logger watermill.LoggerAdapter, sendTimeout time.Duration) message.PubSub {
return &goChannel{
sendTimeout: sendTimeout,
buffer: buffer,

subscribers: make(map[string][]*subscriber),
subscribersLock: &sync.RWMutex{},
logger: logger,
}
}

func (g GoChannel) Save(messages []message.Message) error {
func (g *goChannel) Publish(topic string, messages ...*message.Message) error {
for _, msg := range messages {
if err := g.sendMessage(topic, msg); err != nil {
return err
}
}

return nil
}

func (g *goChannel) sendMessage(topic string, message *message.Message) error {
messageLogFields := watermill.LogFields{
"message_uuid": message.UUID,
}

g.subscribersLock.RLock()
defer g.subscribersLock.RUnlock()

for _, message := range messages {
for _, s := range g.subscribers {
subscribers, ok := g.subscribers[topic]
if !ok {
return nil
}

for _, s := range subscribers {
subscriberLogFields := messageLogFields.Add(watermill.LogFields{
"subscriber_uuid": s.uuid,
})

SendToSubscriber:
for {
select {
case s.outputChannel <- message:
// todo - log it
//sendLogger.Debug("sent messages to subscriber")
case <-time.After(time.Second): // todo - config
// todo - log it
//sendLogger.Warn("cannot send messages")
select {
case <-message.Acked():
g.logger.Trace("message sent", subscriberLogFields)
break SendToSubscriber
case <-message.Nacked():
g.logger.Trace("nack received, resending message", subscriberLogFields)

// message have nack already sent, we need fresh message
message = resetMessage(message)

continue SendToSubscriber
}
case <-time.After(g.sendTimeout):
return errors.Errorf("sending message %s timeouted after %s", message.UUID, g.sendTimeout)
}
}
}

return nil
}

// todo - topics support
func (g GoChannel) Subscribe(topic string) (chan message.Message, error) {
func (g *goChannel) Subscribe(topic string, consumerGroup message.ConsumerGroup) (chan *message.Message, error) {
g.subscribersLock.Lock()
defer g.subscribersLock.Unlock()

s := &subscriber{}
g.subscribers = append(g.subscribers, s)
if _, ok := g.subscribers[topic]; !ok {
g.subscribers[topic] = make([]*subscriber, 0)
}

s := &subscriber{
uuid: uuid.NewV4().String(),
outputChannel: make(chan *message.Message, g.buffer),
}
g.subscribers[topic] = append(g.subscribers[topic], s)

return s.outputChannel, nil
}

func (g GoChannel) Close() error {
// todo
func (g *goChannel) Close() error {
g.subscribersLock.Lock()
defer g.subscribersLock.Unlock()

if g.closed {
return nil
}
g.closed = true

for _, topicSubscribers := range g.subscribers {
for _, subscriber := range topicSubscribers {
close(subscriber.outputChannel)
}
}

return nil
}

func resetMessage(oldMsg *message.Message) *message.Message {
m := message.NewMessage(oldMsg.UUID, oldMsg.Payload)
m.Metadata = m.Metadata

return m
}
35 changes: 35 additions & 0 deletions message/infrastructure/gochannel/pubsub_test.go
@@ -0,0 +1,35 @@
package gochannel_test

import (
"testing"
"time"

"github.com/ThreeDotsLabs/watermill"

"github.com/ThreeDotsLabs/watermill/message/infrastructure/gochannel"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/infrastructure"
)

func createPubSub(t *testing.T) message.PubSub {
return gochannel.NewGoChannel(
0,
watermill.NewStdLogger(true, true),
time.Second*10,
)
}

func TestPublishSubscribe(t *testing.T) {
infrastructure.TestPubSub(
t,
infrastructure.Features{
ConsumerGroups: false,
ExactlyOnceDelivery: true,
GuaranteedOrder: true,
Persistent: false,
RequireSingleInstance: true,
},
createPubSub,
)
}
5 changes: 2 additions & 3 deletions message/infrastructure/http/subscriber.go
Expand Up @@ -22,7 +22,6 @@ type Subscriber struct {
outputChannelsLock sync.Locker
}

// todo - test
func NewSubscriber(addr string, unmarshalMessageFunc UnmarshalMessageFunc, logger watermill.LoggerAdapter) (message.Subscriber, error) {
r := chi.NewRouter()
s := &http.Server{Addr: addr, Handler: r}
Expand Down Expand Up @@ -58,7 +57,7 @@ func (s *Subscriber) Subscribe(topic string, consumerGroup message.ConsumerGroup
w.WriteHeader(http.StatusBadRequest)
return
}
logFields := baseLogFields.Add(watermill.LogFields{"message_id": msg.UUID})
logFields := baseLogFields.Add(watermill.LogFields{"message_uuid": msg.UUID})

s.logger.Trace("Sending msg", logFields)
messages <- msg
Expand All @@ -82,7 +81,7 @@ func (s *Subscriber) Subscribe(topic string, consumerGroup message.ConsumerGroup
return messages, nil
}

func (s Subscriber) CloseSubscriber() error {
func (s Subscriber) Close() error {
defer func() {
for _, ch := range s.outputChannels {
close(ch)
Expand Down
30 changes: 17 additions & 13 deletions message/infrastructure/kafka/marshal/confluent.go
Expand Up @@ -7,12 +7,17 @@ import (
"github.com/pkg/errors"
)

// todo - rename
type ConfluentKafka struct{}
const UUIDHeaderKey = "_watermill_message_uuid"

type KafkaJson struct{}

func (KafkaJson) Marshal(topic string, msg *message.Message) (*confluentKafka.Message, error) {
if value := msg.Metadata.Get(UUIDHeaderKey); value != "" {
return nil, errors.Errorf("metadata %s is reserved by watermil for message UUID", UUIDHeaderKey)
}

func (ConfluentKafka) Marshal(topic string, msg *message.Message) (*confluentKafka.Message, error) {
headers := []confluentKafka.Header{{
Key: "uuid", // todo - make it reserved
Key: UUIDHeaderKey,
Value: []byte(msg.UUID),
}}
for key, value := range msg.Metadata {
Expand All @@ -29,12 +34,12 @@ func (ConfluentKafka) Marshal(topic string, msg *message.Message) (*confluentKaf
}, nil
}

func (ConfluentKafka) Unmarshal(kafkaMsg *confluentKafka.Message) (*message.Message, error) {
func (KafkaJson) Unmarshal(kafkaMsg *confluentKafka.Message) (*message.Message, error) {
var messageID string
metadata := make(message.Metadata, len(kafkaMsg.Headers))

for _, header := range kafkaMsg.Headers {
if header.Key == "uuid" {
if header.Key == UUIDHeaderKey {
messageID = string(header.Value)
} else {
metadata.Set(header.Key, string(header.Value))
Expand All @@ -52,19 +57,18 @@ func (ConfluentKafka) Unmarshal(kafkaMsg *confluentKafka.Message) (*message.Mess

type GeneratePartitionKey func(topic string, msg *message.Message) (string, error)

// todo - check that working and make sense?
type jsonWithPartitioning struct {
ConfluentKafka
type kafkaJsonWithPartitioning struct {
KafkaJson

generatePartitionKey GeneratePartitionKey
}

func NewJsonWithPartitioning(generatePartitionKey GeneratePartitionKey) kafka.MarshalerUnmarshaler {
return jsonWithPartitioning{generatePartitionKey: generatePartitionKey}
func NewKafkaJsonWithPartitioning(generatePartitionKey GeneratePartitionKey) kafka.MarshalerUnmarshaler {
return kafkaJsonWithPartitioning{generatePartitionKey: generatePartitionKey}
}

func (j jsonWithPartitioning) Marshal(topic string, msg *message.Message) (*confluentKafka.Message, error) {
kafkaMsg, err := j.ConfluentKafka.Marshal(topic, msg)
func (j kafkaJsonWithPartitioning) Marshal(topic string, msg *message.Message) (*confluentKafka.Message, error) {
kafkaMsg, err := j.KafkaJson.Marshal(topic, msg)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit b8af3d6

Please sign in to comment.