Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,13 @@ nav_order: 2
|size|The maximum number of messages that can be packed into a batch|`int`|`<nil>`
|timeout|The timeout to wait for a batch to fill, before sending|[`time.Duration`](https://pkg.go.dev/time#Duration)|`<nil>`

## cache.blockchain

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|size|Size of blockchain cache|`string`|`<nil>`
|ttl|Time to live for blockchain cache items|`string`|`<nil>`

## cors

|Key|Description|Type|Default Value|
Expand Down
31 changes: 22 additions & 9 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,20 @@ func (cb *callbacks) BlockchainNetworkAction(ctx context.Context, namespace, act
return nil
}

func (cb *callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error {
for _, cb := range cb.handlers {
// Send the event to all handlers and let them match it to a contract listener
// TODO: can we push more listener/namespace knowledge down to this layer?
if err := cb.BlockchainEvent(event); err != nil {
return err
func (cb *callbacks) BlockchainEvent(ctx context.Context, namespace string, event *blockchain.EventWithSubscription) error {
if namespace == "" {
// Older token subscriptions don't populate namespace, so deliver the event to every handler
for _, cb := range cb.handlers {
// Send the event to all handlers and let them match it to a contract listener
if err := cb.BlockchainEvent(event); err != nil {
return err
}
}
} else {
if handler, ok := cb.handlers[namespace]; ok {
return handler.BlockchainEvent(event)
}
log.L(ctx).Errorf("No handler found for blockchain event on namespace '%s'", namespace)
}
return nil
}
Expand Down Expand Up @@ -207,7 +214,7 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr
return err
}

e.streams = &streamManager{client: e.client}
e.streams = newStreamManager(e.client)
batchSize := ethconnectConf.GetUint(EthconnectConfigBatchSize)
batchTimeout := uint(ethconnectConf.GetDuration(EthconnectConfigBatchTimeout).Milliseconds())
stream, err := e.streams.ensureEventStream(e.ctx, e.topic, batchSize, batchTimeout)
Expand Down Expand Up @@ -434,9 +441,15 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, location *fftypes.JS
}

func (e *Ethereum) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) {
subName, err := e.streams.getSubscriptionName(ctx, msgJSON.GetString("subId"))
if err != nil {
return err
}

namespace := e.streams.getNamespaceFromSubName(subName)
event := e.parseBlockchainEvent(ctx, msgJSON)
if event != nil {
err = e.callbacks.BlockchainEvent(&blockchain.EventWithSubscription{
err = e.callbacks.BlockchainEvent(ctx, namespace, &blockchain.EventWithSubscription{
Event: *event,
Subscription: msgJSON.GetString("subId"),
})
Expand Down Expand Up @@ -773,7 +786,7 @@ func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.Contr
return i18n.WrapError(ctx, err, coremsgs.MsgContractParamInvalid)
}

subName := fmt.Sprintf("ff-sub-%s", listener.ID)
subName := fmt.Sprintf("ff-sub-%s-%s", listener.Namespace, listener.ID)
firstEvent := string(core.SubOptsFirstEventNewest)
if listener.Options != nil {
firstEvent = listener.Options.FirstEvent
Expand Down
266 changes: 259 additions & 7 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1753,7 +1753,7 @@ func TestDeleteSubscriptionFail(t *testing.T) {
assert.Regexp(t, "FF10111", err)
}

func TestHandleMessageContractEvent(t *testing.T) {
func TestHandleMessageContractEventOldSubscription(t *testing.T) {
data := fftypes.JSONAnyPtr(`
[
{
Expand All @@ -1773,14 +1773,23 @@ func TestHandleMessageContractEvent(t *testing.T) {
]`)

em := &blockchainmocks.Callbacks{}
e := &Ethereum{
callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}},
}
e, cancel := newTestEthereum()
defer cancel()
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sub2",
httpmock.NewJsonResponderOrPanic(200, subscription{
ID: "sub2", Stream: "es12345", Name: "ff-sub-1132312312312",
}))

e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}
e.subs = map[string]subscriptionInfo{}
e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{
namespace: "ns1",
version: 1,
}
e.streams = newStreamManager(e.client)

em.On("BlockchainEvent", mock.MatchedBy(func(e *blockchain.EventWithSubscription) bool {
assert.Equal(t, "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", e.BlockchainTXID)
Expand Down Expand Up @@ -1819,7 +1828,7 @@ func TestHandleMessageContractEvent(t *testing.T) {
em.AssertExpectations(t)
}

func TestHandleMessageContractEventError(t *testing.T) {
func TestHandleMessageContractEventErrorOldSubscription(t *testing.T) {
data := fftypes.JSONAnyPtr(`
[
{
Expand All @@ -1839,14 +1848,257 @@ func TestHandleMessageContractEventError(t *testing.T) {
]`)

em := &blockchainmocks.Callbacks{}
e := &Ethereum{
callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}},
e, cancel := newTestEthereum()
defer cancel()
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sub2",
httpmock.NewJsonResponderOrPanic(200, subscription{
ID: "sub2", Stream: "es12345", Name: "ff-sub-1132312312312",
}))

e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}
e.subs = map[string]subscriptionInfo{}
e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{
namespace: "ns1",
version: 1,
}
e.streams = newStreamManager(e.client)
em.On("BlockchainEvent", mock.Anything).Return(fmt.Errorf("pop"))

var events []interface{}
err := json.Unmarshal(data.Bytes(), &events)
assert.NoError(t, err)
err = e.handleMessageBatch(context.Background(), events)
assert.EqualError(t, err, "pop")

em.AssertExpectations(t)
}

func TestHandleMessageContractEventWithNamespace(t *testing.T) {
data := fftypes.JSONAnyPtr(`
[
{
"address": "0x1C197604587F046FD40684A8f21f4609FB811A7b",
"blockNumber": "38011",
"transactionIndex": "0x0",
"transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628",
"data": {
"from": "0x91D2B4381A4CD5C7C0F27565A7D4B829844C8635",
"value": "1"
},
"subId": "sub2",
"signature": "Changed(address,uint256)",
"logIndex": "50",
"timestamp": "1640811383"
},
{
"address": "0x1C197604587F046FD40684A8f21f4609FB811A7b",
"blockNumber": "38011",
"transactionIndex": "0x0",
"transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72638",
"data": {
"from": "0x91D2B4381A4CD5C7C0F27565A7D4B829844C8635",
"value": "1"
},
"subId": "sub2",
"signature": "Changed(address,uint256)",
"logIndex": "50",
"timestamp": "1640811384"
}
]`)

em := &blockchainmocks.Callbacks{}
e, cancel := newTestEthereum()
defer cancel()
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sub2",
httpmock.NewJsonResponderOrPanic(200, subscription{
ID: "sub2", Stream: "es12345", Name: "ff-sub-ns1-1132312312312",
}))

e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}
e.subs = map[string]subscriptionInfo{}
e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{
namespace: "ns1",
version: 1,
}
e.streams = newStreamManager(e.client)

em.On("BlockchainEvent", mock.MatchedBy(func(e *blockchain.EventWithSubscription) bool {
assert.Equal(t, "000000038011/000000/000050", e.Event.ProtocolID)
return true
})).Return(nil)

var events []interface{}
err := json.Unmarshal(data.Bytes(), &events)
assert.NoError(t, err)
err = e.handleMessageBatch(context.Background(), events)
assert.NoError(t, err)

ev := em.Calls[0].Arguments[0].(*blockchain.EventWithSubscription)
assert.Equal(t, "sub2", ev.Subscription)
assert.Equal(t, "Changed", ev.Event.Name)

outputs := fftypes.JSONObject{
"from": "0x91D2B4381A4CD5C7C0F27565A7D4B829844C8635",
"value": "1",
}
assert.Equal(t, outputs, ev.Event.Output)

info := fftypes.JSONObject{
"address": "0x1C197604587F046FD40684A8f21f4609FB811A7b",
"blockNumber": "38011",
"logIndex": "50",
"signature": "Changed(address,uint256)",
"subId": "sub2",
"transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628",
"transactionIndex": "0x0",
"timestamp": "1640811383",
}
assert.Equal(t, info, ev.Event.Info)

em.AssertExpectations(t)
}

func TestHandleMessageContractEventNoNamespaceHandlers(t *testing.T) {
data := fftypes.JSONAnyPtr(`
[
{
"address": "0x1C197604587F046FD40684A8f21f4609FB811A7b",
"blockNumber": "38011",
"transactionIndex": "0x0",
"transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628",
"data": {
"from": "0x91D2B4381A4CD5C7C0F27565A7D4B829844C8635",
"value": "1"
},
"subId": "sub2",
"signature": "Changed(address,uint256)",
"logIndex": "50",
"timestamp": "1640811383"
}
]`)

em := &blockchainmocks.Callbacks{}
e, cancel := newTestEthereum()
defer cancel()
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sub2",
httpmock.NewJsonResponderOrPanic(200, subscription{
ID: "sub2", Stream: "es12345", Name: "ff-sub-ns1-1132312312312",
}))

e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns2": em}}
e.subs = map[string]subscriptionInfo{}
e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{
namespace: "ns1",
version: 1,
}
e.streams = newStreamManager(e.client)

em.On("BlockchainEvent", mock.MatchedBy(func(e *blockchain.EventWithSubscription) bool {
assert.Equal(t, "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", e.BlockchainTXID)
assert.Equal(t, "000000038011/000000/000050", e.Event.ProtocolID)
return true
})).Return(nil)

var events []interface{}
err := json.Unmarshal(data.Bytes(), &events)
assert.NoError(t, err)
err = e.handleMessageBatch(context.Background(), events)
assert.NoError(t, err)
assert.Equal(t, 0, len(em.Calls))
}

func TestHandleMessageContractEventSubNameError(t *testing.T) {
data := fftypes.JSONAnyPtr(`
[
{
"address": "0x1C197604587F046FD40684A8f21f4609FB811A7b",
"blockNumber": "38011",
"transactionIndex": "0x0",
"transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628",
"data": {
"from": "0x91D2B4381A4CD5C7C0F27565A7D4B829844C8635",
"value": "1"
},
"subId": "sub2",
"signature": "Changed(address,uint256)",
"logIndex": "50",
"timestamp": "1640811383"
}
]`)
em := &blockchainmocks.Callbacks{}

e, cancel := newTestEthereum()
defer cancel()
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sub2",
httpmock.NewJsonResponderOrPanic(500, ethError{Error: "pop"}))

e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}
e.subs = map[string]subscriptionInfo{}
e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{
namespace: "ns1",
version: 1,
}
e.streams = newStreamManager(e.client)

var events []interface{}
err := json.Unmarshal(data.Bytes(), &events)
assert.NoError(t, err)
err = e.handleMessageBatch(context.Background(), events)
assert.Regexp(t, "FF10111", err)

em.AssertExpectations(t)
}

func TestHandleMessageContractEventError(t *testing.T) {
data := fftypes.JSONAnyPtr(`
[
{
"address": "0x1C197604587F046FD40684A8f21f4609FB811A7b",
"blockNumber": "38011",
"transactionIndex": "0x0",
"transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628",
"data": {
"from": "0x91D2B4381A4CD5C7C0F27565A7D4B829844C8635",
"value": "1"
},
"subId": "sub2",
"signature": "Changed(address,uint256)",
"logIndex": "50",
"timestamp": "1640811383"
}
]`)

em := &blockchainmocks.Callbacks{}

e, cancel := newTestEthereum()
defer cancel()
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sub2",
httpmock.NewJsonResponderOrPanic(200, subscription{
ID: "sub2", Stream: "es12345", Name: "ff-sub-ns1-1132312312312",
}))

e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}
e.subs = map[string]subscriptionInfo{}
e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{
namespace: "ns1",
version: 1,
}
e.streams = newStreamManager(e.client)

em.On("BlockchainEvent", mock.Anything).Return(fmt.Errorf("pop"))

Expand Down
Loading