Skip to content

Commit

Permalink
Optimized message sharing selection
Browse files Browse the repository at this point in the history
  • Loading branch information
jalextowle committed Oct 18, 2019
1 parent 8b99f4e commit ed72041
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 27 deletions.
9 changes: 9 additions & 0 deletions core/core.go
Expand Up @@ -137,6 +137,11 @@ type App struct {
snapshotExpirationWatcher *expirationwatch.Watcher
muIdToSnapshotInfo sync.Mutex
idToSnapshotInfo map[string]snapshotInfo
messageHandler *MessageHandler
}

type MessageHandler struct {
nextOffset int
}

func New(config Config) (*App, error) {
Expand Down Expand Up @@ -234,6 +239,9 @@ func New(config Config) (*App, error) {
if err != nil {
return nil, err
}
messageHandler := &MessageHandler{
nextOffset: 0,
}

app := &App{
config: config,
Expand All @@ -249,6 +257,7 @@ func New(config Config) (*App, error) {
meshMessageJSONSchema: meshMessageJSONSchema,
snapshotExpirationWatcher: snapshotExpirationWatcher,
idToSnapshotInfo: map[string]snapshotInfo{},
messageHandler: messageHandler,
}

log.WithFields(map[string]interface{}{
Expand Down
60 changes: 33 additions & 27 deletions core/message_handler.go
Expand Up @@ -12,20 +12,11 @@ import (
// Ensure that App implements p2p.MessageHandler.
var _ p2p.MessageHandler = &App{}

var nextOffset = 0;

func Min(a int, b int) int {
func min(a int, b int) int {
if a < b {
return a;
return a
}
return b;
}

func Max(a int, b int) int {
if a > b {
return a;
}
return b;
return b
}

func (app *App) GetMessagesToShare(max int) ([][]byte, error) {
Expand All @@ -46,22 +37,37 @@ func (app *App) GetMessagesToShare(max int) ([][]byte, error) {
if count == 0 {
return nil, nil
}
// The offset will be the value closest to `nextOffset` such that the
// maximum number of orders that can be shared are selected.
offset := Min(nextOffset, count)
if count - offset < max {
offset = Max(count - max, 0)
}
// Calculate the next offset and wrap back to 0 if the next offset is larger
// than or equal to count.
nextOffset += max
if nextOffset >= count {
nextOffset = 0
}

// Select up to the maximum number of orders starting at the offset that was
// calculated the last time this was called with `app`.
offset := min(app.messageHandler.nextOffset, count)
var selectedOrders []*meshdb.Order
err = ordersSnapshot.NewQuery(notRemovedFilter).Offset(offset).Max(max).Run(&selectedOrders)
if err != nil {
return nil, err
if offset != count {
err = ordersSnapshot.NewQuery(notRemovedFilter).Offset(offset).Max(max).Run(&selectedOrders)
if err != nil {
return nil, err
}
}

// If more orders can be shared than were selected, append the maximum amount of
// unique (in this round) orders that can be added to the selected orders without
// exceeding the maximum number to share.
overflow := min(max-len(selectedOrders), offset)
if overflow > 0 {
var overflowSelectedOrders []*meshdb.Order
err = ordersSnapshot.NewQuery(notRemovedFilter).Offset(0).Max(overflow).Run(&overflowSelectedOrders)
if err != nil {
return nil, err
}
selectedOrders = append(selectedOrders, overflowSelectedOrders...)
app.messageHandler.nextOffset = overflow
} else {
// Calculate the next offset and wrap back to 0 if the next offset is larger
// than or equal to count.
app.messageHandler.nextOffset += max
if app.messageHandler.nextOffset >= count {
app.messageHandler.nextOffset = 0
}
}

log.WithFields(map[string]interface{}{
Expand Down

0 comments on commit ed72041

Please sign in to comment.