Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ CREATE TABLE operations (
tx_id UUID NOT NULL,
optype VARCHAR(64) NOT NULL,
opstatus VARCHAR(64) NOT NULL,
member VARCHAR(1024),
member VARCHAR(1024),
plugin VARCHAR(64) NOT NULL,
backend_id VARCHAR(256) NOT NULL,
created BIGINT NOT NULL,
Expand All @@ -18,6 +18,6 @@ CREATE TABLE operations (
CREATE UNIQUE INDEX operations_id ON operations(id);
CREATE INDEX operations_created ON operations(created);
CREATE INDEX operations_backend ON operations(backend_id);
CREATE INDEX operations_namespace ON operations(namespace);
CREATE INDEX operations_tx ON operations(tx_id);

COMMIT;
COMMIT;
5 changes: 2 additions & 3 deletions db/migrations/sqlite/000008_create_operations_table.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ CREATE TABLE operations (
tx_id UUID NOT NULL,
optype VARCHAR(64) NOT NULL,
opstatus VARCHAR(64) NOT NULL,
member VARCHAR(1024),
member VARCHAR(1024),
plugin VARCHAR(64) NOT NULL,
backend_id VARCHAR(256) NOT NULL,
created BIGINT NOT NULL,
Expand All @@ -17,5 +17,4 @@ CREATE TABLE operations (
CREATE UNIQUE INDEX operations_id ON operations(id);
CREATE INDEX operations_created ON operations(created);
CREATE INDEX operations_backend ON operations(backend_id);
CREATE INDEX operations_namespace ON operations(namespace);

CREATE INDEX operations_tx ON operations(tx_id);
15 changes: 15 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4306,6 +4306,9 @@ paths:
properties:
author:
type: string
config:
additionalProperties: {}
type: object
connector:
type: string
created: {}
Expand Down Expand Up @@ -4368,6 +4371,9 @@ paths:
properties:
author:
type: string
config:
additionalProperties: {}
type: object
name:
type: string
symbol:
Expand All @@ -4386,6 +4392,9 @@ paths:
properties:
author:
type: string
config:
additionalProperties: {}
type: object
connector:
type: string
created: {}
Expand Down Expand Up @@ -4416,6 +4425,9 @@ paths:
properties:
author:
type: string
config:
additionalProperties: {}
type: object
connector:
type: string
created: {}
Expand Down Expand Up @@ -4480,6 +4492,9 @@ paths:
properties:
author:
type: string
config:
additionalProperties: {}
type: object
connector:
type: string
created: {}
Expand Down
52 changes: 44 additions & 8 deletions internal/assets/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package assets

import (
"context"
"fmt"

"github.com/hyperledger/firefly/internal/broadcast"
"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/data"
"github.com/hyperledger/firefly/internal/i18n"
Expand All @@ -37,9 +39,10 @@ type Manager interface {
GetTokenPools(ctx context.Context, ns, typeName string, filter database.AndFilter) ([]*fftypes.TokenPool, *database.FilterResult, error)
GetTokenPool(ctx context.Context, ns, typeName, name string) (*fftypes.TokenPool, error)
GetTokenAccounts(ctx context.Context, ns, typeName, name string, filter database.AndFilter) ([]*fftypes.TokenAccount, *database.FilterResult, error)
ValidateTokenPoolTx(ctx context.Context, pool *fftypes.TokenPool, protocolTxID string) error

// Bound token callbacks
TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error
TokenPoolCreated(tk tokens.Plugin, tokenType fftypes.TokenType, tx *fftypes.UUID, protocolID, signingIdentity, protocolTxID string, additionalInfo fftypes.JSONObject) error

Start() error
WaitStop()
Expand All @@ -51,12 +54,13 @@ type assetManager struct {
identity identity.Plugin
data data.Manager
syncasync syncasync.Bridge
broadcast broadcast.Manager
tokens map[string]tokens.Plugin
retry retry.Retry
txhelper txcommon.Helper
}

func NewAssetManager(ctx context.Context, di database.Plugin, ii identity.Plugin, dm data.Manager, sa syncasync.Bridge, ti map[string]tokens.Plugin) (Manager, error) {
func NewAssetManager(ctx context.Context, di database.Plugin, ii identity.Plugin, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, ti map[string]tokens.Plugin) (Manager, error) {
if di == nil || ii == nil || sa == nil || ti == nil {
return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError)
}
Expand All @@ -66,6 +70,7 @@ func NewAssetManager(ctx context.Context, di database.Plugin, ii identity.Plugin
identity: ii,
data: dm,
syncasync: sa,
broadcast: bm,
tokens: ti,
retry: retry.Retry{
InitialDelay: config.GetDuration(config.AssetManagerRetryInitialDelay),
Expand All @@ -86,6 +91,30 @@ func (am *assetManager) selectTokenPlugin(ctx context.Context, name string) (tok
return nil, i18n.NewError(ctx, i18n.MsgUnknownTokensPlugin, name)
}

func addTokenPoolCreateInputs(op *fftypes.Operation, pool *fftypes.TokenPool) {
op.Input = fftypes.JSONObject{
"id": pool.ID.String(),
"namespace": pool.Namespace,
"name": pool.Name,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note that we think this is where a custom JSON input payload could be added in, that's opaque to FireFly core and allows individual Token Connectors to define additional inputs.

For example, the existing contract address of an ERC20/ERC721.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that is exposed as config now on the firefly-tokens-erc1155 API. I could add that to the accepted inputs for the route, store it here, and pass it through to the /pool API (all opaque to FireFly).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a commit for this

"config": pool.Config,
}
}

func retrieveTokenPoolCreateInputs(ctx context.Context, op *fftypes.Operation, pool *fftypes.TokenPool) (err error) {
input := &op.Input
pool.ID, err = fftypes.ParseUUID(ctx, input.GetString("id"))
if err != nil {
return err
}
pool.Namespace = input.GetString("namespace")
pool.Name = input.GetString("name")
if pool.Namespace == "" || pool.Name == "" {
return fmt.Errorf("namespace or name missing from inputs")
}
pool.Config = input.GetObject("config")
return nil
}

func (am *assetManager) CreateTokenPool(ctx context.Context, ns string, typeName string, pool *fftypes.TokenPool, waitConfirm bool) (*fftypes.TokenPool, error) {
return am.CreateTokenPoolWithID(ctx, ns, fftypes.NewUUID(), typeName, pool, waitConfirm)
}
Expand Down Expand Up @@ -132,6 +161,13 @@ func (am *assetManager) CreateTokenPoolWithID(ctx context.Context, ns string, id
return nil, err
}

pool.ID = id
pool.Namespace = ns
pool.TX = fftypes.TransactionRef{
ID: tx.ID,
Type: tx.Subject.Type,
}

op := fftypes.NewTXOperation(
plugin,
ns,
Expand All @@ -140,17 +176,12 @@ func (am *assetManager) CreateTokenPoolWithID(ctx context.Context, ns string, id
fftypes.OpTypeTokensCreatePool,
fftypes.OpStatusPending,
author.Identifier)
addTokenPoolCreateInputs(op, pool)
err = am.database.UpsertOperation(ctx, op, false)
if err != nil {
return nil, err
}

pool.ID = id
pool.Namespace = ns
pool.TX = fftypes.TransactionRef{
ID: tx.ID,
Type: tx.Subject.Type,
}
return pool, plugin.CreateTokenPool(ctx, op.ID, author, pool)
}

Expand Down Expand Up @@ -189,6 +220,11 @@ func (am *assetManager) GetTokenAccounts(ctx context.Context, ns, typeName, name
return am.database.GetTokenAccounts(ctx, filter.Condition(filter.Builder().Eq("protocolid", pool.ProtocolID)))
}

func (am *assetManager) ValidateTokenPoolTx(ctx context.Context, pool *fftypes.TokenPool, protocolTxID string) error {
// TODO: validate that the given token pool was created with the given protocolTxId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think it would be good to have a high level architecture picture of the flow, and what this validation means.
Not a requirement for this PR obviously, but something for the docs when we get to them.

I know you and I have had a chance to talk about the flow... and it really feels significant to me.
How the broadcast and the token are linked together and verified, and how all nodes in the network become aware of the existence of the token (existing, or newly deployed) via the exchanges.

Copy link
Contributor Author

@awrichar awrichar Sep 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attached a very high level picture of the flow I'm envisioning, but I've realized that I'm framing the question to the connector as "did Ethereum transaction 0xabc create pool F1", and the connector actually doesn't have the means to answer that.

We've hit this before, but ethconnect currently doesn't give a way to look up the events emitted from a particular transaction. I think it may be possible by decoding the output of eth_getTransactionReceipt, but I'm not quite sure how difficult it would be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great. Think it would be good to add them to #218

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be possible by decoding the output of eth_getTransactionReceipt

The event stream code is processing these same binary logs, so it should certainly be possible to (just like you did for transaction inputs) allow the logs to be processed: https://github.com/hyperledger/firefly-ethconnect/blob/e708f38f5eac8ad2c4a6b084577c3954e75e7488/internal/events/logprocessor.go#L110-L181

The sticky bit will be making sure the code knows how to find the right event ABI. So the most practical and easy to implement version of the API, would probably be to add a /retrieve as a peer to /subscribe under the event on the REST ABI. A lot like you did for the method input parameters.

https://github.com/hyperledger/firefly-ethconnect/blob/8545f1c09d5fc9829508dac9a08d8c57210079eb/internal/contractgateway/rest2eth.go#L312

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tips - this will definitely be a work item that needs more exploration. Opened #222 to track it further.

return nil
}

func (am *assetManager) Start() error {
return nil
}
Expand Down
14 changes: 12 additions & 2 deletions internal/assets/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/syncasync"
"github.com/hyperledger/firefly/mocks/broadcastmocks"
"github.com/hyperledger/firefly/mocks/databasemocks"
"github.com/hyperledger/firefly/mocks/datamocks"
"github.com/hyperledger/firefly/mocks/identitymocks"
Expand All @@ -40,12 +41,13 @@ func newTestAssets(t *testing.T) (*assetManager, func()) {
mii := &identitymocks.Plugin{}
mdm := &datamocks.Manager{}
msa := &syncasyncmocks.Bridge{}
mbm := &broadcastmocks.Manager{}
mti := &tokenmocks.Plugin{}
mti.On("Name").Return("ut_tokens").Maybe()
defaultIdentity := &fftypes.Identity{Identifier: "UTNodeID", OnChain: "0x12345"}
mii.On("Resolve", mock.Anything, "UTNodeID").Return(defaultIdentity, nil).Maybe()
ctx, cancel := context.WithCancel(context.Background())
a, err := NewAssetManager(ctx, mdi, mii, mdm, msa, map[string]tokens.Plugin{"magic-tokens": mti})
a, err := NewAssetManager(ctx, mdi, mii, mdm, msa, mbm, map[string]tokens.Plugin{"magic-tokens": mti})
rag := mdi.On("RunAsGroup", ctx, mock.Anything).Maybe()
rag.RunFn = func(a mock.Arguments) {
rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))}
Expand All @@ -55,7 +57,7 @@ func newTestAssets(t *testing.T) (*assetManager, func()) {
}

func TestInitFail(t *testing.T) {
_, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil)
_, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil, nil)
assert.Regexp(t, "FF10128", err)
}

Expand Down Expand Up @@ -289,3 +291,11 @@ func TestGetTokenAccountsBadPool(t *testing.T) {
_, _, err := am.GetTokenAccounts(context.Background(), "ns1", "magic-tokens", "test", f)
assert.EqualError(t, err, "pop")
}

func TestValidateTokenPoolTx(t *testing.T) {
am, cancel := newTestAssets(t)
defer cancel()

err := am.ValidateTokenPoolTx(context.Background(), nil, "")
assert.NoError(t, err)
}
107 changes: 60 additions & 47 deletions internal/assets/token_pool_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,69 +17,82 @@
package assets

import (
"context"

"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/pkg/database"
"github.com/hyperledger/firefly/pkg/fftypes"
"github.com/hyperledger/firefly/pkg/tokens"
)

func (am *assetManager) persistTokenPoolTransaction(ctx context.Context, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) (valid bool, err error) {
if pool.ID == nil || pool.TX.ID == nil {
log.L(ctx).Errorf("Invalid token pool '%s'. Missing ID (%v) or transaction ID (%v)", pool.ID, pool.ID, pool.TX.ID)
return false, nil // this is not retryable
func (am *assetManager) TokenPoolCreated(tk tokens.Plugin, tokenType fftypes.TokenType, tx *fftypes.UUID, protocolID, signingIdentity, protocolTxID string, additionalInfo fftypes.JSONObject) error {
// Find a matching operation within this transaction
fb := database.OperationQueryFactory.NewFilter(am.ctx)
filter := fb.And(
fb.Eq("tx", tx),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I note there is not an index on the tx_id in the SQL database impl:

CREATE UNIQUE INDEX operations_id ON operations(id);
CREATE INDEX operations_created ON operations(created);
CREATE INDEX operations_backend ON operations(backend_id);
CREATE INDEX operations_namespace ON operations(namespace);

I think that's a straight omission, because we have a first class query by TX in the REST model:

Path: "namespaces/{ns}/transactions/{txnid}/operations",

... I am a little worried we're over indexing this critical table... so maybe worth working out if we need all those existing indexes. The namespace index looks potentially superfluous to me.

Copy link
Contributor Author

@awrichar awrichar Sep 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have these URIs in the REST model:

/namespaces/{ns}/operations
/namespaces/{ns}/messages/{msgid}/operations
/namespaces/{ns}/transactions/{txnid}/operations

So all 3 are filtered by namespace, and the latter 2 are also filtered by transaction ID.
This PR adds 2 more queries by transaction ID, and DX has one query by backend ID.
I'm not sure why we have an index on "created" - maybe to support the UI?

My take: seems that the indexes most important to the internal functioning of FireFly are on ID, transaction ID, and backend ID. I'd vote to drop indexes on created and namespace - they're probably just there to support routes for inspection/debugging, but those paths don't seem performance critical.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's drop just the namespace one for now... the reason for created is that the UI will provide rich filtering and counting on all its panels. To make this less problematic for the DB, each panel is scoped to a time period like "last 24h". So I would expect a realistic scenario to be that all traffic is on a single namespace at whatever TPS, and then the UI asks "show me a paginated view for everything for the last 24h"... I'm worried without an index, that means a whole table scan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... overall I'm sure we'll be revisiting indexes

fb.Eq("type", fftypes.OpTypeTokensCreatePool),
)
operations, _, err := am.database.GetOperations(am.ctx, filter)
if err != nil || len(operations) == 0 {
log.L(am.ctx).Debugf("Token pool transaction '%s' ignored, as it did not match an operation submitted by this node", tx)
return nil
}

pool := &fftypes.TokenPoolAnnouncement{
TokenPool: fftypes.TokenPool{
Type: tokenType,
ProtocolID: protocolID,
Author: signingIdentity,
},
ProtocolTxID: protocolTxID,
}
return am.txhelper.PersistTransaction(ctx, &fftypes.Transaction{
ID: pool.TX.ID,
err = retrieveTokenPoolCreateInputs(am.ctx, operations[0], &pool.TokenPool)
if err != nil {
log.L(am.ctx).Errorf("Error retrieving pool info from transaction '%s' (%s) - ignoring: %v", tx, err, operations[0].Input)
return nil
}

// Update the transaction with the info received (but leave transaction as "pending").
// At this point we are the only node in the network that knows about this transaction object.
// Our local token connector has performed whatever actions it needs to perform, to give us
// enough information to distribute to all other token connectors in the network.
// (e.g. details of a newly created token instance or an existing one)
transaction := &fftypes.Transaction{
ID: tx,
Status: fftypes.OpStatusPending,
Subject: fftypes.TransactionSubject{
Namespace: pool.Namespace,
Type: pool.TX.Type,
Type: fftypes.TransactionTypeTokenPool,
Signer: signingIdentity,
Reference: pool.ID,
},
ProtocolID: protocolTxID,
Info: additionalInfo,
})
}

func (am *assetManager) persistTokenPool(ctx context.Context, pool *fftypes.TokenPool) (valid bool, err error) {
l := log.L(ctx)
if err := fftypes.ValidateFFNameField(ctx, pool.Name, "name"); err != nil {
l.Errorf("Invalid token pool '%s' - invalid name '%s': %a", pool.ID, pool.Name, err)
return false, nil // This is not retryable
}
err = am.database.UpsertTokenPool(ctx, pool)
if err != nil {
if err == database.IDMismatch {
log.L(ctx).Errorf("Invalid token pool '%s'. ID mismatch with existing record", pool.ID)
return false, nil // This is not retryable
pool.TX.ID = transaction.ID
pool.TX.Type = transaction.Subject.Type

// Add a new operation for the announcement
Copy link
Contributor Author

@awrichar awrichar Sep 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't sure if we want an operation for this leg of the transaction or not (ie tracking the state of the broadcast announcement). I went ahead and added one, and I'm using it primarily to determine if the broadcast initiated with me or with another node. But I'm not positive this is correct/necessary. Open to thoughts on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's really nice that if you list the operations on the transaction, you'll see that it triggered the broadcast on this node.

op := fftypes.NewTXOperation(
tk,
pool.Namespace,
tx,
"",
fftypes.OpTypeTokensAnnouncePool,
fftypes.OpStatusPending,
signingIdentity)

var valid bool
err = am.retry.Do(am.ctx, "persist token pool transaction", func(attempt int) (bool, error) {
valid, err = am.txhelper.PersistTransaction(am.ctx, transaction)
if valid && err == nil {
err = am.database.UpsertOperation(am.ctx, op, false)
}
l.Errorf("Failed to insert token pool '%s': %s", pool.ID, err)
return false, err // a persistence failure here is considered retryable (so returned)
return err != nil, err
})
if !valid || err != nil {
return err
}
return true, nil
}

func (am *assetManager) TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error {
return am.retry.Do(am.ctx, "persist token pool", func(attempt int) (bool, error) {
err := am.database.RunAsGroup(am.ctx, func(ctx context.Context) error {
valid, err := am.persistTokenPoolTransaction(ctx, pool, signingIdentity, protocolTxID, additionalInfo)
if valid && err == nil {
valid, err = am.persistTokenPool(ctx, pool)
}
if err != nil {
return err
}
if !valid {
log.L(ctx).Warnf("Token pool rejected id=%s author=%s", pool.ID, signingIdentity)
event := fftypes.NewEvent(fftypes.EventTypePoolRejected, pool.Namespace, pool.ID)
return am.database.InsertEvent(ctx, event)
}
log.L(ctx).Infof("Token pool created id=%s author=%s", pool.ID, signingIdentity)
event := fftypes.NewEvent(fftypes.EventTypePoolConfirmed, pool.Namespace, pool.ID)
return am.database.InsertEvent(ctx, event)
})
return err != nil, err // retry indefinitely (until context closes)
})
// Announce the details of the new token pool
_, err = am.broadcast.BroadcastTokenPool(am.ctx, pool.Namespace, pool, false)
return err
}
Loading