diff --git a/db/migrations/postgres/000100_allow_null_listener_location.down.sql b/db/migrations/postgres/000100_allow_null_listener_location.down.sql new file mode 100644 index 0000000000..d2fe991c9e --- /dev/null +++ b/db/migrations/postgres/000100_allow_null_listener_location.down.sql @@ -0,0 +1 @@ +-- No down migration for this one diff --git a/db/migrations/postgres/000100_allow_null_listener_location.up.sql b/db/migrations/postgres/000100_allow_null_listener_location.up.sql new file mode 100644 index 0000000000..cd003aebd5 --- /dev/null +++ b/db/migrations/postgres/000100_allow_null_listener_location.up.sql @@ -0,0 +1,5 @@ +BEGIN; + +ALTER TABLE contractlisteners ALTER COLUMN location DROP NOT NULL; + +COMMIT; \ No newline at end of file diff --git a/db/migrations/sqlite/000100_allow_null_listener_location.down.sql b/db/migrations/sqlite/000100_allow_null_listener_location.down.sql new file mode 100644 index 0000000000..d2fe991c9e --- /dev/null +++ b/db/migrations/sqlite/000100_allow_null_listener_location.down.sql @@ -0,0 +1 @@ +-- No down migration for this one diff --git a/db/migrations/sqlite/000100_allow_null_listener_location.up.sql b/db/migrations/sqlite/000100_allow_null_listener_location.up.sql new file mode 100644 index 0000000000..2a87fddee0 --- /dev/null +++ b/db/migrations/sqlite/000100_allow_null_listener_location.up.sql @@ -0,0 +1,4 @@ +ALTER TABLE contractlisteners RENAME COLUMN location TO location_old; +ALTER TABLE contractlisteners ADD COLUMN location TEXT; +UPDATE contractlisteners SET location = location_old; +ALTER TABLE contractlisteners DROP COLUMN location_old; \ No newline at end of file diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 396b89fa96..95ec901593 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -742,10 +742,13 @@ func encodeContractLocation(ctx context.Context, location *Location) (result *ff return result, err } -func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListenerInput) error { - location, err := parseContractLocation(ctx, listener.Location) - if err != nil { - return err +func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListenerInput) (err error) { + var location *Location + if listener.Location != nil { + location, err = parseContractLocation(ctx, listener.Location) + if err != nil { + return err + } } abi, err := ffi2abi.ConvertFFIEventDefinitionToABI(ctx, &listener.Event.FFIEventDefinition) if err != nil { diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index ab6d3e06e7..0369b0585c 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -1689,6 +1689,43 @@ func TestAddSubscription(t *testing.T) { assert.NoError(t, err) } +func TestAddSubscriptionWithoutLocation(t *testing.T) { + e, cancel := newTestEthereum() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + e.streamID = "es-1" + e.streams = &streamManager{ + client: e.client, + } + + sub := &core.ContractListenerInput{ + ContractListener: core.ContractListener{ + Event: &core.FFISerializedEvent{ + FFIEventDefinition: fftypes.FFIEventDefinition{ + Name: "Changed", + Params: fftypes.FFIParams{ + { + Name: "value", + Schema: fftypes.JSONAnyPtr(`{"type": "string", "details": {"type": "string"}}`), + }, + }, + }, + }, + Options: &core.ContractListenerOptions{ + FirstEvent: string(core.SubOptsFirstEventOldest), + }, + }, + } + + httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, + httpmock.NewJsonResponderOrPanic(200, &subscription{})) + + err := e.AddContractListener(context.Background(), sub) + + assert.NoError(t, err) +} + func TestAddSubscriptionBadParamDetails(t *testing.T) { e, cancel := newTestEthereum() defer cancel() diff --git a/internal/blockchain/ethereum/eventstream.go b/internal/blockchain/ethereum/eventstream.go index 0136c66d2e..8852ff9b80 100644 --- a/internal/blockchain/ethereum/eventstream.go +++ b/internal/blockchain/ethereum/eventstream.go @@ -184,12 +184,16 @@ func (s *streamManager) createSubscription(ctx context.Context, location *Locati fromBlock = "latest" } sub := subscription{ - Name: subName, - Stream: stream, - FromBlock: fromBlock, - EthCompatAddress: location.Address, - EthCompatEvent: abi, + Name: subName, + Stream: stream, + FromBlock: fromBlock, + EthCompatEvent: abi, } + + if location != nil { + sub.EthCompatAddress = location.Address + } + res, err := s.client.R(). SetContext(ctx). SetBody(&sub). diff --git a/internal/blockchain/fabric/eventstream.go b/internal/blockchain/fabric/eventstream.go index 5f525485fb..53f002e641 100644 --- a/internal/blockchain/fabric/eventstream.go +++ b/internal/blockchain/fabric/eventstream.go @@ -165,11 +165,15 @@ func (s *streamManager) createSubscription(ctx context.Context, location *Locati Signer: s.signer, Stream: stream, Filter: eventFilter{ - ChaincodeID: location.Chaincode, EventFilter: event, }, FromBlock: fromBlock, } + + if location.Chaincode != "" { + sub.Filter.ChaincodeID = location.Chaincode + } + res, err := s.client.R(). SetContext(ctx). SetBody(&sub). diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index fc0dbb2abf..1c3527d6f2 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -794,6 +794,9 @@ func (f *Fabric) NormalizeContractLocation(ctx context.Context, location *fftype } func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Location, error) { + if location == nil { + return nil, i18n.NewError(ctx, coremsgs.MsgContractLocationInvalid, "'channel' not set") + } fabricLocation := Location{} if err := json.Unmarshal(location.Bytes(), &fabricLocation); err != nil { return nil, i18n.NewError(ctx, coremsgs.MsgContractLocationInvalid, err) @@ -801,9 +804,6 @@ func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Loc if fabricLocation.Channel == "" { return nil, i18n.NewError(ctx, coremsgs.MsgContractLocationInvalid, "'channel' not set") } - if fabricLocation.Chaincode == "" { - return nil, i18n.NewError(ctx, coremsgs.MsgContractLocationInvalid, "'chaincode' not set") - } return &fabricLocation, nil } diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index 522f6b4071..e28ab97177 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -1442,6 +1442,67 @@ func TestAddSubscription(t *testing.T) { assert.NoError(t, err) } +func TestAddSubscriptionNoChannel(t *testing.T) { + e, cancel := newTestFabric() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + + e.streamID = "es-1" + e.streams = &streamManager{ + client: e.client, + } + + sub := &core.ContractListenerInput{ + ContractListener: core.ContractListener{ + Location: fftypes.JSONAnyPtr(fftypes.JSONObject{ + "chaincode": "mycode", + }.String()), + Event: &core.FFISerializedEvent{}, + Options: &core.ContractListenerOptions{ + FirstEvent: string(core.SubOptsFirstEventOldest), + }, + }, + } + + httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, + func(req *http.Request) (*http.Response, error) { + var body map[string]interface{} + json.NewDecoder(req.Body).Decode(&body) + assert.Equal(t, "0", body["fromBlock"]) + return httpmock.NewJsonResponderOrPanic(200, &subscription{})(req) + }) + + err := e.AddContractListener(context.Background(), sub) + + assert.Regexp(t, "FF10310.*channel", err) +} + +func TestAddSubscriptionNoLocation(t *testing.T) { + e, cancel := newTestFabric() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + + e.streamID = "es-1" + e.streams = &streamManager{ + client: e.client, + } + + sub := &core.ContractListenerInput{ + ContractListener: core.ContractListener{ + Event: &core.FFISerializedEvent{}, + Options: &core.ContractListenerOptions{ + FirstEvent: string(core.SubOptsFirstEventOldest), + }, + }, + } + + err := e.AddContractListener(context.Background(), sub) + + assert.Regexp(t, "FF10310.*channel", err) +} + func TestAddSubscriptionBadLocation(t *testing.T) { e, cancel := newTestFabric() defer cancel() @@ -2173,6 +2234,18 @@ func TestNormalizeContractLocation(t *testing.T) { assert.NoError(t, err) } +func TestNormalizeContractLocationNoChannel(t *testing.T) { + e, cancel := newTestFabric() + defer cancel() + location := &Location{ + Chaincode: "simplestorage", + } + locationBytes, err := json.Marshal(location) + assert.NoError(t, err) + _, err = e.NormalizeContractLocation(context.Background(), fftypes.JSONAnyPtrBytes(locationBytes)) + assert.Regexp(t, "FF10310.*channel", err) +} + func TestValidateNoContractLocationChaincode(t *testing.T) { e, cancel := newTestFabric() defer cancel() diff --git a/internal/contracts/manager.go b/internal/contracts/manager.go index 3d2388c527..fea27c91b5 100644 --- a/internal/contracts/manager.go +++ b/internal/contracts/manager.go @@ -471,8 +471,11 @@ func (cm *contractManager) AddContractListener(ctx context.Context, listener *co if err := fftypes.ValidateFFNameField(ctx, listener.Topic, "topic"); err != nil { return nil, err } - if listener.Location, err = cm.blockchain.NormalizeContractLocation(ctx, listener.Location); err != nil { - return nil, err + + if listener.Location != nil { + if listener.Location, err = cm.blockchain.NormalizeContractLocation(ctx, listener.Location); err != nil { + return nil, err + } } if listener.Options == nil { diff --git a/internal/contracts/manager_test.go b/internal/contracts/manager_test.go index eaf229e192..cab669030d 100644 --- a/internal/contracts/manager_test.go +++ b/internal/contracts/manager_test.go @@ -632,6 +632,43 @@ func TestAddContractListenerInline(t *testing.T) { mdi.AssertExpectations(t) } +func TestAddContractListenerNoLocationOK(t *testing.T) { + cm := newTestContractManager() + mbi := cm.blockchain.(*blockchainmocks.Plugin) + mdi := cm.database.(*databasemocks.Plugin) + + sub := &core.ContractListenerInput{ + ContractListener: core.ContractListener{ + Event: &core.FFISerializedEvent{ + FFIEventDefinition: fftypes.FFIEventDefinition{ + Name: "changed", + Params: fftypes.FFIParams{ + { + Name: "value", + Schema: fftypes.JSONAnyPtr(`{"type": "integer"}`), + }, + }, + }, + }, + Options: &core.ContractListenerOptions{}, + Topic: "test-topic", + }, + } + + mbi.On("GenerateEventSignature", context.Background(), mock.Anything).Return("changed") + mdi.On("GetContractListeners", context.Background(), "ns1", mock.Anything).Return(nil, nil, nil) + mbi.On("AddContractListener", context.Background(), sub).Return(nil) + mdi.On("InsertContractListener", context.Background(), &sub.ContractListener).Return(nil) + + result, err := cm.AddContractListener(context.Background(), sub) + assert.NoError(t, err) + assert.NotNil(t, result.ID) + assert.NotNil(t, result.Event) + + mbi.AssertExpectations(t) + mdi.AssertExpectations(t) +} + func TestAddContractListenerByEventPath(t *testing.T) { cm := newTestContractManager() mbi := cm.blockchain.(*blockchainmocks.Plugin)