Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: eventsource #846

Merged
merged 2 commits into from
Aug 26, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 12 additions & 7 deletions eventsources/sources/azureeventshub/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}

handler := func(c context.Context, event *eventhub.Event) error {
log.Info("received an event from eventshub...")

eventData := &events.AzureEventsHubEventData{
Id: event.ID,
Body: event.Data,
Expand All @@ -98,6 +100,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
return errors.Wrapf(err, "failed to marshal the event data for event source %s and message id %s", el.GetEventName(), event.ID)
}

log.Info("dispatching the event to eventbus...")
err = dispatch(eventBytes)
if err != nil {
log.Error("failed to dispatch Azure EventHub event", zap.Error(err))
Expand All @@ -113,23 +116,25 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
return errors.Wrapf(err, "failed to get the hub runtime information for %s", el.GetEventName())
}

var listenerHandles []*eventhub.ListenerHandle
if runtimeInfo == nil {
return errors.Wrapf(err, "runtime information is not available for %s", el.GetEventName())
}

if runtimeInfo.PartitionIDs == nil {
return errors.Wrapf(err, "no partition ids are available for %s", el.GetEventName())
}

log.Info("handling the partitions...")
for _, partitionID := range runtimeInfo.PartitionIDs {
listenerHandle, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithLatestOffset())
if err != nil {
if _, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithLatestOffset()); err != nil {
return errors.Wrapf(err, "failed to receive events from partition %s", partitionID)
}
listenerHandles = append(listenerHandles, listenerHandle)
}

<-ctx.Done()
log.Info("stopping listener handlers")

for _, handler := range listenerHandles {
handler.Close(ctx)
}
hub.Close(context.Background())

return nil
}