Skip to content

Commit

Permalink
Merge pull request #3588 from katiewasnothere/exposeEventPublisher
Browse files Browse the repository at this point in the history
Export shim publisher functions
  • Loading branch information
crosbymichael committed Aug 28, 2019
2 parents d177ffd + 2d8a65b commit 0293cbd
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
18 changes: 9 additions & 9 deletions runtime/v2/shim/publisher.go
Expand Up @@ -41,13 +41,13 @@ type item struct {
count int
}

func newPublisher(address string) (*remoteEventsPublisher, error) {
func NewPublisher(address string) (*RemoteEventsPublisher, error) {
client, err := ttrpcutil.NewClient(address)
if err != nil {
return nil, err
}

l := &remoteEventsPublisher{
l := &RemoteEventsPublisher{
client: client,
closed: make(chan struct{}),
requeue: make(chan *item, queueSize),
Expand All @@ -57,26 +57,26 @@ func newPublisher(address string) (*remoteEventsPublisher, error) {
return l, nil
}

type remoteEventsPublisher struct {
type RemoteEventsPublisher struct {
client *ttrpcutil.Client
closed chan struct{}
closer sync.Once
requeue chan *item
}

func (l *remoteEventsPublisher) Done() <-chan struct{} {
func (l *RemoteEventsPublisher) Done() <-chan struct{} {
return l.closed
}

func (l *remoteEventsPublisher) Close() (err error) {
func (l *RemoteEventsPublisher) Close() (err error) {
err = l.client.Close()
l.closer.Do(func() {
close(l.closed)
})
return err
}

func (l *remoteEventsPublisher) processQueue() {
func (l *RemoteEventsPublisher) processQueue() {
for i := range l.requeue {
if i.count > maxRequeue {
logrus.Errorf("evicting %s from queue because of retry count", i.ev.Topic)
Expand All @@ -91,7 +91,7 @@ func (l *remoteEventsPublisher) processQueue() {
}
}

func (l *remoteEventsPublisher) queue(i *item) {
func (l *RemoteEventsPublisher) queue(i *item) {
go func() {
i.count++
// re-queue after a short delay
Expand All @@ -100,7 +100,7 @@ func (l *remoteEventsPublisher) queue(i *item) {
}()
}

func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
Expand All @@ -127,7 +127,7 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
return nil
}

func (l *remoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
_, err := l.client.EventsService().Forward(ctx, req)
if err == nil {
return nil
Expand Down
2 changes: 1 addition & 1 deletion runtime/v2/shim/shim.go
Expand Up @@ -169,7 +169,7 @@ func run(id string, initFunc Init, config Config) error {

ttrpcAddress := os.Getenv(ttrpcAddressEnv)

publisher, err := newPublisher(ttrpcAddress)
publisher, err := NewPublisher(ttrpcAddress)
if err != nil {
return err
}
Expand Down

0 comments on commit 0293cbd

Please sign in to comment.