diff --git a/docs/reference/config.md b/docs/reference/config.md index 01f5542573..4425795a17 100644 --- a/docs/reference/config.md +++ b/docs/reference/config.md @@ -531,6 +531,47 @@ nav_order: 2 |default|The default event transport for new subscriptions|`string`|`` |enabled|Which event interface plugins are enabled|`boolean`|`` +## events.webhooks + +|Key|Description|Type|Default Value| +|---|-----------|----|-------------| +|connectionTimeout|The maximum amount of time that a connection is allowed to remain with no data transmitted|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s` +|expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s` +|headers|Adds custom headers to HTTP requests|`map[string]string`|`` +|idleTimeout|The max duration to hold a HTTP keepalive connection between calls|[`time.Duration`](https://pkg.go.dev/time#Duration)|`475ms` +|maxIdleConns|The max number of idle connections to hold pooled|`int`|`100` +|requestTimeout|The maximum amount of time that a request is allowed to remain open|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s` +|tlsHandshakeTimeout|The maximum amount of time to wait for a successful TLS handshake|[`time.Duration`](https://pkg.go.dev/time#Duration)|`10s` + +## events.webhooks.auth + +|Key|Description|Type|Default Value| +|---|-----------|----|-------------| +|password|Password|`string`|`` +|username|Username|`string`|`` + +## events.webhooks.proxy + +|Key|Description|Type|Default Value| +|---|-----------|----|-------------| +|url|Optional HTTP proxy server to connect through|`string`|`` + +## events.webhooks.retry + +|Key|Description|Type|Default Value| +|---|-----------|----|-------------| +|count|The maximum number of times to retry|`int`|`5` +|enabled|Enables retries|`boolean`|`false` +|initWaitTime|The initial retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms` +|maxWaitTime|The maximum retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s` + +## events.websockets + +|Key|Description|Type|Default Value| +|---|-----------|----|-------------| +|readBufferSize|WebSocket read buffer size|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb` +|writeBufferSize|WebSocket write buffer size|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb` + ## histograms |Key|Description|Type|Default Value| diff --git a/docs/reference/types/_includes/subscription_description.md b/docs/reference/types/_includes/subscription_description.md index e38fddf826..2a21ff1fdf 100644 --- a/docs/reference/types/_includes/subscription_description.md +++ b/docs/reference/types/_includes/subscription_description.md @@ -167,8 +167,12 @@ allowing you to customize your HTTP requests as follows: - Set the HTTP request details: - Method, URL, query, headers and input body -- Wait for a successful `2xx` HTTP code from the back-end service, before - acknowledging (default). +- Wait for a invocation of the back-end service, before acknowledging + - To retry requests to your Webhook on a non-`2xx` HTTP status code + or other error, then you should enable and configure + [events.webhooks.retry](../../config.html#eventswebhooksretry) + - The event is acknowledged once the request (with any retries), is + completed - regardless of whether the outcome was a success or failure. - Use `fastack` to acknowledge against FireFly immediately and make multiple parallel calls to the HTTP API in a fire-and-forget fashion. - Set the HTTP request details dynamically from `message_confirmed` events: diff --git a/internal/coremsgs/en_config_descriptions.go b/internal/coremsgs/en_config_descriptions.go index 58b3fb69f6..de083d3836 100644 --- a/internal/coremsgs/en_config_descriptions.go +++ b/internal/coremsgs/en_config_descriptions.go @@ -351,4 +351,9 @@ var ( ConfigPluginsAuth = ffc("config.plugins.auth", "Authorization plugin configuration", i18n.MapStringStringType) ConfigPluginsAuthName = ffc("config.plugins.auth[].name", "The name of the auth plugin to use", i18n.StringType) ConfigPluginsAuthType = ffc("config.plugins.auth[].type", "The type of the auth plugin to use", i18n.StringType) + + ConfigPluginsEventSystemReadAhead = ffc("config.events.system.readAhead", "", i18n.IgnoredType) + ConfigPluginsEventWebhooksURL = ffc("config.events.webhooks.url", "", i18n.IgnoredType) + ConfigPluginsEventWebSocketsReadBufferSize = ffc("config.events.websockets.readBufferSize", "WebSocket read buffer size", i18n.ByteSizeType) + ConfigPluginsEventWebSocketsWriteBufferSize = ffc("config.events.websockets.writeBufferSize", "WebSocket write buffer size", i18n.ByteSizeType) ) diff --git a/internal/events/eifactory/factory.go b/internal/events/eifactory/factory.go index 635765b472..d4a65e7ef6 100644 --- a/internal/events/eifactory/factory.go +++ b/internal/events/eifactory/factory.go @@ -19,6 +19,7 @@ package eifactory import ( "context" + "github.com/hyperledger/firefly-common/pkg/config" "github.com/hyperledger/firefly-common/pkg/i18n" "github.com/hyperledger/firefly/internal/coremsgs" "github.com/hyperledger/firefly/internal/events/system" @@ -41,6 +42,12 @@ func init() { } } +func InitConfig(config config.Section) { + for name, plugin := range pluginsByName { + plugin.InitConfig(config.SubSection(name)) + } +} + func GetPlugin(ctx context.Context, pluginType string) (events.Plugin, error) { plugin, ok := pluginsByName[pluginType] if !ok { diff --git a/internal/events/webhooks/webhooks.go b/internal/events/webhooks/webhooks.go index 0cb284af62..4e534f97e7 100644 --- a/internal/events/webhooks/webhooks.go +++ b/internal/events/webhooks/webhooks.go @@ -63,12 +63,13 @@ type whResponse struct { func (wh *WebHooks) Name() string { return "webhooks" } func (wh *WebHooks) Init(ctx context.Context, config config.Section) (err error) { + connID := fftypes.ShortID() *wh = WebHooks{ - ctx: ctx, + ctx: log.WithLogField(ctx, "webhook", wh.connID), capabilities: &events.Capabilities{}, callbacks: make(map[string]events.Callbacks), client: ffresty.New(ctx, config), - connID: fftypes.ShortID(), + connID: connID, } return nil } @@ -85,7 +86,9 @@ func (wh *WebHooks) Capabilities() *events.Capabilities { func (wh *WebHooks) buildRequest(options fftypes.JSONObject, firstData fftypes.JSONObject) (req *whRequest, err error) { req = &whRequest{ - r: wh.client.R().SetDoNotParseResponse(true), + r: wh.client.R(). + SetDoNotParseResponse(true). + SetContext(wh.ctx), url: options.GetString("url"), method: options.GetString("method"), forceJSON: options.GetBool("json"), @@ -223,8 +226,10 @@ func (wh *WebHooks) attemptRequest(sub *core.Subscription, event *core.EventDeli } } + log.L(wh.ctx).Debugf("Webhook-> %s %s event %s on subscription %s", req.method, req.url, event.ID, sub.ID) resp, err := req.r.Execute(req.method, req.url) if err != nil { + log.L(wh.ctx).Errorf("Webhook<- %s %s event %s on subscription %s failed: %s", req.method, req.url, event.ID, sub.ID, err) return nil, nil, err } defer func() { _ = resp.RawBody().Close() }() @@ -233,6 +238,7 @@ func (wh *WebHooks) attemptRequest(sub *core.Subscription, event *core.EventDeli Status: resp.StatusCode(), Headers: fftypes.JSONObject{}, } + log.L(wh.ctx).Infof("Webhook<- %s %s event %s on subscription %s returned %d", req.method, req.url, event.ID, sub.ID, res.Status) header := resp.Header() for h := range header { res.Headers[h] = header.Get(h) @@ -265,7 +271,7 @@ func (wh *WebHooks) attemptRequest(sub *core.Subscription, event *core.EventDeli return req, res, nil } -func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription, event *core.EventDelivery, data core.DataArray) error { +func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription, event *core.EventDelivery, data core.DataArray, fastAck bool) error { req, res, gwErr := wh.attemptRequest(sub, event, data) if gwErr != nil { // Generate a bad-gateway error response - we always want to send something back, @@ -292,6 +298,7 @@ func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription txType = fftypes.FFEnum(strings.ToLower(req.replyTx)) } if cb, ok := wh.callbacks[sub.Namespace]; ok { + log.L(wh.ctx).Debugf("Sending reply message for %s CID=%s", event.ID, event.Message.Header.ID) cb.DeliveryResponse(connID, &core.EventDeliveryResponse{ ID: event.ID, Rejected: false, @@ -313,6 +320,14 @@ func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription }, }) } + } else if !fastAck { + if cb, ok := wh.callbacks[sub.Namespace]; ok { + cb.DeliveryResponse(connID, &core.EventDeliveryResponse{ + ID: event.ID, + Rejected: false, + Subscription: event.Subscription, + }) + } } return nil } @@ -340,13 +355,22 @@ func (wh *WebHooks) DeliveryRequest(connID string, sub *core.Subscription, event } // In fastack mode we drive calls in parallel to the backend, immediately acknowledging the event - if sub.Options.TransportOptions().GetBool("fastack") { + // NOTE: We cannot use this with reply mode, as when we're sending a reply the `DeliveryResponse` + // callback must include the reply in-line. + if !reply && sub.Options.TransportOptions().GetBool("fastack") { + if cb, ok := wh.callbacks[sub.Namespace]; ok { + cb.DeliveryResponse(connID, &core.EventDeliveryResponse{ + ID: event.ID, + Rejected: false, + Subscription: event.Subscription, + }) + } go func() { - err := wh.doDelivery(connID, reply, sub, event, data) + err := wh.doDelivery(connID, reply, sub, event, data, true) log.L(wh.ctx).Warnf("Webhook delivery failed in fastack mode for event '%s': %s", event.ID, err) }() return nil } - return wh.doDelivery(connID, reply, sub, event, data) + return wh.doDelivery(connID, reply, sub, event, data, false) } diff --git a/internal/events/webhooks/webhooks_test.go b/internal/events/webhooks/webhooks_test.go index 2a70c43837..690beda474 100644 --- a/internal/events/webhooks/webhooks_test.go +++ b/internal/events/webhooks/webhooks_test.go @@ -360,7 +360,11 @@ func TestRequestNoBodyNoReply(t *testing.T) { dataID := fftypes.NewUUID() groupHash := fftypes.NewRandB32() - sub := &core.Subscription{} + sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Namespace: "ns1", + }, + } to := sub.Options.TransportOptions() to["url"] = fmt.Sprintf("http://%s/myapi", server.Listener.Addr()) event := &core.EventDelivery{ @@ -380,7 +384,8 @@ func TestRequestNoBodyNoReply(t *testing.T) { }, }, Subscription: core.SubscriptionRef{ - ID: sub.ID, + ID: sub.ID, + Namespace: "ns1", }, } data := &core.Data{ @@ -390,9 +395,16 @@ func TestRequestNoBodyNoReply(t *testing.T) { }`), } + mcb := wh.callbacks["ns1"].(*eventsmocks.Callbacks) + mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool { + return !response.Rejected + })).Return(nil) + err := wh.DeliveryRequest(mock.Anything, sub, event, core.DataArray{data}) assert.NoError(t, err) assert.True(t, called) + + mcb.AssertExpectations(t) } func TestRequestReplyEmptyData(t *testing.T) { @@ -638,7 +650,7 @@ func TestRequestReplyDataArrayError(t *testing.T) { mcb.AssertExpectations(t) } -func TestRequestReplyBuildRequestFailFastAsk(t *testing.T) { +func TestWebhookFailFastAsk(t *testing.T) { wh, cancel := newTestWebHooks(t) defer cancel() @@ -652,7 +664,6 @@ func TestRequestReplyBuildRequestFailFastAsk(t *testing.T) { Namespace: "ns1", }, } - sub.Options.TransportOptions()["reply"] = true sub.Options.TransportOptions()["fastack"] = true event := &core.EventDelivery{ EnrichedEvent: core.EnrichedEvent{ @@ -673,17 +684,11 @@ func TestRequestReplyBuildRequestFailFastAsk(t *testing.T) { waiter := make(chan struct{}) mcb := wh.callbacks["ns1"].(*eventsmocks.Callbacks) - dr := mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool { - assert.Equal(t, *msgID, *response.Reply.Message.Header.CID) - assert.Nil(t, response.Reply.Message.Header.Group) - assert.Equal(t, core.MessageTypeBroadcast, response.Reply.Message.Header.Type) - assert.Equal(t, float64(502), response.Reply.InlineData[0].Value.JSONObject()["status"]) - assert.Regexp(t, "FF10242", response.Reply.InlineData[0].Value.JSONObject().GetObject("body")["error"]) - return true - })).Return(nil) - dr.RunFn = func(a mock.Arguments) { - close(waiter) - } + mcb.On("DeliveryResponse", mock.Anything, mock.Anything). + Return(nil). + Run(func(a mock.Arguments) { + close(waiter) + }) err := wh.DeliveryRequest(mock.Anything, sub, event, core.DataArray{ {ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtr(`"value1"`)}, diff --git a/internal/namespace/manager.go b/internal/namespace/manager.go index a2fb61c739..c1a5b34507 100644 --- a/internal/namespace/manager.go +++ b/internal/namespace/manager.go @@ -190,6 +190,9 @@ func NewNamespaceManager(withDefaults bool) Manager { tifactory.InitConfig(tokensConfig) authfactory.InitConfigArray(authConfig) + // Events still live at the root of the config + eifactory.InitConfig(config.RootSection("events")) + return nm }