diff --git a/examples/gateways/github-gateway-configmap.yaml b/examples/gateways/github-gateway-configmap.yaml index d8102dc53f..7930457657 100644 --- a/examples/gateways/github-gateway-configmap.yaml +++ b/examples/gateways/github-gateway-configmap.yaml @@ -4,6 +4,7 @@ metadata: name: github-gateway-configmap data: project_1: |- + id: 1234 owner: "owner-example" repository: "repo-example" hook: diff --git a/examples/gateways/gitlab-gateway-configmap.yaml b/examples/gateways/gitlab-gateway-configmap.yaml index 205ab86136..9eee3acfad 100644 --- a/examples/gateways/gitlab-gateway-configmap.yaml +++ b/examples/gateways/gitlab-gateway-configmap.yaml @@ -4,6 +4,7 @@ metadata: name: gitlab-gateway-configmap data: project_1: |- + id: 1234 projectId: "1" hook: endpoint: "/push" diff --git a/gateways/common/fake.go b/gateways/common/fake.go index e57811ec3a..918c3a67f2 100644 --- a/gateways/common/fake.go +++ b/gateways/common/fake.go @@ -32,16 +32,35 @@ func (f *FakeHttpWriter) WriteHeader(status int) { f.HeaderStatus = status } -func GetFakeRouteConfig() *RouteConfig { - return &RouteConfig{ +type FakeRouteConfig struct { + route *Route +} + +func (f *FakeRouteConfig) GetRoute() *Route { + return f.route +} + +func (f *FakeRouteConfig) RouteHandler(writer http.ResponseWriter, request *http.Request) { +} + +func (f *FakeRouteConfig) PostStart() error { + return nil +} + +func (f *FakeRouteConfig) PostStop() error { + return nil +} + +func GetFakeRoute() *Route { + logger := common.GetLoggerContext(common.LoggerConf()).Logger() + return &Route{ Webhook: Hook, EventSource: &gateways.EventSource{ Name: "fake-event-source", Data: "hello", Id: "123", }, - Log: common.GetLoggerContext(common.LoggerConf()).Logger(), - Configs: make(map[string]interface{}), + Logger: &logger, StartCh: make(chan struct{}), } } diff --git a/gateways/common/webhook.go b/gateways/common/webhook.go index b8f7578fcc..902492dcb5 100644 --- a/gateways/common/webhook.go +++ b/gateways/common/webhook.go @@ -57,9 +57,9 @@ type WebhookHelper struct { // ActiveEndpoints keep track of endpoints that are already registered with server and their status active or inactive ActiveEndpoints map[string]*Endpoint // RouteActivateChan handles assigning new route to server. - RouteActivateChan chan *RouteConfig + RouteActivateChan chan RouteManager // RouteDeactivateChan handles deactivating existing route - RouteDeactivateChan chan *RouteConfig + RouteDeactivateChan chan RouteManager } // HTTP Muxer @@ -73,16 +73,20 @@ type activeServer struct { errChan chan error } -// RouteConfig contains configuration about an http route -type RouteConfig struct { - Webhook *Webhook - Configs map[string]interface{} - EventSource *gateways.EventSource - Log zerolog.Logger - StartCh chan struct{} - RouteActiveHandler func(writer http.ResponseWriter, request *http.Request, rc *RouteConfig) - PostActivate func(rc *RouteConfig) error - PostStop func(rc *RouteConfig) error +// Route contains common information for a route +type Route struct { + Webhook *Webhook + Logger *zerolog.Logger + StartCh chan struct{} + EventSource *gateways.EventSource +} + +// RouteManager is an interface to manage the configuration for a route +type RouteManager interface { + GetRoute() *Route + RouteHandler(writer http.ResponseWriter, request *http.Request) + PostStart() error + PostStop() error } // endpoint contains state of an http endpoint @@ -93,14 +97,14 @@ type Endpoint struct { DataCh chan []byte } -// NewWebhookHelper returns new webhook helper +// NewWebhookHelper returns new Webhook helper func NewWebhookHelper() *WebhookHelper { return &WebhookHelper{ ActiveEndpoints: make(map[string]*Endpoint), ActiveServers: make(map[string]*activeServer), Mutex: sync.Mutex{}, - RouteActivateChan: make(chan *RouteConfig), - RouteDeactivateChan: make(chan *RouteConfig), + RouteActivateChan: make(chan RouteManager), + RouteDeactivateChan: make(chan RouteManager), } } @@ -110,11 +114,12 @@ func InitRouteChannels(helper *WebhookHelper) { select { case config := <-helper.RouteActivateChan: // start server if it has not been started on this port - config.startHttpServer(helper) - config.StartCh <- struct{}{} + startHttpServer(config, helper) + startCh := config.GetRoute().StartCh + startCh <- struct{}{} case config := <-helper.RouteDeactivateChan: - webhook := config.Webhook + webhook := config.GetRoute().Webhook _, ok := helper.ActiveServers[webhook.Port] if ok { helper.ActiveEndpoints[webhook.Endpoint].Active = false @@ -129,20 +134,21 @@ func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // starts a http server -func (rc *RouteConfig) startHttpServer(helper *WebhookHelper) { +func startHttpServer(routeManager RouteManager, helper *WebhookHelper) { // start a http server only if no other configuration previously started the server on given port helper.Mutex.Lock() - if _, ok := helper.ActiveServers[rc.Webhook.Port]; !ok { + r := routeManager.GetRoute() + if _, ok := helper.ActiveServers[r.Webhook.Port]; !ok { s := &server{ mux: http.NewServeMux(), } - rc.Webhook.mux = s.mux - rc.Webhook.srv = &http.Server{ - Addr: ":" + fmt.Sprintf("%s", rc.Webhook.Port), + r.Webhook.mux = s.mux + r.Webhook.srv = &http.Server{ + Addr: ":" + fmt.Sprintf("%s", r.Webhook.Port), Handler: s, } errChan := make(chan error, 1) - helper.ActiveServers[rc.Webhook.Port] = &activeServer{ + helper.ActiveServers[r.Webhook.Port] = &activeServer{ srv: s.mux, errChan: errChan, } @@ -150,12 +156,12 @@ func (rc *RouteConfig) startHttpServer(helper *WebhookHelper) { // start http server go func() { var err error - if rc.Webhook.ServerCertPath == "" || rc.Webhook.ServerKeyPath == "" { - err = rc.Webhook.srv.ListenAndServe() + if r.Webhook.ServerCertPath == "" || r.Webhook.ServerKeyPath == "" { + err = r.Webhook.srv.ListenAndServe() } else { - err = rc.Webhook.srv.ListenAndServeTLS(rc.Webhook.ServerCertPath, rc.Webhook.ServerKeyPath) + err = r.Webhook.srv.ListenAndServeTLS(r.Webhook.ServerCertPath, r.Webhook.ServerKeyPath) } - rc.Log.Error().Err(err).Str("event-source", rc.EventSource.Name).Str("port", rc.Webhook.Port).Msg("http server stopped") + r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Str("port", r.Webhook.Port).Msg("http server stopped") if err != nil { errChan <- err } @@ -165,76 +171,87 @@ func (rc *RouteConfig) startHttpServer(helper *WebhookHelper) { } // activateRoute activates route -func (rc *RouteConfig) activateRoute(helper *WebhookHelper) { - helper.RouteActivateChan <- rc +func activateRoute(routeManager RouteManager, helper *WebhookHelper) { + r := routeManager.GetRoute() + helper.RouteActivateChan <- routeManager - <-rc.StartCh + <-r.StartCh - if rc.Webhook.mux == nil { + if r.Webhook.mux == nil { helper.Mutex.Lock() - rc.Webhook.mux = helper.ActiveServers[rc.Webhook.Port].srv + r.Webhook.mux = helper.ActiveServers[r.Webhook.Port].srv helper.Mutex.Unlock() } - rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Str("port", rc.Webhook.Port).Str("endpoint", rc.Webhook.Endpoint).Msg("adding route handler") - if _, ok := helper.ActiveEndpoints[rc.Webhook.Endpoint]; !ok { - helper.ActiveEndpoints[rc.Webhook.Endpoint] = &Endpoint{ + r.Logger.Info().Str("event-source-name", r.EventSource.Name).Str("port", r.Webhook.Port).Str("endpoint", r.Webhook.Endpoint).Msg("adding route handler") + + if _, ok := helper.ActiveEndpoints[r.Webhook.Endpoint]; !ok { + helper.ActiveEndpoints[r.Webhook.Endpoint] = &Endpoint{ Active: true, DataCh: make(chan []byte), } - rc.Webhook.mux.HandleFunc(rc.Webhook.Endpoint, func(writer http.ResponseWriter, request *http.Request) { - rc.RouteActiveHandler(writer, request, rc) - }) + r.Webhook.mux.HandleFunc(r.Webhook.Endpoint, routeManager.RouteHandler) } - helper.ActiveEndpoints[rc.Webhook.Endpoint].Active = true + helper.ActiveEndpoints[r.Webhook.Endpoint].Active = true - rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Str("port", rc.Webhook.Port).Str("endpoint", rc.Webhook.Endpoint).Msg("route handler added") + r.Logger.Info().Str("event-source-name", r.EventSource.Name).Str("port", r.Webhook.Port).Str("endpoint", r.Webhook.Endpoint).Msg("route handler added") } -func (rc *RouteConfig) processChannels(helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error { +func processChannels(routeManager RouteManager, helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error { + r := routeManager.GetRoute() + for { select { - case data := <-helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh: - rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Msg("new event received, dispatching to gateway client") + case data := <-helper.ActiveEndpoints[r.Webhook.Endpoint].DataCh: + r.Logger.Info().Str("event-source-name", r.EventSource.Name).Msg("new event received, dispatching to gateway client") err := eventStream.Send(&gateways.Event{ - Name: rc.EventSource.Name, + Name: r.EventSource.Name, Payload: data, }) if err != nil { - rc.Log.Error().Err(err).Str("event-source-name", rc.EventSource.Name).Msg("failed to send event") + r.Logger.Error().Err(err).Str("event-source-name", r.EventSource.Name).Msg("failed to send event") return err } case <-eventStream.Context().Done(): - rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Msg("connection is closed by client") - helper.RouteDeactivateChan <- rc + r.Logger.Info().Str("event-source-name", r.EventSource.Name).Msg("connection is closed by client") + helper.RouteDeactivateChan <- routeManager return nil // this error indicates that the server has stopped running - case err := <-helper.ActiveServers[rc.Webhook.Port].errChan: + case err := <-helper.ActiveServers[r.Webhook.Port].errChan: return err } } } -func DefaultPostActivate(rc *RouteConfig) error { - return nil -} +func ProcessRoute(routeManager RouteManager, helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error { + r := routeManager.GetRoute() -func DefaultPostStop(rc *RouteConfig) error { - return nil -} + r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("validating the route") + if err := validateRoute(routeManager.GetRoute()); err != nil { + r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Msg("error occurred validating route") + return err + } + + r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("activating the route") + activateRoute(routeManager, helper) -func ProcessRoute(rc *RouteConfig, helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error { - rc.activateRoute(helper) - if err := rc.PostActivate(rc); err != nil { + r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("running post start") + if err := routeManager.PostStart(); err != nil { + r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Msg("error occurred in post start") return err } - if err := rc.processChannels(helper, eventStream); err != nil { + + r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("processing channels") + if err := processChannels(routeManager, helper, eventStream); err != nil { + r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Msg("error occurred in process channel") return err } - if err := rc.PostStop(rc); err != nil { - rc.Log.Error().Err(err).Msg("error occurred while executing post stop logic") + + r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("running post stop") + if err := routeManager.PostStop(); err != nil { + r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Msg("error occurred in post stop") } return nil } @@ -258,6 +275,25 @@ func ValidateWebhook(w *Webhook) error { return nil } +func validateRoute(r *Route) error { + if r == nil { + return fmt.Errorf("route can't be nil") + } + if r.Webhook == nil { + return fmt.Errorf("webhook can't be nil") + } + if r.StartCh == nil { + return fmt.Errorf("start channel can't be nil") + } + if r.EventSource == nil { + return fmt.Errorf("event source can't be nil") + } + if r.Logger == nil { + return fmt.Errorf("logger can't be nil") + } + return nil +} + func FormatWebhookEndpoint(endpoint string) string { if !strings.HasPrefix(endpoint, "/") { return fmt.Sprintf("/%s", endpoint) diff --git a/gateways/common/webhook_test.go b/gateways/common/webhook_test.go index d0c79c0b3a..931a0c86b6 100644 --- a/gateways/common/webhook_test.go +++ b/gateways/common/webhook_test.go @@ -26,30 +26,15 @@ import ( "github.com/smartystreets/goconvey/convey" ) -func TestDefaultPostActivate(t *testing.T) { - convey.Convey("Given a route configuration, default post activate should be a no-op", t, func() { - rc := GetFakeRouteConfig() - err := DefaultPostActivate(rc) - convey.So(err, convey.ShouldBeNil) - }) -} - -func TestDefaultPostStop(t *testing.T) { - convey.Convey("Given a route configuration, default post stop should be a no-op", t, func() { - rc := GetFakeRouteConfig() - err := DefaultPostStop(rc) - convey.So(err, convey.ShouldBeNil) - }) +var rc = &FakeRouteConfig{ + route: GetFakeRoute(), } func TestProcessRoute(t *testing.T) { convey.Convey("Given a route configuration", t, func() { convey.Convey("Activate the route configuration", func() { - rc := GetFakeRouteConfig() - rc.Webhook.mux = http.NewServeMux() - rc.PostActivate = DefaultPostActivate - rc.PostStop = DefaultPostStop + rc.route.Webhook.mux = http.NewServeMux() ctx, cancel := context.WithCancel(context.Background()) fgs := &FakeGRPCStream{ @@ -57,10 +42,10 @@ func TestProcessRoute(t *testing.T) { } helper := NewWebhookHelper() - helper.ActiveEndpoints[rc.Webhook.Endpoint] = &Endpoint{ + helper.ActiveEndpoints[rc.route.Webhook.Endpoint] = &Endpoint{ DataCh: make(chan []byte), } - helper.ActiveServers[rc.Webhook.Port] = &activeServer{ + helper.ActiveServers[rc.route.Webhook.Port] = &activeServer{ errChan: make(chan error), } @@ -73,7 +58,7 @@ func TestProcessRoute(t *testing.T) { <-helper.RouteActivateChan }() go func() { - rc.StartCh <- struct{}{} + rc.route.StartCh <- struct{}{} }() go func() { time.Sleep(3 * time.Second) @@ -93,16 +78,15 @@ func TestProcessRoute(t *testing.T) { func TestProcessRouteChannels(t *testing.T) { convey.Convey("Given a route configuration", t, func() { convey.Convey("Stop server stream", func() { - rc := GetFakeRouteConfig() ctx, cancel := context.WithCancel(context.Background()) fgs := &FakeGRPCStream{ Ctx: ctx, } helper := NewWebhookHelper() - helper.ActiveEndpoints[rc.Webhook.Endpoint] = &Endpoint{ + helper.ActiveEndpoints[rc.route.Webhook.Endpoint] = &Endpoint{ DataCh: make(chan []byte), } - helper.ActiveServers[rc.Webhook.Port] = &activeServer{ + helper.ActiveServers[rc.route.Webhook.Port] = &activeServer{ errChan: make(chan error), } errCh := make(chan error) @@ -110,31 +94,30 @@ func TestProcessRouteChannels(t *testing.T) { <-helper.RouteDeactivateChan }() go func() { - errCh <- rc.processChannels(helper, fgs) + errCh <- processChannels(rc, helper, fgs) }() cancel() err := <-errCh convey.So(err, convey.ShouldBeNil) }) convey.Convey("Handle error", func() { - rc := GetFakeRouteConfig() fgs := &FakeGRPCStream{ Ctx: context.Background(), } helper := NewWebhookHelper() - helper.ActiveEndpoints[rc.Webhook.Endpoint] = &Endpoint{ + helper.ActiveEndpoints[rc.route.Webhook.Endpoint] = &Endpoint{ DataCh: make(chan []byte), } - helper.ActiveServers[rc.Webhook.Port] = &activeServer{ + helper.ActiveServers[rc.route.Webhook.Port] = &activeServer{ errChan: make(chan error), } errCh := make(chan error) err := fmt.Errorf("error") go func() { - helper.ActiveServers[rc.Webhook.Port].errChan <- err + helper.ActiveServers[rc.route.Webhook.Port].errChan <- err }() go func() { - errCh <- rc.processChannels(helper, fgs) + errCh <- processChannels(rc, helper, fgs) }() newErr := <-errCh convey.So(newErr.Error(), convey.ShouldEqual, err.Error()) diff --git a/gateways/community/aws-sns/config.go b/gateways/community/aws-sns/config.go index 64c2832336..54f678a034 100644 --- a/gateways/community/aws-sns/config.go +++ b/gateways/community/aws-sns/config.go @@ -20,11 +20,12 @@ import ( "time" "github.com/argoproj/argo-events/gateways/common" - "k8s.io/client-go/kubernetes" - + gwcommon "github.com/argoproj/argo-events/gateways/common" + snslib "github.com/aws/aws-sdk-go/service/sns" "github.com/ghodss/yaml" "github.com/rs/zerolog" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" ) const ( @@ -45,6 +46,16 @@ type SNSEventSourceExecutor struct { Namespace string } +// RouteConfig contains information for a route +type RouteConfig struct { + Route *gwcommon.Route + snses *snsEventSource + session *snslib.SNS + subscriptionArn *string + clientset kubernetes.Interface + namespace string +} + // Json http notifications // SNS posts those to your http url endpoint if http is selected as delivery method. // http://docs.aws.amazon.com/sns/latest/dg/json-formats.html#http-subscription-confirmation-json @@ -65,8 +76,8 @@ type httpNotification struct { UnsubscribeURL string `json:"UnsubscribeURL,omitempty"` // Only for notifications } -// snsConfig contains configuration to subscribe to SNS topic -type snsConfig struct { +// snsEventSource contains configuration to subscribe to SNS topic +type snsEventSource struct { // Hook defines a webhook. Hook *common.Webhook `json:"hook"` TopicArn string `json:"topicArn"` @@ -76,10 +87,10 @@ type snsConfig struct { } func parseEventSource(es string) (interface{}, error) { - var n *snsConfig - err := yaml.Unmarshal([]byte(es), &n) + var ses *snsEventSource + err := yaml.Unmarshal([]byte(es), &ses) if err != nil { return nil, err } - return n, nil + return ses, nil } diff --git a/gateways/community/aws-sns/config_test.go b/gateways/community/aws-sns/config_test.go index e1c07c570f..22a39fe169 100644 --- a/gateways/community/aws-sns/config_test.go +++ b/gateways/community/aws-sns/config_test.go @@ -41,7 +41,7 @@ func TestParseConfig(t *testing.T) { ps, err := parseEventSource(es) convey.So(err, convey.ShouldBeNil) convey.So(ps, convey.ShouldNotBeNil) - _, ok := ps.(*snsConfig) + _, ok := ps.(*snsEventSource) convey.So(ok, convey.ShouldEqual, true) }) } diff --git a/gateways/community/aws-sns/start.go b/gateways/community/aws-sns/start.go index 7d1723e69b..368f24eca0 100644 --- a/gateways/community/aws-sns/start.go +++ b/gateways/community/aws-sns/start.go @@ -28,12 +28,6 @@ import ( "github.com/ghodss/yaml" ) -const ( - labelSNSConfig = "snsConfig" - labelSNSSession = "snsSession" - labelSubscriptionArn = "subscriptionArn" -) - var ( helper = gwcommon.NewWebhookHelper() ) @@ -42,106 +36,107 @@ func init() { go gwcommon.InitRouteChannels(helper) } -// RouteActiveHandler handles new routes -func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *gwcommon.RouteConfig) { - var response string +// GetRoute returns the route +func (rc *RouteConfig) GetRoute() *gwcommon.Route { + return rc.Route +} + +// RouteHandler handles new routes +func (rc *RouteConfig) RouteHandler(writer http.ResponseWriter, request *http.Request) { + r := rc.Route - logger := rc.Log.With().Str("event-source", rc.EventSource.Name).Str("endpoint", rc.Webhook.Endpoint). - Str("port", rc.Webhook.Port). + logger := r.Logger.With(). + Str("event-source", r.EventSource.Name). + Str("endpoint", r.Webhook.Endpoint). + Str("port", r.Webhook.Port). Str("http-method", request.Method).Logger() + logger.Info().Msg("request received") - if !helper.ActiveEndpoints[rc.Webhook.Endpoint].Active { - response = fmt.Sprintf("the route: endpoint %s and method %s is deactived", rc.Webhook.Endpoint, rc.Webhook.Method) + if !helper.ActiveEndpoints[r.Webhook.Endpoint].Active { logger.Info().Msg("endpoint is not active") - common.SendErrorResponse(writer, response) + common.SendErrorResponse(writer, "") return } body, err := ioutil.ReadAll(request.Body) if err != nil { logger.Error().Err(err).Msg("failed to parse request body") - common.SendErrorResponse(writer, "failed to parse request") + common.SendErrorResponse(writer, "") return } var snspayload *httpNotification err = yaml.Unmarshal(body, &snspayload) if err != nil { - logger.Error().Err(err).Msg("failed to convert request payload into snsConfig payload") - common.SendErrorResponse(writer, "failed to marshal request") + logger.Error().Err(err).Msg("failed to convert request payload into sns event source payload") + common.SendErrorResponse(writer, "") return } - sc := rc.Configs[labelSNSConfig].(*snsConfig) - switch snspayload.Type { case messageTypeSubscriptionConfirmation: - awsSession := rc.Configs[labelSNSSession].(*snslib.SNS) + awsSession := rc.session out, err := awsSession.ConfirmSubscription(&snslib.ConfirmSubscriptionInput{ - TopicArn: &sc.TopicArn, + TopicArn: &rc.snses.TopicArn, Token: &snspayload.Token, }) if err != nil { logger.Error().Err(err).Msg("failed to send confirmation response to amazon") - common.SendErrorResponse(writer, "failed to confirm subscription") + common.SendErrorResponse(writer, "") return } - rc.Configs[labelSubscriptionArn] = out.SubscriptionArn + rc.subscriptionArn = out.SubscriptionArn case messageTypeNotification: - helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh <- body + helper.ActiveEndpoints[r.Webhook.Endpoint].DataCh <- body } - response = "request successfully processed" - logger.Info().Msg(response) - common.SendSuccessResponse(writer, response) + logger.Info().Msg("request successfully processed") } -// PostActivate subscribes to the sns topic -func (ese *SNSEventSourceExecutor) PostActivate(rc *gwcommon.RouteConfig) error { - logger := rc.Log.With().Str("event-source", rc.EventSource.Name).Str("endpoint", rc.Webhook.Endpoint). - Str("port", rc.Webhook.Port).Logger() +// PostStart subscribes to the sns topic +func (rc *RouteConfig) PostStart() error { + r := rc.Route - sc := rc.Configs[labelSNSConfig].(*snsConfig) + logger := r.Logger.With(). + Str("event-source", r.EventSource.Name). + Str("endpoint", r.Webhook.Endpoint). + Str("port", r.Webhook.Port). + Str("topic-arn", rc.snses.TopicArn). + Logger() + logger.Info().Msg("subscribing to sns topic") - creds, err := gwcommon.GetAWSCreds(ese.Clientset, ese.Namespace, sc.AccessKey, sc.SecretKey) + sc := rc.snses + creds, err := gwcommon.GetAWSCreds(rc.clientset, rc.namespace, sc.AccessKey, sc.SecretKey) if err != nil { - logger.Error().Err(err).Msg("failed to get aws credentials") + return fmt.Errorf("failed to get aws credentials. err: %+v", err) } awsSession, err := gwcommon.GetAWSSession(creds, sc.Region) if err != nil { - logger.Error().Err(err).Msg("failed to create new session") - return err + return fmt.Errorf("failed to create aws session. err: %+v", err) } - logger.Info().Msg("subscribing to sns topic") - - snsSession := snslib.New(awsSession) - rc.Configs[labelSNSSession] = snsSession + rc.session = snslib.New(awsSession) formattedUrl := gwcommon.GenerateFormattedURL(sc.Hook) - - if _, err := snsSession.Subscribe(&snslib.SubscribeInput{ + if _, err := rc.session.Subscribe(&snslib.SubscribeInput{ Endpoint: &formattedUrl, Protocol: &snsProtocol, TopicArn: &sc.TopicArn, }); err != nil { - logger.Error().Err(err).Msg("failed to send subscribe request") - return err + return fmt.Errorf("failed to send subscribe request. err: %+v", err) } return nil } // PostStop unsubscribes from the sns topic -func PostStop(rc *gwcommon.RouteConfig) error { - awsSession := rc.Configs[labelSNSSession].(*snslib.SNS) - if _, err := awsSession.Unsubscribe(&snslib.UnsubscribeInput{ - SubscriptionArn: rc.Configs[labelSubscriptionArn].(*string), +func (rc *RouteConfig) PostStop() error { + if _, err := rc.session.Unsubscribe(&snslib.UnsubscribeInput{ + SubscriptionArn: rc.subscriptionArn, }); err != nil { - rc.Log.Error().Err(err).Str("event-source-name", rc.EventSource.Name).Msg("failed to unsubscribe") - return err + return fmt.Errorf("failed to unsubscribe. err: %+v", err) } return nil } @@ -156,18 +151,17 @@ func (ese *SNSEventSourceExecutor) StartEventSource(eventSource *gateways.EventS ese.Log.Error().Err(err).Str("event-source-name", eventSource.Name).Msg("failed to parse event source") return err } - sc := config.(*snsConfig) - - return gwcommon.ProcessRoute(&gwcommon.RouteConfig{ - Webhook: sc.Hook, - Configs: map[string]interface{}{ - labelSNSConfig: sc, + sc := config.(*snsEventSource) + + return gwcommon.ProcessRoute(&RouteConfig{ + Route: &gwcommon.Route{ + Logger: &ese.Log, + EventSource: eventSource, + StartCh: make(chan struct{}), + Webhook: sc.Hook, }, - Log: ese.Log, - EventSource: eventSource, - PostActivate: ese.PostActivate, - PostStop: PostStop, - RouteActiveHandler: RouteActiveHandler, - StartCh: make(chan struct{}), + snses: sc, + namespace: ese.Namespace, + clientset: ese.Clientset, }, helper, eventStream) } diff --git a/gateways/community/aws-sns/start_test.go b/gateways/community/aws-sns/start_test.go index 46dd2f0c97..92df16d68f 100644 --- a/gateways/community/aws-sns/start_test.go +++ b/gateways/community/aws-sns/start_test.go @@ -18,22 +18,28 @@ package aws_sns import ( "bytes" - "github.com/argoproj/argo-events/common" + "io/ioutil" + "net/http" + "testing" + gwcommon "github.com/argoproj/argo-events/gateways/common" "github.com/aws/aws-sdk-go/aws/credentials" snslib "github.com/aws/aws-sdk-go/service/sns" "github.com/ghodss/yaml" "github.com/smartystreets/goconvey/convey" - "io/ioutil" "k8s.io/client-go/kubernetes/fake" - "net/http" - "testing" ) func TestAWSSNS(t *testing.T) { convey.Convey("Given an route configuration", t, func() { - rc := gwcommon.GetFakeRouteConfig() - helper.ActiveEndpoints[rc.Webhook.Endpoint] = &gwcommon.Endpoint{ + rc := &RouteConfig{ + Route: gwcommon.GetFakeRoute(), + namespace: "fake", + clientset: fake.NewSimpleClientset(), + } + r := rc.Route + + helper.ActiveEndpoints[r.Webhook.Endpoint] = &gwcommon.Endpoint{ DataCh: make(chan []byte), } writer := &gwcommon.FakeHttpWriter{} @@ -46,19 +52,19 @@ func TestAWSSNS(t *testing.T) { convey.So(err, convey.ShouldBeNil) snsSession := snslib.New(awsSession) - rc.Configs[labelSNSSession] = snsSession - rc.Configs[labelSubscriptionArn] = &subscriptionArn + rc.session = snsSession + rc.subscriptionArn = &subscriptionArn convey.Convey("handle the inactive route", func() { - RouteActiveHandler(writer, &http.Request{}, rc) + rc.RouteHandler(writer, &http.Request{}) convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusBadRequest) }) ps, err := parseEventSource(es) convey.So(err, convey.ShouldBeNil) - helper.ActiveEndpoints[rc.Webhook.Endpoint].Active = true - rc.Configs[labelSNSConfig] = ps.(*snsConfig) + helper.ActiveEndpoints[r.Webhook.Endpoint].Active = true + rc.snses = ps.(*snsEventSource) convey.Convey("handle the active route", func() { payload := httpNotification{ @@ -69,37 +75,35 @@ func TestAWSSNS(t *testing.T) { payloadBytes, err := yaml.Marshal(payload) convey.So(err, convey.ShouldBeNil) - RouteActiveHandler(writer, &http.Request{ + rc.RouteHandler(writer, &http.Request{ Body: ioutil.NopCloser(bytes.NewBuffer(payloadBytes)), - }, rc) + }) convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusBadRequest) - convey.So(string(writer.Payload), convey.ShouldEqual, "failed to confirm subscription") + + dataCh := make(chan []byte) go func() { - <-helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh + data := <-helper.ActiveEndpoints[r.Webhook.Endpoint].DataCh + dataCh <- data }() payload.Type = messageTypeNotification payloadBytes, err = yaml.Marshal(payload) convey.So(err, convey.ShouldBeNil) - RouteActiveHandler(writer, &http.Request{ + rc.RouteHandler(writer, &http.Request{ Body: ioutil.NopCloser(bytes.NewBuffer(payloadBytes)), - }, rc) - convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusOK) + }) + data := <-dataCh + convey.So(data, convey.ShouldNotBeNil) }) convey.Convey("Run post activate", func() { - ese := SNSEventSourceExecutor{ - Namespace: "fake", - Clientset: fake.NewSimpleClientset(), - Log: common.GetLoggerContext(common.LoggerConf()).Logger(), - } - err := ese.PostActivate(rc) + err := rc.PostStart() convey.So(err, convey.ShouldNotBeNil) }) convey.Convey("Run post stop", func() { - err = PostStop(rc) + err = rc.PostStop() convey.So(err, convey.ShouldNotBeNil) }) }) diff --git a/gateways/community/aws-sns/validate.go b/gateways/community/aws-sns/validate.go index 63b79e2ba0..9c3dac5938 100644 --- a/gateways/community/aws-sns/validate.go +++ b/gateways/community/aws-sns/validate.go @@ -29,7 +29,7 @@ func (ese *SNSEventSourceExecutor) ValidateEventSource(ctx context.Context, es * } func validateSNSConfig(config interface{}) error { - sc := config.(*snsConfig) + sc := config.(*snsEventSource) if sc == nil { return gwcommon.ErrNilEventSource } diff --git a/gateways/community/aws-sqs/config.go b/gateways/community/aws-sqs/config.go index a83dd2bcac..80eca23ac7 100644 --- a/gateways/community/aws-sqs/config.go +++ b/gateways/community/aws-sqs/config.go @@ -32,8 +32,8 @@ type SQSEventSourceExecutor struct { Namespace string } -// sqs contains information to listen to AWS SQS -type sqs struct { +// sqsEventSource contains information to listen to AWS SQS +type sqsEventSource struct { // AccessKey refers K8 secret containing aws access key AccessKey *corev1.SecretKeySelector `json:"accessKey"` @@ -52,7 +52,7 @@ type sqs struct { } func parseEventSource(es string) (interface{}, error) { - var n *sqs + var n *sqsEventSource err := yaml.Unmarshal([]byte(es), &n) if err != nil { return nil, err diff --git a/gateways/community/aws-sqs/config_test.go b/gateways/community/aws-sqs/config_test.go index 5d6929dfe8..aab41b8735 100644 --- a/gateways/community/aws-sqs/config_test.go +++ b/gateways/community/aws-sqs/config_test.go @@ -35,11 +35,11 @@ waitTimeSeconds: 10 ` func TestParseConfig(t *testing.T) { - convey.Convey("Given a aws-sqs event source, parse it", t, func() { + convey.Convey("Given a aws-sqsEventSource event source, parse it", t, func() { ps, err := parseEventSource(es) convey.So(err, convey.ShouldBeNil) convey.So(ps, convey.ShouldNotBeNil) - _, ok := ps.(*sqs) + _, ok := ps.(*sqsEventSource) convey.So(ok, convey.ShouldEqual, true) }) } diff --git a/gateways/community/aws-sqs/start.go b/gateways/community/aws-sqs/start.go index df44dfdee4..b564aa8ac9 100644 --- a/gateways/community/aws-sqs/start.go +++ b/gateways/community/aws-sqs/start.go @@ -29,7 +29,6 @@ func (ese *SQSEventSourceExecutor) StartEventSource(eventSource *gateways.EventS config, err := parseEventSource(eventSource.Data) if err != nil { ese.Log.Error().Err(err).Str("event-source-name", eventSource.Name).Msg("failed to parse event source") - return err } @@ -37,13 +36,13 @@ func (ese *SQSEventSourceExecutor) StartEventSource(eventSource *gateways.EventS errorCh := make(chan error) doneCh := make(chan struct{}, 1) - go ese.listenEvents(config.(*sqs), eventSource, dataCh, errorCh, doneCh) + go ese.listenEvents(config.(*sqsEventSource), eventSource, dataCh, errorCh, doneCh) return gateways.HandleEventsFromEventSource(eventSource.Name, eventStream, dataCh, errorCh, doneCh, &ese.Log) } // listenEvents fires an event when interval completes and item is processed from queue. -func (ese *SQSEventSourceExecutor) listenEvents(s *sqs, eventSource *gateways.EventSource, dataCh chan []byte, errorCh chan error, doneCh chan struct{}) { +func (ese *SQSEventSourceExecutor) listenEvents(s *sqsEventSource, eventSource *gateways.EventSource, dataCh chan []byte, errorCh chan error, doneCh chan struct{}) { defer gateways.Recover(eventSource.Name) creds, err := gwcommon.GetAWSCreds(ese.Clientset, ese.Namespace, s.AccessKey, s.SecretKey) diff --git a/gateways/community/aws-sqs/start_test.go b/gateways/community/aws-sqs/start_test.go index 499b311f5d..b7f992aa5d 100644 --- a/gateways/community/aws-sqs/start_test.go +++ b/gateways/community/aws-sqs/start_test.go @@ -47,7 +47,7 @@ func TestListenEvents(t *testing.T) { errorCh2 <- err }() - ese.listenEvents(ps.(*sqs), &gateways.EventSource{ + ese.listenEvents(ps.(*sqsEventSource), &gateways.EventSource{ Name: "fake", Data: es, Id: "1234", diff --git a/gateways/community/aws-sqs/validate.go b/gateways/community/aws-sqs/validate.go index 75de366fe5..5f5043dd68 100644 --- a/gateways/community/aws-sqs/validate.go +++ b/gateways/community/aws-sqs/validate.go @@ -30,7 +30,7 @@ func (ese *SQSEventSourceExecutor) ValidateEventSource(ctx context.Context, es * } func validateSQSConfig(config interface{}) error { - sc := config.(*sqs) + sc := config.(*sqsEventSource) if sc == nil { return gwcommon.ErrNilEventSource } diff --git a/gateways/community/aws-sqs/validate_test.go b/gateways/community/aws-sqs/validate_test.go index 0d34145e5d..31cf0538c6 100644 --- a/gateways/community/aws-sqs/validate_test.go +++ b/gateways/community/aws-sqs/validate_test.go @@ -50,7 +50,7 @@ secretKey: ) func TestSQSEventSourceExecutor_ValidateEventSource(t *testing.T) { - convey.Convey("Given a valid sqs event source spec, parse it and make sure no error occurs", t, func() { + convey.Convey("Given a valid sqsEventSource event source spec, parse it and make sure no error occurs", t, func() { ese := &SQSEventSourceExecutor{} valid, _ := ese.ValidateEventSource(context.Background(), &gateways.EventSource{ Name: configKey, @@ -61,7 +61,7 @@ func TestSQSEventSourceExecutor_ValidateEventSource(t *testing.T) { convey.So(valid.IsValid, convey.ShouldBeTrue) }) - convey.Convey("Given an invalid sqs event source spec, parse it and make sure error occurs", t, func() { + convey.Convey("Given an invalid sqsEventSource event source spec, parse it and make sure error occurs", t, func() { ese := &SQSEventSourceExecutor{} valid, _ := ese.ValidateEventSource(context.Background(), &gateways.EventSource{ Data: invalidConfig, diff --git a/gateways/community/gcp-pubsub/config.go b/gateways/community/gcp-pubsub/config.go index 68c8564b18..7021dbdbb6 100644 --- a/gateways/community/gcp-pubsub/config.go +++ b/gateways/community/gcp-pubsub/config.go @@ -26,8 +26,8 @@ type GcpPubSubEventSourceExecutor struct { Log zerolog.Logger } -// pubSubConfig contains configuration to subscribe to GCP PubSub topic -type pubSubConfig struct { +// pubSubEventSource contains configuration to subscribe to GCP PubSub topic +type pubSubEventSource struct { // ProjectID is the unique identifier for your project on GCP ProjectID string `json:"projectID"` // Topic on which a subscription will be created @@ -37,7 +37,7 @@ type pubSubConfig struct { } func parseEventSource(es string) (interface{}, error) { - var n *pubSubConfig + var n *pubSubEventSource err := yaml.Unmarshal([]byte(es), &n) if err != nil { return nil, err diff --git a/gateways/community/gcp-pubsub/config_test.go b/gateways/community/gcp-pubsub/config_test.go index 2eb280bf48..f816c79722 100644 --- a/gateways/community/gcp-pubsub/config_test.go +++ b/gateways/community/gcp-pubsub/config_test.go @@ -31,7 +31,7 @@ func TestParseConfig(t *testing.T) { ps, err := parseEventSource(es) convey.So(err, convey.ShouldBeNil) convey.So(ps, convey.ShouldNotBeNil) - _, ok := ps.(*pubSubConfig) + _, ok := ps.(*pubSubEventSource) convey.So(ok, convey.ShouldEqual, true) }) } diff --git a/gateways/community/gcp-pubsub/start.go b/gateways/community/gcp-pubsub/start.go index 14f00aa9f6..d2934ade04 100644 --- a/gateways/community/gcp-pubsub/start.go +++ b/gateways/community/gcp-pubsub/start.go @@ -34,7 +34,7 @@ func (ese *GcpPubSubEventSourceExecutor) StartEventSource(eventSource *gateways. ese.Log.Error().Err(err).Str("event-source-name", eventSource.Name).Msg("failed to parse event source") return err } - sc := config.(*pubSubConfig) + sc := config.(*pubSubEventSource) ctx := eventStream.Context() @@ -47,7 +47,7 @@ func (ese *GcpPubSubEventSourceExecutor) StartEventSource(eventSource *gateways. return gateways.HandleEventsFromEventSource(eventSource.Name, eventStream, dataCh, errorCh, doneCh, &ese.Log) } -func (ese *GcpPubSubEventSourceExecutor) listenEvents(ctx context.Context, sc *pubSubConfig, eventSource *gateways.EventSource, dataCh chan []byte, errorCh chan error, doneCh chan struct{}) { +func (ese *GcpPubSubEventSourceExecutor) listenEvents(ctx context.Context, sc *pubSubEventSource, eventSource *gateways.EventSource, dataCh chan []byte, errorCh chan error, doneCh chan struct{}) { // Create a new topic with the given name. logger := ese.Log.With().Str("event-source", eventSource.Name).Str("topic", sc.Topic).Logger() diff --git a/gateways/community/gcp-pubsub/start_test.go b/gateways/community/gcp-pubsub/start_test.go index ea21d8c23c..be06b0ebe1 100644 --- a/gateways/community/gcp-pubsub/start_test.go +++ b/gateways/community/gcp-pubsub/start_test.go @@ -29,7 +29,7 @@ func TestListenEvents(t *testing.T) { ps, err := parseEventSource(es) convey.So(err, convey.ShouldBeNil) convey.So(ps, convey.ShouldNotBeNil) - psc := ps.(*pubSubConfig) + psc := ps.(*pubSubEventSource) ese := &GcpPubSubEventSourceExecutor{ Log: common.GetLoggerContext(common.LoggerConf()).Logger(), diff --git a/gateways/community/gcp-pubsub/validate.go b/gateways/community/gcp-pubsub/validate.go index 1d87719451..f81c167e43 100644 --- a/gateways/community/gcp-pubsub/validate.go +++ b/gateways/community/gcp-pubsub/validate.go @@ -31,7 +31,7 @@ func (ese *GcpPubSubEventSourceExecutor) ValidateEventSource(ctx context.Context } func validatePubSubConfig(config interface{}) error { - sc := config.(*pubSubConfig) + sc := config.(*pubSubEventSource) if sc == nil { return gwcommon.ErrNilEventSource } diff --git a/gateways/community/github/config.go b/gateways/community/github/config.go index 22a5610089..08ee5c9801 100644 --- a/gateways/community/github/config.go +++ b/gateways/community/github/config.go @@ -18,7 +18,9 @@ package github import ( "github.com/argoproj/argo-events/gateways/common" + gwcommon "github.com/argoproj/argo-events/gateways/common" "github.com/ghodss/yaml" + "github.com/google/go-github/github" "github.com/rs/zerolog" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" @@ -33,8 +35,20 @@ type GithubEventSourceExecutor struct { Namespace string } -// githubConfig contains information to setup a github project integration -type githubConfig struct { +// RouteConfig contains information about the route +type RouteConfig struct { + route *gwcommon.Route + ges *githubEventSource + client *github.Client + hook *github.Hook + clientset kubernetes.Interface + namespace string +} + +// githubEventSource contains information to setup a github project integration +type githubEventSource struct { + // Webhook ID + Id int64 `json:"id"` // Webhook Hook *common.Webhook `json:"hook"` // GitHub owner name i.e. argoproj @@ -66,7 +80,7 @@ type cred struct { // parseEventSource parses a configuration of gateway func parseEventSource(config string) (interface{}, error) { - var g *githubConfig + var g *githubEventSource err := yaml.Unmarshal([]byte(config), &g) if err != nil { return nil, err diff --git a/gateways/community/github/config_test.go b/gateways/community/github/config_test.go index dde443535d..08bb2e6c1f 100644 --- a/gateways/community/github/config_test.go +++ b/gateways/community/github/config_test.go @@ -22,6 +22,7 @@ import ( ) var es = ` +id: 1234 hook: endpoint: "/push" port: "12000" @@ -40,7 +41,7 @@ func TestParseConfig(t *testing.T) { ps, err := parseEventSource(es) convey.So(err, convey.ShouldBeNil) convey.So(ps, convey.ShouldNotBeNil) - _, ok := ps.(*githubConfig) + _, ok := ps.(*githubEventSource) convey.So(ok, convey.ShouldEqual, true) }) } diff --git a/gateways/community/github/start.go b/gateways/community/github/start.go index 3d6b035ace..3e0df19b0b 100644 --- a/gateways/community/github/start.go +++ b/gateways/community/github/start.go @@ -32,12 +32,6 @@ import ( corev1 "k8s.io/api/core/v1" ) -const ( - labelGithubConfig = "config" - labelGithubClient = "client" - labelWebhook = "hook" -) - const ( githubEventHeader = "X-GitHub-Event" githubDeliveryHeader = "X-GitHub-Delivery" @@ -52,8 +46,8 @@ func init() { } // getCredentials for github -func (ese *GithubEventSourceExecutor) getCredentials(gs *corev1.SecretKeySelector) (*cred, error) { - token, err := store.GetSecrets(ese.Clientset, ese.Namespace, gs.Name, gs.Key) +func (rc *RouteConfig) getCredentials(gs *corev1.SecretKeySelector) (*cred, error) { + token, err := store.GetSecrets(rc.clientset, rc.namespace, gs.Name, gs.Key) if err != nil { return nil, err } @@ -62,10 +56,14 @@ func (ese *GithubEventSourceExecutor) getCredentials(gs *corev1.SecretKeySelecto }, nil } -func (ese *GithubEventSourceExecutor) PostActivate(rc *gwcommon.RouteConfig) error { - gc := rc.Configs[labelGithubConfig].(*githubConfig) +func (rc *RouteConfig) GetRoute() *gwcommon.Route { + return rc.route +} - c, err := ese.getCredentials(gc.APIToken) +func (rc *RouteConfig) PostStart() error { + gc := rc.ges + + c, err := rc.getCredentials(gc.APIToken) if err != nil { return fmt.Errorf("failed to rtrieve github credentials. err: %+v", err) } @@ -90,40 +88,39 @@ func (ese *GithubEventSourceExecutor) PostActivate(rc *gwcommon.RouteConfig) err } if gc.WebHookSecret != nil { - sc, err := ese.getCredentials(gc.WebHookSecret) + sc, err := rc.getCredentials(gc.WebHookSecret) if err != nil { return fmt.Errorf("failed to retrieve webhook secret. err: %+v", err) } hookConfig["secret"] = sc.secret } - hookSetup := &gh.Hook{ + rc.hook = &gh.Hook{ Events: gc.Events, Active: gh.Bool(gc.Active), Config: hookConfig, + ID: &rc.ges.Id, } - rc.Configs[labelWebhook] = hookSetup - client := gh.NewClient(PATTransport.Client()) + rc.client = gh.NewClient(PATTransport.Client()) if gc.GithubBaseURL != "" { baseURL, err := url.Parse(gc.GithubBaseURL) if err != nil { return fmt.Errorf("failed to parse github base url. err: %s", err) } - client.BaseURL = baseURL + rc.client.BaseURL = baseURL } if gc.GithubUploadURL != "" { uploadURL, err := url.Parse(gc.GithubUploadURL) if err != nil { return fmt.Errorf("failed to parse github upload url. err: %s", err) } - client.UploadURL = uploadURL + rc.client.UploadURL = uploadURL } - rc.Configs[labelGithubClient] = client ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - hook, _, err := client.Repositories.CreateHook(ctx, gc.Owner, gc.Repository, hookSetup) + hook, _, err := rc.client.Repositories.CreateHook(ctx, gc.Owner, gc.Repository, rc.hook) if err != nil { // Continue if error is because hook already exists er, ok := err.(*gh.ErrorResponse) @@ -132,22 +129,28 @@ func (ese *GithubEventSourceExecutor) PostActivate(rc *gwcommon.RouteConfig) err } } - ese.Log.Info().Str("event-source-name", rc.EventSource.Name).Interface("hook-id", *hook.ID).Msg("github hook created") + if hook == nil { + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + hook, _, err = rc.client.Repositories.GetHook(ctx, gc.Owner, gc.Repository, rc.ges.Id) + if err != nil { + return fmt.Errorf("failed to get existing webhook with id %d. err: %+v", rc.ges.Id, err) + } + } + + rc.hook = hook + rc.route.Logger.Info().Str("event-source-name", rc.route.EventSource.Name).Msg("github hook created") return nil } -func PostStop(rc *gwcommon.RouteConfig) error { - gc := rc.Configs[labelGithubConfig].(*githubConfig) - client := rc.Configs[labelGithubClient].(*gh.Client) - hook := rc.Configs[labelWebhook].(*gh.Hook) - +// PostStop runs after event source is stopped +func (rc *RouteConfig) PostStop() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - if _, err := client.Repositories.DeleteHook(ctx, gc.Owner, gc.Repository, *hook.ID); err != nil { - rc.Log.Error().Err(err).Str("event-source-name", rc.EventSource.Name).Msg("failed to delete github hook") - return err + if _, err := rc.client.Repositories.DeleteHook(ctx, rc.ges.Owner, rc.ges.Repository, *rc.hook.ID); err != nil { + return fmt.Errorf("failed to delete hook. err: %+v", err) } - rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Interface("hook-id", *hook.ID).Msg("github hook deleted") + rc.route.Logger.Info().Str("event-source-name", rc.route.EventSource.Name).Msg("github hook deleted") return nil } @@ -159,20 +162,20 @@ func (ese *GithubEventSourceExecutor) StartEventSource(eventSource *gateways.Eve config, err := parseEventSource(eventSource.Data) if err != nil { ese.Log.Error().Err(err).Str("event-source-name", eventSource.Name).Msg("failed to parse event source") + return err } - gc := config.(*githubConfig) + gc := config.(*githubEventSource) - return gwcommon.ProcessRoute(&gwcommon.RouteConfig{ - Webhook: gc.Hook, - Configs: map[string]interface{}{ - labelGithubConfig: gc, + return gwcommon.ProcessRoute(&RouteConfig{ + route: &gwcommon.Route{ + Logger: &ese.Log, + EventSource: eventSource, + Webhook: gc.Hook, + StartCh: make(chan struct{}), }, - Log: ese.Log, - EventSource: eventSource, - PostActivate: ese.PostActivate, - PostStop: PostStop, - RouteActiveHandler: RouteActiveHandler, - StartCh: make(chan struct{}), + clientset: ese.Clientset, + namespace: ese.Namespace, + ges: gc, }, helper, eventStream) } @@ -196,22 +199,24 @@ func parseValidateRequest(r *http.Request, secret []byte) ([]byte, error) { } // routeActiveHandler handles new route -func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *gwcommon.RouteConfig) { - var response string +func (rc *RouteConfig) RouteHandler(writer http.ResponseWriter, request *http.Request) { + r := rc.route + + logger := r.Logger.With(). + Str("event-source", r.EventSource.Name). + Str("endpoint", r.Webhook.Endpoint). + Str("port", r.Webhook.Port). + Logger() - logger := rc.Log.With().Str("event-source", rc.EventSource.Name).Str("endpoint", rc.Webhook.Endpoint). - Str("port", rc.Webhook.Port). - Str("http-method", request.Method).Logger() logger.Info().Msg("request received") - if !helper.ActiveEndpoints[rc.Webhook.Endpoint].Active { - response = fmt.Sprintf("the route: endpoint %s and method %s is deactived", rc.Webhook.Endpoint, rc.Webhook.Method) + if !helper.ActiveEndpoints[r.Webhook.Endpoint].Active { logger.Info().Msg("endpoint is not active") - common.SendErrorResponse(writer, response) + common.SendErrorResponse(writer, "") return } - hook := rc.Configs[labelWebhook].(*gh.Hook) + hook := rc.hook secret := "" if s, ok := hook.Config["secret"]; ok { secret = s.(string) @@ -219,12 +224,11 @@ func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *g body, err := parseValidateRequest(request, []byte(secret)) if err != nil { logger.Error().Err(err).Msg("request is not valid event notification") - common.SendErrorResponse(writer, fmt.Sprintf("invalid event notification")) + common.SendErrorResponse(writer, "") return } - helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh <- body - response = "request successfully processed" - logger.Info().Msg(response) - common.SendSuccessResponse(writer, response) + helper.ActiveEndpoints[r.Webhook.Endpoint].DataCh <- body + logger.Info().Msg("request successfully processed") + common.SendSuccessResponse(writer, "") } diff --git a/gateways/community/github/start_test.go b/gateways/community/github/start_test.go index 04751467c4..9206c8d5bf 100644 --- a/gateways/community/github/start_test.go +++ b/gateways/community/github/start_test.go @@ -23,7 +23,6 @@ import ( "net/http" "testing" - "github.com/argoproj/argo-events/common" gwcommon "github.com/argoproj/argo-events/gateways/common" "github.com/ghodss/yaml" "github.com/google/go-github/github" @@ -34,10 +33,10 @@ import ( ) var ( - ese = &GithubEventSourceExecutor{ - Clientset: fake.NewSimpleClientset(), - Namespace: "fake", - Log: common.GetLoggerContext(common.LoggerConf()).Logger(), + rc = &RouteConfig{ + route: gwcommon.GetFakeRoute(), + clientset: fake.NewSimpleClientset(), + namespace: "fake", } secretName = "githab-access" @@ -47,10 +46,11 @@ var ( func TestGetCredentials(t *testing.T) { convey.Convey("Given a kubernetes secret, get credentials", t, func() { - secret, err := ese.Clientset.CoreV1().Secrets(ese.Namespace).Create(&corev1.Secret{ + + secret, err := rc.clientset.CoreV1().Secrets(rc.namespace).Create(&corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: secretName, - Namespace: ese.Namespace, + Namespace: rc.namespace, }, Data: map[string][]byte{ LabelAccessKey: []byte(accessKey), @@ -61,7 +61,7 @@ func TestGetCredentials(t *testing.T) { ps, err := parseEventSource(es) convey.So(err, convey.ShouldBeNil) - creds, err := ese.getCredentials(ps.(*githubConfig).APIToken) + creds, err := rc.getCredentials(ps.(*githubEventSource).APIToken) convey.So(err, convey.ShouldBeNil) convey.So(creds, convey.ShouldNotBeNil) convey.So(creds.secret, convey.ShouldEqual, "YWNjZXNz") @@ -70,8 +70,8 @@ func TestGetCredentials(t *testing.T) { func TestRouteActiveHandler(t *testing.T) { convey.Convey("Given a route configuration", t, func() { - rc := gwcommon.GetFakeRouteConfig() - helper.ActiveEndpoints[rc.Webhook.Endpoint] = &gwcommon.Endpoint{ + r := rc.route + helper.ActiveEndpoints[r.Webhook.Endpoint] = &gwcommon.Endpoint{ DataCh: make(chan []byte), } @@ -79,26 +79,26 @@ func TestRouteActiveHandler(t *testing.T) { writer := &gwcommon.FakeHttpWriter{} ps, err := parseEventSource(es) convey.So(err, convey.ShouldBeNil) - pbytes, err := yaml.Marshal(ps.(*githubConfig)) + pbytes, err := yaml.Marshal(ps.(*githubEventSource)) convey.So(err, convey.ShouldBeNil) - RouteActiveHandler(writer, &http.Request{ + rc.RouteHandler(writer, &http.Request{ Body: ioutil.NopCloser(bytes.NewReader(pbytes)), - }, rc) + }) convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusBadRequest) convey.Convey("Active route should return success", func() { - helper.ActiveEndpoints[rc.Webhook.Endpoint].Active = true - rc.Configs[labelWebhook] = &github.Hook{ + helper.ActiveEndpoints[r.Webhook.Endpoint].Active = true + rc.hook = &github.Hook{ Config: make(map[string]interface{}), } - RouteActiveHandler(writer, &http.Request{ + rc.RouteHandler(writer, &http.Request{ Body: ioutil.NopCloser(bytes.NewReader(pbytes)), - }, rc) + }) convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusBadRequest) - rc.Configs[labelGithubConfig] = ps.(*githubConfig) - err = ese.PostActivate(rc) + rc.ges = ps.(*githubEventSource) + err = rc.PostStart() convey.So(err, convey.ShouldNotBeNil) }) }) diff --git a/gateways/community/github/validate.go b/gateways/community/github/validate.go index 722fd05498..4812e86498 100644 --- a/gateways/community/github/validate.go +++ b/gateways/community/github/validate.go @@ -26,10 +26,13 @@ func (ese *GithubEventSourceExecutor) ValidateEventSource(ctx context.Context, e } func validateGithub(config interface{}) error { - g := config.(*githubConfig) + g := config.(*githubEventSource) if g == nil { return gwcommon.ErrNilEventSource } + if g.Id == 0 { + return fmt.Errorf("hook id must be not be zero") + } if g.Repository == "" { return fmt.Errorf("repository cannot be empty") } diff --git a/gateways/community/github/validate_test.go b/gateways/community/github/validate_test.go index ac7ed8dc9d..15ba48bf25 100644 --- a/gateways/community/github/validate_test.go +++ b/gateways/community/github/validate_test.go @@ -11,6 +11,7 @@ var ( configKey = "testConfig" configId = "1234" goodConfig = ` +id: 1234 hook: endpoint: "/push" port: "12000" diff --git a/gateways/community/gitlab/config.go b/gateways/community/gitlab/config.go index f74aeedd91..0b8b066272 100644 --- a/gateways/community/gitlab/config.go +++ b/gateways/community/gitlab/config.go @@ -18,8 +18,10 @@ package gitlab import ( "github.com/argoproj/argo-events/gateways/common" + gwcommon "github.com/argoproj/argo-events/gateways/common" "github.com/ghodss/yaml" "github.com/rs/zerolog" + "github.com/xanzy/go-gitlab" "k8s.io/client-go/kubernetes" ) @@ -32,8 +34,20 @@ type GitlabEventSourceExecutor struct { Namespace string } -// glab contains information to setup a gitlab project integration -type glab struct { +// RouteConfig contains the configuration information for a route +type RouteConfig struct { + route *gwcommon.Route + clientset kubernetes.Interface + client *gitlab.Client + hook *gitlab.ProjectHook + namespace string + ges *gitlabEventSource +} + +// gitlabEventSource contains information to setup a gitlab project integration +type gitlabEventSource struct { + // Webhook Id + Id int `json:"id"` // Webhook Hook *common.Webhook `json:"hook"` // ProjectId is the id of project for which integration needs to setup @@ -65,7 +79,7 @@ type cred struct { // parseEventSource parses an event sources of gateway func parseEventSource(config string) (interface{}, error) { - var g *glab + var g *gitlabEventSource err := yaml.Unmarshal([]byte(config), &g) if err != nil { return nil, err diff --git a/gateways/community/gitlab/config_test.go b/gateways/community/gitlab/config_test.go index 6f88cdef76..8496bda0bd 100644 --- a/gateways/community/gitlab/config_test.go +++ b/gateways/community/gitlab/config_test.go @@ -23,6 +23,7 @@ import ( ) var es = ` +id: 12 hook: endpoint: "/push" port: "12000" @@ -41,7 +42,7 @@ func TestParseConfig(t *testing.T) { ps, err := parseEventSource(es) convey.So(err, convey.ShouldBeNil) convey.So(ps, convey.ShouldNotBeNil) - _, ok := ps.(*glab) + _, ok := ps.(*gitlabEventSource) convey.So(ok, convey.ShouldEqual, true) }) } diff --git a/gateways/community/gitlab/start.go b/gateways/community/gitlab/start.go index f771806620..66423a9718 100644 --- a/gateways/community/gitlab/start.go +++ b/gateways/community/gitlab/start.go @@ -29,12 +29,6 @@ import ( "github.com/xanzy/go-gitlab" ) -const ( - labelGitlabConfig = "config" - labelGitlabClient = "client" - labelWebhook = "hook" -) - var ( helper = gwcommon.NewWebhookHelper() ) @@ -44,8 +38,8 @@ func init() { } // getCredentials for gitlab -func (ese *GitlabEventSourceExecutor) getCredentials(gs *GitlabSecret) (*cred, error) { - token, err := store.GetSecrets(ese.Clientset, ese.Namespace, gs.Name, gs.Key) +func (rc *RouteConfig) getCredentials(gs *GitlabSecret) (*cred, error) { + token, err := store.GetSecrets(rc.clientset, rc.namespace, gs.Name, gs.Key) if err != nil { return nil, err } @@ -54,88 +48,85 @@ func (ese *GitlabEventSourceExecutor) getCredentials(gs *GitlabSecret) (*cred, e }, nil } -func (ese *GitlabEventSourceExecutor) PostActivate(rc *gwcommon.RouteConfig) error { - gl := rc.Configs[labelGitlabConfig].(*glab) +func (rc *RouteConfig) GetRoute() *gwcommon.Route { + return rc.route +} - c, err := ese.getCredentials(gl.AccessToken) +func (rc *RouteConfig) PostStart() error { + c, err := rc.getCredentials(rc.ges.AccessToken) if err != nil { return fmt.Errorf("failed to get gitlab credentials. err: %+v", err) } - client := gitlab.NewClient(nil, c.token) - if err = client.SetBaseURL(gl.GitlabBaseURL); err != nil { + rc.client = gitlab.NewClient(nil, c.token) + if err = rc.client.SetBaseURL(rc.ges.GitlabBaseURL); err != nil { return fmt.Errorf("failed to set gitlab base url, err: %+v", err) } - rc.Configs[labelGitlabClient] = client - - formattedUrl := gwcommon.GenerateFormattedURL(gl.Hook) + formattedUrl := gwcommon.GenerateFormattedURL(rc.ges.Hook) opt := &gitlab.AddProjectHookOptions{ - URL: &formattedUrl, - Token: &c.token, - EnableSSLVerification: &gl.EnableSSLVerification, + URL: &formattedUrl, + Token: &c.token, + EnableSSLVerification: &rc.ges.EnableSSLVerification, } - elem := reflect.ValueOf(opt).Elem().FieldByName(string(gl.Event)) + elem := reflect.ValueOf(opt).Elem().FieldByName(string(rc.ges.Event)) if ok := elem.IsValid(); !ok { - return fmt.Errorf("unknown event %s", gl.Event) + return fmt.Errorf("unknown event %s", rc.ges.Event) } iev := reflect.New(elem.Type().Elem()) reflect.Indirect(iev).SetBool(true) elem.Set(iev) - hook, _, err := client.Projects.AddProjectHook(gl.ProjectId, opt) + hook, _, err := rc.client.Projects.GetProjectHook(rc.ges.ProjectId, rc.ges.Id) if err != nil { - return fmt.Errorf("failed to add project hook. err: %+v", err) + hook, _, err = rc.client.Projects.AddProjectHook(rc.ges.ProjectId, opt) + if err != nil { + return fmt.Errorf("failed to add project hook. err: %+v", err) + } } - rc.Configs[labelWebhook] = hook - rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Interface("hook-id", hook.ID).Msg("gitlab hook created") + rc.hook = hook + rc.route.Logger.Info().Str("event-source-name", rc.route.EventSource.Name).Msg("gitlab hook created") return nil } -func PostStop(rc *gwcommon.RouteConfig) error { - gl := rc.Configs[labelGitlabConfig].(*glab) - client := rc.Configs[labelGitlabClient].(*gitlab.Client) - hook := rc.Configs[labelWebhook].(*gitlab.ProjectHook) - - if _, err := client.Projects.DeleteProjectHook(gl.ProjectId, hook.ID); err != nil { - rc.Log.Error().Err(err).Str("event-source-name", rc.EventSource.Name).Interface("hook-id", hook.ID).Msg("failed to delete gitlab hook") - return err +func (rc *RouteConfig) PostStop() error { + if _, err := rc.client.Projects.DeleteProjectHook(rc.ges.ProjectId, rc.hook.ID); err != nil { + return fmt.Errorf("failed to delete hook. err: %+v", err) } - rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Interface("hook-id", hook.ID).Msg("gitlab hook deleted") + rc.route.Logger.Info().Str("event-source-name", rc.route.EventSource.Name).Msg("gitlab hook deleted") return nil } // routeActiveHandler handles new route -func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *gwcommon.RouteConfig) { - var response string +func (rc *RouteConfig) RouteHandler(writer http.ResponseWriter, request *http.Request) { + logger := rc.route.Logger.With(). + Str("event-source", rc.route.EventSource.Name). + Str("endpoint", rc.route.Webhook.Endpoint). + Str("port", rc.route.Webhook.Port). + Logger() - logger := rc.Log.With().Str("event-source", rc.EventSource.Name).Str("endpoint", rc.Webhook.Endpoint). - Str("port", rc.Webhook.Port). - Str("http-method", request.Method).Logger() logger.Info().Msg("request received") - if !helper.ActiveEndpoints[rc.Webhook.Endpoint].Active { - response = fmt.Sprintf("the route: endpoint %s and method %s is deactived", rc.Webhook.Endpoint, rc.Webhook.Method) + if !helper.ActiveEndpoints[rc.route.Webhook.Endpoint].Active { logger.Info().Msg("endpoint is not active") - common.SendErrorResponse(writer, response) + common.SendErrorResponse(writer, "") return } body, err := ioutil.ReadAll(request.Body) if err != nil { logger.Error().Err(err).Msg("failed to parse request body") - common.SendErrorResponse(writer, fmt.Sprintf("failed to parse request. err: %+v", err)) + common.SendErrorResponse(writer, "") return } - helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh <- body - response = "request successfully processed" - logger.Info().Msg(response) - common.SendSuccessResponse(writer, response) + helper.ActiveEndpoints[rc.route.Webhook.Endpoint].DataCh <- body + logger.Info().Msg("request successfully processed") + common.SendSuccessResponse(writer, "") } // StartEventSource starts an event source @@ -148,18 +139,17 @@ func (ese *GitlabEventSourceExecutor) StartEventSource(eventSource *gateways.Eve ese.Log.Error().Err(err).Str("event-source-name", eventSource.Name).Msg("failed to parse event source") return err } - gl := config.(*glab) - - return gwcommon.ProcessRoute(&gwcommon.RouteConfig{ - Webhook: gl.Hook, - Configs: map[string]interface{}{ - labelGitlabConfig: gl, + gl := config.(*gitlabEventSource) + + return gwcommon.ProcessRoute(&RouteConfig{ + route: &gwcommon.Route{ + EventSource: eventSource, + Logger: &ese.Log, + Webhook: gl.Hook, + StartCh: make(chan struct{}), }, - Log: ese.Log, - EventSource: eventSource, - PostActivate: ese.PostActivate, - PostStop: PostStop, - RouteActiveHandler: RouteActiveHandler, - StartCh: make(chan struct{}), + namespace: ese.Namespace, + clientset: ese.Clientset, + ges: gl, }, helper, eventStream) } diff --git a/gateways/community/gitlab/start_test.go b/gateways/community/gitlab/start_test.go index b75716ffdf..4c27de9979 100644 --- a/gateways/community/gitlab/start_test.go +++ b/gateways/community/gitlab/start_test.go @@ -18,14 +18,13 @@ package gitlab import ( "bytes" + "github.com/xanzy/go-gitlab" "io/ioutil" "net/http" "testing" - "github.com/argoproj/argo-events/common" gwcommon "github.com/argoproj/argo-events/gateways/common" "github.com/ghodss/yaml" - "github.com/google/go-github/github" "github.com/smartystreets/goconvey/convey" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,10 +32,10 @@ import ( ) var ( - ese = &GitlabEventSourceExecutor{ - Clientset: fake.NewSimpleClientset(), - Namespace: "fake", - Log: common.GetLoggerContext(common.LoggerConf()).Logger(), + rc = &RouteConfig{ + route: gwcommon.GetFakeRoute(), + clientset: fake.NewSimpleClientset(), + namespace: "fake", } secretName = "gitlab-access" @@ -46,10 +45,10 @@ var ( func TestGetCredentials(t *testing.T) { convey.Convey("Given a kubernetes secret, get credentials", t, func() { - secret, err := ese.Clientset.CoreV1().Secrets(ese.Namespace).Create(&corev1.Secret{ + secret, err := rc.clientset.CoreV1().Secrets(rc.namespace).Create(&corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: secretName, - Namespace: ese.Namespace, + Namespace: rc.namespace, }, Data: map[string][]byte{ LabelAccessKey: []byte(accessKey), @@ -60,7 +59,7 @@ func TestGetCredentials(t *testing.T) { ps, err := parseEventSource(es) convey.So(err, convey.ShouldBeNil) - creds, err := ese.getCredentials(ps.(*glab).AccessToken) + creds, err := rc.getCredentials(ps.(*gitlabEventSource).AccessToken) convey.So(err, convey.ShouldBeNil) convey.So(creds, convey.ShouldNotBeNil) convey.So(creds.token, convey.ShouldEqual, "YWNjZXNz") @@ -69,8 +68,7 @@ func TestGetCredentials(t *testing.T) { func TestRouteActiveHandler(t *testing.T) { convey.Convey("Given a route configuration", t, func() { - rc := gwcommon.GetFakeRouteConfig() - helper.ActiveEndpoints[rc.Webhook.Endpoint] = &gwcommon.Endpoint{ + helper.ActiveEndpoints[rc.route.Webhook.Endpoint] = &gwcommon.Endpoint{ DataCh: make(chan []byte), } @@ -78,33 +76,34 @@ func TestRouteActiveHandler(t *testing.T) { writer := &gwcommon.FakeHttpWriter{} ps, err := parseEventSource(es) convey.So(err, convey.ShouldBeNil) - pbytes, err := yaml.Marshal(ps.(*glab)) + pbytes, err := yaml.Marshal(ps.(*gitlabEventSource)) convey.So(err, convey.ShouldBeNil) - RouteActiveHandler(writer, &http.Request{ + rc.RouteHandler(writer, &http.Request{ Body: ioutil.NopCloser(bytes.NewReader(pbytes)), - }, rc) + }) convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusBadRequest) convey.Convey("Active route should return success", func() { - helper.ActiveEndpoints[rc.Webhook.Endpoint].Active = true - rc.Configs[labelWebhook] = &github.Hook{ - Config: make(map[string]interface{}), + helper.ActiveEndpoints[rc.route.Webhook.Endpoint].Active = true + rc.hook = &gitlab.ProjectHook{ + URL: "fake", + PushEvents: true, } dataCh := make(chan []byte) go func() { - resp := <-helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh + resp := <-helper.ActiveEndpoints[rc.route.Webhook.Endpoint].DataCh dataCh <- resp }() - RouteActiveHandler(writer, &http.Request{ + rc.RouteHandler(writer, &http.Request{ Body: ioutil.NopCloser(bytes.NewReader(pbytes)), - }, rc) + }) data := <-dataCh convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusOK) convey.So(string(data), convey.ShouldEqual, string(pbytes)) - rc.Configs[labelGitlabConfig] = ps.(*glab) - err = ese.PostActivate(rc) + rc.ges = ps.(*gitlabEventSource) + err = rc.PostStart() convey.So(err, convey.ShouldNotBeNil) }) }) diff --git a/gateways/community/gitlab/validate.go b/gateways/community/gitlab/validate.go index 48193aa4e3..8e5dd0e556 100644 --- a/gateways/community/gitlab/validate.go +++ b/gateways/community/gitlab/validate.go @@ -27,10 +27,13 @@ func (ese *GitlabEventSourceExecutor) ValidateEventSource(ctx context.Context, e } func validateGitlab(config interface{}) error { - g := config.(*glab) + g := config.(*gitlabEventSource) if g == nil { return gwcommon.ErrNilEventSource } + if g.Id == 0 { + return fmt.Errorf("hook id must be not be zero") + } if g.ProjectId == "" { return fmt.Errorf("project id can't be empty") } diff --git a/gateways/community/gitlab/validate_test.go b/gateways/community/gitlab/validate_test.go index e8cd87ef29..fa3e331ec1 100644 --- a/gateways/community/gitlab/validate_test.go +++ b/gateways/community/gitlab/validate_test.go @@ -27,6 +27,7 @@ var ( configKey = "testConfig" configId = "1234" validConfig = ` +id: 1234 hook: endpoint: "/push" port: "12000" diff --git a/gateways/community/slack/config.go b/gateways/community/slack/config.go index 7467f21a40..6087f0b86e 100644 --- a/gateways/community/slack/config.go +++ b/gateways/community/slack/config.go @@ -18,6 +18,7 @@ package slack import ( "github.com/argoproj/argo-events/gateways/common" + gwcommon "github.com/argoproj/argo-events/gateways/common" "github.com/ghodss/yaml" "github.com/rs/zerolog" corev1 "k8s.io/api/core/v1" @@ -33,8 +34,15 @@ type SlackEventSourceExecutor struct { Log zerolog.Logger } -// slackConfig -type slackConfig struct { +type RouteConfig struct { + route *gwcommon.Route + ses *slackEventSource + token string + clientset kubernetes.Interface + namespace string +} + +type slackEventSource struct { // Token for URL verification handshake Token *corev1.SecretKeySelector `json:"token"` // Webhook @@ -42,7 +50,7 @@ type slackConfig struct { } func parseEventSource(es string) (interface{}, error) { - var n *slackConfig + var n *slackEventSource err := yaml.Unmarshal([]byte(es), &n) if err != nil { return nil, err diff --git a/gateways/community/slack/config_test.go b/gateways/community/slack/config_test.go index d61fe38be9..b07e62fdd7 100644 --- a/gateways/community/slack/config_test.go +++ b/gateways/community/slack/config_test.go @@ -36,7 +36,7 @@ func TestParseConfig(t *testing.T) { ps, err := parseEventSource(es) convey.So(err, convey.ShouldBeNil) convey.So(ps, convey.ShouldNotBeNil) - _, ok := ps.(*slackConfig) + _, ok := ps.(*slackEventSource) convey.So(ok, convey.ShouldEqual, true) }) } diff --git a/gateways/community/slack/start.go b/gateways/community/slack/start.go index a1048b66c4..2fe9479f9d 100644 --- a/gateways/community/slack/start.go +++ b/gateways/community/slack/start.go @@ -19,7 +19,6 @@ package slack import ( "bytes" "encoding/json" - "fmt" "net/http" "github.com/argoproj/argo-events/common" @@ -29,10 +28,6 @@ import ( "github.com/nlopes/slack/slackevents" ) -const ( - labelSlackToken = "slackToken" -) - var ( helper = gwcommon.NewWebhookHelper() ) @@ -41,36 +36,41 @@ func init() { go gwcommon.InitRouteChannels(helper) } -// RouteActiveHandler handles new route -func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *gwcommon.RouteConfig) { - var response string +func (rc *RouteConfig) GetRoute() *gwcommon.Route { + return rc.route +} + +// RouteHandler handles new route +func (rc *RouteConfig) RouteHandler(writer http.ResponseWriter, request *http.Request) { + r := rc.route + + logger := r.Logger.With(). + Str("event-source", r.EventSource.Name). + Str("endpoint", r.Webhook.Endpoint). + Str("port", r.Webhook.Port). + Str("http-method", request.Method). + Logger() - logger := rc.Log.With().Str("event-source", rc.EventSource.Name).Str("endpoint", rc.Webhook.Endpoint). - Str("port", rc.Webhook.Port). - Str("http-method", request.Method).Logger() logger.Info().Msg("request received") - if !helper.ActiveEndpoints[rc.Webhook.Endpoint].Active { - response = fmt.Sprintf("the route: endpoint %s and method %s is deactived", rc.Webhook.Endpoint, rc.Webhook.Method) - logger.Info().Msg("endpoint is not active") - common.SendErrorResponse(writer, response) + if !helper.ActiveEndpoints[r.Webhook.Endpoint].Active { + logger.Warn().Msg("endpoint is not active") + common.SendErrorResponse(writer, "") return } var buf bytes.Buffer if _, err := buf.ReadFrom(request.Body); err != nil { logger.Error().Err(err).Msg("failed to parse request body") - common.SendInternalErrorResponse(writer, fmt.Sprintf("failed to parse request. err: %+v", err)) + common.SendInternalErrorResponse(writer, "") return } body := buf.String() - token := rc.Configs[labelSlackToken] - eventsAPIEvent, e := slackevents.ParseEvent(json.RawMessage(body), slackevents.OptionVerifyToken(&slackevents.TokenComparator{VerificationToken: token.(string)})) + eventsAPIEvent, e := slackevents.ParseEvent(json.RawMessage(body), slackevents.OptionVerifyToken(&slackevents.TokenComparator{VerificationToken: rc.token})) if e != nil { - response = "failed to extract event" - logger.Error().Msg(response) - common.SendInternalErrorResponse(writer, "failed to extract event") + logger.Error().Msg("failed to extract event") + common.SendInternalErrorResponse(writer, "") return } @@ -78,29 +78,37 @@ func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *g var r *slackevents.ChallengeResponse err := json.Unmarshal([]byte(body), &r) if err != nil { - response = "failed to verify the challenge" - logger.Error().Msg(response) - common.SendInternalErrorResponse(writer, response) + logger.Error().Msg("failed to verify the challenge") + common.SendInternalErrorResponse(writer, "") return } writer.Header().Set("Content-Type", "text") - writer.Write([]byte(r.Challenge)) + if _, err := writer.Write([]byte(r.Challenge)); err != nil { + logger.Error().Err(err).Msg("failed to write the response for url verification") + // don't return, we want to keep this running to give user chance to retry + } } if eventsAPIEvent.Type == slackevents.CallbackEvent { data, err := json.Marshal(eventsAPIEvent.InnerEvent.Data) if err != nil { - response = "failed to marshal event data" - logger.Error().Err(err).Msg(response) - common.SendInternalErrorResponse(writer, response) + logger.Error().Err(err).Msg("failed to marshal event data") + common.SendInternalErrorResponse(writer, "") return } - helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh <- data + helper.ActiveEndpoints[rc.route.Webhook.Endpoint].DataCh <- data } - response = "request successfully processed" - logger.Info().Msg(response) - common.SendSuccessResponse(writer, response) + logger.Info().Msg("request successfully processed") + common.SendSuccessResponse(writer, "") +} + +func (rc *RouteConfig) PostStart() error { + return nil +} + +func (rc *RouteConfig) PostStop() error { + return nil } // StartEventSource starts a event source @@ -115,24 +123,25 @@ func (ese *SlackEventSourceExecutor) StartEventSource(eventSource *gateways.Even logger.Error().Err(err).Msg("failed to parse event source") return err } - sc := config.(*slackConfig) - token, err := store.GetSecrets(ese.Clientset, ese.Namespace, sc.Token.Name, sc.Token.Key) + ses := config.(*slackEventSource) + + token, err := store.GetSecrets(ese.Clientset, ese.Namespace, ses.Token.Name, ses.Token.Key) if err != nil { logger.Error().Err(err).Msg("failed to retrieve token") return err } - return gwcommon.ProcessRoute(&gwcommon.RouteConfig{ - Webhook: sc.Hook, - Configs: map[string]interface{}{ - labelSlackToken: token, + return gwcommon.ProcessRoute(&RouteConfig{ + route: &gwcommon.Route{ + Logger: &ese.Log, + StartCh: make(chan struct{}), + Webhook: ses.Hook, + EventSource: eventSource, }, - Log: ese.Log, - EventSource: eventSource, - PostActivate: gwcommon.DefaultPostActivate, - PostStop: gwcommon.DefaultPostStop, - RouteActiveHandler: RouteActiveHandler, - StartCh: make(chan struct{}), + token: token, + clientset: ese.Clientset, + namespace: ese.Namespace, + ses: ses, }, helper, eventStream) } diff --git a/gateways/community/slack/start_test.go b/gateways/community/slack/start_test.go index 1443654074..158cbcf11c 100644 --- a/gateways/community/slack/start_test.go +++ b/gateways/community/slack/start_test.go @@ -19,32 +19,37 @@ package slack import ( "bytes" "encoding/json" - "github.com/ghodss/yaml" - "github.com/nlopes/slack/slackevents" "io/ioutil" "net/http" "testing" gwcommon "github.com/argoproj/argo-events/gateways/common" + "github.com/ghodss/yaml" + "github.com/nlopes/slack/slackevents" "github.com/smartystreets/goconvey/convey" + "k8s.io/client-go/kubernetes/fake" ) func TestRouteActiveHandler(t *testing.T) { convey.Convey("Given a route configuration", t, func() { - rc := gwcommon.GetFakeRouteConfig() + rc := &RouteConfig{ + route: gwcommon.GetFakeRoute(), + clientset: fake.NewSimpleClientset(), + namespace: "fake", + } - helper.ActiveEndpoints[rc.Webhook.Endpoint] = &gwcommon.Endpoint{ + helper.ActiveEndpoints[rc.route.Webhook.Endpoint] = &gwcommon.Endpoint{ DataCh: make(chan []byte), } convey.Convey("Inactive route should return 404", func() { writer := &gwcommon.FakeHttpWriter{} - RouteActiveHandler(writer, &http.Request{}, rc) + rc.RouteHandler(writer, &http.Request{}) convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusBadRequest) }) - rc.Configs[labelSlackToken] = "Jhj5dZrVaK7ZwHHjRyZWjbDl" - helper.ActiveEndpoints[rc.Webhook.Endpoint].Active = true + rc.token = "Jhj5dZrVaK7ZwHHjRyZWjbDl" + helper.ActiveEndpoints[rc.route.Webhook.Endpoint].Active = true convey.Convey("Test url verification request", func() { writer := &gwcommon.FakeHttpWriter{} @@ -56,9 +61,9 @@ func TestRouteActiveHandler(t *testing.T) { payload, err := yaml.Marshal(urlVer) convey.So(err, convey.ShouldBeNil) convey.So(payload, convey.ShouldNotBeNil) - RouteActiveHandler(writer, &http.Request{ + rc.RouteHandler(writer, &http.Request{ Body: ioutil.NopCloser(bytes.NewReader(payload)), - }, rc) + }) convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusInternalServerError) }) @@ -90,12 +95,12 @@ func TestRouteActiveHandler(t *testing.T) { convey.So(err, convey.ShouldBeNil) go func() { - <-helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh + <-helper.ActiveEndpoints[rc.route.Webhook.Endpoint].DataCh }() - RouteActiveHandler(writer, &http.Request{ + rc.RouteHandler(writer, &http.Request{ Body: ioutil.NopCloser(bytes.NewBuffer(payload)), - }, rc) + }) convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusInternalServerError) }) diff --git a/gateways/community/slack/validate.go b/gateways/community/slack/validate.go index eceac34931..5c54cee246 100644 --- a/gateways/community/slack/validate.go +++ b/gateways/community/slack/validate.go @@ -30,7 +30,7 @@ func (ese *SlackEventSourceExecutor) ValidateEventSource(ctx context.Context, es } func validateSlack(config interface{}) error { - sc := config.(*slackConfig) + sc := config.(*slackEventSource) if sc == nil { return gwcommon.ErrNilEventSource } diff --git a/gateways/community/storagegrid/config.go b/gateways/community/storagegrid/config.go index 1fa565cce8..52ad9f5942 100644 --- a/gateways/community/storagegrid/config.go +++ b/gateways/community/storagegrid/config.go @@ -21,7 +21,7 @@ import ( "time" "github.com/argoproj/argo-events/gateways/common" - + gwcommon "github.com/argoproj/argo-events/gateways/common" "github.com/ghodss/yaml" "github.com/rs/zerolog" ) @@ -31,8 +31,13 @@ type StorageGridEventSourceExecutor struct { Log zerolog.Logger } -// storageGrid contains configuration for storage grid sns -type storageGrid struct { +type RouteConfig struct { + route *gwcommon.Route + sges *storageGridEventSource +} + +// storageGridEventSource contains configuration for storage grid sns +type storageGridEventSource struct { // Webhook Hook *common.Webhook `json:"hook"` @@ -98,7 +103,7 @@ type storageGridNotification struct { } func parseEventSource(eventSource string) (interface{}, error) { - var s *storageGrid + var s *storageGridEventSource err := yaml.Unmarshal([]byte(eventSource), &s) if err != nil { return nil, err diff --git a/gateways/community/storagegrid/config_test.go b/gateways/community/storagegrid/config_test.go index e8c88b27e2..dac18610eb 100644 --- a/gateways/community/storagegrid/config_test.go +++ b/gateways/community/storagegrid/config_test.go @@ -38,7 +38,7 @@ func TestParseConfig(t *testing.T) { ps, err := parseEventSource(es) convey.So(err, convey.ShouldBeNil) convey.So(ps, convey.ShouldNotBeNil) - _, ok := ps.(*storageGrid) + _, ok := ps.(*storageGridEventSource) convey.So(ok, convey.ShouldEqual, true) }) } diff --git a/gateways/community/storagegrid/start.go b/gateways/community/storagegrid/start.go index a84b49aa2b..cd169f644b 100644 --- a/gateways/community/storagegrid/start.go +++ b/gateways/community/storagegrid/start.go @@ -24,17 +24,12 @@ import ( "strings" "github.com/argoproj/argo-events/common" - "github.com/argoproj/argo-events/gateways" gwcommon "github.com/argoproj/argo-events/gateways/common" "github.com/joncalhoun/qson" "github.com/satori/go.uuid" ) -const ( - LabelStorageGridConfig = "storageGridConfig" -) - var ( helper = gwcommon.NewWebhookHelper() @@ -59,7 +54,7 @@ func generateUUID() uuid.UUID { } // filterEvent filters notification based on event filter in a gateway configuration -func filterEvent(notification *storageGridNotification, sg *storageGrid) bool { +func filterEvent(notification *storageGridNotification, sg *storageGridEventSource) bool { if sg.Events == nil { return true } @@ -72,7 +67,7 @@ func filterEvent(notification *storageGridNotification, sg *storageGrid) bool { } // filterName filters object key based on configured prefix and/or suffix -func filterName(notification *storageGridNotification, sg *storageGrid) bool { +func filterName(notification *storageGridNotification, sg *storageGridEventSource) bool { if sg.Filter == nil { return true } @@ -88,6 +83,10 @@ func filterName(notification *storageGridNotification, sg *storageGrid) bool { return true } +func (rc *RouteConfig) GetRoute() *gwcommon.Route { + return rc.route +} + // StartConfig runs a configuration func (ese *StorageGridEventSourceExecutor) StartEventSource(eventSource *gateways.EventSource, eventStream gateways.Eventing_StartEventSourceServer) error { defer gateways.Recover(eventSource.Name) @@ -98,37 +97,46 @@ func (ese *StorageGridEventSourceExecutor) StartEventSource(eventSource *gateway ese.Log.Error().Err(err).Str("event-source-name", eventSource.Name).Msg("failed to parse event source") return err } - sg := config.(*storageGrid) - - return gwcommon.ProcessRoute(&gwcommon.RouteConfig{ - Configs: map[string]interface{}{ - LabelStorageGridConfig: sg, + sges := config.(*storageGridEventSource) + + return gwcommon.ProcessRoute(&RouteConfig{ + route: &gwcommon.Route{ + Webhook: sges.Hook, + EventSource: eventSource, + Logger: &ese.Log, + StartCh: make(chan struct{}), }, - Webhook: sg.Hook, - Log: ese.Log, - EventSource: eventSource, - PostActivate: gwcommon.DefaultPostActivate, - PostStop: gwcommon.DefaultPostStop, - RouteActiveHandler: RouteActiveHandler, - StartCh: make(chan struct{}), + sges: sges, }, helper, eventStream) } -// routeActiveHandler handles new route -func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *gwcommon.RouteConfig) { - logger := rc.Log.With().Str("event-source-name", rc.EventSource.Name).Str("endpoint", rc.Webhook.Endpoint).Str("port", rc.Webhook.Port).Str("method", http.MethodHead).Logger() +func (rc *RouteConfig) PostStart() error { + return nil +} + +func (rc *RouteConfig) PostStop() error { + return nil +} - if !helper.ActiveEndpoints[rc.Webhook.Endpoint].Active { +// RouteHandler handles new route +func (rc *RouteConfig) RouteHandler(writer http.ResponseWriter, request *http.Request) { + logger := rc.route.Logger.With(). + Str("event-source-name", rc.route.EventSource.Name). + Str("endpoint", rc.route.Webhook.Endpoint). + Str("port", rc.route.Webhook.Port). + Logger() + + if !helper.ActiveEndpoints[rc.route.Webhook.Endpoint].Active { logger.Warn().Msg("inactive route") - common.SendErrorResponse(writer, "route is not valid") + common.SendErrorResponse(writer, "") return } - rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Str("method", http.MethodHead).Msg("received a request") + logger.Info().Str("event-source-name", rc.route.EventSource.Name).Str("method", http.MethodHead).Msg("received a request") body, err := ioutil.ReadAll(request.Body) if err != nil { logger.Error().Err(err).Msg("failed to parse request body") - common.SendErrorResponse(writer, "failed to parse request body") + common.SendErrorResponse(writer, "") return } @@ -159,11 +167,9 @@ func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *g return } - storageGridConfig := rc.Configs[LabelStorageGridConfig].(*storageGrid) - - if filterEvent(notification, storageGridConfig) && filterName(notification, storageGridConfig) { + if filterEvent(notification, rc.sges) && filterName(notification, rc.sges) { logger.Info().Msg("new event received, dispatching to gateway client") - helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh <- b + helper.ActiveEndpoints[rc.route.Webhook.Endpoint].DataCh <- b return } diff --git a/gateways/community/storagegrid/start_test.go b/gateways/community/storagegrid/start_test.go index 9a6cec6a43..e0d7e040e5 100644 --- a/gateways/community/storagegrid/start_test.go +++ b/gateways/community/storagegrid/start_test.go @@ -27,7 +27,8 @@ import ( "testing" ) -var notification = ` +var ( + notification = ` { "Action": "Publish", "Message": { @@ -70,11 +71,14 @@ var notification = ` "Version": "2010-03-31" } ` + rc = &RouteConfig{ + route: gwcommon.GetFakeRoute(), + } +) func TestRouteActiveHandler(t *testing.T) { convey.Convey("Given a route configuration", t, func() { - rc := gwcommon.GetFakeRouteConfig() - helper.ActiveEndpoints[rc.Webhook.Endpoint] = &gwcommon.Endpoint{ + helper.ActiveEndpoints[rc.route.Webhook.Endpoint] = &gwcommon.Endpoint{ DataCh: make(chan []byte), } @@ -83,29 +87,28 @@ func TestRouteActiveHandler(t *testing.T) { writer := &gwcommon.FakeHttpWriter{} convey.Convey("Inactive route should return error", func() { - pbytes, err := yaml.Marshal(ps.(*storageGrid)) + pbytes, err := yaml.Marshal(ps.(*storageGridEventSource)) convey.So(err, convey.ShouldBeNil) - RouteActiveHandler(writer, &http.Request{ + rc.RouteHandler(writer, &http.Request{ Body: ioutil.NopCloser(bytes.NewReader(pbytes)), - }, rc) + }) convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusBadRequest) }) convey.Convey("Active route should return success", func() { - helper.ActiveEndpoints[rc.Webhook.Endpoint].Active = true - rc.Configs[LabelStorageGridConfig] = ps.(*storageGrid) + helper.ActiveEndpoints[rc.route.Webhook.Endpoint].Active = true + rc.sges = ps.(*storageGridEventSource) dataCh := make(chan []byte) go func() { - resp := <-helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh + resp := <-helper.ActiveEndpoints[rc.route.Webhook.Endpoint].DataCh dataCh <- resp }() - RouteActiveHandler(writer, &http.Request{ + rc.RouteHandler(writer, &http.Request{ Body: ioutil.NopCloser(bytes.NewReader([]byte(notification))), - }, rc) + }) convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusOK) }) - }) } @@ -126,7 +129,7 @@ func TestFilterEvent(t *testing.T) { convey.So(err, convey.ShouldBeNil) convey.So(sg, convey.ShouldNotBeNil) - ok := filterEvent(sg, ps.(*storageGrid)) + ok := filterEvent(sg, ps.(*storageGridEventSource)) convey.So(ok, convey.ShouldEqual, true) }) } @@ -140,7 +143,7 @@ func TestFilterName(t *testing.T) { convey.So(err, convey.ShouldBeNil) convey.So(sg, convey.ShouldNotBeNil) - ok := filterName(sg, ps.(*storageGrid)) + ok := filterName(sg, ps.(*storageGridEventSource)) convey.So(ok, convey.ShouldEqual, true) }) } diff --git a/gateways/community/storagegrid/validate.go b/gateways/community/storagegrid/validate.go index 2be908877c..e23bd84908 100644 --- a/gateways/community/storagegrid/validate.go +++ b/gateways/community/storagegrid/validate.go @@ -28,7 +28,7 @@ func (ese *StorageGridEventSourceExecutor) ValidateEventSource(ctx context.Conte } func validateStorageGrid(config interface{}) error { - sg := config.(*storageGrid) + sg := config.(*storageGridEventSource) if sg == nil { return gwcommon.ErrNilEventSource } diff --git a/gateways/core/calendar/README.md b/gateways/core/calendar/README.md index 3811b1fc76..ef5e2a7fad 100644 --- a/gateways/core/calendar/README.md +++ b/gateways/core/calendar/README.md @@ -14,7 +14,7 @@ An entry in the gateway configmap corresponds to [this](https://github.com/argop The [gateway configmap](../../../examples/gateways/calendar-gateway-configmap.yaml) contains three event source configurations, -The `interval` configuration will basically generate an event after every 55 seconds. +The `interval` configuration will basically generate an event after every 10 seconds. The `schedule` configuration will generate an event every 30 minutes past an hour diff --git a/gateways/core/webhook/config.go b/gateways/core/webhook/config.go index 2290331502..08729beab6 100644 --- a/gateways/core/webhook/config.go +++ b/gateways/core/webhook/config.go @@ -18,6 +18,7 @@ package webhook import ( "github.com/argoproj/argo-events/gateways/common" + gwcommon "github.com/argoproj/argo-events/gateways/common" "github.com/ghodss/yaml" "github.com/rs/zerolog" ) @@ -27,6 +28,10 @@ type WebhookEventSourceExecutor struct { Log zerolog.Logger } +type RouteConfig struct { + Route *gwcommon.Route +} + func parseEventSource(es string) (interface{}, error) { var n *common.Webhook err := yaml.Unmarshal([]byte(es), &n) diff --git a/gateways/core/webhook/start.go b/gateways/core/webhook/start.go index a7d3e88d9e..03f337045c 100644 --- a/gateways/core/webhook/start.go +++ b/gateways/core/webhook/start.go @@ -34,25 +34,31 @@ func init() { go gwcommon.InitRouteChannels(helper) } -// routeActiveHandler handles new route -func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *gwcommon.RouteConfig) { +func (rc *RouteConfig) GetRoute() *gwcommon.Route { + return rc.Route +} + +// RouteHandler handles new route +func (rc *RouteConfig) RouteHandler(writer http.ResponseWriter, request *http.Request) { var response string - logger := rc.Log.With().Str("event-source", rc.EventSource.Name).Str("endpoint", rc.Webhook.Endpoint). - Str("port", rc.Webhook.Port). + r := rc.Route + + logger := r.Logger.With().Str("event-source", r.EventSource.Name).Str("endpoint", r.Webhook.Endpoint). + Str("port", r.Webhook.Port). Str("http-method", request.Method).Logger() logger.Info().Msg("request received") - if !helper.ActiveEndpoints[rc.Webhook.Endpoint].Active { - response = fmt.Sprintf("the route: endpoint %s and method %s is deactived", rc.Webhook.Endpoint, rc.Webhook.Method) + if !helper.ActiveEndpoints[r.Webhook.Endpoint].Active { + response = fmt.Sprintf("the route: endpoint %s and method %s is deactived", r.Webhook.Endpoint, r.Webhook.Method) logger.Info().Msg("endpoint is not active") common.SendErrorResponse(writer, response) return } - if rc.Webhook.Method != request.Method { - logger.Warn().Str("expected", rc.Webhook.Method).Str("actual", request.Method).Msg("method mismatch") - common.SendErrorResponse(writer, fmt.Sprintf("the method %s is not defined for endpoint %s", rc.Webhook.Method, rc.Webhook.Endpoint)) + if r.Webhook.Method != request.Method { + logger.Warn().Str("expected", r.Webhook.Method).Str("actual", request.Method).Msg("method mismatch") + common.SendErrorResponse(writer, fmt.Sprintf("the method %s is not defined for endpoint %s", r.Webhook.Method, r.Webhook.Endpoint)) return } @@ -63,12 +69,20 @@ func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *g return } - helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh <- body + helper.ActiveEndpoints[r.Webhook.Endpoint].DataCh <- body response = "request successfully processed" logger.Info().Msg(response) common.SendSuccessResponse(writer, response) } +func (rc *RouteConfig) PostStart() error { + return nil +} + +func (rc *RouteConfig) PostStop() error { + return nil +} + // StartEventSource starts a event source func (ese *WebhookEventSourceExecutor) StartEventSource(eventSource *gateways.EventSource, eventStream gateways.Eventing_StartEventSourceServer) error { defer gateways.Recover(eventSource.Name) @@ -82,13 +96,12 @@ func (ese *WebhookEventSourceExecutor) StartEventSource(eventSource *gateways.Ev h := config.(*gwcommon.Webhook) h.Endpoint = gwcommon.FormatWebhookEndpoint(h.Endpoint) - return gwcommon.ProcessRoute(&gwcommon.RouteConfig{ - Webhook: h, - Log: ese.Log, - EventSource: eventSource, - PostActivate: gwcommon.DefaultPostActivate, - PostStop: gwcommon.DefaultPostStop, - RouteActiveHandler: RouteActiveHandler, - StartCh: make(chan struct{}), + return gwcommon.ProcessRoute(&RouteConfig{ + Route: &gwcommon.Route{ + Logger: &ese.Log, + EventSource: eventSource, + StartCh: make(chan struct{}), + Webhook: h, + }, }, helper, eventStream) } diff --git a/gateways/core/webhook/start_test.go b/gateways/core/webhook/start_test.go index 2758a1e485..3c1025b692 100644 --- a/gateways/core/webhook/start_test.go +++ b/gateways/core/webhook/start_test.go @@ -27,44 +27,47 @@ import ( func TestRouteActiveHandler(t *testing.T) { convey.Convey("Given a route configuration", t, func() { - rc := gwcommon.GetFakeRouteConfig() - rc.Webhook.Method = http.MethodGet - helper.ActiveEndpoints[rc.Webhook.Endpoint] = &gwcommon.Endpoint{ + rc := &RouteConfig{ + Route: gwcommon.GetFakeRoute(), + } + r := rc.Route + r.Webhook.Method = http.MethodGet + helper.ActiveEndpoints[r.Webhook.Endpoint] = &gwcommon.Endpoint{ DataCh: make(chan []byte), } writer := &gwcommon.FakeHttpWriter{} convey.Convey("Inactive route should return error", func() { - RouteActiveHandler(writer, &http.Request{ + rc.RouteHandler(writer, &http.Request{ Body: ioutil.NopCloser(bytes.NewReader([]byte("hello"))), - }, rc) + }) convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusBadRequest) }) - helper.ActiveEndpoints[rc.Webhook.Endpoint].Active = true + helper.ActiveEndpoints[r.Webhook.Endpoint].Active = true convey.Convey("Active route with correct method should return success", func() { dataCh := make(chan []byte) go func() { - resp := <-helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh + resp := <-helper.ActiveEndpoints[r.Webhook.Endpoint].DataCh dataCh <- resp }() - RouteActiveHandler(writer, &http.Request{ + rc.RouteHandler(writer, &http.Request{ Body: ioutil.NopCloser(bytes.NewReader([]byte("fake notification"))), Method: http.MethodGet, - }, rc) + }) convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusOK) data := <-dataCh convey.So(string(data), convey.ShouldEqual, "fake notification") }) convey.Convey("Active route with incorrect method should return failure", func() { - RouteActiveHandler(writer, &http.Request{ + rc.RouteHandler(writer, &http.Request{ Body: ioutil.NopCloser(bytes.NewReader([]byte("fake notification"))), Method: http.MethodHead, - }, rc) + }) convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusBadRequest) }) }) diff --git a/gateways/event-sources.go b/gateways/event-sources.go index 5868c119c4..b558f84d07 100644 --- a/gateways/event-sources.go +++ b/gateways/event-sources.go @@ -172,6 +172,7 @@ func (gc *GatewayConfig) startEventSources(eventSources map[string]*EventSourceC Data: eventSource.Data.Config, }) if err != nil { + gc.Log.Error().Err(err).Str("event-source-name", eventSource.Data.Src).Msg("error occurred while starting event source") gc.StatusCh <- EventSourceStatus{ Phase: v1alpha1.NodePhaseError, Message: "failed_to_receive_event_stream", diff --git a/gateways/gateway.go b/gateways/gateway.go index 33278c58dd..56655e4285 100644 --- a/gateways/gateway.go +++ b/gateways/gateway.go @@ -18,12 +18,12 @@ package gateways import ( "fmt" - "net" - "os" - "github.com/argoproj/argo-events/common" zlog "github.com/rs/zerolog" "google.golang.org/grpc" + "net" + "os" + "runtime/debug" ) // StartGateway start a gateway @@ -50,6 +50,7 @@ func StartGateway(es EventingServer) { func Recover(eventSource string) { if r := recover(); r != nil { fmt.Printf("recovered event source %s from error. recover: %v", eventSource, r) + debug.PrintStack() } } diff --git a/gateways/gateway_test.go b/gateways/gateway_test.go index 9cbb0f565e..4362c06b53 100644 --- a/gateways/gateway_test.go +++ b/gateways/gateway_test.go @@ -61,13 +61,6 @@ func (f *FakeGRPCStream) RecvMsg(m interface{}) error { return nil } -func TestRecover(t *testing.T) { - convey.Convey("Recover from panic", t, func() { - defer Recover("fake") - panic("fake panic") - }) -} - func TestHandleEventsFromEventSource(t *testing.T) { convey.Convey("Given a gateway server, handle events from an event source", t, func() { dataCh := make(chan []byte)