Skip to content

Commit

Permalink
Merge 60777d6 into 45bf2f0
Browse files Browse the repository at this point in the history
  • Loading branch information
magaldima committed Mar 11, 2019
2 parents 45bf2f0 + 60777d6 commit 9365e0d
Show file tree
Hide file tree
Showing 21 changed files with 114 additions and 95 deletions.
2 changes: 1 addition & 1 deletion VERSION
@@ -1 +1 @@
0.1.2
0.8.0
2 changes: 1 addition & 1 deletion common/common.go
Expand Up @@ -125,7 +125,7 @@ const (
// LabelGatewayEventSourceID is the label for gateway configuration ID
LabelGatewayEventSourceID = "event-source-id"

// Server Connection Timeout, 5 seconds
// Server Connection Timeout, 10 seconds
ServerConnTimeout = 10
)

Expand Down
1 change: 1 addition & 0 deletions controllers/gateway/informer.go
Expand Up @@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gateway

import (
Expand Down
8 changes: 4 additions & 4 deletions controllers/gateway/state.go
Expand Up @@ -9,7 +9,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
)

// persist the updates to the gateway resource
// PersistUpdates of the gateway resource
func PersistUpdates(client gwclient.Interface, gw *v1alpha1.Gateway, log *zerolog.Logger) (*v1alpha1.Gateway, error) {
gatewayClient := client.ArgoprojV1alpha1().Gateways(gw.ObjectMeta.Namespace)

Expand All @@ -23,7 +23,7 @@ func PersistUpdates(client gwclient.Interface, gw *v1alpha1.Gateway, log *zerolo
return oldgw, err
}
log.Info().Msg("re-applying updates on latest version and retrying update")
err = ReapplyUpdate(client, gw)
err = ReapplyUpdates(client, gw)
if err != nil {
log.Error().Err(err).Msg("failed to re-apply update")
return oldgw, err
Expand All @@ -33,8 +33,8 @@ func PersistUpdates(client gwclient.Interface, gw *v1alpha1.Gateway, log *zerolo
return gw, nil
}

// reapply the updates to gateway resource
func ReapplyUpdate(client gwclient.Interface, gw *v1alpha1.Gateway) error {
// ReapplyUpdates to gateway resource
func ReapplyUpdates(client gwclient.Interface, gw *v1alpha1.Gateway) error {
return wait.ExponentialBackoff(common.DefaultRetry, func() (bool, error) {
gatewayClient := client.ArgoprojV1alpha1().Gateways(gw.Namespace)
g, err := gatewayClient.Update(gw)
Expand Down
2 changes: 1 addition & 1 deletion gateways/common/doc.go
Expand Up @@ -14,5 +14,5 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// This package contains structs and methods that are shared across different gateways.
// Package common contains structs and methods that are shared across different gateways.
package common
7 changes: 4 additions & 3 deletions gateways/common/webhook.go
Expand Up @@ -42,9 +42,10 @@ type Webhook struct {
ServerCertPath string `json:"serverCertPath,omitempty" protobuf:"bytes,4,opt,name=serverCertPath"`
// ServerKeyPath refers the file that contains private key
ServerKeyPath string `json:"serverKeyPath,omitempty" protobuf:"bytes,5,opt,name=serverKeyPath"`

// srv holds reference to http server
srv *http.Server `json:"srv,omitempty"`
mux *http.ServeMux `json:"mux,omitempty"`
srv *http.Server
mux *http.ServeMux
}

// WebhookHelper is a helper struct
Expand Down Expand Up @@ -72,7 +73,7 @@ type activeServer struct {
errChan chan error
}

// RouteConfig contains configuration about a http route
// RouteConfig contains configuration about an http route
type RouteConfig struct {
Webhook *Webhook
Configs map[string]interface{}
Expand Down
7 changes: 4 additions & 3 deletions gateways/community/aws-sns/config.go
Expand Up @@ -17,18 +17,19 @@ limitations under the License.
package aws_sns

import (
"time"

"github.com/argoproj/argo-events/gateways/common"
"k8s.io/client-go/kubernetes"
"time"

"github.com/ghodss/yaml"
"github.com/rs/zerolog"
corev1 "k8s.io/api/core/v1"
)

const (
MESSAGE_TYPE_SUBSCRIPTION_CONFIRMATION = "SubscriptionConfirmation"
MESSAGE_TYPE_NOTIFICATION = "Notification"
messageTypeSubscriptionConfirmation = "SubscriptionConfirmation"
messageTypeNotification = "Notification"
)

var (
Expand Down
33 changes: 17 additions & 16 deletions gateways/community/aws-sns/start.go
Expand Up @@ -33,9 +33,9 @@ import (
)

const (
LabelSNSConfig = "snsConfig"
LabelSNSSession = "snsSession"
LabelSubscriptionArn = "subscriptionArn"
labelSNSConfig = "snsConfig"
labelSNSSession = "snsSession"
labelSubscriptionArn = "subscriptionArn"
)

var (
Expand All @@ -46,7 +46,7 @@ func init() {
go gwcommon.InitRouteChannels(helper)
}

// routeActiveHandler handles new route
// RouteActiveHandler handles new routes
func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *gwcommon.RouteConfig) {
var response string

Expand Down Expand Up @@ -76,11 +76,11 @@ func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *g
return
}

sc := rc.Configs[LabelSNSConfig].(*snsConfig)
sc := rc.Configs[labelSNSConfig].(*snsConfig)

switch snspayload.Type {
case MESSAGE_TYPE_SUBSCRIPTION_CONFIRMATION:
awsSession := rc.Configs[LabelSNSSession].(*snslib.SNS)
case messageTypeSubscriptionConfirmation:
awsSession := rc.Configs[labelSNSSession].(*snslib.SNS)
out, err := awsSession.ConfirmSubscription(&snslib.ConfirmSubscriptionInput{
TopicArn: &sc.TopicArn,
Token: &snspayload.Token,
Expand All @@ -89,9 +89,9 @@ func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *g
logger.Error().Err(err).Msg("failed to send confirmation response to amazon")
return
}
rc.Configs[LabelSubscriptionArn] = out.SubscriptionArn
rc.Configs[labelSubscriptionArn] = out.SubscriptionArn

case MESSAGE_TYPE_NOTIFICATION:
case messageTypeNotification:
helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh <- body
}

Expand All @@ -100,11 +100,12 @@ func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *g
common.SendSuccessResponse(writer, response)
}

// PostActivate subscribes to the sns topic
func (ese *SNSEventSourceExecutor) PostActivate(rc *gwcommon.RouteConfig) error {
logger := rc.Log.With().Str("event-source", rc.EventSource.Name).Str("endpoint", rc.Webhook.Endpoint).
Str("port", rc.Webhook.Port).Logger()

sc := rc.Configs[LabelSNSConfig].(*snsConfig)
sc := rc.Configs[labelSNSConfig].(*snsConfig)
// retrieve access key id and secret access key
accessKey, err := store.GetSecrets(ese.Clientset, ese.Namespace, sc.AccessKey.Name, sc.AccessKey.Key)
if err != nil {
Expand Down Expand Up @@ -135,7 +136,7 @@ func (ese *SNSEventSourceExecutor) PostActivate(rc *gwcommon.RouteConfig) error
}

snsSession := snslib.New(awsSession)
rc.Configs[LabelSNSSession] = snsSession
rc.Configs[labelSNSSession] = snsSession

logger.Info().Msg("subscribing to snsConfig topic")
if _, err := snsSession.Subscribe(&snslib.SubscribeInput{
Expand All @@ -150,19 +151,19 @@ func (ese *SNSEventSourceExecutor) PostActivate(rc *gwcommon.RouteConfig) error
return nil
}

// PostStop unsubscribes the topic
// PostStop unsubscribes from the sns topic
func PostStop(rc *gwcommon.RouteConfig) error {
awsSession := rc.Configs[LabelSNSSession].(*snslib.SNS)
awsSession := rc.Configs[labelSNSSession].(*snslib.SNS)
if _, err := awsSession.Unsubscribe(&snslib.UnsubscribeInput{
SubscriptionArn: rc.Configs[LabelSubscriptionArn].(*string),
SubscriptionArn: rc.Configs[labelSubscriptionArn].(*string),
}); err != nil {
rc.Log.Error().Err(err).Str("event-source-name", rc.EventSource.Name).Msg("failed to unsubscribe")
return err
}
return nil
}

// StartConfig runs a configuration
// StartEventSource starts an SNS event source
func (ese *SNSEventSourceExecutor) StartEventSource(eventSource *gateways.EventSource, eventStream gateways.Eventing_StartEventSourceServer) error {
defer gateways.Recover(eventSource.Name)

Expand All @@ -177,7 +178,7 @@ func (ese *SNSEventSourceExecutor) StartEventSource(eventSource *gateways.EventS
return gwcommon.ProcessRoute(&gwcommon.RouteConfig{
Webhook: sc.Hook,
Configs: map[string]interface{}{
LabelSNSConfig: sc,
labelSNSConfig: sc,
},
Log: ese.Log,
EventSource: eventSource,
Expand Down
8 changes: 2 additions & 6 deletions gateways/community/gcp-pubsub/start.go
Expand Up @@ -17,17 +17,13 @@ limitations under the License.
package pubsub

import (
"cloud.google.com/go/pubsub"
"context"

"cloud.google.com/go/pubsub"
"github.com/argoproj/argo-events/gateways"
"google.golang.org/api/option"
)

const (
// LabelGcpPubSubConfig is the label name of the GCP PubSub Config
LabelGcpPubSubConfig = "pubSubConfig"
)

// StartEventSource starts the GCP PubSub Gateway
func (ese *GcpPubSubEventSourceExecutor) StartEventSource(eventSource *gateways.EventSource, eventStream gateways.Eventing_StartEventSourceServer) error {
defer gateways.Recover(eventSource.Name)
Expand Down
41 changes: 21 additions & 20 deletions gateways/community/github/start.go
Expand Up @@ -37,15 +37,15 @@ import (
)

const (
LabelGithubConfig = "config"
LabelGithubClient = "client"
LabelWebhook = "hook"
labelGithubConfig = "config"
labelGithubClient = "client"
labelWebhook = "hook"
)

const (
GithubSignatureHeader = "x-hub-signature"
GithubEventHeader = "x-github-event"
GithubDeliveryHeader = "x-github-delivery"
githubSignatureHeader = "x-hub-signature"
githubEventHeader = "x-github-event"
githubDeliveryHeader = "x-github-delivery"
)

var (
Expand All @@ -68,7 +68,7 @@ func (ese *GithubEventSourceExecutor) getCredentials(gs *corev1.SecretKeySelecto
}

func (ese *GithubEventSourceExecutor) PostActivate(rc *gwcommon.RouteConfig) error {
gc := rc.Configs[LabelGithubConfig].(*githubConfig)
gc := rc.Configs[labelGithubConfig].(*githubConfig)

c, err := ese.getCredentials(gc.APIToken)
if err != nil {
Expand Down Expand Up @@ -109,26 +109,27 @@ func (ese *GithubEventSourceExecutor) PostActivate(rc *gwcommon.RouteConfig) err
}

client := gh.NewClient(PATTransport.Client())
rc.Configs[LabelGithubClient] = client

ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
rc.Configs[labelGithubClient] = client

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
hook, _, err := client.Repositories.CreateHook(ctx, gc.Owner, gc.Repository, hookSetup)
if err != nil {
return fmt.Errorf("failed to create webhook. err: %+v", err)
}
rc.Configs[LabelWebhook] = hook
rc.Configs[labelWebhook] = hook

ese.Log.Info().Str("event-source-name", rc.EventSource.Name).Interface("hook-id", *hook.ID).Msg("github hook created")
return nil
}

func PostStop(rc *gwcommon.RouteConfig) error {
gc := rc.Configs[LabelGithubConfig].(*githubConfig)
client := rc.Configs[LabelGithubClient].(*gh.Client)
hook := rc.Configs[LabelWebhook].(*gh.Hook)
gc := rc.Configs[labelGithubConfig].(*githubConfig)
client := rc.Configs[labelGithubClient].(*gh.Client)
hook := rc.Configs[labelWebhook].(*gh.Hook)

ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if _, err := client.Repositories.DeleteHook(ctx, gc.Owner, gc.Repository, *hook.ID); err != nil {
rc.Log.Error().Err(err).Str("event-source-name", rc.EventSource.Name).Msg("failed to delete github hook")
return err
Expand All @@ -151,7 +152,7 @@ func (ese *GithubEventSourceExecutor) StartEventSource(eventSource *gateways.Eve
return gwcommon.ProcessRoute(&gwcommon.RouteConfig{
Webhook: gc.Hook,
Configs: map[string]interface{}{
LabelGithubConfig: gc,
labelGithubConfig: gc,
},
Log: ese.Log,
EventSource: eventSource,
Expand Down Expand Up @@ -183,16 +184,16 @@ func verifySignature(secret []byte, signature string, body []byte) bool {
}

func validatePayload(secret []byte, headers http.Header, body []byte) error {
signature := headers.Get(GithubSignatureHeader)
signature := headers.Get(githubSignatureHeader)
if len(signature) == 0 {
return errors.New("no x-hub-signature header found")
}

if event := headers.Get(GithubEventHeader); len(event) == 0 {
if event := headers.Get(githubEventHeader); len(event) == 0 {
return errors.New("no x-github-event header found")
}

if id := headers.Get(GithubDeliveryHeader); len(id) == 0 {
if id := headers.Get(githubDeliveryHeader); len(id) == 0 {
return errors.New("no x-github-delivery header found")
}

Expand Down Expand Up @@ -225,7 +226,7 @@ func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *g
return
}

hook := rc.Configs[LabelWebhook].(*gh.Hook)
hook := rc.Configs[labelWebhook].(*gh.Hook)
if secret, ok := hook.Config["secret"]; ok {
if err := validatePayload([]byte(secret.(string)), request.Header, body); err != nil {
logger.Error().Err(err).Msg("request is not valid event notification")
Expand Down
20 changes: 10 additions & 10 deletions gateways/community/gitlab/start.go
Expand Up @@ -30,9 +30,9 @@ import (
)

const (
LabelGitlabConfig = "config"
LabelGitlabClient = "client"
LabelWebhook = "hook"
labelGitlabConfig = "config"
labelGitlabClient = "client"
labelWebhook = "hook"
)

var (
Expand All @@ -55,7 +55,7 @@ func (ese *GitlabEventSourceExecutor) getCredentials(gs *GitlabSecret) (*cred, e
}

func (ese *GitlabEventSourceExecutor) PostActivate(rc *gwcommon.RouteConfig) error {
gl := rc.Configs[LabelGitlabConfig].(*glab)
gl := rc.Configs[labelGitlabConfig].(*glab)

c, err := ese.getCredentials(gl.AccessToken)
if err != nil {
Expand All @@ -67,7 +67,7 @@ func (ese *GitlabEventSourceExecutor) PostActivate(rc *gwcommon.RouteConfig) err
return fmt.Errorf("failed to set gitlab base url, err: %+v", err)
}

rc.Configs[LabelGitlabClient] = client
rc.Configs[labelGitlabClient] = client

formattedUrl := gwcommon.GenerateFormattedURL(gl.Hook)

Expand All @@ -90,16 +90,16 @@ func (ese *GitlabEventSourceExecutor) PostActivate(rc *gwcommon.RouteConfig) err
if err != nil {
return fmt.Errorf("failed to add project hook. err: %+v", err)
}
rc.Configs[LabelWebhook] = hook
rc.Configs[labelWebhook] = hook

rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Interface("hook-id", hook.ID).Msg("gitlab hook created")
return nil
}

func PostStop(rc *gwcommon.RouteConfig) error {
gl := rc.Configs[LabelGitlabConfig].(*glab)
client := rc.Configs[LabelGitlabClient].(*gitlab.Client)
hook := rc.Configs[LabelWebhook].(*gitlab.ProjectHook)
gl := rc.Configs[labelGitlabConfig].(*glab)
client := rc.Configs[labelGitlabClient].(*gitlab.Client)
hook := rc.Configs[labelWebhook].(*gitlab.ProjectHook)

if _, err := client.Projects.DeleteProjectHook(gl.ProjectId, hook.ID); err != nil {
rc.Log.Error().Err(err).Str("event-source-name", rc.EventSource.Name).Interface("hook-id", hook.ID).Msg("failed to delete gitlab hook")
Expand Down Expand Up @@ -153,7 +153,7 @@ func (ese *GitlabEventSourceExecutor) StartEventSource(eventSource *gateways.Eve
return gwcommon.ProcessRoute(&gwcommon.RouteConfig{
Webhook: gl.Hook,
Configs: map[string]interface{}{
LabelGitlabConfig: gl,
labelGitlabConfig: gl,
},
Log: ese.Log,
EventSource: eventSource,
Expand Down

0 comments on commit 9365e0d

Please sign in to comment.