Skip to content

Commit

Permalink
refactor: major refactor for the relay defintions.
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Neudegg <andrew.neudegg@finbourne.com>
  • Loading branch information
AndrewNeudegg committed Dec 29, 2020
1 parent e2eee73 commit a9f68c0
Show file tree
Hide file tree
Showing 26 changed files with 265 additions and 381 deletions.
69 changes: 27 additions & 42 deletions cmd/bridge/subcmd/simulator/bridge.go
Expand Up @@ -2,25 +2,17 @@
package simulator

import (
"context"
"fmt"
"math/rand"
"time"

"github.com/google/uuid"
"github.com/spf13/cobra"

"github.com/andrewneudegg/delta/cmd/bridge/apputil"
"github.com/andrewneudegg/delta/pkg/events"
"github.com/andrewneudegg/delta/pkg/queue"
)

type simulatorOpts struct {
Interval time.Duration

appState *apputil.AppState

Qimpl queue.Q
}

// Cmd demonstrates how to configure a new subcommand.
Expand All @@ -35,13 +27,6 @@ func Cmd(appState *apputil.AppState) *cobra.Command {
Run: func(cmd *cobra.Command, args []string) {
appState.Logger.Debug("starting example cmd")

go simulator(&simulatorOpts{
Interval: generationInterval,
appState: appState,
Qimpl: queue.NewMemoryQ(),
})

appState.Block(context.TODO())
},
}

Expand All @@ -50,32 +35,32 @@ func Cmd(appState *apputil.AppState) *cobra.Command {
return cmd
}

func simulator(opts *simulatorOpts) {
r := rand.New(rand.NewSource(0))
// func simulator(opts *simulatorOpts) {
// r := rand.New(rand.NewSource(0))

mq := make(chan events.Event)
go func() {
for {
msg := <-mq
opts.appState.Logger.Debugf("queuing msg with id: '%s'", msg.GetMessageID())
opts.Qimpl.Push(msg)
}
}()
// mq := make(chan events.Event)
// go func() {
// for {
// msg := <-mq
// opts.appState.Logger.Debugf("queuing msg with id: '%s'", msg.GetMessageID())
// opts.Qimpl.Push(msg)
// }
// }()

go func() {
for {
time.Sleep(opts.Interval)
opts.appState.Logger.Debug("generating event")
msg := &events.EventMsg{
ID: uuid.New().String(),
Headers: map[string][]string{
"User-Agent": {"test"},
"Host": {"test.com"},
},
URI: "/test/1/2/3",
Content: []byte(fmt.Sprintf("%d", r.Intn(100000))),
}
mq <- msg
}
}()
}
// go func() {
// for {
// time.Sleep(opts.Interval)
// opts.appState.Logger.Debug("generating event")
// msg := &events.EventMsg{
// ID: uuid.New().String(),
// Headers: map[string][]string{
// "User-Agent": {"test"},
// "Host": {"test.com"},
// },
// URI: "/test/1/2/3",
// Content: []byte(fmt.Sprintf("%d", r.Intn(100000))),
// }
// mq <- msg
// }
// }()
// }
2 changes: 1 addition & 1 deletion pkg/queue/memory.go → pkg/_queue/memory.go
@@ -1,4 +1,4 @@
package queue
package _queue

import (
"container/list"
Expand Down
2 changes: 1 addition & 1 deletion pkg/queue/memory_test.go → pkg/_queue/memory_test.go
@@ -1,4 +1,4 @@
package queue
package _queue

import (
"testing"
Expand Down
2 changes: 1 addition & 1 deletion pkg/queue/queue.go → pkg/_queue/queue.go
@@ -1,4 +1,4 @@
package queue
package _queue

// Q is a any type of queue, memory, RPC, HTTP, Kafka, etc.
type Q interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/relay/httpRelay.go → pkg/_relay/httpRelay.go
@@ -1,4 +1,4 @@
package relay
package _relay

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion pkg/relay/noop.go → pkg/_relay/noop.go
@@ -1,4 +1,4 @@
package relay
package _relay

import (
"context"
Expand Down
34 changes: 34 additions & 0 deletions pkg/_relay/relay.go
@@ -0,0 +1,34 @@
package _relay

import (
"context"

"github.com/andrewneudegg/delta/pkg/events"
)

// ------------ RELAY -------------

// R is the relay interface, all relays should adhere to this.
type R interface {
Do(context.Context, <-chan events.Event)
}

// ------------ / RELAY -------------

// Server will serve events to requesting clients.
type Server interface {
events.Distributor
Start() error // Start the server.
Stop(ctx context.Context) error // Stop the server.
}

// Client provides clients with a mechanism to access events.
type Client interface {
GetNextEvent() (events.Event, error) // GetNextEvent will return the next available event.
GetNextNEvents(int) ([]events.Event, error) // GetNextNEvents will return up to N next events.
CompleteEvent(events.Event) error // CompleteEvent will complete the underlying event if required.
CompleteNEvents([]events.Event) error // CompleteNEvents will complete N underlying events if required.
FailEvent(events.Event) error // FailEvent will inform the RelayServer that the given event could not be processed.
FailNEvents([]events.Event) error // FailNEvents provides a convenience mechanism for bulk failing events.
EstimateLoad() (int, error) // EstimateLoad provides an estimate for the amount of work being done by this server, between 0 and 100.
}
10 changes: 2 additions & 8 deletions pkg/e2e/sink_test/basic_test.go
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/andrewneudegg/delta/pkg/events"
"github.com/andrewneudegg/delta/pkg/messaging/memory"
"github.com/andrewneudegg/delta/pkg/relay/memory"
"github.com/andrewneudegg/delta/pkg/sink"
"github.com/stretchr/testify/assert"
)
Expand All @@ -27,13 +27,9 @@ func TestE2EMemory(t *testing.T) {
}
}(distributedMessageChan)

// phonebook will route all events to the specified channel.
phonebook := memory.Phonebook{
Ch: distributedMessageChan,
}
// relay will route messages from incomingEvents to distributedMessages.
relay := memory.Relay{}
go relay.Do(context.TODO(), incomingEvents, phonebook)
go relay.Do(context.TODO(), incomingEvents, distributedMessageChan)

// sinkServer will catch all events.
sinkServer, _ := sink.NewHTTPSinkServer(&sink.HTTPSinkServerConfiguration{
Expand Down Expand Up @@ -68,8 +64,6 @@ func TestE2EMemory(t *testing.T) {
assert.Equal(t, numEvents, len(distributedMessages))
}



// func TestSmoke(t *testing.T) {
// mq := make(chan events.Event)
// re := make(chan events.Event)
Expand Down
17 changes: 0 additions & 17 deletions pkg/messaging/dialer.go

This file was deleted.

20 changes: 0 additions & 20 deletions pkg/messaging/memory/phonebook.go

This file was deleted.

30 changes: 0 additions & 30 deletions pkg/messaging/memory/relay.go

This file was deleted.

20 changes: 0 additions & 20 deletions pkg/messaging/memory/target.go

This file was deleted.

18 changes: 0 additions & 18 deletions pkg/messaging/memory/transform.go

This file was deleted.

12 changes: 0 additions & 12 deletions pkg/messaging/relay.go

This file was deleted.

37 changes: 0 additions & 37 deletions pkg/messaging/tcp/phonebook.go

This file was deleted.

0 comments on commit a9f68c0

Please sign in to comment.