From dc7fd4e0849919b4abaac73548bc23e3935a7bd1 Mon Sep 17 00:00:00 2001 From: Andrew Neudegg Date: Tue, 29 Dec 2020 13:54:11 +0000 Subject: [PATCH] feat: groundwork for messaging events Signed-off-by: Andrew Neudegg --- pkg/e2e/sink_test/basic_test.go | 94 ++++++++++++++++++++++++++------- pkg/messaging/http/README.md | 3 ++ pkg/messaging/memory/README.md | 3 ++ pkg/messaging/memory/relay.go | 2 +- 4 files changed, 83 insertions(+), 19 deletions(-) create mode 100644 pkg/messaging/http/README.md create mode 100644 pkg/messaging/memory/README.md diff --git a/pkg/e2e/sink_test/basic_test.go b/pkg/e2e/sink_test/basic_test.go index 4d14fd9..54248c1 100644 --- a/pkg/e2e/sink_test/basic_test.go +++ b/pkg/e2e/sink_test/basic_test.go @@ -7,42 +7,54 @@ import ( "time" "github.com/andrewneudegg/delta/pkg/events" - "github.com/andrewneudegg/delta/pkg/relay" + "github.com/andrewneudegg/delta/pkg/messaging/memory" "github.com/andrewneudegg/delta/pkg/sink" "github.com/stretchr/testify/assert" ) -func TestSmoke(t *testing.T) { - mq := make(chan events.Event) - re := make(chan events.Event) - resultantEvents := []events.Event{} - numEvents := 100000 +func TestE2EMemory(t *testing.T) { + // Create a channel to pass messages from the api server. + incomingEvents := make(chan events.Event) - relay := relay.NewNoOpRelay(&relay.NoOpRelayOpts{ - Output: re, - }) - go relay.Do(context.TODO(), mq) - go func() { + // Create a channel to collect 'distributed messages'. + distributedMessageChan := make(chan events.Event) + distributedMessages := []events.Event{} + // pretend to be doing something with all these distributed events. + go func(ch chan events.Event) { for { - msg := <-re - resultantEvents = append(resultantEvents, msg) + event := <-ch + distributedMessages = append(distributedMessages, event) } - }() + }(distributedMessageChan) + + // phonebook will route all events to the specified channel. + phonebook := memory.Phonebook{ + Ch: distributedMessageChan, + } + // relay will route messages from incomingEvents to distributedMessages. + relay := memory.Relay{} + go relay.Do(context.TODO(), incomingEvents, phonebook) + // sinkServer will catch all events. sinkServer, _ := sink.NewHTTPSinkServer(&sink.HTTPSinkServerConfiguration{ ServerConfiguration: sink.ServerConfiguration{ - ToChan: mq, + ToChan: incomingEvents, }, ListenAddr: ":8090", MaxBodySize: 2097152, }) go sinkServer.Serve(context.TODO()) + + // pause for a bit in case we haven't context switches goroutines yet. time.Sleep(time.Second) + // create a client. client := SinkClient{ Addr: "http://localhost:8090", } + // send a bunch of events. + numEvents := 100000 for i := 0; i < numEvents; i++ { result, err := client.Send("/test/hello", map[string][]string{ "Host": {fmt.Sprintf("%d.com", i)}, @@ -52,7 +64,53 @@ func TestSmoke(t *testing.T) { assert.NotEqual(t, "", result) } - assert.Equal(t, "/test/hello", resultantEvents[0].GetURI()) - assert.Equal(t, numEvents, len(resultantEvents)) - sinkServer.Stop(context.Background()) + // check that we received the right number of events... + assert.Equal(t, numEvents, len(distributedMessages)) } + + + +// func TestSmoke(t *testing.T) { +// mq := make(chan events.Event) +// re := make(chan events.Event) +// resultantEvents := []events.Event{} +// numEvents := 100000 + +// relay := relay.NewNoOpRelay(&relay.NoOpRelayOpts{ +// Output: re, +// }) +// go relay.Do(context.TODO(), mq) +// go func() { +// for { +// msg := <-re +// resultantEvents = append(resultantEvents, msg) +// } +// }() + +// sinkServer, _ := sink.NewHTTPSinkServer(&sink.HTTPSinkServerConfiguration{ +// ServerConfiguration: sink.ServerConfiguration{ +// ToChan: mq, +// }, +// ListenAddr: ":8090", +// MaxBodySize: 2097152, +// }) +// go sinkServer.Serve(context.TODO()) +// time.Sleep(time.Second) + +// client := SinkClient{ +// Addr: "http://localhost:8090", +// } + +// for i := 0; i < numEvents; i++ { +// result, err := client.Send("/test/hello", map[string][]string{ +// "Host": {fmt.Sprintf("%d.com", i)}, +// "Content-Type": {"application/json"}, +// }, []byte("hello")) +// assert.Nil(t, err) +// assert.NotEqual(t, "", result) +// } + +// assert.Equal(t, "/test/hello", resultantEvents[0].GetURI()) +// assert.Equal(t, numEvents, len(resultantEvents)) +// sinkServer.Stop(context.Background()) +// } diff --git a/pkg/messaging/http/README.md b/pkg/messaging/http/README.md new file mode 100644 index 0000000..f059dd5 --- /dev/null +++ b/pkg/messaging/http/README.md @@ -0,0 +1,3 @@ +# HTTP messaging + +Is the implementation that routes messages over HTTP, via a dns entry. When running locally this will be `localhost`. diff --git a/pkg/messaging/memory/README.md b/pkg/messaging/memory/README.md new file mode 100644 index 0000000..aa3c013 --- /dev/null +++ b/pkg/messaging/memory/README.md @@ -0,0 +1,3 @@ +# Memory messaging + +Is the reference design for all other messaging implementations. diff --git a/pkg/messaging/memory/relay.go b/pkg/messaging/memory/relay.go index 429f4d7..5963139 100644 --- a/pkg/messaging/memory/relay.go +++ b/pkg/messaging/memory/relay.go @@ -14,7 +14,7 @@ type Relay struct { // Do will pass events from a channel to a target, by looking them up. func (r Relay) Do(ctx context.Context, ch <-chan events.Event, p Phonebook) error { - for ctx.Err() != nil { + for ctx.Err() == nil { e := <-ch t, err := p.Lookup(e) if err != nil {