Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/hyperledger/firefly/internal/config/wsconfig"
"github.com/hyperledger/firefly/internal/i18n"
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/internal/metrics"
"github.com/hyperledger/firefly/internal/restclient"
"github.com/hyperledger/firefly/pkg/blockchain"
"github.com/hyperledger/firefly/pkg/fftypes"
Expand Down Expand Up @@ -62,6 +63,7 @@ type Ethereum struct {
wsconn wsclient.WSClient
closed chan struct{}
addressResolver *addressResolver
metrics metrics.Manager
}

type eventStreamWebsocket struct {
Expand Down Expand Up @@ -138,7 +140,6 @@ type FFIGenerationInput struct {
ABI []ABIElementMarshaling `json:"abi,omitempty"`
}

// var batchPinEvent = "BatchPin"
var addressVerify = regexp.MustCompile("^[0-9a-f]{40}$")

func (e *Ethereum) Name() string {
Expand All @@ -149,13 +150,13 @@ func (e *Ethereum) VerifierType() fftypes.VerifierType {
return fftypes.VerifierTypeEthAddress
}

func (e *Ethereum) Init(ctx context.Context, prefix config.Prefix, callbacks blockchain.Callbacks) (err error) {

func (e *Ethereum) Init(ctx context.Context, prefix config.Prefix, callbacks blockchain.Callbacks, metrics metrics.Manager) (err error) {
ethconnectConf := prefix.SubPrefix(EthconnectConfigKey)
addressResolverConf := prefix.SubPrefix(AddressResolverConfigKey)

e.ctx = log.WithLogField(ctx, "proto", "ethereum")
e.callbacks = callbacks
e.metrics = metrics

if addressResolverConf.GetString(AddressResolverURLTemplate) != "" {
if e.addressResolver, err = newAddressResolver(ctx, addressResolverConf); err != nil {
Expand Down Expand Up @@ -341,6 +342,8 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON
Output: dataJSON,
Info: msgJSON,
Timestamp: timestamp,
Location: e.buildEventLocationString(msgJSON),
Signature: msgJSON.GetString("signature"),
},
}

Expand Down Expand Up @@ -378,6 +381,8 @@ func (e *Ethereum) handleContractEvent(ctx context.Context, msgJSON fftypes.JSON
Output: dataJSON,
Info: msgJSON,
Timestamp: timestamp,
Location: e.buildEventLocationString(msgJSON),
Signature: msgJSON.GetString("signature"),
},
}

Expand Down Expand Up @@ -409,6 +414,10 @@ func (e *Ethereum) handleReceipt(ctx context.Context, reply fftypes.JSONObject)
return e.callbacks.BlockchainOpUpdate(operationID, updateType, txHash, message, reply)
}

func (e *Ethereum) buildEventLocationString(msgJSON fftypes.JSONObject) string {
return fmt.Sprintf("address=%s", msgJSON.GetString("address"))
}

func (e *Ethereum) handleMessageBatch(ctx context.Context, messages []interface{}) error {
l := log.L(ctx)

Expand Down Expand Up @@ -511,6 +520,9 @@ func (e *Ethereum) NormalizeSigningKey(ctx context.Context, key string) (string,
}

func (e *Ethereum) invokeContractMethod(ctx context.Context, address, signingKey string, abi ABIElementMarshaling, requestID string, input []interface{}) (*resty.Response, error) {
if e.metrics.IsMetricsEnabled() {
e.metrics.BlockchainTransaction(address, abi.Name)
}
body := EthconnectMessageRequest{
Headers: EthconnectMessageHeaders{
Type: "SendTransaction",
Expand All @@ -528,6 +540,9 @@ func (e *Ethereum) invokeContractMethod(ctx context.Context, address, signingKey
}

func (e *Ethereum) queryContractMethod(ctx context.Context, address string, abi ABIElementMarshaling, input []interface{}) (*resty.Response, error) {
if e.metrics.IsMetricsEnabled() {
e.metrics.BlockchainQuery(address, abi.Name)
}
body := EthconnectMessageRequest{
Headers: EthconnectMessageHeaders{
Type: "Query",
Expand Down
36 changes: 21 additions & 15 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/internal/restclient"
"github.com/hyperledger/firefly/mocks/blockchainmocks"
"github.com/hyperledger/firefly/mocks/metricsmocks"
"github.com/hyperledger/firefly/mocks/wsmocks"
"github.com/hyperledger/firefly/pkg/blockchain"
"github.com/hyperledger/firefly/pkg/fftypes"
Expand Down Expand Up @@ -74,6 +75,10 @@ func newTestEthereum() (*Ethereum, func()) {
ctx, cancel := context.WithCancel(context.Background())
em := &blockchainmocks.Callbacks{}
wsm := &wsmocks.WSClient{}
mm := &metricsmocks.Manager{}
mm.On("IsMetricsEnabled").Return(true)
mm.On("BlockchainTransaction", mock.Anything, mock.Anything).Return(nil)
mm.On("BlockchainQuery", mock.Anything, mock.Anything).Return(nil)
e := &Ethereum{
ctx: ctx,
client: resty.New().SetBaseURL("http://localhost:12345"),
Expand All @@ -83,6 +88,7 @@ func newTestEthereum() (*Ethereum, func()) {
prefixLong: defaultPrefixLong,
callbacks: em,
wsconn: wsm,
metrics: mm,
}
return e, func() {
cancel()
Expand All @@ -97,7 +103,7 @@ func TestInitMissingURL(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()
resetConf()
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})
assert.Regexp(t, "FF10138.*url", err)
}

Expand All @@ -106,7 +112,7 @@ func TestInitBadAddressResolver(t *testing.T) {
defer cancel()
resetConf()
utAddressResolverConf.Set(AddressResolverURLTemplate, "{{unclosed}")
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})
assert.Regexp(t, "FF10337.*urlTemplate", err)
}

Expand All @@ -117,7 +123,7 @@ func TestInitMissingInstance(t *testing.T) {
utEthconnectConf.Set(restclient.HTTPConfigURL, "http://localhost:12345")
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})
assert.Regexp(t, "FF10138.*instance", err)
}

Expand All @@ -128,7 +134,7 @@ func TestInitMissingTopic(t *testing.T) {
utEthconnectConf.Set(restclient.HTTPConfigURL, "http://localhost:12345")
utEthconnectConf.Set(EthconnectConfigInstancePath, "/instances/0x12345")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})
assert.Regexp(t, "FF10138.*topic", err)
}

Expand Down Expand Up @@ -169,7 +175,7 @@ func TestInitAllNewStreamsAndWSEvent(t *testing.T) {
utEthconnectConf.Set(EthconnectConfigInstancePath, "/instances/0x12345")
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})
assert.NoError(t, err)

assert.Equal(t, "ethereum", e.Name())
Expand Down Expand Up @@ -207,7 +213,7 @@ func TestWSInitFail(t *testing.T) {
utEthconnectConf.Set(EthconnectConfigInstancePath, "/instances/0x12345")
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})
assert.Regexp(t, "FF10162", err)

}
Expand Down Expand Up @@ -249,7 +255,7 @@ func TestInitAllExistingStreams(t *testing.T) {
utEthconnectConf.Set(EthconnectConfigInstancePath, "0x12345")
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})

assert.Equal(t, 3, httpmock.GetTotalCallCount())
assert.Equal(t, "es12345", e.initInfo.stream.ID)
Expand Down Expand Up @@ -298,7 +304,7 @@ func TestInitOldInstancePathContracts(t *testing.T) {
utEthconnectConf.Set(EthconnectConfigInstancePath, "/contracts/firefly")
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})
assert.NoError(t, err)
assert.Equal(t, e.instancePath, "0x12345")
}
Expand Down Expand Up @@ -332,7 +338,7 @@ func TestInitOldInstancePathInstances(t *testing.T) {
utEthconnectConf.Set(EthconnectConfigInstancePath, "/instances/0x12345")
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})
assert.NoError(t, err)
assert.Equal(t, e.instancePath, "0x12345")
}
Expand Down Expand Up @@ -369,7 +375,7 @@ func TestInitOldInstancePathError(t *testing.T) {
utEthconnectConf.Set(EthconnectConfigInstancePath, "/contracts/firefly")
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})
assert.Regexp(t, "FF10111", err)
assert.Regexp(t, "pop", err)
}
Expand All @@ -393,7 +399,7 @@ func TestStreamQueryError(t *testing.T) {
utEthconnectConf.Set(EthconnectConfigInstancePath, "/instances/0x12345")
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})

assert.Regexp(t, "FF10111", err)
assert.Regexp(t, "pop", err)
Expand Down Expand Up @@ -421,7 +427,7 @@ func TestStreamCreateError(t *testing.T) {
utEthconnectConf.Set(EthconnectConfigInstancePath, "/instances/0x12345")
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})

assert.Regexp(t, "FF10111", err)
assert.Regexp(t, "pop", err)
Expand Down Expand Up @@ -449,7 +455,7 @@ func TestStreamUpdateError(t *testing.T) {
utEthconnectConf.Set(EthconnectConfigInstancePath, "/instances/0x12345")
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})

assert.Regexp(t, "FF10111", err)
assert.Regexp(t, "pop", err)
Expand Down Expand Up @@ -479,7 +485,7 @@ func TestSubQueryError(t *testing.T) {
utEthconnectConf.Set(EthconnectConfigInstancePath, "/instances/0x12345")
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})

assert.Regexp(t, "FF10111", err)
assert.Regexp(t, "pop", err)
Expand Down Expand Up @@ -511,7 +517,7 @@ func TestSubQueryCreateError(t *testing.T) {
utEthconnectConf.Set(EthconnectConfigInstancePath, "/instances/0x12345")
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})

assert.Regexp(t, "FF10111", err)
assert.Regexp(t, "pop", err)
Expand Down
6 changes: 4 additions & 2 deletions internal/blockchain/fabric/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/hyperledger/firefly/internal/config/wsconfig"
"github.com/hyperledger/firefly/internal/i18n"
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/internal/metrics"
"github.com/hyperledger/firefly/internal/restclient"
"github.com/hyperledger/firefly/pkg/blockchain"
"github.com/hyperledger/firefly/pkg/fftypes"
Expand Down Expand Up @@ -60,6 +61,7 @@ type Fabric struct {
idCache map[string]*fabIdentity
wsconn wsclient.WSClient
closed chan struct{}
metrics metrics.Manager
}

type eventStreamWebsocket struct {
Expand Down Expand Up @@ -153,13 +155,13 @@ func (f *Fabric) VerifierType() fftypes.VerifierType {
return fftypes.VerifierTypeMSPIdentity
}

func (f *Fabric) Init(ctx context.Context, prefix config.Prefix, callbacks blockchain.Callbacks) (err error) {

func (f *Fabric) Init(ctx context.Context, prefix config.Prefix, callbacks blockchain.Callbacks, metrics metrics.Manager) (err error) {
fabconnectConf := prefix.SubPrefix(FabconnectConfigKey)

f.ctx = log.WithLogField(ctx, "proto", "fabric")
f.callbacks = callbacks
f.idCache = make(map[string]*fabIdentity)
f.metrics = metrics

if fabconnectConf.GetString(restclient.HTTPConfigURL) == "" {
return i18n.NewError(ctx, i18n.MsgMissingPluginConfig, "url", "blockchain.fabconnect")
Expand Down
21 changes: 11 additions & 10 deletions internal/blockchain/fabric/fabric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/internal/restclient"
"github.com/hyperledger/firefly/mocks/blockchainmocks"
"github.com/hyperledger/firefly/mocks/metricsmocks"
"github.com/hyperledger/firefly/mocks/wsmocks"
"github.com/hyperledger/firefly/pkg/blockchain"
"github.com/hyperledger/firefly/pkg/fftypes"
Expand Down Expand Up @@ -103,7 +104,7 @@ func TestInitMissingURL(t *testing.T) {
e, cancel := newTestFabric()
defer cancel()
resetConf()
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})
assert.Regexp(t, "FF10138.*url", err)
}

Expand All @@ -114,7 +115,7 @@ func TestInitMissingChaincode(t *testing.T) {
utFabconnectConf.Set(restclient.HTTPConfigURL, "http://localhost:12345")
utFabconnectConf.Set(FabconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})
assert.Regexp(t, "FF10138.*chaincode", err)
}

Expand All @@ -126,7 +127,7 @@ func TestInitMissingTopic(t *testing.T) {
utFabconnectConf.Set(FabconnectConfigChaincode, "Firefly")
utFabconnectConf.Set(FabconnectConfigSigner, "signer001")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})
assert.Regexp(t, "FF10138.*topic", err)
}

Expand Down Expand Up @@ -169,7 +170,7 @@ func TestInitAllNewStreamsAndWSEvent(t *testing.T) {
utFabconnectConf.Set(FabconnectConfigSigner, "signer001")
utFabconnectConf.Set(FabconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})
assert.NoError(t, err)

assert.Equal(t, "fabric", e.Name())
Expand Down Expand Up @@ -208,7 +209,7 @@ func TestWSInitFail(t *testing.T) {
utFabconnectConf.Set(FabconnectConfigSigner, "signer001")
utFabconnectConf.Set(FabconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})
assert.Regexp(t, "FF10162", err)

}
Expand Down Expand Up @@ -249,7 +250,7 @@ func TestInitAllExistingStreams(t *testing.T) {
utFabconnectConf.Set(FabconnectConfigSigner, "signer001")
utFabconnectConf.Set(FabconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})

assert.Equal(t, 2, httpmock.GetTotalCallCount())
assert.Equal(t, "es12345", e.initInfo.stream.ID)
Expand Down Expand Up @@ -279,7 +280,7 @@ func TestStreamQueryError(t *testing.T) {
utFabconnectConf.Set(FabconnectConfigSigner, "signer001")
utFabconnectConf.Set(FabconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})

assert.Regexp(t, "FF10284", err)
assert.Regexp(t, "pop", err)
Expand Down Expand Up @@ -308,7 +309,7 @@ func TestStreamCreateError(t *testing.T) {
utFabconnectConf.Set(FabconnectConfigSigner, "signer001")
utFabconnectConf.Set(FabconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})

assert.Regexp(t, "FF10284", err)
assert.Regexp(t, "pop", err)
Expand Down Expand Up @@ -339,7 +340,7 @@ func TestSubQueryError(t *testing.T) {
utFabconnectConf.Set(FabconnectConfigSigner, "signer001")
utFabconnectConf.Set(FabconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})

assert.Regexp(t, "FF10284", err)
assert.Regexp(t, "pop", err)
Expand Down Expand Up @@ -372,7 +373,7 @@ func TestSubQueryCreateError(t *testing.T) {
utFabconnectConf.Set(FabconnectConfigSigner, "signer001")
utFabconnectConf.Set(FabconnectConfigTopic, "topic1")

err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{})
err := e.Init(e.ctx, utConfPrefix, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{})

assert.Regexp(t, "FF10284", err)
assert.Regexp(t, "pop", err)
Expand Down
1 change: 1 addition & 0 deletions internal/events/batch_pin_complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (em *eventManager) BatchPinComplete(bi blockchain.Plugin, batchPin *blockch
if err := em.persistBlockchainEvent(ctx, chainEvent); err != nil {
return err
}
em.emitBlockchainEventMetric(&batchPin.Event)
private := batchPin.BatchPayloadRef == ""
if err := em.persistContexts(ctx, batchPin, signingKey, private); err != nil {
return err
Expand Down
Loading