Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,47 @@ nav_order: 2
|default|The default event transport for new subscriptions|`string`|`<nil>`
|enabled|Which event interface plugins are enabled|`boolean`|`<nil>`

## events.webhooks

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|connectionTimeout|The maximum amount of time that a connection is allowed to remain with no data transmitted|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
|headers|Adds custom headers to HTTP requests|`map[string]string`|`<nil>`
|idleTimeout|The max duration to hold a HTTP keepalive connection between calls|[`time.Duration`](https://pkg.go.dev/time#Duration)|`475ms`
|maxIdleConns|The max number of idle connections to hold pooled|`int`|`100`
|requestTimeout|The maximum amount of time that a request is allowed to remain open|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|tlsHandshakeTimeout|The maximum amount of time to wait for a successful TLS handshake|[`time.Duration`](https://pkg.go.dev/time#Duration)|`10s`

## events.webhooks.auth

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|password|Password|`string`|`<nil>`
|username|Username|`string`|`<nil>`

## events.webhooks.proxy

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|url|Optional HTTP proxy server to connect through|`string`|`<nil>`

## events.webhooks.retry

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|count|The maximum number of times to retry|`int`|`5`
|enabled|Enables retries|`boolean`|`false`
|initWaitTime|The initial retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
|maxWaitTime|The maximum retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`

## events.websockets

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|readBufferSize|WebSocket read buffer size|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb`
|writeBufferSize|WebSocket write buffer size|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb`

## histograms

|Key|Description|Type|Default Value|
Expand Down
8 changes: 6 additions & 2 deletions docs/reference/types/_includes/subscription_description.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,12 @@ allowing you to customize your HTTP requests as follows:

- Set the HTTP request details:
- Method, URL, query, headers and input body
- Wait for a successful `2xx` HTTP code from the back-end service, before
acknowledging (default).
- Wait for a invocation of the back-end service, before acknowledging
- To retry requests to your Webhook on a non-`2xx` HTTP status code
or other error, then you should enable and configure
[events.webhooks.retry](../../config.html#eventswebhooksretry)
- The event is acknowledged once the request (with any retries), is
completed - regardless of whether the outcome was a success or failure.
- Use `fastack` to acknowledge against FireFly immediately and make multiple
parallel calls to the HTTP API in a fire-and-forget fashion.
- Set the HTTP request details dynamically from `message_confirmed` events:
Expand Down
5 changes: 5 additions & 0 deletions internal/coremsgs/en_config_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,4 +351,9 @@ var (
ConfigPluginsAuth = ffc("config.plugins.auth", "Authorization plugin configuration", i18n.MapStringStringType)
ConfigPluginsAuthName = ffc("config.plugins.auth[].name", "The name of the auth plugin to use", i18n.StringType)
ConfigPluginsAuthType = ffc("config.plugins.auth[].type", "The type of the auth plugin to use", i18n.StringType)

ConfigPluginsEventSystemReadAhead = ffc("config.events.system.readAhead", "", i18n.IgnoredType)
ConfigPluginsEventWebhooksURL = ffc("config.events.webhooks.url", "", i18n.IgnoredType)
ConfigPluginsEventWebSocketsReadBufferSize = ffc("config.events.websockets.readBufferSize", "WebSocket read buffer size", i18n.ByteSizeType)
ConfigPluginsEventWebSocketsWriteBufferSize = ffc("config.events.websockets.writeBufferSize", "WebSocket write buffer size", i18n.ByteSizeType)
)
7 changes: 7 additions & 0 deletions internal/events/eifactory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package eifactory
import (
"context"

"github.com/hyperledger/firefly-common/pkg/config"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly/internal/coremsgs"
"github.com/hyperledger/firefly/internal/events/system"
Expand All @@ -41,6 +42,12 @@ func init() {
}
}

func InitConfig(config config.Section) {
for name, plugin := range pluginsByName {
plugin.InitConfig(config.SubSection(name))
}
}

func GetPlugin(ctx context.Context, pluginType string) (events.Plugin, error) {
plugin, ok := pluginsByName[pluginType]
if !ok {
Expand Down
38 changes: 31 additions & 7 deletions internal/events/webhooks/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ type whResponse struct {
func (wh *WebHooks) Name() string { return "webhooks" }

func (wh *WebHooks) Init(ctx context.Context, config config.Section) (err error) {
connID := fftypes.ShortID()
*wh = WebHooks{
ctx: ctx,
ctx: log.WithLogField(ctx, "webhook", wh.connID),
capabilities: &events.Capabilities{},
callbacks: make(map[string]events.Callbacks),
client: ffresty.New(ctx, config),
connID: fftypes.ShortID(),
connID: connID,
}
return nil
}
Expand All @@ -85,7 +86,9 @@ func (wh *WebHooks) Capabilities() *events.Capabilities {

func (wh *WebHooks) buildRequest(options fftypes.JSONObject, firstData fftypes.JSONObject) (req *whRequest, err error) {
req = &whRequest{
r: wh.client.R().SetDoNotParseResponse(true),
r: wh.client.R().
SetDoNotParseResponse(true).
SetContext(wh.ctx),
url: options.GetString("url"),
method: options.GetString("method"),
forceJSON: options.GetBool("json"),
Expand Down Expand Up @@ -223,8 +226,10 @@ func (wh *WebHooks) attemptRequest(sub *core.Subscription, event *core.EventDeli
}
}

log.L(wh.ctx).Debugf("Webhook-> %s %s event %s on subscription %s", req.method, req.url, event.ID, sub.ID)
resp, err := req.r.Execute(req.method, req.url)
if err != nil {
log.L(wh.ctx).Errorf("Webhook<- %s %s event %s on subscription %s failed: %s", req.method, req.url, event.ID, sub.ID, err)
return nil, nil, err
}
defer func() { _ = resp.RawBody().Close() }()
Expand All @@ -233,6 +238,7 @@ func (wh *WebHooks) attemptRequest(sub *core.Subscription, event *core.EventDeli
Status: resp.StatusCode(),
Headers: fftypes.JSONObject{},
}
log.L(wh.ctx).Infof("Webhook<- %s %s event %s on subscription %s returned %d", req.method, req.url, event.ID, sub.ID, res.Status)
header := resp.Header()
for h := range header {
res.Headers[h] = header.Get(h)
Expand Down Expand Up @@ -265,7 +271,7 @@ func (wh *WebHooks) attemptRequest(sub *core.Subscription, event *core.EventDeli
return req, res, nil
}

func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription, event *core.EventDelivery, data core.DataArray) error {
func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription, event *core.EventDelivery, data core.DataArray, fastAck bool) error {
req, res, gwErr := wh.attemptRequest(sub, event, data)
if gwErr != nil {
// Generate a bad-gateway error response - we always want to send something back,
Expand All @@ -292,6 +298,7 @@ func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription
txType = fftypes.FFEnum(strings.ToLower(req.replyTx))
}
if cb, ok := wh.callbacks[sub.Namespace]; ok {
log.L(wh.ctx).Debugf("Sending reply message for %s CID=%s", event.ID, event.Message.Header.ID)
cb.DeliveryResponse(connID, &core.EventDeliveryResponse{
ID: event.ID,
Rejected: false,
Expand All @@ -313,6 +320,14 @@ func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription
},
})
}
} else if !fastAck {
if cb, ok := wh.callbacks[sub.Namespace]; ok {
cb.DeliveryResponse(connID, &core.EventDeliveryResponse{
ID: event.ID,
Rejected: false,
Subscription: event.Subscription,
})
}
}
return nil
}
Expand Down Expand Up @@ -340,13 +355,22 @@ func (wh *WebHooks) DeliveryRequest(connID string, sub *core.Subscription, event
}

// In fastack mode we drive calls in parallel to the backend, immediately acknowledging the event
if sub.Options.TransportOptions().GetBool("fastack") {
// NOTE: We cannot use this with reply mode, as when we're sending a reply the `DeliveryResponse`
// callback must include the reply in-line.
if !reply && sub.Options.TransportOptions().GetBool("fastack") {
if cb, ok := wh.callbacks[sub.Namespace]; ok {
cb.DeliveryResponse(connID, &core.EventDeliveryResponse{
ID: event.ID,
Rejected: false,
Subscription: event.Subscription,
})
}
go func() {
err := wh.doDelivery(connID, reply, sub, event, data)
err := wh.doDelivery(connID, reply, sub, event, data, true)
log.L(wh.ctx).Warnf("Webhook delivery failed in fastack mode for event '%s': %s", event.ID, err)
}()
return nil
}

return wh.doDelivery(connID, reply, sub, event, data)
return wh.doDelivery(connID, reply, sub, event, data, false)
}
35 changes: 20 additions & 15 deletions internal/events/webhooks/webhooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,11 @@ func TestRequestNoBodyNoReply(t *testing.T) {

dataID := fftypes.NewUUID()
groupHash := fftypes.NewRandB32()
sub := &core.Subscription{}
sub := &core.Subscription{
SubscriptionRef: core.SubscriptionRef{
Namespace: "ns1",
},
}
to := sub.Options.TransportOptions()
to["url"] = fmt.Sprintf("http://%s/myapi", server.Listener.Addr())
event := &core.EventDelivery{
Expand All @@ -380,7 +384,8 @@ func TestRequestNoBodyNoReply(t *testing.T) {
},
},
Subscription: core.SubscriptionRef{
ID: sub.ID,
ID: sub.ID,
Namespace: "ns1",
},
}
data := &core.Data{
Expand All @@ -390,9 +395,16 @@ func TestRequestNoBodyNoReply(t *testing.T) {
}`),
}

mcb := wh.callbacks["ns1"].(*eventsmocks.Callbacks)
mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool {
return !response.Rejected
})).Return(nil)

err := wh.DeliveryRequest(mock.Anything, sub, event, core.DataArray{data})
assert.NoError(t, err)
assert.True(t, called)

mcb.AssertExpectations(t)
}

func TestRequestReplyEmptyData(t *testing.T) {
Expand Down Expand Up @@ -638,7 +650,7 @@ func TestRequestReplyDataArrayError(t *testing.T) {
mcb.AssertExpectations(t)
}

func TestRequestReplyBuildRequestFailFastAsk(t *testing.T) {
func TestWebhookFailFastAsk(t *testing.T) {
wh, cancel := newTestWebHooks(t)
defer cancel()

Expand All @@ -652,7 +664,6 @@ func TestRequestReplyBuildRequestFailFastAsk(t *testing.T) {
Namespace: "ns1",
},
}
sub.Options.TransportOptions()["reply"] = true
sub.Options.TransportOptions()["fastack"] = true
event := &core.EventDelivery{
EnrichedEvent: core.EnrichedEvent{
Expand All @@ -673,17 +684,11 @@ func TestRequestReplyBuildRequestFailFastAsk(t *testing.T) {

waiter := make(chan struct{})
mcb := wh.callbacks["ns1"].(*eventsmocks.Callbacks)
dr := mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool {
assert.Equal(t, *msgID, *response.Reply.Message.Header.CID)
assert.Nil(t, response.Reply.Message.Header.Group)
assert.Equal(t, core.MessageTypeBroadcast, response.Reply.Message.Header.Type)
assert.Equal(t, float64(502), response.Reply.InlineData[0].Value.JSONObject()["status"])
assert.Regexp(t, "FF10242", response.Reply.InlineData[0].Value.JSONObject().GetObject("body")["error"])
return true
})).Return(nil)
dr.RunFn = func(a mock.Arguments) {
close(waiter)
}
mcb.On("DeliveryResponse", mock.Anything, mock.Anything).
Return(nil).
Run(func(a mock.Arguments) {
close(waiter)
})

err := wh.DeliveryRequest(mock.Anything, sub, event, core.DataArray{
{ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtr(`"value1"`)},
Expand Down
3 changes: 3 additions & 0 deletions internal/namespace/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ func NewNamespaceManager(withDefaults bool) Manager {
tifactory.InitConfig(tokensConfig)
authfactory.InitConfigArray(authConfig)

// Events still live at the root of the config
eifactory.InitConfig(config.RootSection("events"))

return nm
}

Expand Down