Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor webhook route config #243

Merged
merged 12 commits into from Mar 21, 2019
16 changes: 3 additions & 13 deletions Makefile
Expand Up @@ -17,7 +17,7 @@ override LDFLAGS += \
# docker image publishing options
DOCKER_PUSH?=true
IMAGE_NAMESPACE?=argoproj
IMAGE_TAG?=v0.8
IMAGE_TAG?=latest

ifeq (${DOCKER_PUSH},true)
ifndef IMAGE_NAMESPACE
Expand All @@ -35,9 +35,9 @@ endif

# Build the project images
.DELETE_ON_ERROR:
all: sensor-linux sensor-controller-linux gateway-controller-linux gateway-client-linux webhook-linux calendar-linux resource-linux artifact-linux file-linux nats-linux kafka-linux amqp-linux mqtt-linux storage-grid-linux github-linux hdfs-linux gitlab-linux sns-linux sqs-linux pubsub-linux trello-linux
all: sensor-linux sensor-controller-linux gateway-controller-linux gateway-client-linux webhook-linux calendar-linux resource-linux artifact-linux file-linux nats-linux kafka-linux amqp-linux mqtt-linux storage-grid-linux github-linux hdfs-linux gitlab-linux sns-linux sqs-linux pubsub-linux
VaibhavPage marked this conversation as resolved.
Show resolved Hide resolved

all-images: sensor-image sensor-controller-image gateway-controller-image gateway-client-image webhook-image calendar-image resource-image artifact-image file-image nats-image kafka-image amqp-image mqtt-image storage-grid-image github-image gitlab-image sns-image pubsub-image hdfs-image sqs-image trello-image
all-images: sensor-image sensor-controller-image gateway-controller-image gateway-client-image webhook-image calendar-image resource-image artifact-image file-image nats-image kafka-image amqp-image mqtt-image storage-grid-image github-image gitlab-image sns-image pubsub-image hdfs-image sqs-image

all-controller-images: sensor-controller-image gateway-controller-image

Expand Down Expand Up @@ -263,16 +263,6 @@ sqs-image: sqs-linux
docker build -t $(IMAGE_PREFIX)aws-sqs-gateway:$(IMAGE_TAG) -f ./gateways/community/aws-sqs/Dockerfile .
@if [ "$(DOCKER_PUSH)" = "true" ] ; then docker push $(IMAGE_PREFIX)aws-sqs-gateway:$(IMAGE_TAG) ; fi

trello:
go build -v -ldflags '${LDFLAGS}' -o ${DIST_DIR}/trello-gateway ./gateways/community/trello/cmd

trello-linux:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 make trello

trello-image: trello-linux
docker build -t $(IMAGE_PREFIX)trello-gateway:$(IMAGE_TAG) -f ./gateways/community/trello/Dockerfile .
@if [ "$(DOCKER_PUSH)" = "true" ] ; then docker push $(IMAGE_PREFIX)trello-gateway:$(IMAGE_TAG) ; fi

test:
go test $(shell go list ./... | grep -v /vendor/) -race -short -v

Expand Down
2 changes: 1 addition & 1 deletion VERSION
@@ -1 +1 @@
0.8.0
0.8.3
1 change: 1 addition & 0 deletions examples/gateways/github-configmap.yaml
Expand Up @@ -4,6 +4,7 @@ metadata:
name: github-gateway-configmap
data:
project_1: |-
id: 1234
owner: "owner-example"
repository: "repo-example"
hook:
Expand Down
1 change: 1 addition & 0 deletions examples/gateways/gitlab-configmap.yaml
Expand Up @@ -4,6 +4,7 @@ metadata:
name: gitlab-gateway-configmap
data:
project_1: |-
id: 1234
projectId: "1"
hook:
endpoint: "/push"
Expand Down
27 changes: 23 additions & 4 deletions gateways/common/fake.go
Expand Up @@ -32,16 +32,35 @@ func (f *FakeHttpWriter) WriteHeader(status int) {
f.HeaderStatus = status
}

func GetFakeRouteConfig() *RouteConfig {
return &RouteConfig{
type FakeRouteConfig struct {
route *Route
}

func (f *FakeRouteConfig) GetRoute() *Route {
return f.route
}

func (f *FakeRouteConfig) RouteHandler(writer http.ResponseWriter, request *http.Request) {
}

func (f *FakeRouteConfig) PostStart() error {
return nil
}

func (f *FakeRouteConfig) PostStop() error {
return nil
}

func GetFakeRoute() *Route {
logger := common.GetLoggerContext(common.LoggerConf()).Logger()
return &Route{
Webhook: Hook,
EventSource: &gateways.EventSource{
Name: "fake-event-source",
Data: "hello",
Id: "123",
},
Log: common.GetLoggerContext(common.LoggerConf()).Logger(),
Configs: make(map[string]interface{}),
Logger: &logger,
StartCh: make(chan struct{}),
}
}
Expand Down
154 changes: 95 additions & 59 deletions gateways/common/webhook.go
Expand Up @@ -57,9 +57,9 @@ type WebhookHelper struct {
// ActiveEndpoints keep track of endpoints that are already registered with server and their status active or inactive
ActiveEndpoints map[string]*Endpoint
// RouteActivateChan handles assigning new route to server.
RouteActivateChan chan *RouteConfig
RouteActivateChan chan RouteConfig
// RouteDeactivateChan handles deactivating existing route
RouteDeactivateChan chan *RouteConfig
RouteDeactivateChan chan RouteConfig
}

// HTTP Muxer
Expand All @@ -73,16 +73,20 @@ type activeServer struct {
errChan chan error
}

// RouteConfig contains configuration about an http route
type RouteConfig struct {
Webhook *Webhook
Configs map[string]interface{}
EventSource *gateways.EventSource
Log zerolog.Logger
StartCh chan struct{}
RouteActiveHandler func(writer http.ResponseWriter, request *http.Request, rc *RouteConfig)
PostActivate func(rc *RouteConfig) error
PostStop func(rc *RouteConfig) error
// Route contains common information for a route
type Route struct {
Webhook *Webhook
Logger *zerolog.Logger
StartCh chan struct{}
EventSource *gateways.EventSource
}

// RouteConfig is an interface to manage the configuration for a route
type RouteConfig interface {
VaibhavPage marked this conversation as resolved.
Show resolved Hide resolved
GetRoute() *Route
RouteHandler(writer http.ResponseWriter, request *http.Request)
PostStart() error
PostStop() error
}

// endpoint contains state of an http endpoint
Expand All @@ -93,14 +97,14 @@ type Endpoint struct {
DataCh chan []byte
}

// NewWebhookHelper returns new webhook helper
// NewWebhookHelper returns new Webhook helper
func NewWebhookHelper() *WebhookHelper {
return &WebhookHelper{
ActiveEndpoints: make(map[string]*Endpoint),
ActiveServers: make(map[string]*activeServer),
Mutex: sync.Mutex{},
RouteActivateChan: make(chan *RouteConfig),
RouteDeactivateChan: make(chan *RouteConfig),
RouteActivateChan: make(chan RouteConfig),
RouteDeactivateChan: make(chan RouteConfig),
}
}

Expand All @@ -110,11 +114,12 @@ func InitRouteChannels(helper *WebhookHelper) {
select {
case config := <-helper.RouteActivateChan:
// start server if it has not been started on this port
config.startHttpServer(helper)
config.StartCh <- struct{}{}
startHttpServer(config, helper)
startCh := config.GetRoute().StartCh
startCh <- struct{}{}

case config := <-helper.RouteDeactivateChan:
webhook := config.Webhook
webhook := config.GetRoute().Webhook
_, ok := helper.ActiveServers[webhook.Port]
if ok {
helper.ActiveEndpoints[webhook.Endpoint].Active = false
Expand All @@ -129,33 +134,34 @@ func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

// starts a http server
func (rc *RouteConfig) startHttpServer(helper *WebhookHelper) {
func startHttpServer(rc RouteConfig, helper *WebhookHelper) {
// start a http server only if no other configuration previously started the server on given port
helper.Mutex.Lock()
if _, ok := helper.ActiveServers[rc.Webhook.Port]; !ok {
r := rc.GetRoute()
if _, ok := helper.ActiveServers[r.Webhook.Port]; !ok {
s := &server{
mux: http.NewServeMux(),
}
rc.Webhook.mux = s.mux
rc.Webhook.srv = &http.Server{
Addr: ":" + fmt.Sprintf("%s", rc.Webhook.Port),
r.Webhook.mux = s.mux
r.Webhook.srv = &http.Server{
Addr: ":" + fmt.Sprintf("%s", r.Webhook.Port),
Handler: s,
}
errChan := make(chan error, 1)
helper.ActiveServers[rc.Webhook.Port] = &activeServer{
helper.ActiveServers[r.Webhook.Port] = &activeServer{
srv: s.mux,
errChan: errChan,
}

// start http server
go func() {
var err error
if rc.Webhook.ServerCertPath == "" || rc.Webhook.ServerKeyPath == "" {
err = rc.Webhook.srv.ListenAndServe()
if r.Webhook.ServerCertPath == "" || r.Webhook.ServerKeyPath == "" {
err = r.Webhook.srv.ListenAndServe()
} else {
err = rc.Webhook.srv.ListenAndServeTLS(rc.Webhook.ServerCertPath, rc.Webhook.ServerKeyPath)
err = r.Webhook.srv.ListenAndServeTLS(r.Webhook.ServerCertPath, r.Webhook.ServerKeyPath)
}
rc.Log.Error().Err(err).Str("event-source", rc.EventSource.Name).Str("port", rc.Webhook.Port).Msg("http server stopped")
r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Str("port", r.Webhook.Port).Msg("http server stopped")
if err != nil {
errChan <- err
}
Expand All @@ -165,76 +171,87 @@ func (rc *RouteConfig) startHttpServer(helper *WebhookHelper) {
}

// activateRoute activates route
func (rc *RouteConfig) activateRoute(helper *WebhookHelper) {
func activateRoute(rc RouteConfig, helper *WebhookHelper) {
r := rc.GetRoute()
helper.RouteActivateChan <- rc

<-rc.StartCh
<-r.StartCh

if rc.Webhook.mux == nil {
if r.Webhook.mux == nil {
helper.Mutex.Lock()
rc.Webhook.mux = helper.ActiveServers[rc.Webhook.Port].srv
r.Webhook.mux = helper.ActiveServers[r.Webhook.Port].srv
helper.Mutex.Unlock()
}

rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Str("port", rc.Webhook.Port).Str("endpoint", rc.Webhook.Endpoint).Msg("adding route handler")
if _, ok := helper.ActiveEndpoints[rc.Webhook.Endpoint]; !ok {
helper.ActiveEndpoints[rc.Webhook.Endpoint] = &Endpoint{
r.Logger.Info().Str("event-source-name", r.EventSource.Name).Str("port", r.Webhook.Port).Str("endpoint", r.Webhook.Endpoint).Msg("adding route handler")

if _, ok := helper.ActiveEndpoints[r.Webhook.Endpoint]; !ok {
helper.ActiveEndpoints[r.Webhook.Endpoint] = &Endpoint{
Active: true,
DataCh: make(chan []byte),
}
rc.Webhook.mux.HandleFunc(rc.Webhook.Endpoint, func(writer http.ResponseWriter, request *http.Request) {
rc.RouteActiveHandler(writer, request, rc)
})
r.Webhook.mux.HandleFunc(r.Webhook.Endpoint, rc.RouteHandler)
}
helper.ActiveEndpoints[rc.Webhook.Endpoint].Active = true
helper.ActiveEndpoints[r.Webhook.Endpoint].Active = true

rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Str("port", rc.Webhook.Port).Str("endpoint", rc.Webhook.Endpoint).Msg("route handler added")
r.Logger.Info().Str("event-source-name", r.EventSource.Name).Str("port", r.Webhook.Port).Str("endpoint", r.Webhook.Endpoint).Msg("route handler added")
}

func (rc *RouteConfig) processChannels(helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error {
func processChannels(rc RouteConfig, helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error {
r := rc.GetRoute()

for {
select {
case data := <-helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh:
rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Msg("new event received, dispatching to gateway client")
case data := <-helper.ActiveEndpoints[r.Webhook.Endpoint].DataCh:
r.Logger.Info().Str("event-source-name", r.EventSource.Name).Msg("new event received, dispatching to gateway client")
err := eventStream.Send(&gateways.Event{
Name: rc.EventSource.Name,
Name: r.EventSource.Name,
Payload: data,
})
if err != nil {
rc.Log.Error().Err(err).Str("event-source-name", rc.EventSource.Name).Msg("failed to send event")
r.Logger.Error().Err(err).Str("event-source-name", r.EventSource.Name).Msg("failed to send event")
return err
}

case <-eventStream.Context().Done():
rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Msg("connection is closed by client")
r.Logger.Info().Str("event-source-name", r.EventSource.Name).Msg("connection is closed by client")
helper.RouteDeactivateChan <- rc
return nil

// this error indicates that the server has stopped running
case err := <-helper.ActiveServers[rc.Webhook.Port].errChan:
case err := <-helper.ActiveServers[r.Webhook.Port].errChan:
return err
}
}
}

func DefaultPostActivate(rc *RouteConfig) error {
return nil
}
func ProcessRoute(rc RouteConfig, helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error {
r := rc.GetRoute()

func DefaultPostStop(rc *RouteConfig) error {
return nil
}
r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("validating the route")
if err := validateRoute(rc.GetRoute()); err != nil {
r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Msg("error occurred validating route")
return err
}

r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("activating the route")
activateRoute(rc, helper)

func ProcessRoute(rc *RouteConfig, helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error {
rc.activateRoute(helper)
if err := rc.PostActivate(rc); err != nil {
r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("running post start")
if err := rc.PostStart(); err != nil {
r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Msg("error occurred in post start")
return err
}
if err := rc.processChannels(helper, eventStream); err != nil {

r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("processing channels")
if err := processChannels(rc, helper, eventStream); err != nil {
r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Msg("error occurred in process channel")
return err
}
if err := rc.PostStop(rc); err != nil {
rc.Log.Error().Err(err).Msg("error occurred while executing post stop logic")

r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("running post stop")
if err := rc.PostStop(); err != nil {
r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Msg("error occurred in post stop")
}
return nil
}
Expand All @@ -258,6 +275,25 @@ func ValidateWebhook(w *Webhook) error {
return nil
}

func validateRoute(r *Route) error {
if r == nil {
return fmt.Errorf("route can't be nil")
}
if r.Webhook == nil {
return fmt.Errorf("webhook can't be nil")
}
if r.StartCh == nil {
return fmt.Errorf("start channel can't be nil")
}
if r.EventSource == nil {
return fmt.Errorf("event source can't be nil")
}
if r.Logger == nil {
return fmt.Errorf("logger can't be nil")
}
return nil
}

func FormatWebhookEndpoint(endpoint string) string {
if !strings.HasPrefix(endpoint, "/") {
return fmt.Sprintf("/%s", endpoint)
Expand Down