Skip to content

Commit

Permalink
Merge 30eaa29 into a913daf
Browse files Browse the repository at this point in the history
  • Loading branch information
VaibhavPage committed Mar 20, 2019
2 parents a913daf + 30eaa29 commit f7ce9f1
Show file tree
Hide file tree
Showing 59 changed files with 660 additions and 1,034 deletions.
16 changes: 3 additions & 13 deletions Makefile
Expand Up @@ -17,7 +17,7 @@ override LDFLAGS += \
# docker image publishing options
DOCKER_PUSH?=true
IMAGE_NAMESPACE?=argoproj
IMAGE_TAG?=v0.8
IMAGE_TAG?=latest

ifeq (${DOCKER_PUSH},true)
ifndef IMAGE_NAMESPACE
Expand All @@ -35,9 +35,9 @@ endif

# Build the project images
.DELETE_ON_ERROR:
all: sensor-linux sensor-controller-linux gateway-controller-linux gateway-client-linux webhook-linux calendar-linux resource-linux artifact-linux file-linux nats-linux kafka-linux amqp-linux mqtt-linux storage-grid-linux github-linux hdfs-linux gitlab-linux sns-linux sqs-linux pubsub-linux trello-linux
all: sensor-linux sensor-controller-linux gateway-controller-linux gateway-client-linux webhook-linux calendar-linux resource-linux artifact-linux file-linux nats-linux kafka-linux amqp-linux mqtt-linux storage-grid-linux github-linux hdfs-linux gitlab-linux sns-linux sqs-linux pubsub-linux

all-images: sensor-image sensor-controller-image gateway-controller-image gateway-client-image webhook-image calendar-image resource-image artifact-image file-image nats-image kafka-image amqp-image mqtt-image storage-grid-image github-image gitlab-image sns-image pubsub-image hdfs-image sqs-image trello-image
all-images: sensor-image sensor-controller-image gateway-controller-image gateway-client-image webhook-image calendar-image resource-image artifact-image file-image nats-image kafka-image amqp-image mqtt-image storage-grid-image github-image gitlab-image sns-image pubsub-image hdfs-image sqs-image

all-controller-images: sensor-controller-image gateway-controller-image

Expand Down Expand Up @@ -263,16 +263,6 @@ sqs-image: sqs-linux
docker build -t $(IMAGE_PREFIX)aws-sqs-gateway:$(IMAGE_TAG) -f ./gateways/community/aws-sqs/Dockerfile .
@if [ "$(DOCKER_PUSH)" = "true" ] ; then docker push $(IMAGE_PREFIX)aws-sqs-gateway:$(IMAGE_TAG) ; fi

trello:
go build -v -ldflags '${LDFLAGS}' -o ${DIST_DIR}/trello-gateway ./gateways/community/trello/cmd

trello-linux:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 make trello

trello-image: trello-linux
docker build -t $(IMAGE_PREFIX)trello-gateway:$(IMAGE_TAG) -f ./gateways/community/trello/Dockerfile .
@if [ "$(DOCKER_PUSH)" = "true" ] ; then docker push $(IMAGE_PREFIX)trello-gateway:$(IMAGE_TAG) ; fi

test:
go test $(shell go list ./... | grep -v /vendor/) -race -short -v

Expand Down
2 changes: 1 addition & 1 deletion VERSION
@@ -1 +1 @@
0.8.0
0.8.3
1 change: 1 addition & 0 deletions examples/gateways/github-configmap.yaml
Expand Up @@ -4,6 +4,7 @@ metadata:
name: github-gateway-configmap
data:
project_1: |-
id: 1234
owner: "owner-example"
repository: "repo-example"
hook:
Expand Down
1 change: 1 addition & 0 deletions examples/gateways/gitlab-configmap.yaml
Expand Up @@ -4,6 +4,7 @@ metadata:
name: gitlab-gateway-configmap
data:
project_1: |-
id: 1234
projectId: "1"
hook:
endpoint: "/push"
Expand Down
27 changes: 23 additions & 4 deletions gateways/common/fake.go
Expand Up @@ -32,16 +32,35 @@ func (f *FakeHttpWriter) WriteHeader(status int) {
f.HeaderStatus = status
}

func GetFakeRouteConfig() *RouteConfig {
return &RouteConfig{
type FakeRouteConfig struct {
route *Route
}

func (f *FakeRouteConfig) GetRoute() *Route {
return f.route
}

func (f *FakeRouteConfig) RouteHandler(writer http.ResponseWriter, request *http.Request) {
}

func (f *FakeRouteConfig) PostStart() error {
return nil
}

func (f *FakeRouteConfig) PostStop() error {
return nil
}

func GetFakeRoute() *Route {
logger := common.GetLoggerContext(common.LoggerConf()).Logger()
return &Route{
Webhook: Hook,
EventSource: &gateways.EventSource{
Name: "fake-event-source",
Data: "hello",
Id: "123",
},
Log: common.GetLoggerContext(common.LoggerConf()).Logger(),
Configs: make(map[string]interface{}),
Logger: &logger,
StartCh: make(chan struct{}),
}
}
Expand Down
158 changes: 97 additions & 61 deletions gateways/common/webhook.go
Expand Up @@ -57,9 +57,9 @@ type WebhookHelper struct {
// ActiveEndpoints keep track of endpoints that are already registered with server and their status active or inactive
ActiveEndpoints map[string]*Endpoint
// RouteActivateChan handles assigning new route to server.
RouteActivateChan chan *RouteConfig
RouteActivateChan chan RouteManager
// RouteDeactivateChan handles deactivating existing route
RouteDeactivateChan chan *RouteConfig
RouteDeactivateChan chan RouteManager
}

// HTTP Muxer
Expand All @@ -73,16 +73,20 @@ type activeServer struct {
errChan chan error
}

// RouteConfig contains configuration about an http route
type RouteConfig struct {
Webhook *Webhook
Configs map[string]interface{}
EventSource *gateways.EventSource
Log zerolog.Logger
StartCh chan struct{}
RouteActiveHandler func(writer http.ResponseWriter, request *http.Request, rc *RouteConfig)
PostActivate func(rc *RouteConfig) error
PostStop func(rc *RouteConfig) error
// Route contains common information for a route
type Route struct {
Webhook *Webhook
Logger *zerolog.Logger
StartCh chan struct{}
EventSource *gateways.EventSource
}

// RouteManager is an interface to manage the configuration for a route
type RouteManager interface {
GetRoute() *Route
RouteHandler(writer http.ResponseWriter, request *http.Request)
PostStart() error
PostStop() error
}

// endpoint contains state of an http endpoint
Expand All @@ -93,14 +97,14 @@ type Endpoint struct {
DataCh chan []byte
}

// NewWebhookHelper returns new webhook helper
// NewWebhookHelper returns new Webhook helper
func NewWebhookHelper() *WebhookHelper {
return &WebhookHelper{
ActiveEndpoints: make(map[string]*Endpoint),
ActiveServers: make(map[string]*activeServer),
Mutex: sync.Mutex{},
RouteActivateChan: make(chan *RouteConfig),
RouteDeactivateChan: make(chan *RouteConfig),
RouteActivateChan: make(chan RouteManager),
RouteDeactivateChan: make(chan RouteManager),
}
}

Expand All @@ -110,11 +114,12 @@ func InitRouteChannels(helper *WebhookHelper) {
select {
case config := <-helper.RouteActivateChan:
// start server if it has not been started on this port
config.startHttpServer(helper)
config.StartCh <- struct{}{}
startHttpServer(config, helper)
startCh := config.GetRoute().StartCh
startCh <- struct{}{}

case config := <-helper.RouteDeactivateChan:
webhook := config.Webhook
webhook := config.GetRoute().Webhook
_, ok := helper.ActiveServers[webhook.Port]
if ok {
helper.ActiveEndpoints[webhook.Endpoint].Active = false
Expand All @@ -129,33 +134,34 @@ func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

// starts a http server
func (rc *RouteConfig) startHttpServer(helper *WebhookHelper) {
func startHttpServer(routeManager RouteManager, helper *WebhookHelper) {
// start a http server only if no other configuration previously started the server on given port
helper.Mutex.Lock()
if _, ok := helper.ActiveServers[rc.Webhook.Port]; !ok {
r := routeManager.GetRoute()
if _, ok := helper.ActiveServers[r.Webhook.Port]; !ok {
s := &server{
mux: http.NewServeMux(),
}
rc.Webhook.mux = s.mux
rc.Webhook.srv = &http.Server{
Addr: ":" + fmt.Sprintf("%s", rc.Webhook.Port),
r.Webhook.mux = s.mux
r.Webhook.srv = &http.Server{
Addr: ":" + fmt.Sprintf("%s", r.Webhook.Port),
Handler: s,
}
errChan := make(chan error, 1)
helper.ActiveServers[rc.Webhook.Port] = &activeServer{
helper.ActiveServers[r.Webhook.Port] = &activeServer{
srv: s.mux,
errChan: errChan,
}

// start http server
go func() {
var err error
if rc.Webhook.ServerCertPath == "" || rc.Webhook.ServerKeyPath == "" {
err = rc.Webhook.srv.ListenAndServe()
if r.Webhook.ServerCertPath == "" || r.Webhook.ServerKeyPath == "" {
err = r.Webhook.srv.ListenAndServe()
} else {
err = rc.Webhook.srv.ListenAndServeTLS(rc.Webhook.ServerCertPath, rc.Webhook.ServerKeyPath)
err = r.Webhook.srv.ListenAndServeTLS(r.Webhook.ServerCertPath, r.Webhook.ServerKeyPath)
}
rc.Log.Error().Err(err).Str("event-source", rc.EventSource.Name).Str("port", rc.Webhook.Port).Msg("http server stopped")
r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Str("port", r.Webhook.Port).Msg("http server stopped")
if err != nil {
errChan <- err
}
Expand All @@ -165,76 +171,87 @@ func (rc *RouteConfig) startHttpServer(helper *WebhookHelper) {
}

// activateRoute activates route
func (rc *RouteConfig) activateRoute(helper *WebhookHelper) {
helper.RouteActivateChan <- rc
func activateRoute(routeManager RouteManager, helper *WebhookHelper) {
r := routeManager.GetRoute()
helper.RouteActivateChan <- routeManager

<-rc.StartCh
<-r.StartCh

if rc.Webhook.mux == nil {
if r.Webhook.mux == nil {
helper.Mutex.Lock()
rc.Webhook.mux = helper.ActiveServers[rc.Webhook.Port].srv
r.Webhook.mux = helper.ActiveServers[r.Webhook.Port].srv
helper.Mutex.Unlock()
}

rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Str("port", rc.Webhook.Port).Str("endpoint", rc.Webhook.Endpoint).Msg("adding route handler")
if _, ok := helper.ActiveEndpoints[rc.Webhook.Endpoint]; !ok {
helper.ActiveEndpoints[rc.Webhook.Endpoint] = &Endpoint{
r.Logger.Info().Str("event-source-name", r.EventSource.Name).Str("port", r.Webhook.Port).Str("endpoint", r.Webhook.Endpoint).Msg("adding route handler")

if _, ok := helper.ActiveEndpoints[r.Webhook.Endpoint]; !ok {
helper.ActiveEndpoints[r.Webhook.Endpoint] = &Endpoint{
Active: true,
DataCh: make(chan []byte),
}
rc.Webhook.mux.HandleFunc(rc.Webhook.Endpoint, func(writer http.ResponseWriter, request *http.Request) {
rc.RouteActiveHandler(writer, request, rc)
})
r.Webhook.mux.HandleFunc(r.Webhook.Endpoint, routeManager.RouteHandler)
}
helper.ActiveEndpoints[rc.Webhook.Endpoint].Active = true
helper.ActiveEndpoints[r.Webhook.Endpoint].Active = true

rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Str("port", rc.Webhook.Port).Str("endpoint", rc.Webhook.Endpoint).Msg("route handler added")
r.Logger.Info().Str("event-source-name", r.EventSource.Name).Str("port", r.Webhook.Port).Str("endpoint", r.Webhook.Endpoint).Msg("route handler added")
}

func (rc *RouteConfig) processChannels(helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error {
func processChannels(routeManager RouteManager, helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error {
r := routeManager.GetRoute()

for {
select {
case data := <-helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh:
rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Msg("new event received, dispatching to gateway client")
case data := <-helper.ActiveEndpoints[r.Webhook.Endpoint].DataCh:
r.Logger.Info().Str("event-source-name", r.EventSource.Name).Msg("new event received, dispatching to gateway client")
err := eventStream.Send(&gateways.Event{
Name: rc.EventSource.Name,
Name: r.EventSource.Name,
Payload: data,
})
if err != nil {
rc.Log.Error().Err(err).Str("event-source-name", rc.EventSource.Name).Msg("failed to send event")
r.Logger.Error().Err(err).Str("event-source-name", r.EventSource.Name).Msg("failed to send event")
return err
}

case <-eventStream.Context().Done():
rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Msg("connection is closed by client")
helper.RouteDeactivateChan <- rc
r.Logger.Info().Str("event-source-name", r.EventSource.Name).Msg("connection is closed by client")
helper.RouteDeactivateChan <- routeManager
return nil

// this error indicates that the server has stopped running
case err := <-helper.ActiveServers[rc.Webhook.Port].errChan:
case err := <-helper.ActiveServers[r.Webhook.Port].errChan:
return err
}
}
}

func DefaultPostActivate(rc *RouteConfig) error {
return nil
}
func ProcessRoute(routeManager RouteManager, helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error {
r := routeManager.GetRoute()

func DefaultPostStop(rc *RouteConfig) error {
return nil
}
r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("validating the route")
if err := validateRoute(routeManager.GetRoute()); err != nil {
r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Msg("error occurred validating route")
return err
}

r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("activating the route")
activateRoute(routeManager, helper)

func ProcessRoute(rc *RouteConfig, helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error {
rc.activateRoute(helper)
if err := rc.PostActivate(rc); err != nil {
r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("running post start")
if err := routeManager.PostStart(); err != nil {
r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Msg("error occurred in post start")
return err
}
if err := rc.processChannels(helper, eventStream); err != nil {

r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("processing channels")
if err := processChannels(routeManager, helper, eventStream); err != nil {
r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Msg("error occurred in process channel")
return err
}
if err := rc.PostStop(rc); err != nil {
rc.Log.Error().Err(err).Msg("error occurred while executing post stop logic")

r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("running post stop")
if err := routeManager.PostStop(); err != nil {
r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Msg("error occurred in post stop")
}
return nil
}
Expand All @@ -258,6 +275,25 @@ func ValidateWebhook(w *Webhook) error {
return nil
}

func validateRoute(r *Route) error {
if r == nil {
return fmt.Errorf("route can't be nil")
}
if r.Webhook == nil {
return fmt.Errorf("webhook can't be nil")
}
if r.StartCh == nil {
return fmt.Errorf("start channel can't be nil")
}
if r.EventSource == nil {
return fmt.Errorf("event source can't be nil")
}
if r.Logger == nil {
return fmt.Errorf("logger can't be nil")
}
return nil
}

func FormatWebhookEndpoint(endpoint string) string {
if !strings.HasPrefix(endpoint, "/") {
return fmt.Sprintf("/%s", endpoint)
Expand Down

0 comments on commit f7ce9f1

Please sign in to comment.