Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: concurrent recheckTx (#163) #221

Merged
merged 4 commits into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,24 @@ func (app *localClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes {
)
}

func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
res := app.Application.CheckTx(req)
return app.callback(
types.ToRequestCheckTx(req),
types.ToResponseCheckTx(res),
)
func (app *localClient) CheckTxAsync(params types.RequestCheckTx) *ReqRes {
req := types.ToRequestCheckTx(params)
reqRes := NewReqRes(req)

app.Application.CheckTxAsync(params, func(r types.ResponseCheckTx) {
res := types.ToResponseCheckTx(r)
app.Callback(req, res)
reqRes.Response = res
reqRes.Done()
reqRes.SetDone()

// Notify reqRes listener if set
if cb := reqRes.GetCallback(); cb != nil {
cb(res)
}
})

return reqRes
}

func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes {
Expand Down Expand Up @@ -246,7 +258,7 @@ func (app *localClient) DeliverTxSync(req types.RequestDeliverTx) (*types.Respon
}

func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
res := app.Application.CheckTx(req)
res := app.Application.CheckTxSync(req)
return &res, nil
}

Expand Down Expand Up @@ -350,6 +362,7 @@ func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRe
func newLocalReqRes(req *types.Request, res *types.Response) *ReqRes {
reqRes := NewReqRes(req)
reqRes.Response = res
reqRes.Done()
reqRes.SetDone()
return reqRes
}
10 changes: 9 additions & 1 deletion abci/example/counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli
return types.ResponseDeliverTx{Code: code.CodeTypeOK}
}

func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
func (app *Application) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx {
return app.checkTx(req)
}

func (app *Application) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) {
callback(app.checkTx(req))
}

func (app *Application) checkTx(req types.RequestCheckTx) types.ResponseCheckTx {
if app.serial {
if len(req.Tx) > 8 {
return types.ResponseCheckTx{
Expand Down
10 changes: 9 additions & 1 deletion abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli
return types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events}
}

func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
func (app *Application) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx {
return app.checkTx(req)
}

func (app *Application) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) {
callback(app.checkTx(req))
}

func (app *Application) checkTx(req types.RequestCheckTx) types.ResponseCheckTx {
return types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}
}

Expand Down
8 changes: 6 additions & 2 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,12 @@ func (app *PersistentKVStoreApplication) DeliverTx(req types.RequestDeliverTx) t
return app.app.DeliverTx(req)
}

func (app *PersistentKVStoreApplication) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
return app.app.CheckTx(req)
func (app *PersistentKVStoreApplication) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx {
return app.app.CheckTxSync(req)
}

func (app *PersistentKVStoreApplication) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) {
app.app.CheckTxAsync(req, callback)
}

func (app *PersistentKVStoreApplication) BeginRecheckTx(req types.RequestBeginRecheckTx) types.ResponseBeginRecheckTx {
Expand Down
51 changes: 29 additions & 22 deletions abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
context "golang.org/x/net/context"
)

type CheckTxCallback func(ResponseCheckTx)

// Application is an interface that enables any finite, deterministic state machine
// to be driven by a blockchain-based replication engine via the ABCI.
// All methods take a RequestXxx argument and return a ResponseXxx argument,
Expand All @@ -15,7 +17,8 @@ type Application interface {
Query(RequestQuery) ResponseQuery // Query for state

// Mempool Connection
CheckTx(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool
CheckTxSync(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool
CheckTxAsync(RequestCheckTx, CheckTxCallback) // Asynchronously validate a tx for the mempool
BeginRecheckTx(RequestBeginRecheckTx) ResponseBeginRecheckTx // Signals the beginning of rechecking
EndRecheckTx(RequestEndRecheckTx) ResponseEndRecheckTx // Signals the end of rechecking

Expand Down Expand Up @@ -57,10 +60,22 @@ func (BaseApplication) DeliverTx(req RequestDeliverTx) ResponseDeliverTx {
return ResponseDeliverTx{Code: CodeTypeOK}
}

func (BaseApplication) CheckTx(req RequestCheckTx) ResponseCheckTx {
func (BaseApplication) CheckTxSync(req RequestCheckTx) ResponseCheckTx {
return ResponseCheckTx{Code: CodeTypeOK}
}

func (BaseApplication) CheckTxAsync(req RequestCheckTx, callback CheckTxCallback) {
callback(ResponseCheckTx{Code: CodeTypeOK})
}

func (BaseApplication) BeginRecheckTx(req RequestBeginRecheckTx) ResponseBeginRecheckTx {
return ResponseBeginRecheckTx{Code: CodeTypeOK}
}

func (BaseApplication) EndRecheckTx(req RequestEndRecheckTx) ResponseEndRecheckTx {
return ResponseEndRecheckTx{Code: CodeTypeOK}
}

func (BaseApplication) Commit() ResponseCommit {
return ResponseCommit{}
}
Expand All @@ -81,14 +96,6 @@ func (BaseApplication) EndBlock(req RequestEndBlock) ResponseEndBlock {
return ResponseEndBlock{}
}

func (BaseApplication) BeginRecheckTx(req RequestBeginRecheckTx) ResponseBeginRecheckTx {
return ResponseBeginRecheckTx{Code: CodeTypeOK}
}

func (BaseApplication) EndRecheckTx(req RequestEndRecheckTx) ResponseEndRecheckTx {
return ResponseEndRecheckTx{Code: CodeTypeOK}
}

func (BaseApplication) ListSnapshots(req RequestListSnapshots) ResponseListSnapshots {
return ResponseListSnapshots{}
}
Expand Down Expand Up @@ -136,7 +143,18 @@ func (app *GRPCApplication) DeliverTx(ctx context.Context, req *RequestDeliverTx
}

func (app *GRPCApplication) CheckTx(ctx context.Context, req *RequestCheckTx) (*ResponseCheckTx, error) {
res := app.app.CheckTx(*req)
res := app.app.CheckTxSync(*req)
return &res, nil
}

func (app *GRPCApplication) BeginRecheckTx(ctx context.Context, req *RequestBeginRecheckTx) (
*ResponseBeginRecheckTx, error) {
res := app.app.BeginRecheckTx(*req)
return &res, nil
}

func (app *GRPCApplication) EndRecheckTx(ctx context.Context, req *RequestEndRecheckTx) (*ResponseEndRecheckTx, error) {
res := app.app.EndRecheckTx(*req)
return &res, nil
}

Expand Down Expand Up @@ -165,17 +183,6 @@ func (app *GRPCApplication) EndBlock(ctx context.Context, req *RequestEndBlock)
return &res, nil
}

func (app *GRPCApplication) BeginRecheckTx(ctx context.Context, req *RequestBeginRecheckTx) (
*ResponseBeginRecheckTx, error) {
res := app.app.BeginRecheckTx(*req)
return &res, nil
}

func (app *GRPCApplication) EndRecheckTx(ctx context.Context, req *RequestEndRecheckTx) (*ResponseEndRecheckTx, error) {
res := app.app.EndRecheckTx(*req)
return &res, nil
}

func (app *GRPCApplication) ListSnapshots(
ctx context.Context, req *RequestListSnapshots) (*ResponseListSnapshots, error) {
res := app.app.ListSnapshots(*req)
Expand Down
6 changes: 5 additions & 1 deletion blockchain/v0/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,14 @@ func (app *testApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx
return abci.ResponseDeliverTx{Events: []abci.Event{}}
}

func (app *testApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
func (app *testApp) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx {
return abci.ResponseCheckTx{}
}

func (app *testApp) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) {
callback(abci.ResponseCheckTx{})
}

func (app *testApp) Commit() abci.ResponseCommit {
return abci.ResponseCommit{}
}
Expand Down
16 changes: 12 additions & 4 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func deliverTxsRange(cs *State, start, end int) {
for i := start; i < end; i++ {
txBytes := make([]byte, 8)
binary.BigEndian.PutUint64(txBytes, uint64(i))
err := assertMempool(cs.txNotifier).CheckTx(txBytes, nil, mempl.TxInfo{})
_, err := assertMempool(cs.txNotifier).CheckTxSync(txBytes, mempl.TxInfo{})
if err != nil {
panic(fmt.Sprintf("Error after CheckTx: %v", err))
}
Expand Down Expand Up @@ -168,13 +168,13 @@ func TestMempoolRmBadTx(t *testing.T) {
// Try to send the tx through the mempool.
// CheckTx should not err, but the app should return a bad abci code
// and the tx should get removed from the pool
err := assertMempool(cs.txNotifier).CheckTx(txBytes, func(r *abci.Response) {
err := assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(r *abci.Response) {
if r.GetCheckTx().Code != code.CodeTypeBadNonce {
t.Errorf("expected checktx to return bad nonce, got %v", r)
return
}
checkTxRespCh <- struct{}{}
}, mempl.TxInfo{})
})
if err != nil {
t.Errorf("error after CheckTx: %v", err)
return
Expand Down Expand Up @@ -240,7 +240,15 @@ func (app *CounterApplication) DeliverTx(req abci.RequestDeliverTx) abci.Respons
return abci.ResponseDeliverTx{Code: code.CodeTypeOK}
}

func (app *CounterApplication) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
func (app *CounterApplication) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx {
return app.checkTx(req)
}

func (app *CounterApplication) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) {
callback(app.checkTx(req))
}

func (app *CounterApplication) checkTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
txValue := txAsUint64(req.Tx)
app.mempoolTxCountMtx.Lock()
defer app.mempoolTxCountMtx.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)

// send a tx
if err := assertMempool(css[3].txNotifier).CheckTx([]byte{1, 2, 3}, nil, mempl.TxInfo{}); err != nil {
if _, err := assertMempool(css[3].txNotifier).CheckTxSync([]byte{1, 2, 3}, mempl.TxInfo{}); err != nil {
t.Error(err)
}

Expand Down Expand Up @@ -532,7 +532,7 @@ func waitForAndValidateBlock(
err := validateBlock(newBlock, activeVals)
assert.Nil(t, err)
for _, tx := range txs {
err := assertMempool(css[j].txNotifier).CheckTx(tx, nil, mempl.TxInfo{})
_, err := assertMempool(css[j].txNotifier).CheckTxSync(tx, mempl.TxInfo{})
assert.Nil(t, err)
}
}, css)
Expand Down
5 changes: 4 additions & 1 deletion consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ var _ mempl.Mempool = emptyMempool{}
func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error {
func (emptyMempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) {
return nil, nil
}
func (emptyMempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(*abci.Response)) error {
return nil
}
func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
Expand Down
14 changes: 7 additions & 7 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func sendTxs(ctx context.Context, cs *State) {
return
default:
tx := []byte{byte(i)}
if err := assertMempool(cs.txNotifier).CheckTx(tx, nil, mempl.TxInfo{}); err != nil {
if _, err := assertMempool(cs.txNotifier).CheckTxSync(tx, mempl.TxInfo{}); err != nil {
panic(err)
}
i++
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
valPubKey1ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey1)
require.NoError(t, err)
newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx1, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ := css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlockParts := propBlock.MakePartSet(partSize)
Expand Down Expand Up @@ -393,7 +393,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
updatePubKey1ABCI, err := cryptoenc.PubKeyToProto(updateValidatorPubKey1)
require.NoError(t, err)
updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25)
err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(updateValidatorTx1, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
Expand Down Expand Up @@ -423,14 +423,14 @@ func TestSimulateValidatorsChange(t *testing.T) {
newVal2ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey2)
require.NoError(t, err)
newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx2, mempl.TxInfo{})
assert.Nil(t, err)
newValidatorPubKey3, err := css[nVals+2].privValidator.GetPubKey()
require.NoError(t, err)
newVal3ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey3)
require.NoError(t, err)
newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx3, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
ensureNewProposal(proposalCh, height, round)

removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx2, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(removeValidatorTx2, mempl.TxInfo{})
assert.Nil(t, err)

rs = css[0].GetRoundState()
Expand Down Expand Up @@ -505,7 +505,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
height++
incrementHeight(vss...)
removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(removeValidatorTx3, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
Expand Down
Loading