Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- No down migration for this one
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN;

ALTER TABLE contractlisteners ALTER COLUMN location DROP NOT NULL;

COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- No down migration for this one
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE contractlisteners RENAME COLUMN location TO location_old;
ALTER TABLE contractlisteners ADD COLUMN location TEXT;
UPDATE contractlisteners SET location = location_old;
ALTER TABLE contractlisteners DROP COLUMN location_old;
11 changes: 7 additions & 4 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,10 +742,13 @@ func encodeContractLocation(ctx context.Context, location *Location) (result *ff
return result, err
}

func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListenerInput) error {
location, err := parseContractLocation(ctx, listener.Location)
if err != nil {
return err
func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListenerInput) (err error) {
var location *Location
if listener.Location != nil {
location, err = parseContractLocation(ctx, listener.Location)
if err != nil {
return err
}
}
abi, err := ffi2abi.ConvertFFIEventDefinitionToABI(ctx, &listener.Event.FFIEventDefinition)
if err != nil {
Expand Down
37 changes: 37 additions & 0 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,43 @@ func TestAddSubscription(t *testing.T) {
assert.NoError(t, err)
}

func TestAddSubscriptionWithoutLocation(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()
e.streamID = "es-1"
e.streams = &streamManager{
client: e.client,
}

sub := &core.ContractListenerInput{
ContractListener: core.ContractListener{
Event: &core.FFISerializedEvent{
FFIEventDefinition: fftypes.FFIEventDefinition{
Name: "Changed",
Params: fftypes.FFIParams{
{
Name: "value",
Schema: fftypes.JSONAnyPtr(`{"type": "string", "details": {"type": "string"}}`),
},
},
},
},
Options: &core.ContractListenerOptions{
FirstEvent: string(core.SubOptsFirstEventOldest),
},
},
}

httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewJsonResponderOrPanic(200, &subscription{}))

err := e.AddContractListener(context.Background(), sub)

assert.NoError(t, err)
}

func TestAddSubscriptionBadParamDetails(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()
Expand Down
14 changes: 9 additions & 5 deletions internal/blockchain/ethereum/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,16 @@ func (s *streamManager) createSubscription(ctx context.Context, location *Locati
fromBlock = "latest"
}
sub := subscription{
Name: subName,
Stream: stream,
FromBlock: fromBlock,
EthCompatAddress: location.Address,
EthCompatEvent: abi,
Name: subName,
Stream: stream,
FromBlock: fromBlock,
EthCompatEvent: abi,
}

if location != nil {
sub.EthCompatAddress = location.Address
}

res, err := s.client.R().
SetContext(ctx).
SetBody(&sub).
Expand Down
6 changes: 5 additions & 1 deletion internal/blockchain/fabric/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,15 @@ func (s *streamManager) createSubscription(ctx context.Context, location *Locati
Signer: s.signer,
Stream: stream,
Filter: eventFilter{
ChaincodeID: location.Chaincode,
EventFilter: event,
},
FromBlock: fromBlock,
}

if location.Chaincode != "" {
sub.Filter.ChaincodeID = location.Chaincode
}

res, err := s.client.R().
SetContext(ctx).
SetBody(&sub).
Expand Down
6 changes: 3 additions & 3 deletions internal/blockchain/fabric/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,16 +794,16 @@ func (f *Fabric) NormalizeContractLocation(ctx context.Context, location *fftype
}

func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Location, error) {
if location == nil {
return nil, i18n.NewError(ctx, coremsgs.MsgContractLocationInvalid, "'channel' not set")
}
fabricLocation := Location{}
if err := json.Unmarshal(location.Bytes(), &fabricLocation); err != nil {
return nil, i18n.NewError(ctx, coremsgs.MsgContractLocationInvalid, err)
}
if fabricLocation.Channel == "" {
return nil, i18n.NewError(ctx, coremsgs.MsgContractLocationInvalid, "'channel' not set")
}
if fabricLocation.Chaincode == "" {
return nil, i18n.NewError(ctx, coremsgs.MsgContractLocationInvalid, "'chaincode' not set")
}
return &fabricLocation, nil
}

Expand Down
73 changes: 73 additions & 0 deletions internal/blockchain/fabric/fabric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,67 @@ func TestAddSubscription(t *testing.T) {
assert.NoError(t, err)
}

func TestAddSubscriptionNoChannel(t *testing.T) {
e, cancel := newTestFabric()
defer cancel()
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()

e.streamID = "es-1"
e.streams = &streamManager{
client: e.client,
}

sub := &core.ContractListenerInput{
ContractListener: core.ContractListener{
Location: fftypes.JSONAnyPtr(fftypes.JSONObject{
"chaincode": "mycode",
}.String()),
Event: &core.FFISerializedEvent{},
Options: &core.ContractListenerOptions{
FirstEvent: string(core.SubOptsFirstEventOldest),
},
},
}

httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
func(req *http.Request) (*http.Response, error) {
var body map[string]interface{}
json.NewDecoder(req.Body).Decode(&body)
assert.Equal(t, "0", body["fromBlock"])
return httpmock.NewJsonResponderOrPanic(200, &subscription{})(req)
})

err := e.AddContractListener(context.Background(), sub)

assert.Regexp(t, "FF10310.*channel", err)
}

func TestAddSubscriptionNoLocation(t *testing.T) {
e, cancel := newTestFabric()
defer cancel()
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()

e.streamID = "es-1"
e.streams = &streamManager{
client: e.client,
}

sub := &core.ContractListenerInput{
ContractListener: core.ContractListener{
Event: &core.FFISerializedEvent{},
Options: &core.ContractListenerOptions{
FirstEvent: string(core.SubOptsFirstEventOldest),
},
},
}

err := e.AddContractListener(context.Background(), sub)

assert.Regexp(t, "FF10310.*channel", err)
}

func TestAddSubscriptionBadLocation(t *testing.T) {
e, cancel := newTestFabric()
defer cancel()
Expand Down Expand Up @@ -2173,6 +2234,18 @@ func TestNormalizeContractLocation(t *testing.T) {
assert.NoError(t, err)
}

func TestNormalizeContractLocationNoChannel(t *testing.T) {
e, cancel := newTestFabric()
defer cancel()
location := &Location{
Chaincode: "simplestorage",
}
locationBytes, err := json.Marshal(location)
assert.NoError(t, err)
_, err = e.NormalizeContractLocation(context.Background(), fftypes.JSONAnyPtrBytes(locationBytes))
assert.Regexp(t, "FF10310.*channel", err)
}

func TestValidateNoContractLocationChaincode(t *testing.T) {
e, cancel := newTestFabric()
defer cancel()
Expand Down
7 changes: 5 additions & 2 deletions internal/contracts/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,11 @@ func (cm *contractManager) AddContractListener(ctx context.Context, listener *co
if err := fftypes.ValidateFFNameField(ctx, listener.Topic, "topic"); err != nil {
return nil, err
}
if listener.Location, err = cm.blockchain.NormalizeContractLocation(ctx, listener.Location); err != nil {
return nil, err

if listener.Location != nil {
if listener.Location, err = cm.blockchain.NormalizeContractLocation(ctx, listener.Location); err != nil {
return nil, err
}
}

if listener.Options == nil {
Expand Down
37 changes: 37 additions & 0 deletions internal/contracts/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,43 @@ func TestAddContractListenerInline(t *testing.T) {
mdi.AssertExpectations(t)
}

func TestAddContractListenerNoLocationOK(t *testing.T) {
cm := newTestContractManager()
mbi := cm.blockchain.(*blockchainmocks.Plugin)
mdi := cm.database.(*databasemocks.Plugin)

sub := &core.ContractListenerInput{
ContractListener: core.ContractListener{
Event: &core.FFISerializedEvent{
FFIEventDefinition: fftypes.FFIEventDefinition{
Name: "changed",
Params: fftypes.FFIParams{
{
Name: "value",
Schema: fftypes.JSONAnyPtr(`{"type": "integer"}`),
},
},
},
},
Options: &core.ContractListenerOptions{},
Topic: "test-topic",
},
}

mbi.On("GenerateEventSignature", context.Background(), mock.Anything).Return("changed")
mdi.On("GetContractListeners", context.Background(), "ns1", mock.Anything).Return(nil, nil, nil)
mbi.On("AddContractListener", context.Background(), sub).Return(nil)
mdi.On("InsertContractListener", context.Background(), &sub.ContractListener).Return(nil)

result, err := cm.AddContractListener(context.Background(), sub)
assert.NoError(t, err)
assert.NotNil(t, result.ID)
assert.NotNil(t, result.Event)

mbi.AssertExpectations(t)
mdi.AssertExpectations(t)
}

func TestAddContractListenerByEventPath(t *testing.T) {
cm := newTestContractManager()
mbi := cm.blockchain.(*blockchainmocks.Plugin)
Expand Down