Skip to content
224 changes: 182 additions & 42 deletions internal/claimer/claimer.go

Large diffs are not rendered by default.

982 changes: 620 additions & 362 deletions internal/claimer/claimer_test.go

Large diffs are not rendered by default.

269 changes: 238 additions & 31 deletions internal/claimer/side-effects.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,26 @@ package claimer
import (
"context"
"fmt"
"iter"
"math/big"

. "github.com/cartesi/rollups-node/internal/model"
"github.com/cartesi/rollups-node/pkg/contracts/iconsensus"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/cartesi/rollups-node/pkg/ethutil"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

type sideEffects interface {
// database
selectClaimPairsPerApp() (
selectSubmissionClaimPairsPerApp() (
map[common.Address]*ClaimRow,
map[common.Address]*ClaimRow,
error,
)
selectAcceptanceClaimPairsPerApp() (
map[common.Address]*ClaimRow,
map[common.Address]*ClaimRow,
error,
Expand All @@ -26,6 +34,10 @@ type sideEffects interface {
claim *ClaimRow,
txHash common.Hash,
) error
updateEpochWithAcceptedClaim(
claim *ClaimRow,
txHash common.Hash,
) error
updateApplicationState(
appID int64,
state ApplicationState,
Expand All @@ -35,43 +47,73 @@ type sideEffects interface {
// blockchain
findClaimSubmissionEventAndSucc(
claim *ClaimRow,
endBlock *big.Int,
) (
*iconsensus.IConsensus,
*iconsensus.IConsensusClaimSubmission,
*iconsensus.IConsensusClaimSubmission,
error,
)
findClaimAcceptanceEventAndSucc(
claim *ClaimRow,
endBlock *big.Int,
) (
*iconsensus.IConsensus,
*iconsensus.IConsensusClaimAcceptance,
*iconsensus.IConsensusClaimAcceptance,
error,
)
submitClaimToBlockchain(
ic *iconsensus.IConsensus,
claim *ClaimRow,
) (
common.Hash,
error,
)
pollTransaction(txHash common.Hash) (
pollTransaction(
txHash common.Hash,
endBlock *big.Int,
) (
bool,
*types.Receipt,
error,
)
}

func (s *Service) selectClaimPairsPerApp() (
func (s *Service) selectSubmissionClaimPairsPerApp() (
map[common.Address]*ClaimRow,
map[common.Address]*ClaimRow,
error,
) {
computed, accepted, err := s.repository.SelectClaimPairsPerApp(s.Context)
accepted, computed, err := s.repository.SelectSubmissionClaimPairsPerApp(s.Context)
if err != nil {
s.Logger.Error("selectClaimPairsPerApp:failed",
s.Logger.Error("selectSubmissionClaimPairsPerApp:failed",
"error", err)
} else {
s.Logger.Debug("selectClaimPairsPerApp:success",
s.Logger.Debug("selectSubmissionClaimPairsPerApp:success",
"len(computed)", len(computed),
"len(accepted)", len(accepted))
}
return accepted, computed, err
}

func (s *Service) selectAcceptanceClaimPairsPerApp() (
map[common.Address]*ClaimRow,
map[common.Address]*ClaimRow,
error,
) {
accepted, submitted, err := s.repository.SelectAcceptanceClaimPairsPerApp(s.Context)
if err != nil {
s.Logger.Error("selectAcceptanceClaimPairsPerApp:failed",
"error", err)
} else {
s.Logger.Debug("selectAcceptanceClaimPairsPerApp:success",
"len(submitted)", len(submitted),
"len(accepted)", len(accepted))
}
return accepted, submitted, err
}

/* update the database epoch status to CLAIM_SUBMITTED and add a transaction hash */
func (s *Service) updateEpochWithSubmittedClaim(
claim *ClaimRow,
Expand All @@ -95,6 +137,29 @@ func (s *Service) updateEpochWithSubmittedClaim(
return err
}

/* update the database epoch status to CLAIM_SUBMITTED and add a transaction hash */
func (s *Service) updateEpochWithAcceptedClaim(
claim *ClaimRow,
txHash common.Hash,
) error {
err := s.repository.UpdateEpochWithAcceptedClaim(s.Context, claim.ApplicationID, claim.Index)
if err != nil {
s.Logger.Error("updateEpochWithSubmittedClaim:failed",
"appContractAddress", claim.IApplicationAddress,
"hash", claim.ClaimHash,
"last_block", claim.LastBlock,
"txHash", txHash,
"error", err)
} else {
s.Logger.Debug("updateEpochWithSubmittedClaim:success",
"appContractAddress", claim.IApplicationAddress,
"last_block", claim.LastBlock,
"hash", claim.ClaimHash,
"txHash", txHash)
}
return err
}

func (s *Service) updateApplicationState(
appID int64,
state ApplicationState,
Expand All @@ -119,15 +184,16 @@ func (s *Service) updateApplicationState(

func (s *Service) findClaimSubmissionEventAndSucc(
claim *ClaimRow,
endBlock *big.Int,
) (
*iconsensus.IConsensus,
*iconsensus.IConsensusClaimSubmission,
*iconsensus.IConsensusClaimSubmission,
error,
) {
ic, curr, next, err := s.FindClaimSubmissionEventAndSucc(claim)
ic, curr, next, err := s.FindClaimSubmissionEventAndSucc(claim, endBlock)
if err != nil {
s.Logger.Error("findClaimSubmissionEventAndSucc:failed",
s.Logger.Debug("findClaimSubmissionEventAndSucc:failed",
"claim", claim,
"error", err)
} else {
Expand Down Expand Up @@ -165,10 +231,10 @@ func (s *Service) submitClaimToBlockchain(
return txHash, err
}

func (s *Service) pollTransaction(txHash common.Hash) (bool, *types.Receipt, error) {
ready, receipt, err := s.PollTransaction(txHash)
func (s *Service) pollTransaction(txHash common.Hash, endBlock *big.Int) (bool, *types.Receipt, error) {
ready, receipt, err := s.PollTransaction(txHash, endBlock)
if err != nil {
s.Logger.Error("PollTransaction:failed",
s.Logger.Debug("PollTransaction:failed",
"tx", txHash,
"error", err)
} else if ready {
Expand All @@ -184,10 +250,27 @@ func (s *Service) pollTransaction(txHash common.Hash) (bool, *types.Receipt, err
return ready, receipt, err
}

func unwrapClaimSubmission(
ic *iconsensus.IConsensus,
pull func() (log *types.Log, err error, ok bool),
) (
*iconsensus.IConsensusClaimSubmission,
bool,
error,
) {
log, err, ok := pull()
if !ok || err != nil {
return nil, false, err
}
ev, err := ic.ParseClaimSubmission(*log)
return ev, true, err
}

// scan the event stream for a claimSubmission event that matches claim.
// return this event and its successor
func (s *Service) FindClaimSubmissionEventAndSucc(
claim *ClaimRow,
endBlock *big.Int,
) (
*iconsensus.IConsensus,
*iconsensus.IConsensusClaimSubmission,
Expand All @@ -199,38 +282,158 @@ func (s *Service) FindClaimSubmissionEventAndSucc(
return nil, nil, nil, err
}

it, err := ic.FilterClaimSubmission(&bind.FilterOpts{
Context: s.Context,
Start: claim.LastBlock,
}, nil, []common.Address{claim.IApplicationAddress})
// filter must match:
// - `ClaimSubmission` events
// - submitter == nil (any)
// - appContract == claim.IApplicationAddress
c, err := iconsensus.IConsensusMetaData.GetAbi()
topics, err := abi.MakeTopics(
[]interface{}{c.Events["ClaimSubmission"].ID},
nil,
[]interface{}{claim.IApplicationAddress},
)
if err != nil {
return nil, nil, nil, err
}

it, err := ethutil.ChunkedFilterLogs(s.Context, s.ethConn, ethereum.FilterQuery{
FromBlock: new(big.Int).SetUint64(claim.Epoch.LastBlock),
ToBlock: endBlock,
Addresses: []common.Address{claim.IConsensusAddress},
Topics: topics,
})
if err != nil {
return nil, nil, nil, err
}

for it.Next() {
event := it.Event
// pull events instead of iterating
next, stop := iter.Pull2(it)
defer stop()
for {
event, ok, err := unwrapClaimSubmission(ic, next)
if !ok || err != nil {
return ic, event, nil, err
}
lastBlock := event.LastProcessedBlockNumber.Uint64()
if claimMatchesEvent(claim, event) {
var succ *iconsensus.IConsensusClaimSubmission = nil
if it.Next() {
succ = it.Event
}
if it.Error() != nil {
return nil, nil, nil, it.Error()

if claimSubmissionMatch(claim, event) {
// found the event, does it has a successor? try to fetch it
succ, ok, err := unwrapClaimSubmission(ic, next)
if !ok || err != nil {
return ic, event, nil, err
}
return ic, event, succ, nil
} else if lastBlock > claim.LastBlock {
err = fmt.Errorf("claim not found, searched up to %v", event)
return ic, event, succ, err
} else if lastBlock > claim.Epoch.LastBlock {
err = fmt.Errorf("No matching claim, searched up to %v", event)
return nil, nil, nil, err
}
}
if err := it.Error(); err != nil {
}

func unwrapClaimAcceptance(
ic *iconsensus.IConsensus,
pull func() (log *types.Log, err error, ok bool),
) (
*iconsensus.IConsensusClaimAcceptance,
bool,
error,
) {
log, err, ok := pull()
if !ok || err != nil {
return nil, false, err
}
ev, err := ic.ParseClaimAcceptance(*log)
return ev, true, err
}

func (s *Service) findClaimAcceptanceEventAndSucc(
claim *ClaimRow,
endBlock *big.Int,
) (
*iconsensus.IConsensus,
*iconsensus.IConsensusClaimAcceptance,
*iconsensus.IConsensusClaimAcceptance,
error,
) {
ic, curr, next, err := s.FindClaimAcceptanceEventAndSucc(claim, endBlock)
if err != nil {
s.Logger.Debug("findClaimAcceptanceEventAndSucc:failed",
"claim", claim,
"error", err)
} else {
s.Logger.Debug("findClaimAcceptanceEventAndSucc:success",
"claim", claim,
"currEvent", curr,
"nextEvent", next,
)
}
return ic, curr, next, err
}

// scan the event stream for a claimAcceptance event that matches claim.
// return this event and its successor
func (s *Service) FindClaimAcceptanceEventAndSucc(
claim *ClaimRow,
endBlock *big.Int,
) (
*iconsensus.IConsensus,
*iconsensus.IConsensusClaimAcceptance,
*iconsensus.IConsensusClaimAcceptance,
error,
) {
ic, err := iconsensus.NewIConsensus(claim.IConsensusAddress, s.ethConn)
if err != nil {
return nil, nil, nil, err
}

// filter must match:
// - `ClaimAcceptance` events
// - appContract == claim.IApplicationAddress
c, err := iconsensus.IConsensusMetaData.GetAbi()
topics, err := abi.MakeTopics(
[]interface{}{c.Events["ClaimAcceptance"].ID},
[]interface{}{claim.IApplicationAddress},
)
if err != nil {
return nil, nil, nil, err
}
return ic, nil, nil, err

it, err := ethutil.ChunkedFilterLogs(s.Context, s.ethConn, ethereum.FilterQuery{
FromBlock: new(big.Int).SetUint64(claim.Epoch.LastBlock),
ToBlock: endBlock,
Addresses: []common.Address{claim.IConsensusAddress},
Topics: topics,
})
if err != nil {
return nil, nil, nil, err
}

// pull events instead of iterating
next, stop := iter.Pull2(it)
defer stop()
for {
event, ok, err := unwrapClaimAcceptance(ic, next)
if !ok || err != nil {
return ic, event, nil, err
}
lastBlock := event.LastProcessedBlockNumber.Uint64()

if claimAcceptanceMatch(claim, event) {
// found the event, does it has a successor? try to fetch it
succ, ok, err := unwrapClaimAcceptance(ic, next)
if !ok || err != nil {
return ic, event, nil, err
}
return ic, event, succ, err
} else if lastBlock > claim.Epoch.LastBlock {
err = fmt.Errorf("No matching claim, searched up to %v", event)
return nil, nil, nil, err
}
}
}

/* poll a transaction hash for its submission status and receipt */
func (s *Service) PollTransaction(txHash common.Hash) (bool, *types.Receipt, error) {
func (s *Service) PollTransaction(txHash common.Hash, endBlock *big.Int) (bool, *types.Receipt, error) {
_, isPending, err := s.ethConn.TransactionByHash(s.Context, txHash)
if err != nil || isPending {
return false, nil, err
Expand All @@ -241,5 +444,9 @@ func (s *Service) PollTransaction(txHash common.Hash) (bool, *types.Receipt, err
return false, nil, err
}

if receipt.BlockNumber.Cmp(endBlock) >= 0 {
return false, receipt, err
}

return receipt.Status == 1, receipt, err
}
Loading
Loading