diff --git a/common/common.go b/common/common.go index 12c304b0c9..fd6ba475ad 100644 --- a/common/common.go +++ b/common/common.go @@ -87,6 +87,8 @@ const ( EnvVarGatewayServerPort = "GATEWAY_SERVER_PORT" // ProcessorPort is the default port for the gateway event processor server to run on. GatewayProcessorPort = "9300" + //LabelGatewayName is the label for gateway name + LabelGatewayName = "gateway-name" ) const ( @@ -108,8 +110,6 @@ const ( LabelOwnerName = "owner-name" // LabelObjectName is the label for object name LabelObjectName = "object-name" - //LabelGatewayName is the label for gateway name - LabelGatewayName = "gateway-name" ) // various supported media types diff --git a/controllers/sensor/resource.go b/controllers/sensor/resource.go index cc68763c7c..2496995634 100644 --- a/controllers/sensor/resource.go +++ b/controllers/sensor/resource.go @@ -54,7 +54,8 @@ func (ctx *sensorContext) generateServiceSpec() *corev1.Service { }, Type: corev1.ServiceTypeClusterIP, Selector: map[string]string{ - common.LabelOwnerName: ctx.sensor.Name, + common.LabelSensorName: ctx.sensor.Name, + common.LabelOwnerName: ctx.sensor.Name, }, }, } @@ -88,6 +89,7 @@ func (ctx *sensorContext) makeDeploymentSpec() (*appv1.DeploymentSpec, error) { replicas := int32(1) labels := map[string]string{ + common.LabelSensorName: ctx.sensor.Name, common.LabelObjectName: ctx.sensor.Name, } sensorContainer := corev1.Container{ diff --git a/gateways/client/cloud-events.go b/gateways/client/cloud-events.go index 5e7cf50b3c..07da867c66 100644 --- a/gateways/client/cloud-events.go +++ b/gateways/client/cloud-events.go @@ -17,15 +17,18 @@ limitations under the License. package main import ( - "context" + "bytes" + "encoding/json" "fmt" + "net/http" "time" "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" cloudevents "github.com/cloudevents/sdk-go" - cloudeventsnats "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/nats" "github.com/google/uuid" + "github.com/nats-io/go-nats" + "github.com/sirupsen/logrus" ) // updateSubscriberClients updates the active clients for event subscribers @@ -34,51 +37,21 @@ func (gatewayContext *GatewayContext) updateSubscriberClients() { return } - if gatewayContext.httpSubscribers == nil { - gatewayContext.httpSubscribers = make(map[string]cloudevents.Client) - } if gatewayContext.natsSubscribers == nil { - gatewayContext.natsSubscribers = make(map[string]cloudevents.Client) - } - - // http subscribers - for _, subscriber := range gatewayContext.gateway.Spec.Subscribers.HTTP { - if _, ok := gatewayContext.httpSubscribers[subscriber]; !ok { - t, err := cloudevents.NewHTTPTransport( - cloudevents.WithTarget(subscriber), - cloudevents.WithEncoding(cloudevents.HTTPBinaryV03), - ) - if err != nil { - gatewayContext.logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to create a transport") - continue - } - - client, err := cloudevents.NewClient(t) - if err != nil { - gatewayContext.logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to create a client") - continue - } - gatewayContext.logger.WithField("subscriber", subscriber).Infoln("added a client for the subscriber") - gatewayContext.httpSubscribers[subscriber] = client - } + gatewayContext.natsSubscribers = make(map[string]*nats.Conn) } // nats subscribers for _, subscriber := range gatewayContext.gateway.Spec.Subscribers.NATS { if _, ok := gatewayContext.natsSubscribers[subscriber.Name]; !ok { - t, err := cloudeventsnats.New(subscriber.ServerURL, subscriber.Subject) + conn, err := nats.Connect(subscriber.ServerURL) if err != nil { - gatewayContext.logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to create a transport") + gatewayContext.logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to connect to subscriber") continue } - client, err := cloudevents.NewClient(t) - if err != nil { - gatewayContext.logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to create a client") - continue - } gatewayContext.logger.WithField("subscriber", subscriber).Infoln("added a client for the subscriber") - gatewayContext.natsSubscribers[subscriber.Name] = client + gatewayContext.natsSubscribers[subscriber.Name] = conn } } } @@ -100,37 +73,53 @@ func (gatewayContext *GatewayContext) dispatchEvent(gatewayEvent *gateways.Event completeSuccess := true + eventBody, err := json.Marshal(cloudEvent) + if err != nil { + logger.WithError(err).Errorln("failed to marshal the event") + return err + } + // http subscribers for _, subscriber := range gatewayContext.gateway.Spec.Subscribers.HTTP { - client, ok := gatewayContext.httpSubscribers[subscriber] - if !ok { - gatewayContext.logger.WithField("subscriber", subscriber).Warnln("unable to send event. no client found for the subscriber") + request, err := http.NewRequest(http.MethodPost, subscriber, bytes.NewReader(eventBody)) + if err != nil { + logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to construct http request for the event") completeSuccess = false continue } - _, _, err := client.Send(context.Background(), *cloudEvent) + response, err := gatewayContext.httpClient.Do(request) if err != nil { - logger.WithError(err).WithField("target", subscriber).Warnln("failed to send the event") + logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to send http request for the event") completeSuccess = false continue } + + logger.WithFields(logrus.Fields{ + "status": response.Status, + "subscriber": subscriber, + }).Infoln("successfully sent event to the subscriber") } // NATS subscribers for _, subscriber := range gatewayContext.gateway.Spec.Subscribers.NATS { - client, ok := gatewayContext.natsSubscribers[subscriber.Name] + conn, ok := gatewayContext.natsSubscribers[subscriber.Name] if !ok { gatewayContext.logger.WithField("subscriber", subscriber).Warnln("unable to send event. no client found for the subscriber") completeSuccess = false continue } - if _, _, err := client.Send(context.Background(), *cloudEvent); err != nil { - logger.WithError(err).WithField("target", subscriber).Warnln("failed to send the event") + if err := conn.Publish(subscriber.Subject, eventBody); err != nil { + logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to publish the event") completeSuccess = false continue } + + logger.WithFields(logrus.Fields{ + "subscriber": subscriber.Name, + "subject": subscriber.Subject, + }).Infoln("successfully published event on the subject") } response := "dispatched event" diff --git a/gateways/client/context.go b/gateways/client/context.go index 6da7bd2947..994c5cfc85 100644 --- a/gateways/client/context.go +++ b/gateways/client/context.go @@ -18,6 +18,7 @@ package main import ( "context" + "net/http" "os" "github.com/argoproj/argo-events/common" @@ -25,7 +26,7 @@ import ( "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1" eventsourceClientset "github.com/argoproj/argo-events/pkg/client/eventsources/clientset/versioned" gwclientset "github.com/argoproj/argo-events/pkg/client/gateway/clientset/versioned" - cloudevents "github.com/cloudevents/sdk-go" + "github.com/nats-io/go-nats" "github.com/sirupsen/logrus" "google.golang.org/grpc" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -60,10 +61,10 @@ type GatewayContext struct { controllerInstanceID string // statusCh is used to communicate the status of an event source statusCh chan notification - // httpSubscribers holds the active clients for HTTP subscribers - httpSubscribers map[string]cloudevents.Client + // http client to send cloud events to subscribers + httpClient *http.Client // natsSubscribers holds the active clients for NATS subscribers - natsSubscribers map[string]cloudevents.Client + natsSubscribers map[string]*nats.Conn } // EventSourceContext contains information of a event source for gateway to run. @@ -130,8 +131,8 @@ func NewGatewayContext() *GatewayContext { controllerInstanceID: controllerInstanceID, serverPort: serverPort, statusCh: make(chan notification), - httpSubscribers: make(map[string]cloudevents.Client), - natsSubscribers: make(map[string]cloudevents.Client), + httpClient: &http.Client{}, + natsSubscribers: make(map[string]*nats.Conn), } return gatewayConfig diff --git a/go.mod b/go.mod index 5e5c734bba..5643c097c8 100644 --- a/go.mod +++ b/go.mod @@ -64,7 +64,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.1 // indirect github.com/nats-io/gnatsd v1.4.1 // indirect github.com/nats-io/go-nats v1.7.2 - github.com/nats-io/nats.go v1.9.2 // indirect + github.com/nats-io/nats.go v1.10.0 // indirect github.com/nicksnyder/go-i18n v1.10.1-0.20190510212457-b280125b035a // indirect github.com/nlopes/slack v0.6.1-0.20200219171353-c05e07b0a5de github.com/nsqio/go-nsq v1.0.8 diff --git a/go.sum b/go.sum index 25c951c64b..6486a9fe21 100644 --- a/go.sum +++ b/go.sum @@ -87,6 +87,7 @@ github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbt github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/GoogleCloudPlatform/k8s-cloud-provider v0.0.0-20190822182118-27a4ced34534/go.mod h1:iroGtC8B3tQiqtds1l+mgk/BBOrxbqjH+eUfFQYRc14= github.com/JeffAshton/win_pdh v0.0.0-20161109143554-76bb4ee9f0ab/go.mod h1:3VYc5hodBMJ5+l/7J4xAyMeuM2PNuepvHlGs8yilUCA= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible h1:1G1pk05UrOh0NlF1oeaaix1x8XzrfjIDK47TY0Zehcw= @@ -141,10 +142,12 @@ github.com/argoproj/pkg v0.0.0-20200319004004-f46beff7cd54 h1:hDn02iEkh5EUl4TJfO github.com/argoproj/pkg v0.0.0-20200319004004-f46beff7cd54/go.mod h1:2EZ44RG/CcgtPTwrRR0apOc7oU6UIw8GjCUJWZ8X3bM= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= +github.com/aslakhellesoy/gox v1.0.100/go.mod h1:AJl542QsKKG96COVsv0N74HHzVQgDIQPceVUh1aeU2M= github.com/auth0/go-jwt-middleware v0.0.0-20170425171159-5493cabe49f7/go.mod h1:LWMyo4iOLWXHGdBki7NIht1kHru/0wM179h+d3g8ATM= github.com/aws/aws-sdk-go v1.16.26/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.30.7 h1:IaXfqtioP6p9SFAnNfsqdNczbR5UNbYqvcZUSsCAdTY= @@ -174,9 +177,18 @@ github.com/cheekybits/genny v0.0.0-20170328200008-9127e812e1e9/go.mod h1:+tQajlR github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= +github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= +github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudevents/sdk-go v1.1.2 h1:mg/7d+BzubBPrPpH1bdeF85BQZYV85j7Ljqat3+m+qE= github.com/cloudevents/sdk-go v1.1.2/go.mod h1:ss+jWJ88wypiewnPEzChSBzTYXGpdcILoN9YHk8uhTQ= +github.com/cloudevents/sdk-go v2.0.0-RC2+incompatible h1:kvbGsd1C2g/Q32dbi47oVYS0f5ELkgzGc3hAuc0ntoo= +github.com/cloudevents/sdk-go v2.0.0+incompatible h1:iKHDTCTPmNB+VRVxfBZS6wc0TKjn9ZNRrSCUA38heb8= +github.com/cloudevents/sdk-go v2.0.0+incompatible/go.mod h1:xV7GfuhjnJoK6+2MgCk3kfkoO4YRIuARdY3UpSwGz+U= +github.com/cloudevents/sdk-go/v2 v2.0.0-RC2 h1:XXqj/WXjOWhxUR8/+Ovn5YtSuIE83uOD6Gy3vUnBdUQ= +github.com/cloudevents/sdk-go/v2 v2.0.0-RC2/go.mod h1:f6d2RzSysHwhr4EsysDapUIWyJOFKqIhDisATXEa6Wk= +github.com/cloudevents/sdk-go/v2 v2.0.0 h1:AUdGJwaSUnA+VvepKqgjy6XDkPcf0hf/3L7icEs1ibs= +github.com/cloudevents/sdk-go/v2 v2.0.0/go.mod h1:3CTrpB4+u7Iaj6fd7E2Xvm5IxMdRoaAhqaRVnOr2rCU= github.com/cloudfoundry/jibber_jabber v0.0.0-20151120183258-bcc4c8345a21 h1:tuijfIjZyjZaHq9xDUh0tNitwXshJpbLkqMOJv4H3do= github.com/cloudfoundry/jibber_jabber v0.0.0-20151120183258-bcc4c8345a21/go.mod h1:po7NpZ/QiTKzBKyrsEAxwnTamCoh8uDk/egRpQ7siIc= github.com/clusterhq/flocker-go v0.0.0-20160920122132-2b8b7259d313/go.mod h1:P1wt9Z3DP8O6W3rvwCt0REIlshg1InHImaLW0t3ObY0= @@ -204,6 +216,11 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfc github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/cucumber/gherkin-go/v11 v11.0.0/go.mod h1:CX33k2XU2qog4e+TFjOValoq6mIUq0DmVccZs238R9w= +github.com/cucumber/godog v0.9.0/go.mod h1:roWCHkpeK6UTOyIRRl7IR+fgfBeZ4vZR7OSq2J/NbM4= +github.com/cucumber/messages-go/v10 v10.0.1/go.mod h1:kA5T38CBlBbYLU12TIrJ4fk4wSkVVOgyh7Enyy8WnSg= +github.com/cucumber/messages-go/v10 v10.0.3/go.mod h1:9jMZ2Y8ZxjLY6TG2+x344nt5rXstVVDYSdS5ySfI1WY= github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -369,9 +386,11 @@ github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJA github.com/gobwas/glob v0.2.4-0.20181002190808-e7a84e9525fe h1:zn8tqiUbec4wR94o7Qj3LZCAT6uGobhEgnDRg6isG5U= github.com/gobwas/glob v0.2.4-0.20181002190808-e7a84e9525fe/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/godbus/dbus v0.0.0-20181101234600-2ff6f7ffd60f/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw= +github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d h1:3PaI8p3seN09VjbTYC/QWlUZdZ1qS1zGjy7LH2Wt07I= github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= @@ -480,23 +499,33 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.5 h1:UImYN5qQ8tuGpGE16ZmjvcTtTw24zw1QAp/SlnNrZhI= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= +github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= +github.com/hashicorp/go-hclog v0.9.1/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= +github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= +github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-retryablehttp v0.6.4/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY= github.com/hashicorp/go-retryablehttp v0.6.5 h1:wzmq39Tu6c45XuV1dIGZEfWkoR+IofdEXWDranjjwmc= github.com/hashicorp/go-retryablehttp v0.6.5/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY= github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= +github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-version v1.0.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.0.0-20180201235237-0fb14efe8c47/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce/go.mod h1:oZtUIOe8dh44I2q6ScRibXws4Ajl+d+nod3AaR9vL5w= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= +github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= github.com/heketi/heketi v9.0.1-0.20190917153846-c2e2a4ab7ab9+incompatible/go.mod h1:bB9ly3RchcQqsQ9CpyaQwvva7RS5ytVoSoholZQON6o= github.com/heketi/tests v0.0.0-20151005000721-f3775cbcefd6/go.mod h1:xGMAM8JLi7UkZt1i4FQeQy0R2T8GLUwQhOP5M1gBhy4= github.com/hokaccha/go-prettyjson v0.0.0-20190818114111-108c894c2c0e h1:0aewS5NTyxftZHSnFaJmWE5oCCrj4DyEXkAiMa1iZJM= @@ -576,7 +605,10 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= +github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/libopenstorage/openstorage v1.0.0/go.mod h1:Sp1sIObHjat1BeXhfMqLZ14wnOzEhNx2YQedreMcUyc= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= @@ -633,6 +665,7 @@ github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk github.com/mitchellh/go-ps v0.0.0-20170309133038-4fdf99ab2936/go.mod h1:r1VsdOzOPt1ZSrGZWFoNhsAedKnEd6r9Np1+5blZCWk= github.com/mitchellh/go-wordwrap v1.0.0 h1:6GlHJ/LTGMrIJbwgdqdl2eEH8o+Exx/0m8ir9Gns0u4= github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= +github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= github.com/mitchellh/mapstructure v0.0.0-20180220230111-00c29f56e238/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.3.0 h1:iDwIio/3gk2QtLLEsqU5lInaMzos0hDTz8a6lazSFVw= @@ -664,21 +697,30 @@ github.com/nats-io/go-nats v1.7.2/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1 github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= +github.com/nats-io/nats-server v1.4.1 h1:Ul1oSOGNV/L8kjr4v6l2f9Yet6WY+LevH1/7cRZ/qyA= +github.com/nats-io/nats-server v1.4.1/go.mod h1:c8f/fHd2B6Hgms3LtCaI7y6pC4WD1f4SUxcCud5vhBc= github.com/nats-io/nats-server/v2 v2.1.2 h1:i2Ly0B+1+rzNZHHWtD4ZwKi+OU5l+uQo1iDHZ2PmiIc= github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= +github.com/nats-io/nats-server/v2 v2.1.4/go.mod h1:Jw1Z28soD/QasIA2uWjXyM9El1jly3YwyFOuR8tH1rg= +github.com/nats-io/nats-streaming-server v0.17.0/go.mod h1:ewPBEsmp62Znl3dcRsYtlcfwudxHEdYMtYqUQSt4fE0= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= github.com/nats-io/nats.go v1.9.2 h1:oDeERm3NcZVrPpdR/JpGdWHMv3oJ8yY30YwxKq+DU2s= github.com/nats-io/nats.go v1.9.2/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= +github.com/nats-io/nats.go v1.10.0 h1:L8qnKaofSfNFbXg0C5F71LdjPRnmQwSsA4ukmkt1TvY= +github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/nats-io/stan.go v0.6.0/go.mod h1:eIcD5bi3pqbHT/xIIvXMwvzXYElgouBvaVRftaE+eac= github.com/nbutton23/zxcvbn-go v0.0.0-20160627004424-a22cb81b2ecd/go.mod h1:o96djdrsSGy3AWPyBgZMAGfxZNfgntdJG+11KU4QvbU= github.com/nbutton23/zxcvbn-go v0.0.0-20171102151520-eafdab6b0663/go.mod h1:o96djdrsSGy3AWPyBgZMAGfxZNfgntdJG+11KU4QvbU= github.com/nicksnyder/go-i18n v1.10.1-0.20190510212457-b280125b035a h1:WsVgYECoTBctNmskVv/BZ8gh/TWP1xJf61PSW9HBdRY= github.com/nicksnyder/go-i18n v1.10.1-0.20190510212457-b280125b035a/go.mod h1:e4Di5xjP9oTVrC6y3C7C0HoSYXjSbhh/dU0eUV32nB4= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nlopes/slack v0.6.1-0.20200219171353-c05e07b0a5de h1:ri0yWD9OmI9MQfsv4qnS/WaqheJ5z7pM36M+TW7xLcs= github.com/nlopes/slack v0.6.1-0.20200219171353-c05e07b0a5de/go.mod h1:JzQ9m3PMAqcpeCam7UaHSuBuupz7CmpjehYMayT6YOk= github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk= @@ -690,6 +732,7 @@ github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.2/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.11.0 h1:JAKSXpt1YjtLA7YpPiqO9ss6sNXEsPfSGdwN0UHqzrw= github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= @@ -707,6 +750,7 @@ github.com/opencontainers/runc v1.0.0-rc9/go.mod h1:qT5XzbpPznkRYVz/mWwUaVBUv2rm github.com/opencontainers/runtime-spec v1.0.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/selinux v1.3.1-0.20190929122143-5215b1806f52/go.mod h1:+BLncwf63G4dgOzykXAxcmnFlUaOlkDdmw/CqsW6pjs= github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= +github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-buffruneio v0.2.0/go.mod h1:JkE26KsDizTr40EUHkXVtNPvgGtbSNq5BcowyYOWdKo= github.com/pelletier/go-toml v1.1.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= @@ -746,6 +790,8 @@ github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8= +github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/quasilyte/go-consistent v0.0.0-20190521200055-c6f3937de18c/go.mod h1:5STLWrekHfjyYwxBRVRXNOSewLJ3PWfDJd1VyTS21fI= github.com/quobyte/api v0.1.2/go.mod h1:jL7lIHrmqQ7yh05OJ+eEEdHr0u/kmT1Ff9iHd+4H6VI= github.com/radovskyb/watcher v1.0.7 h1:AYePLih6dpmS32vlHfhCeli8127LzkIgwJGcwwe8tUE= @@ -844,11 +890,13 @@ github.com/tidwall/sjson v1.1.1 h1:7h1vk049Jnd5EH9NyzNiEuwYW4b5qgreBbqRC19AS3U= github.com/tidwall/sjson v1.1.1/go.mod h1:yvVuSnpEQv5cYIrO+AT6kw4QVfd5SDZoGIS7/5+fZFs= github.com/timakin/bodyclose v0.0.0-20190721030226-87058b9bfcec/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ultraware/funlen v0.0.1/go.mod h1:Dp4UiAus7Wdb9KUZsYWZEWiRzGuM2kXM1lPbfaF6xhA= github.com/ultraware/funlen v0.0.2/go.mod h1:Dp4UiAus7Wdb9KUZsYWZEWiRzGuM2kXM1lPbfaF6xhA= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s= github.com/valyala/quicktemplate v1.1.1/go.mod h1:EH+4AkTd43SvgIbQHYu59/cJyxDoOVRUAfrukLPuGJ4= @@ -920,6 +968,7 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200206161412-a0c6ece9d31a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200429183012-4b2356b1ed79 h1:IaQbIIB2X/Mp/DKctl6ROxz1KyMlKp4uyvL6+kQ7C88= @@ -1037,6 +1086,7 @@ golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502175342-a43fa875dd82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1198,6 +1248,8 @@ gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= diff --git a/sensors/listener.go b/sensors/listener.go index 13e5fc7df7..a12559a484 100644 --- a/sensors/listener.go +++ b/sensors/listener.go @@ -18,13 +18,17 @@ package sensors import ( "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1" "github.com/argoproj/argo-events/sensors/dependencies" "github.com/argoproj/argo-events/sensors/types" cloudevents "github.com/cloudevents/sdk-go" - cloudeventsnats "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/nats" + "github.com/nats-io/go-nats" "github.com/pkg/errors" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,13 +47,11 @@ func (sensorCtx *SensorContext) ListenEvents() error { go sensorCtx.syncSensor(context.Background()) errCh := make(chan error) - httpCtx, httpCancel := context.WithCancel(context.Background()) - natsCtx, natsCancel := context.WithCancel(context.Background()) // listen events over http if sensorCtx.Sensor.Spec.Subscription.HTTP != nil { go func() { - if err := sensorCtx.listenEventsOverHTTP(httpCtx); err != nil { + if err := sensorCtx.listenEventsOverHTTP(); err != nil { errCh <- errors.Wrap(err, "failed to listen events over HTTP subscription") } }() @@ -58,7 +60,7 @@ func (sensorCtx *SensorContext) ListenEvents() error { // listen events over nats if sensorCtx.Sensor.Spec.Subscription.NATS != nil { go func() { - if err := sensorCtx.listenEventsOverNATS(natsCtx); err != nil { + if err := sensorCtx.listenEventsOverNATS(); err != nil { errCh <- errors.Wrap(err, "failed to listen events over NATS subscription") } }() @@ -67,38 +69,36 @@ func (sensorCtx *SensorContext) ListenEvents() error { err := <-errCh sensorCtx.Logger.WithError(err).Errorln("subscription failure. stopping sensor operations") - httpCancel() - natsCancel() - return nil } // listenEventsOverHTTP listens to events over HTTP -func (sensorCtx *SensorContext) listenEventsOverHTTP(ctx context.Context) error { +func (sensorCtx *SensorContext) listenEventsOverHTTP() error { port := sensorCtx.Sensor.Spec.Subscription.HTTP.Port if port == 0 { port = common.SensorServerPort } - t, err := cloudevents.NewHTTPTransport( - cloudevents.WithPort(port), - cloudevents.WithPath("/"), - ) - if err != nil { - return err - } - - client, err := cloudevents.NewClient(t) - if err != nil { - return err - } - sensorCtx.Logger.WithFields(logrus.Fields{ "port": port, "endpoint": "/", - }).Infoln("starting HTTP cloudevents receiver") + }).Infoln("starting HTTP events receiver") + + http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { + eventBody, err := ioutil.ReadAll(request.Body) + if err != nil { + writer.WriteHeader(http.StatusBadRequest) + writer.Write([]byte("failed to parse the event")) + return + } + if err := sensorCtx.handleEvent(eventBody); err != nil { + writer.WriteHeader(http.StatusInternalServerError) + writer.Write([]byte("failed to handle the event")) + return + } + }) - if err := client.StartReceiver(ctx, sensorCtx.handleEvent); err != nil { + if err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil); err != nil { return err } @@ -106,24 +106,27 @@ func (sensorCtx *SensorContext) listenEventsOverHTTP(ctx context.Context) error } // listenEventsOverNATS listens to events over NATS -func (sensorCtx *SensorContext) listenEventsOverNATS(ctx context.Context) error { +func (sensorCtx *SensorContext) listenEventsOverNATS() error { subscription := sensorCtx.Sensor.Spec.Subscription.NATS - t, err := cloudeventsnats.New(subscription.ServerURL, subscription.Subject) - if err != nil { - return err - } - client, err := cloudevents.NewClient(t) + conn, err := nats.Connect(subscription.ServerURL) if err != nil { return err } - sensorCtx.Logger.WithFields(logrus.Fields{ + logger := sensorCtx.Logger.WithFields(logrus.Fields{ "url": subscription.ServerURL, "subject": subscription.Subject, - }).Infoln("starting NATS cloudevents receiver") + }) + + logger.Infoln("starting NATS events subscriber") - if err := client.StartReceiver(ctx, sensorCtx.handleEvent); err != nil { + _, err = conn.Subscribe(subscription.Subject, func(msg *nats.Msg) { + if err := sensorCtx.handleEvent(msg.Data); err != nil { + logger.WithError(err).Errorln("failed to process the event") + } + }) + if err != nil { return err } @@ -150,8 +153,13 @@ func cloudEventConverter(event *cloudevents.Event) (*v1alpha1.Event, error) { } // handleEvent handles a cloudevent, validates and sends it over internal event notification queue -func (sensorCtx *SensorContext) handleEvent(ctx context.Context, event cloudevents.Event) error { - internalEvent, err := cloudEventConverter(&event) +func (sensorCtx *SensorContext) handleEvent(eventBody []byte) error { + var event *cloudevents.Event + if err := json.Unmarshal(eventBody, &event); err != nil { + return err + } + + internalEvent, err := cloudEventConverter(event) if err != nil { return errors.Wrap(err, "failed to parse the cloudevent") } diff --git a/sensors/listener_test.go b/sensors/listener_test.go index e185e532ee..6957c6945d 100644 --- a/sensors/listener_test.go +++ b/sensors/listener_test.go @@ -17,7 +17,7 @@ limitations under the License. package sensors import ( - "context" + "encoding/json" "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1" "github.com/argoproj/argo-events/sensors/types" @@ -78,7 +78,11 @@ func TestHandleEvent(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { test.updateFunc() - result := sensorCtx.handleEvent(context.Background(), event) + eventBody, err := json.Marshal(&event) + if err != nil { + assert.Fail(t, err.Error()) + } + result := sensorCtx.handleEvent(eventBody) assert.Equal(t, test.result, result) }) }