Skip to content

Commit

Permalink
internal/server/ship_processor.go: refactor out table delta row procc…
Browse files Browse the repository at this point in the history
…essing to its own function
  • Loading branch information
pnx committed May 17, 2024
1 parent cc754ee commit 8bd3736
Showing 1 changed file with 32 additions and 30 deletions.
62 changes: 32 additions & 30 deletions internal/server/ship_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,36 @@ func (processor *ShipProcessor) proccessActionTrace(logger *log.Entry, trace *sh
return act
}

func (processor *ShipProcessor) proccessDeltaRows(logger *log.Entry, table_name string, rows []ship.Row) []message.TableDeltaRow {
out := []message.TableDeltaRow{}
for _, row := range rows {

msg := message.TableDeltaRow{
Present: row.Present,
RawData: row.Data,
}

if processor.shipABI != nil {

v, err := processor.shipABI.Decode(bytes.NewReader(row.Data), table_name)
if err == nil {
v, err := parseTableDeltaData(v)
if err == nil {
msg.Data = v
} else {
logger.WithError(err).Error("Failed to parse table delta data")
}
} else {
logger.Error("Failed to decode table delta")
}
} else {
logger.Warn("No SHIP ABI present")
}
out = append(out, msg)
}
return out
}

// Callback function called by shipclient.Stream when a new block arrives.
func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0) {
block := ship.SignedBlock{}
Expand Down Expand Up @@ -254,42 +284,14 @@ func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0
if err := blockResult.Deltas.Unpack(&deltas); err != nil {
mainLogger.WithError(err).Error("Failed to unpack table deltas")
} else {
logger := mainLogger.WithField("type", "table_delta").Dup()
for _, delta := range deltas {

logger := mainLogger.WithField("type", "table_delta").Dup()

rows := []message.TableDeltaRow{}
for _, row := range delta.V0.Rows {

msg := message.TableDeltaRow{
Present: row.Present,
RawData: row.Data,
}

if processor.shipABI != nil {

v, err := processor.shipABI.Decode(bytes.NewReader(row.Data), delta.V0.Name)
if err == nil {
v, err := parseTableDeltaData(v)
if err == nil {
msg.Data = v
} else {
logger.WithError(err).Error("Failed to parse table delta data")
}
} else {
logger.Error("Failed to decode table delta")
}
} else {
logger.Warn("No SHIP ABI present")
}
rows = append(rows, msg)
}

msg := message.TableDelta{
BlockNum: blockNumber,
Timestamp: timestamp,
Name: delta.V0.Name,
Rows: rows,
Rows: processor.proccessDeltaRows(logger, delta.V0.Name, delta.V0.Rows),
}

if err := processor.queue.PostTableDelta(msg); err != nil {
Expand Down

0 comments on commit 8bd3736

Please sign in to comment.