Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func (psql *Postgres) Init(ctx context.Context, config config.Section) error {
return psql.SQLCommon.Init(ctx, psql, config, capabilities)
}

func (psql *Postgres) SetHandler(handler database.Callbacks) {
psql.SQLCommon.SetHandler(handler)
func (psql *Postgres) SetHandler(namespace string, handler database.Callbacks) {
psql.SQLCommon.SetHandler(namespace, handler)
}

func (psql *Postgres) Name() string {
Expand Down
2 changes: 1 addition & 1 deletion internal/database/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

func TestPostgresProvider(t *testing.T) {
psql := &Postgres{}
psql.SetHandler(&databasemocks.Callbacks{})
psql.SetHandler("ns", &databasemocks.Callbacks{})
config := config.RootSection("unittest")
psql.InitConfig(config)
config.Set(sqlcommon.SQLConfDatasourceURL, "!bad connection")
Expand Down
2 changes: 1 addition & 1 deletion internal/database/sqlcommon/contractapis_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestContractAPIDBFailUpdate(t *testing.T) {
func TestUpsertContractAPIIDMismatch(t *testing.T) {
s, db := newMockProvider().init()
callbacks := &databasemocks.Callbacks{}
s.SetHandler(callbacks)
s.SetHandler("ns1", callbacks)
apiID := fftypes.NewUUID()
api := &core.ContractAPI{
ID: apiID,
Expand Down
2 changes: 1 addition & 1 deletion internal/database/sqlcommon/provider_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newMockProvider() *mockProvider {
// init is a convenience to init for tests that aren't testing init itself
func (mp *mockProvider) init() (*mockProvider, sqlmock.Sqlmock) {
_ = mp.Init(context.Background(), mp, mp.config, mp.capabilities)
mp.SetHandler(mp.callbacks)
mp.SetHandler(database.GlobalHandler, mp.callbacks)
return mp, mp.mdb
}

Expand Down
2 changes: 1 addition & 1 deletion internal/database/sqlcommon/provider_sqlitego_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func newSQLiteTestProvider(t *testing.T) (*sqliteGoTestProvider, func()) {

err = tp.Init(context.Background(), tp, tp.config, tp.capabilities)
assert.NoError(tp.t, err)
tp.SetHandler(tp.callbacks)
tp.SetHandler(database.GlobalHandler, tp.callbacks)

return tp, func() {
tp.Close()
Expand Down
31 changes: 23 additions & 8 deletions internal/database/sqlcommon/sqlcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,35 +45,47 @@ type SQLCommon struct {
}

type callbacks struct {
handlers []database.Callbacks
handlers map[string]database.Callbacks
}

func (cb *callbacks) OrderedUUIDCollectionNSEvent(resType database.OrderedUUIDCollectionNS, eventType core.ChangeEventType, ns string, id *fftypes.UUID, sequence int64) {
for _, cb := range cb.handlers {
if cb, ok := cb.handlers[ns]; ok {
cb.OrderedUUIDCollectionNSEvent(resType, eventType, ns, id, sequence)
}
if cb, ok := cb.handlers[database.GlobalHandler]; ok {
cb.OrderedUUIDCollectionNSEvent(resType, eventType, ns, id, sequence)
}
}

func (cb *callbacks) OrderedCollectionNSEvent(resType database.OrderedCollectionNS, eventType core.ChangeEventType, ns string, sequence int64) {
for _, cb := range cb.handlers {
if cb, ok := cb.handlers[ns]; ok {
cb.OrderedCollectionNSEvent(resType, eventType, ns, sequence)
}
if cb, ok := cb.handlers[database.GlobalHandler]; ok {
cb.OrderedCollectionNSEvent(resType, eventType, ns, sequence)
}
}

func (cb *callbacks) UUIDCollectionNSEvent(resType database.UUIDCollectionNS, eventType core.ChangeEventType, ns string, id *fftypes.UUID) {
for _, cb := range cb.handlers {
if cb, ok := cb.handlers[ns]; ok {
cb.UUIDCollectionNSEvent(resType, eventType, ns, id)
}
if cb, ok := cb.handlers[database.GlobalHandler]; ok {
cb.UUIDCollectionNSEvent(resType, eventType, ns, id)
}
}

func (cb *callbacks) UUIDCollectionEvent(resType database.UUIDCollection, eventType core.ChangeEventType, id *fftypes.UUID) {
for _, cb := range cb.handlers {
if cb, ok := cb.handlers[database.GlobalHandler]; ok {
cb.UUIDCollectionEvent(resType, eventType, id)
}
}

func (cb *callbacks) HashCollectionNSEvent(resType database.HashCollectionNS, eventType core.ChangeEventType, ns string, hash *fftypes.Bytes32) {
for _, cb := range cb.handlers {
if cb, ok := cb.handlers[ns]; ok {
cb.HashCollectionNSEvent(resType, eventType, ns, hash)
}
if cb, ok := cb.handlers[database.GlobalHandler]; ok {
cb.HashCollectionNSEvent(resType, eventType, ns, hash)
}
}
Expand Down Expand Up @@ -130,8 +142,11 @@ func (s *SQLCommon) Init(ctx context.Context, provider Provider, config config.S
return nil
}

func (s *SQLCommon) SetHandler(handler database.Callbacks) {
s.callbacks.handlers = append(s.callbacks.handlers, handler)
func (s *SQLCommon) SetHandler(namespace string, handler database.Callbacks) {
if s.callbacks.handlers == nil {
s.callbacks.handlers = make(map[string]database.Callbacks)
}
s.callbacks.handlers[namespace] = handler
}

func (s *SQLCommon) Capabilities() *database.Capabilities { return s.capabilities }
Expand Down
25 changes: 25 additions & 0 deletions internal/database/sqlcommon/sqlcommon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
sq "github.com/Masterminds/squirrel"
"github.com/golang-migrate/migrate/v4"
"github.com/hyperledger/firefly-common/pkg/config"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly/mocks/databasemocks"
"github.com/hyperledger/firefly/pkg/core"
"github.com/hyperledger/firefly/pkg/database"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -361,3 +363,26 @@ func TestInsertTxRowsIncompleteReturn(t *testing.T) {
err = s.insertTxRows(ctx, "table1", tx, sb, nil, []int64{1, 2}, false)
assert.Regexp(t, "FF10116", err)
}

func TestNamespaceCallbacks(t *testing.T) {
tcb := &databasemocks.Callbacks{}
cb := callbacks{
handlers: map[string]database.Callbacks{
"ns1": tcb,
},
}
id := fftypes.NewUUID()
hash := fftypes.NewRandB32()

tcb.On("OrderedUUIDCollectionNSEvent", database.CollectionMessages, core.ChangeEventTypeCreated, "ns1", id, int64(1)).Return()
tcb.On("OrderedCollectionNSEvent", database.CollectionPins, core.ChangeEventTypeCreated, "ns1", int64(1)).Return()
tcb.On("UUIDCollectionNSEvent", database.CollectionOperations, core.ChangeEventTypeCreated, "ns1", id).Return()
tcb.On("UUIDCollectionEvent", database.CollectionNamespaces, core.ChangeEventTypeCreated, id).Return()
tcb.On("HashCollectionNSEvent", database.CollectionGroups, core.ChangeEventTypeUpdated, "ns1", hash).Return()

cb.OrderedUUIDCollectionNSEvent(database.CollectionMessages, core.ChangeEventTypeCreated, "ns1", id, 1)
cb.OrderedCollectionNSEvent(database.CollectionPins, core.ChangeEventTypeCreated, "ns1", 1)
cb.UUIDCollectionNSEvent(database.CollectionOperations, core.ChangeEventTypeCreated, "ns1", id)
cb.UUIDCollectionEvent(database.CollectionNamespaces, core.ChangeEventTypeCreated, id)
cb.HashCollectionNSEvent(database.CollectionGroups, core.ChangeEventTypeUpdated, "ns1", hash)
}
2 changes: 1 addition & 1 deletion internal/database/sqlcommon/tokenpool_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestUpsertTokenPoolFailCommit(t *testing.T) {
func TestUpsertTokenPoolUpdateIDMismatch(t *testing.T) {
s, db := newMockProvider().init()
callbacks := &databasemocks.Callbacks{}
s.SetHandler(callbacks)
s.SetHandler("ns1", callbacks)
poolID := fftypes.NewUUID()
pool := &core.TokenPool{
ID: poolID,
Expand Down
4 changes: 2 additions & 2 deletions internal/database/sqlite3/sqlite3.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func (sqlite *SQLite3) Init(ctx context.Context, config config.Section) error {
return sqlite.SQLCommon.Init(ctx, sqlite, config, capabilities)
}

func (sqlite *SQLite3) SetHandler(handler database.Callbacks) {
sqlite.SQLCommon.SetHandler(handler)
func (sqlite *SQLite3) SetHandler(namespace string, handler database.Callbacks) {
sqlite.SQLCommon.SetHandler(namespace, handler)
}

func (sqlite *SQLite3) Name() string {
Expand Down
2 changes: 1 addition & 1 deletion internal/database/sqlite3/sqlite3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

func TestSQLite3GoProvider(t *testing.T) {
sqlite := &SQLite3{}
sqlite.SetHandler(&databasemocks.Callbacks{})
sqlite.SetHandler("ns", &databasemocks.Callbacks{})
config := config.RootSection("unittest")
sqlite.InitConfig(config)
config.Set(sqlcommon.SQLConfDatasourceURL, "!wrong://")
Expand Down
37 changes: 25 additions & 12 deletions internal/dataexchange/ffdx/dxevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package ffdx

import (
"strings"
"encoding/json"
"fmt"

"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/i18n"
Expand Down Expand Up @@ -92,6 +93,7 @@ func (e *dxEvent) TransferResult() *dataexchange.TransferResult {
}

func (h *FFDX) dispatchEvent(msg *wsEvent) {
var namespace string
var err error
e := &dxEvent{ffdx: h, id: msg.EventID, requestID: msg.RequestID}
switch msg.Type {
Expand Down Expand Up @@ -129,10 +131,21 @@ func (h *FFDX) dispatchEvent(msg *wsEvent) {
},
}
case messageReceived:
e.dxType = dataexchange.DXEventTypeMessageReceived
e.messageReceived = &dataexchange.MessageReceived{
PeerID: msg.Sender,
Data: []byte(msg.Message),
// De-serialize the transport wrapper
var wrapper *core.TransportWrapper
err = json.Unmarshal([]byte(msg.Message), &wrapper)
switch {
case err != nil:
err = fmt.Errorf("invalid transmission from peer '%s': %s", msg.Sender, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why these aren't translated errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error is just logged below (this method never returns errors). I could still localize if you think there's value.

case wrapper.Batch == nil:
err = fmt.Errorf("invalid transmission from peer '%s': nil batch", msg.Sender)
default:
namespace = wrapper.Batch.Namespace
e.dxType = dataexchange.DXEventTypeMessageReceived
e.messageReceived = &dataexchange.MessageReceived{
PeerID: msg.Sender,
Transport: wrapper,
}
}
case blobFailed:
e.dxType = dataexchange.DXEventTypeTransferResult
Expand Down Expand Up @@ -161,14 +174,10 @@ func (h *FFDX) dispatchEvent(msg *wsEvent) {
var hash *fftypes.Bytes32
hash, err = fftypes.ParseBytes32(h.ctx, msg.Hash)
if err == nil {
var ns string
pathParts := strings.Split(msg.Path, "/")
if len(pathParts) >= 2 {
ns = pathParts[len(pathParts)-2]
}
_, namespace, _ = splitBlobPath(msg.Path)
e.dxType = dataexchange.DXEventTypePrivateBlobReceived
e.privateBlobReceived = &dataexchange.PrivateBlobReceived{
Namespace: ns,
Namespace: namespace,
PeerID: msg.Sender,
Hash: *hash,
Size: msg.Size,
Expand All @@ -188,11 +197,15 @@ func (h *FFDX) dispatchEvent(msg *wsEvent) {
default:
err = i18n.NewError(h.ctx, coremsgs.MsgUnexpectedDXMessageType, msg.Type)
}

// If we couldn't dispatch the event we received, we still ack it
if err != nil {
log.L(h.ctx).Warnf("Failed to dispatch DX event: %s", err)
e.Ack()
} else {
h.callbacks.DXEvent(e)
if namespace == "" && msg.RequestID != "" {
namespace, _, _ = core.ParseNamespacedOpID(h.ctx, msg.RequestID)
}
h.callbacks.DXEvent(h.ctx, namespace, e)
}
}
38 changes: 30 additions & 8 deletions internal/dataexchange/ffdx/ffdx.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io"
"net/http"
"strconv"
"strings"
"sync"

"github.com/go-resty/resty/v2"
Expand Down Expand Up @@ -50,15 +51,36 @@ type FFDX struct {
}

type callbacks struct {
handlers []dataexchange.Callbacks
handlers map[string]dataexchange.Callbacks
}

func (cb *callbacks) DXEvent(event dataexchange.DXEvent) {
for _, cb := range cb.handlers {
cb.DXEvent(event)
func (cb *callbacks) DXEvent(ctx context.Context, namespace string, event dataexchange.DXEvent) {
if handler, ok := cb.handlers[namespace]; ok {
handler.DXEvent(event)
} else {
log.L(ctx).Errorf("unknown namespace on event '%s'", event.EventID())
event.Ack()
}
}

func splitLast(s string, sep string) (string, string) {
split := strings.LastIndex(s, sep)
if split == -1 {
return "", s
}
return s[:split], s[split+1:]
}

func splitBlobPath(path string) (prefix, namespace, id string) {
path, id = splitLast(path, "/")
path, namespace = splitLast(path, "/")
return path, namespace, id
}

func joinBlobPath(namespace, id string) string {
return fmt.Sprintf("%s/%s", namespace, id)
}

const (
dxHTTPHeaderHash = "dx-hash"
dxHTTPHeaderSize = "dx-size"
Expand Down Expand Up @@ -121,7 +143,7 @@ func (h *FFDX) Name() string {
func (h *FFDX) Init(ctx context.Context, config config.Section) (err error) {
h.ctx = log.WithLogField(ctx, "dx", "https")
h.ackChannel = make(chan *ack)

h.callbacks.handlers = make(map[string]dataexchange.Callbacks)
h.needsInit = config.GetBool(DataExchangeInitEnabled)

if config.GetString(ffresty.HTTPConfigURL) == "" {
Expand All @@ -148,8 +170,8 @@ func (h *FFDX) SetNodes(nodes []fftypes.JSONObject) {
h.nodes = nodes
}

func (h *FFDX) SetHandler(handler dataexchange.Callbacks) {
h.callbacks.handlers = append(h.callbacks.handlers, handler)
func (h *FFDX) SetHandler(namespace string, handler dataexchange.Callbacks) {
h.callbacks.handlers[namespace] = handler
}

func (h *FFDX) Start() error {
Expand Down Expand Up @@ -227,7 +249,7 @@ func (h *FFDX) AddPeer(ctx context.Context, peer fftypes.JSONObject) (err error)
}

func (h *FFDX) UploadBlob(ctx context.Context, ns string, id fftypes.UUID, content io.Reader) (payloadRef string, hash *fftypes.Bytes32, size int64, err error) {
payloadRef = fmt.Sprintf("%s/%s", ns, &id)
payloadRef = joinBlobPath(ns, id.String())
var upload uploadBlob
res, err := h.client.R().SetContext(ctx).
SetFileReader("file", id.String(), content).
Expand Down
Loading