Skip to content

Commit

Permalink
feat: Adding support to propagate unique ID to message for Kafka Even…
Browse files Browse the repository at this point in the history
…tSource messages (#1453)
  • Loading branch information
dpadhiar authored and whynowy committed Dec 9, 2021
1 parent 202e50e commit 76777a7
Show file tree
Hide file tree
Showing 28 changed files with 105 additions and 44 deletions.
13 changes: 13 additions & 0 deletions eventsources/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package common

import "github.com/cloudevents/sdk-go/v2/event"

type Options func(*event.Event) error

// Option to set different ID for event
func WithID(id string) Options {
return func(e *event.Event) error {
e.SetID(id)
return nil
}
}
5 changes: 3 additions & 2 deletions eventsources/common/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
metrics "github.com/argoproj/argo-events/metrics"
"github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"
)
Expand Down Expand Up @@ -177,7 +178,7 @@ func activateRoute(router Router, controller *Controller) {
}

// manageRouteChannels consumes data from route's data channel and stops the processing when the event source is stopped/removed
func manageRouteChannels(router Router, dispatch func([]byte) error) {
func manageRouteChannels(router Router, dispatch func([]byte, ...eventsourcecommon.Options) error) {
route := router.GetRoute()
logger := route.Logger
for {
Expand All @@ -198,7 +199,7 @@ func manageRouteChannels(router Router, dispatch func([]byte) error) {
}

// ManagerRoute manages the lifecycle of a route
func ManageRoute(ctx context.Context, router Router, controller *Controller, dispatch func([]byte) error) error {
func ManageRoute(ctx context.Context, router Router, controller *Controller, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
route := router.GetRoute()

logger := route.Logger
Expand Down
11 changes: 9 additions & 2 deletions eventsources/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/argoproj/argo-events/common/logging"
"github.com/argoproj/argo-events/eventbus"
eventbusdriver "github.com/argoproj/argo-events/eventbus/driver"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/sources/amqp"
"github.com/argoproj/argo-events/eventsources/sources/awssns"
"github.com/argoproj/argo-events/eventsources/sources/awssqs"
Expand Down Expand Up @@ -63,7 +64,7 @@ type EventingServer interface {
GetEventSourceType() apicommon.EventSourceType

// Function to start listening events.
StartListening(ctx context.Context, dispatch func([]byte) error) error
StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error
}

// GetEventingServers returns the mapping of event source type and list of eventing servers
Expand Down Expand Up @@ -392,13 +393,19 @@ func (e *EventSourceAdaptor) run(ctx context.Context, servers map[apicommon.Even
Jitter: &jitter,
}
if err = common.Connect(&backoff, func() error {
return s.StartListening(ctx, func(data []byte) error {
return s.StartListening(ctx, func(data []byte, opts ...eventsourcecommon.Options) error {
event := cloudevents.NewEvent()
event.SetID(fmt.Sprintf("%x", uuid.New()))
event.SetType(string(s.GetEventSourceType()))
event.SetSource(s.GetEventSourceName())
event.SetSubject(s.GetEventName())
event.SetTime(time.Now())
for _, opt := range opts {
err := opt(&event)
if err != nil {
return err
}
}
err := event.SetData(cloudevents.ApplicationJSON, data)
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions eventsources/sources/amqp/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/sources"
metrics "github.com/argoproj/argo-events/metrics"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
Expand Down Expand Up @@ -58,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())

Expand Down Expand Up @@ -153,7 +154,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}
}

func (el *EventListener) handleOne(amqpEventSource *v1alpha1.AMQPEventSource, msg amqplib.Delivery, dispatch func([]byte) error, log *zap.SugaredLogger) error {
func (el *EventListener) handleOne(amqpEventSource *v1alpha1.AMQPEventSource, msg amqplib.Delivery, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error {
defer func(start time.Time) {
el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond))
}(time.Now())
Expand Down
3 changes: 2 additions & 1 deletion eventsources/sources/awssns/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
commonaws "github.com/argoproj/argo-events/eventsources/common/aws"
"github.com/argoproj/argo-events/eventsources/common/webhook"
"github.com/argoproj/argo-events/eventsources/sources"
Expand Down Expand Up @@ -267,7 +268,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts an SNS event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
logger := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())

Expand Down
5 changes: 3 additions & 2 deletions eventsources/sources/awssqs/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.uber.org/zap"

"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
awscommon "github.com/argoproj/argo-events/eventsources/common/aws"
"github.com/argoproj/argo-events/eventsources/sources"
metrics "github.com/argoproj/argo-events/metrics"
Expand Down Expand Up @@ -60,7 +61,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
log.Info("started processing the AWS SQS event source...")
Expand Down Expand Up @@ -122,7 +123,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}
}

func (el *EventListener) processMessage(ctx context.Context, message *sqslib.Message, dispatch func([]byte) error, ack func(), log *zap.SugaredLogger) {
func (el *EventListener) processMessage(ctx context.Context, message *sqslib.Message, dispatch func([]byte, ...eventsourcecommon.Options) error, ack func(), log *zap.SugaredLogger) {
defer func(start time.Time) {
el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond))
}(time.Now())
Expand Down
3 changes: 2 additions & 1 deletion eventsources/sources/azureeventshub/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/sources"
metrics "github.com/argoproj/argo-events/metrics"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
Expand Down Expand Up @@ -59,7 +60,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
log.Info("started processing the Azure Events Hub event source...")
Expand Down
3 changes: 2 additions & 1 deletion eventsources/sources/bitbucketserver/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/common/webhook"
"github.com/argoproj/argo-events/eventsources/sources"
"github.com/argoproj/argo-events/pkg/apis/events"
Expand Down Expand Up @@ -157,7 +158,7 @@ func (router *Router) PostInactivate() error {
}

// StartListening starts an event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
defer sources.Recover(el.GetEventName())

bitbucketserverEventSource := &el.BitbucketServerEventSource
Expand Down
3 changes: 2 additions & 1 deletion eventsources/sources/calendar/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/persist"
metrics "github.com/argoproj/argo-events/metrics"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
Expand Down Expand Up @@ -134,7 +135,7 @@ func (el *EventListener) getExecutionTime() (time.Time, error) {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
el.log = logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
el.log.Info("started processing the calendar event source...")
Expand Down
3 changes: 2 additions & 1 deletion eventsources/sources/emitter/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/sources"
metrics "github.com/argoproj/argo-events/metrics"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
Expand Down Expand Up @@ -58,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
log.Info("started processing the Emitter event source...")
Expand Down
7 changes: 4 additions & 3 deletions eventsources/sources/file/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.uber.org/zap"

"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/common/fsevent"
"github.com/argoproj/argo-events/eventsources/sources"
metrics "github.com/argoproj/argo-events/metrics"
Expand Down Expand Up @@ -60,7 +61,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
defer sources.Recover(el.GetEventName())
Expand All @@ -81,7 +82,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}

// listenEvents listen to file related events.
func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte) error, log *zap.SugaredLogger) error {
func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error {
fileEventSource := &el.FileEventSource

// create new fs watcher
Expand Down Expand Up @@ -161,7 +162,7 @@ func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte)
}

// listenEvents listen to file related events using polling.
func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func([]byte) error, log *zap.SugaredLogger) error {
func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error {
fileEventSource := &el.FileEventSource

// create new fs watcher
Expand Down
3 changes: 2 additions & 1 deletion eventsources/sources/gcppubsub/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/sources"
metrics "github.com/argoproj/argo-events/metrics"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
Expand Down Expand Up @@ -65,7 +66,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening listens to GCP PubSub events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
// In order to listen events from GCP PubSub,
// 1. Parse the event source that contains configuration to connect to GCP PubSub
// 2. Create a new PubSub client
Expand Down
5 changes: 3 additions & 2 deletions eventsources/sources/generic/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/sources"
metrics "github.com/argoproj/argo-events/metrics"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
Expand Down Expand Up @@ -47,7 +48,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening listens to generic events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
logger := logging.FromContext(ctx).
With(zap.String(logging.LabelEventSourceType, string(el.GetEventSourceType())),
zap.String(logging.LabelEventName, el.GetEventName()),
Expand Down Expand Up @@ -93,7 +94,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}
}

func (el *EventListener) handleOne(event *Event, dispatch func([]byte) error, logger *zap.SugaredLogger) error {
func (el *EventListener) handleOne(event *Event, dispatch func([]byte, ...eventsourcecommon.Options) error, logger *zap.SugaredLogger) error {
defer func(start time.Time) {
el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond))
}(time.Now())
Expand Down
3 changes: 2 additions & 1 deletion eventsources/sources/github/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/common/webhook"
"github.com/argoproj/argo-events/pkg/apis/events"
)
Expand Down Expand Up @@ -156,7 +157,7 @@ func (router *Router) PostInactivate() error {
}

// StartListening starts an event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
logger := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
logger.Info("started processing the Github event source...")
Expand Down
3 changes: 2 additions & 1 deletion eventsources/sources/gitlab/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/common/webhook"
"github.com/argoproj/argo-events/eventsources/sources"
"github.com/argoproj/argo-events/pkg/apis/events"
Expand Down Expand Up @@ -143,7 +144,7 @@ func (router *Router) PostInactivate() error {
}

// StartListening starts an event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
logger := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
logger.Info("started processing the Gitlab event source...")
Expand Down
5 changes: 3 additions & 2 deletions eventsources/sources/hdfs/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.uber.org/zap"

"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/common/fsevent"
"github.com/argoproj/argo-events/eventsources/common/naivewatcher"
"github.com/argoproj/argo-events/eventsources/sources"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (w *WatchableHDFS) GetFileID(fi os.FileInfo) interface{} {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
log.Info("started processing the Emitter event source...")
Expand Down Expand Up @@ -156,7 +157,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}
}

func (el *EventListener) handleOne(event fsevent.Event, dispatch func([]byte) error, log *zap.SugaredLogger) error {
func (el *EventListener) handleOne(event fsevent.Event, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error {
defer func(start time.Time) {
el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond))
}(time.Now())
Expand Down

0 comments on commit 76777a7

Please sign in to comment.