Skip to content

Commit

Permalink
internal/server/ship_processor.go: refactor to use MessageQueue struct.
Browse files Browse the repository at this point in the history
  • Loading branch information
pnx committed May 12, 2024
1 parent c876875 commit 95365d0
Showing 1 changed file with 23 additions and 75 deletions.
98 changes: 23 additions & 75 deletions internal/server/ship_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"encoding/hex"

"github.com/eosswedenorg/thalos/api"
"github.com/eosswedenorg/thalos/api/message"
"github.com/eosswedenorg/thalos/internal/abi"
"github.com/eosswedenorg/thalos/internal/driver"
Expand All @@ -16,19 +15,6 @@ import (
"github.com/pnx/antelope-go/ship"
)

// logDecoratedEncoder decorates a message.Encoder and logs any error.
func logDecoratedEncoder(encoder message.Encoder) message.Encoder {
return func(v interface{}) ([]byte, error) {
payload, err := encoder(v)
if err != nil {
log.WithError(err).
WithField("v", v).
Warn("Failed to encode message")
}
return payload, err
}
}

// A ShipProcessor will consume messages from a ship stream, convert the messages into
// thalos specific ones, encode them and finally post them to an api.Writer
type ShipProcessor struct {
Expand All @@ -38,11 +24,7 @@ type ShipProcessor struct {
// Abi manager used for cacheing
abi *abi.AbiManager

// Writer to send messages to.
writer driver.Writer

// Encoder used to encode messages
encode message.Encoder
queue MessageQueue

// Function for saving state.
saver StateSaver
Expand All @@ -62,10 +44,9 @@ func SpawnProccessor(shipStream *shipclient.Stream, loader StateLoader, saver St
processor := &ShipProcessor{
saver: saver,
abi: abi,
writer: writer,
shipStream: shipStream,
encode: logDecoratedEncoder(codec.Encoder),
syscontract: chain.N("eosio"),
queue: NewMessageQueue(writer, codec.Encoder),
}

loader(&processor.state)
Expand All @@ -85,22 +66,6 @@ func (processor *ShipProcessor) initHandler(abi *chain.Abi) {
processor.shipABI = abi
}

func (processor *ShipProcessor) queueMessage(channel api.Channel, payload []byte) bool {
err := processor.writer.Write(channel, payload)
if err != nil {
log.WithError(err).Errorf("Failed to post to channel '%s'", channel)
return false
}
return true
}

func (processor *ShipProcessor) encodeQueue(channel api.Channel, v interface{}) bool {
if payload, err := processor.encode(v); err == nil {
return processor.queueMessage(channel, payload)
}
return false
}

// updateAbiFromAction updates the contract abi based on the ship.Action passed.
func (processor *ShipProcessor) updateAbiFromAction(act *chain.Action) error {
set_abi := struct {
Expand Down Expand Up @@ -130,25 +95,6 @@ func (processor *ShipProcessor) GetCurrentBlock() uint32 {
return processor.state.CurrentBlock
}

func (processor *ShipProcessor) broadcastAction(act *message.ActionTrace) {
payload, err := processor.encode(*act)
if err != nil {
log.WithField("act", act).Warn("failed to encode action")
return
}

channels := []api.Channel{
api.ActionChannel{}.Channel(),
api.ActionChannel{Name: act.Name}.Channel(),
api.ActionChannel{Contract: act.Contract}.Channel(),
api.ActionChannel{Name: act.Name, Contract: act.Contract}.Channel(),
}

for _, channel := range channels {
processor.queueMessage(channel, payload)
}
}

func (processor *ShipProcessor) processTransactionTrace(log *log.Entry, blockNumber uint32, block *ship.SignedBlock, trace *ship.TransactionTraceV0) {
logger := log.WithField("type", "trace").WithField("tx_id", trace.ID.String()).Dup()

Expand Down Expand Up @@ -178,13 +124,15 @@ func (processor *ShipProcessor) processTransactionTrace(log *log.Entry, blockNum
actMsg.BlockNum = blockNumber
actMsg.Timestamp = timestamp

processor.broadcastAction(actMsg)
processor.queue.PostAction(*actMsg)

transaction.ActionTraces = append(transaction.ActionTraces, *actMsg)
}
}

processor.encodeQueue(api.TransactionChannel, transaction)
if err := processor.queue.PostTransactionTrace(transaction); err != nil {
logger.WithError(err).Error("Failed to post transaction trace")
}
}

func (processor *ShipProcessor) proccessActionTrace(logger *log.Entry, trace *ship.ActionTraceV1) *message.ActionTrace {
Expand Down Expand Up @@ -255,14 +203,18 @@ func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0
// Check to see if we have a microfork and post a message to
// the rollback channel in that case.
if processor.state.CurrentBlock > 0 && blockNumber < processor.state.CurrentBlock {
log.WithField("old_block", processor.state.CurrentBlock).
WithField("new_block", blockResult.ThisBlock.BlockNum).
Warn("Fork detected, old_block is greater than new_block")

processor.encodeQueue(api.RollbackChannel, message.RollbackMessage{
msg := message.RollbackMessage{
OldBlockNum: processor.state.CurrentBlock,
NewBlockNum: blockResult.ThisBlock.BlockNum,
})
}
log.WithField("old_block", msg.OldBlockNum).
WithField("new_block", msg.NewBlockNum).
Warn("Fork detected, old_block is greater than new_block")

if err := processor.queue.PostRollback(msg); err != nil {
log.WithError(err).Error("Failed to write rollback message")
}
}

processor.state.CurrentBlock = blockNumber
Expand All @@ -277,8 +229,9 @@ func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0
LastIrreversibleBlockNum: blockResult.LastIrreversible.BlockNum,
HeadBlockNum: blockResult.Head.BlockNum,
}

processor.encodeQueue(api.HeartbeatChannel, hb)
if err := processor.queue.PostHeartbeat(hb); err != nil {
log.WithError(err).Error("Failed to write heartbeat message")
}
}

mainLogger := log.WithField("block", blockNumber).Dup()
Expand Down Expand Up @@ -329,25 +282,20 @@ func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0
rows = append(rows, msg)
}

message := message.TableDelta{
msg := message.TableDelta{
BlockNum: blockNumber,
Timestamp: timestamp,
Name: delta.V0.Name,
Rows: rows,
}

channels := []api.Channel{
api.TableDeltaChannel{}.Channel(),
api.TableDeltaChannel{Name: delta.V0.Name}.Channel(),
}

for _, channel := range channels {
processor.encodeQueue(channel, message)
if err := processor.queue.PostTableDelta(msg); err != nil {
logger.WithError(err).Error("Failed to post table delta message")
}
}
}

err := processor.writer.Flush()
err := processor.queue.Flush()
if err != nil {
log.WithError(err).Error("Failed to send messages")
}
Expand All @@ -360,5 +308,5 @@ func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0

// Close closes the writer associated with the processor.
func (processor *ShipProcessor) Close() error {
return processor.writer.Close()
return processor.queue.Close()
}

0 comments on commit 95365d0

Please sign in to comment.