Skip to content

Commit

Permalink
feat: groundwork for messaging events
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Neudegg <andrew.neudegg@finbourne.com>
  • Loading branch information
AndrewNeudegg committed Dec 29, 2020
1 parent cb02393 commit dc7fd4e
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 19 deletions.
94 changes: 76 additions & 18 deletions pkg/e2e/sink_test/basic_test.go
Expand Up @@ -7,42 +7,54 @@ import (
"time"

"github.com/andrewneudegg/delta/pkg/events"
"github.com/andrewneudegg/delta/pkg/relay"
"github.com/andrewneudegg/delta/pkg/messaging/memory"
"github.com/andrewneudegg/delta/pkg/sink"
"github.com/stretchr/testify/assert"
)

func TestSmoke(t *testing.T) {
mq := make(chan events.Event)
re := make(chan events.Event)
resultantEvents := []events.Event{}
numEvents := 100000
func TestE2EMemory(t *testing.T) {
// Create a channel to pass messages from the api server.
incomingEvents := make(chan events.Event)

relay := relay.NewNoOpRelay(&relay.NoOpRelayOpts{
Output: re,
})
go relay.Do(context.TODO(), mq)
go func() {
// Create a channel to collect 'distributed messages'.
distributedMessageChan := make(chan events.Event)
distributedMessages := []events.Event{}
// pretend to be doing something with all these distributed events.
go func(ch chan events.Event) {
for {
msg := <-re
resultantEvents = append(resultantEvents, msg)
event := <-ch
distributedMessages = append(distributedMessages, event)
}
}()
}(distributedMessageChan)

// phonebook will route all events to the specified channel.
phonebook := memory.Phonebook{
Ch: distributedMessageChan,
}
// relay will route messages from incomingEvents to distributedMessages.
relay := memory.Relay{}
go relay.Do(context.TODO(), incomingEvents, phonebook)

// sinkServer will catch all events.
sinkServer, _ := sink.NewHTTPSinkServer(&sink.HTTPSinkServerConfiguration{
ServerConfiguration: sink.ServerConfiguration{
ToChan: mq,
ToChan: incomingEvents,
},
ListenAddr: ":8090",
MaxBodySize: 2097152,
})
go sinkServer.Serve(context.TODO())

// pause for a bit in case we haven't context switches goroutines yet.
time.Sleep(time.Second)

// create a client.
client := SinkClient{
Addr: "http://localhost:8090",
}

// send a bunch of events.
numEvents := 100000
for i := 0; i < numEvents; i++ {
result, err := client.Send("/test/hello", map[string][]string{
"Host": {fmt.Sprintf("%d.com", i)},
Expand All @@ -52,7 +64,53 @@ func TestSmoke(t *testing.T) {
assert.NotEqual(t, "", result)
}

assert.Equal(t, "/test/hello", resultantEvents[0].GetURI())
assert.Equal(t, numEvents, len(resultantEvents))
sinkServer.Stop(context.Background())
// check that we received the right number of events...
assert.Equal(t, numEvents, len(distributedMessages))
}



// func TestSmoke(t *testing.T) {
// mq := make(chan events.Event)
// re := make(chan events.Event)
// resultantEvents := []events.Event{}
// numEvents := 100000

// relay := relay.NewNoOpRelay(&relay.NoOpRelayOpts{
// Output: re,
// })
// go relay.Do(context.TODO(), mq)
// go func() {
// for {
// msg := <-re
// resultantEvents = append(resultantEvents, msg)
// }
// }()

// sinkServer, _ := sink.NewHTTPSinkServer(&sink.HTTPSinkServerConfiguration{
// ServerConfiguration: sink.ServerConfiguration{
// ToChan: mq,
// },
// ListenAddr: ":8090",
// MaxBodySize: 2097152,
// })
// go sinkServer.Serve(context.TODO())
// time.Sleep(time.Second)

// client := SinkClient{
// Addr: "http://localhost:8090",
// }

// for i := 0; i < numEvents; i++ {
// result, err := client.Send("/test/hello", map[string][]string{
// "Host": {fmt.Sprintf("%d.com", i)},
// "Content-Type": {"application/json"},
// }, []byte("hello"))
// assert.Nil(t, err)
// assert.NotEqual(t, "", result)
// }

// assert.Equal(t, "/test/hello", resultantEvents[0].GetURI())
// assert.Equal(t, numEvents, len(resultantEvents))
// sinkServer.Stop(context.Background())
// }
3 changes: 3 additions & 0 deletions pkg/messaging/http/README.md
@@ -0,0 +1,3 @@
# HTTP messaging

Is the implementation that routes messages over HTTP, via a dns entry. When running locally this will be `localhost`.
3 changes: 3 additions & 0 deletions pkg/messaging/memory/README.md
@@ -0,0 +1,3 @@
# Memory messaging

Is the reference design for all other messaging implementations.
2 changes: 1 addition & 1 deletion pkg/messaging/memory/relay.go
Expand Up @@ -14,7 +14,7 @@ type Relay struct {

// Do will pass events from a channel to a target, by looking them up.
func (r Relay) Do(ctx context.Context, ch <-chan events.Event, p Phonebook) error {
for ctx.Err() != nil {
for ctx.Err() == nil {
e := <-ch
t, err := p.Lookup(e)
if err != nil {
Expand Down

0 comments on commit dc7fd4e

Please sign in to comment.