Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,9 @@ func (or *orchestrator) initPlugins(ctx context.Context) (err error) {
or.plugins.SharedStorage.Plugin.RegisterListener(&or.bc)

for _, token := range or.plugins.Tokens {
token.Plugin.RegisterListener(&or.bc)
if err := token.Plugin.RegisterListener(or.namespace, &or.bc); err != nil {
return err
}
}

return nil
Expand Down
16 changes: 15 additions & 1 deletion internal/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestInitOK(t *testing.T) {
or.mdx.On("RegisterListener", mock.Anything).Return()
or.mdx.On("SetNodes", mock.Anything).Return()
or.mps.On("RegisterListener", mock.Anything).Return()
or.mti.On("RegisterListener", mock.Anything).Return()
or.mti.On("RegisterListener", "ns", mock.Anything).Return(nil)
err := or.Init(or.ctx, or.cancelCtx)
assert.NoError(t, err)

Expand All @@ -206,6 +206,20 @@ func TestInitOK(t *testing.T) {
assert.Equal(t, or.mnm, or.NetworkMap())
}

func TestInitTokenListenerFail(t *testing.T) {
or := newTestOrchestrator()
defer or.cleanup(t)
or.mdi.On("RegisterListener", mock.Anything).Return()
or.mbi.On("RegisterListener", mock.Anything).Return()
or.mdi.On("GetIdentities", mock.Anything, mock.Anything).Return([]*core.Identity{{}}, nil, nil)
or.mdx.On("RegisterListener", mock.Anything).Return()
or.mdx.On("SetNodes", mock.Anything).Return()
or.mps.On("RegisterListener", mock.Anything).Return()
or.mti.On("RegisterListener", "ns", mock.Anything).Return(fmt.Errorf("pop"))
err := or.Init(or.ctx, or.cancelCtx)
assert.EqualError(t, err, "pop")
}

func TestInitDataexchangeNodesFail(t *testing.T) {
or := newTestOrchestrator()
defer or.cleanup(t)
Expand Down
180 changes: 117 additions & 63 deletions internal/tokens/fftokens/fftokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type FFTokens struct {
}

type callbacks struct {
listeners []tokens.Callbacks
listeners map[string]tokens.Callbacks
}

func (cb *callbacks) TokenOpUpdate(plugin tokens.Plugin, nsOpID string, txState core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) {
Expand All @@ -52,28 +52,52 @@ func (cb *callbacks) TokenOpUpdate(plugin tokens.Plugin, nsOpID string, txState
}
}

func (cb *callbacks) TokenPoolCreated(plugin tokens.Plugin, pool *tokens.TokenPool) error {
for _, cb := range cb.listeners {
if err := cb.TokenPoolCreated(plugin, pool); err != nil {
return err
func (cb *callbacks) TokenPoolCreated(namespace string, plugin tokens.Plugin, pool *tokens.TokenPool) error {
if namespace == "" {
// Older token subscriptions don't populate namespace, so deliver the event to every listener and let them filter
// TODO: deprecate this path
for _, cb := range cb.listeners {
if err := cb.TokenPoolCreated(plugin, pool); err != nil {
return err
}
}
} else {
if listener, ok := cb.listeners[namespace]; ok {
return listener.TokenPoolCreated(plugin, pool)
}
}
return nil
}

func (cb *callbacks) TokensTransferred(plugin tokens.Plugin, transfer *tokens.TokenTransfer) error {
for _, cb := range cb.listeners {
if err := cb.TokensTransferred(plugin, transfer); err != nil {
return err
func (cb *callbacks) TokensTransferred(namespace string, plugin tokens.Plugin, transfer *tokens.TokenTransfer) error {
if namespace == "" {
// Older token subscriptions don't populate namespace, so deliver the event to every listener and let them filter
// TODO: deprecate this path
for _, cb := range cb.listeners {
if err := cb.TokensTransferred(plugin, transfer); err != nil {
return err
}
}
} else {
if listener, ok := cb.listeners[namespace]; ok {
return listener.TokensTransferred(plugin, transfer)
}
}
return nil
}

func (cb *callbacks) TokensApproved(plugin tokens.Plugin, approval *tokens.TokenApproval) error {
for _, cb := range cb.listeners {
if err := cb.TokensApproved(plugin, approval); err != nil {
return err
func (cb *callbacks) TokensApproved(namespace string, plugin tokens.Plugin, approval *tokens.TokenApproval) error {
if namespace == "" {
// Older token subscriptions don't populate namespace, so deliver the event to every listener and let them filter
// TODO: deprecate this path
for _, cb := range cb.listeners {
if err := cb.TokensApproved(plugin, approval); err != nil {
return err
}
}
} else {
if listener, ok := cb.listeners[namespace]; ok {
return listener.TokensApproved(plugin, approval)
}
}
return nil
Expand All @@ -89,6 +113,7 @@ type msgType string

const (
messageReceipt msgType = "receipt"
messageBatch msgType = "batch"
messageTokenPool msgType = "token-pool"
messageTokenMint msgType = "token-mint"
messageTokenBurn msgType = "token-burn"
Expand All @@ -103,6 +128,10 @@ type tokenData struct {
MessageHash *fftypes.Bytes32 `json:"messageHash,omitempty"`
}

type tokenInit struct {
Namespace string `json:"namespace"`
}

type createPool struct {
Type core.TokenType `json:"type"`
RequestID string `json:"requestId"`
Expand All @@ -114,6 +143,7 @@ type createPool struct {
}

type activatePool struct {
Namespace string `json:"namespace"`
PoolLocator string `json:"poolLocator"`
Config fftypes.JSONObject `json:"config"`
RequestID string `json:"requestId,omitempty"`
Expand Down Expand Up @@ -172,16 +202,15 @@ func (ft *FFTokens) Name() string {
func (ft *FFTokens) Init(ctx context.Context, name string, config config.Section) (err error) {
ft.ctx = log.WithLogField(ctx, "proto", "fftokens")
ft.configuredName = name
ft.capabilities = &tokens.Capabilities{}
ft.callbacks.listeners = make(map[string]tokens.Callbacks)

if config.GetString(ffresty.HTTPConfigURL) == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "url", "tokens.fftokens")
}

ft.client = ffresty.New(ft.ctx, config)
ft.capabilities = &tokens.Capabilities{}

wsConfig := wsclient.GenerateConfig(config)

if wsConfig.WSKeyPath == "" {
wsConfig.WSKeyPath = "/api/ws"
}
Expand All @@ -196,8 +225,18 @@ func (ft *FFTokens) Init(ctx context.Context, name string, config config.Section
return nil
}

func (ft *FFTokens) RegisterListener(listener tokens.Callbacks) {
ft.callbacks.listeners = append(ft.callbacks.listeners, listener)
func (ft *FFTokens) RegisterListener(namespace string, listener tokens.Callbacks) error {
ft.callbacks.listeners[namespace] = listener

res, err := ft.client.R().SetContext(ft.ctx).
SetBody(&tokenInit{
Namespace: namespace,
}).
Post("/api/v1/init")
if err != nil || !res.IsSuccess() {
return wrapError(ft.ctx, nil, res, err)
}
return nil
}

func (ft *FFTokens) Start() error {
Expand Down Expand Up @@ -230,10 +269,11 @@ func (ft *FFTokens) handleReceipt(ctx context.Context, data fftypes.JSONObject)
func (ft *FFTokens) handleTokenPoolCreate(ctx context.Context, data fftypes.JSONObject) (err error) {
tokenType := data.GetString("type")
poolLocator := data.GetString("poolLocator")
standard := data.GetString("standard") // optional
symbol := data.GetString("symbol") // optional
decimals := data.GetInt64("decimals") // optional
info := data.GetObject("info") // optional
standard := data.GetString("standard") // optional
symbol := data.GetString("symbol") // optional
decimals := data.GetInt64("decimals") // optional
info := data.GetObject("info") // optional
namespace := data.GetString("namespace") // optional

// All blockchain items below are optional
blockchainEvent := data.GetObject("blockchain")
Expand Down Expand Up @@ -296,7 +336,7 @@ func (ft *FFTokens) handleTokenPoolCreate(ctx context.Context, data fftypes.JSON
}

// If there's an error dispatching the event, we must return the error and shutdown
return ft.callbacks.TokenPoolCreated(ft, pool)
return ft.callbacks.TokenPoolCreated(namespace, ft, pool)
}

func (ft *FFTokens) handleTokenTransfer(ctx context.Context, t core.TokenTransferType, data fftypes.JSONObject) (err error) {
Expand All @@ -308,6 +348,7 @@ func (ft *FFTokens) handleTokenTransfer(ctx context.Context, t core.TokenTransfe
value := data.GetString("amount")
tokenIndex := data.GetString("tokenIndex") // optional
uri := data.GetString("uri") // optional
namespace := data.GetString("namespace") // optional

blockchainEvent := data.GetObject("blockchain")
blockchainID := blockchainEvent.GetString("id")
Expand Down Expand Up @@ -384,7 +425,7 @@ func (ft *FFTokens) handleTokenTransfer(ctx context.Context, t core.TokenTransfe
}

// If there's an error dispatching the event, we must return the error and shutdown
return ft.callbacks.TokensTransferred(ft, transfer)
return ft.callbacks.TokensTransferred(namespace, ft, transfer)
}

func (ft *FFTokens) handleTokenApproval(ctx context.Context, data fftypes.JSONObject) (err error) {
Expand All @@ -394,7 +435,8 @@ func (ft *FFTokens) handleTokenApproval(ctx context.Context, data fftypes.JSONOb
poolLocator := data.GetString("poolLocator")
operatorAddress := data.GetString("operator")
approved := data.GetBool("approved")
info := data.GetObject("info") // optional
info := data.GetObject("info") // optional
namespace := data.GetString("namespace") // optional

blockchainEvent := data.GetObject("blockchain")
blockchainID := blockchainEvent.GetString("id")
Expand Down Expand Up @@ -458,7 +500,54 @@ func (ft *FFTokens) handleTokenApproval(ctx context.Context, data fftypes.JSONOb
},
}

return ft.callbacks.TokensApproved(ft, approval)
return ft.callbacks.TokensApproved(namespace, ft, approval)
}

func (ft *FFTokens) handleMessage(ctx context.Context, msgBytes []byte) (err error) {
l := log.L(ctx)

var msg wsEvent
if err = json.Unmarshal(msgBytes, &msg); err != nil {
l.Errorf("Message cannot be parsed as JSON: %s\n%s", err, string(msgBytes))
return nil // Swallow this and move on
}

l.Debugf("Received %s event %s", msg.Event, msg.ID)
switch msg.Event {
case messageReceipt:
ft.handleReceipt(ctx, msg.Data)
case messageBatch:
for _, msg := range msg.Data.GetObjectArray("events") {
if err = ft.handleMessage(ctx, []byte(msg.String())); err != nil {
break
}
}
case messageTokenPool:
err = ft.handleTokenPoolCreate(ctx, msg.Data)
case messageTokenMint:
err = ft.handleTokenTransfer(ctx, core.TokenTransferTypeMint, msg.Data)
case messageTokenBurn:
err = ft.handleTokenTransfer(ctx, core.TokenTransferTypeBurn, msg.Data)
case messageTokenTransfer:
err = ft.handleTokenTransfer(ctx, core.TokenTransferTypeTransfer, msg.Data)
case messageTokenApproval:
err = ft.handleTokenApproval(ctx, msg.Data)
default:
l.Errorf("Message unexpected: %s", msg.Event)
}

if err == nil && msg.Event != messageReceipt && msg.ID != "" {
l.Debugf("Sending ack %s", msg.ID)
ack, _ := json.Marshal(fftypes.JSONObject{
"event": "ack",
"data": fftypes.JSONObject{
"id": msg.ID,
},
})
err = ft.wsconn.Send(ctx, ack)
}

return err
}

func (ft *FFTokens) eventLoop() {
Expand All @@ -475,43 +564,7 @@ func (ft *FFTokens) eventLoop() {
l.Debugf("Event loop exiting (receive channel closed)")
return
}

var msg wsEvent
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
l.Errorf("Message cannot be parsed as JSON: %s\n%s", err, string(msgBytes))
continue // Swallow this and move on
}
l.Debugf("Received %s event %s", msg.Event, msg.ID)
switch msg.Event {
case messageReceipt:
ft.handleReceipt(ctx, msg.Data)
case messageTokenPool:
err = ft.handleTokenPoolCreate(ctx, msg.Data)
case messageTokenMint:
err = ft.handleTokenTransfer(ctx, core.TokenTransferTypeMint, msg.Data)
case messageTokenBurn:
err = ft.handleTokenTransfer(ctx, core.TokenTransferTypeBurn, msg.Data)
case messageTokenTransfer:
err = ft.handleTokenTransfer(ctx, core.TokenTransferTypeTransfer, msg.Data)
case messageTokenApproval:
err = ft.handleTokenApproval(ctx, msg.Data)
default:
l.Errorf("Message unexpected: %s", msg.Event)
}

if err == nil && msg.Event != messageReceipt && msg.ID != "" {
l.Debugf("Sending ack %s", msg.ID)
ack, _ := json.Marshal(fftypes.JSONObject{
"event": "ack",
"data": fftypes.JSONObject{
"id": msg.ID,
},
})
err = ft.wsconn.Send(ctx, ack)
}

if err != nil {
if err := ft.handleMessage(ctx, msgBytes); err != nil {
l.Errorf("Event loop exiting: %s", err)
return
}
Expand Down Expand Up @@ -571,6 +624,7 @@ func (ft *FFTokens) ActivateTokenPool(ctx context.Context, nsOpID string, pool *
res, err := ft.client.R().SetContext(ctx).
SetBody(&activatePool{
RequestID: nsOpID,
Namespace: pool.Namespace,
PoolLocator: pool.Locator,
Config: pool.Config,
}).
Expand Down
Loading