diff --git a/cmd/tx-storm/acc.go b/cmd/tx-storm/acc.go index 0d33d2b3f..d882ca056 100644 --- a/cmd/tx-storm/acc.go +++ b/cmd/tx-storm/acc.go @@ -6,7 +6,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - ethparams "github.com/ethereum/go-ethereum/params" "github.com/Fantom-foundation/go-lachesis/crypto" "github.com/Fantom-foundation/go-lachesis/lachesis/params" @@ -32,7 +31,7 @@ func MakeAcc(n uint) *Acc { } } -func (a *Acc) TransactionTo(b *Acc, nonce uint, amount *big.Int) *types.Transaction { +func (a *Acc) TransactionTo(b *Acc, nonce uint, amount *big.Int, chainId uint) *types.Transaction { tx := types.NewTransaction( uint64(nonce), *b.Addr, @@ -44,7 +43,7 @@ func (a *Acc) TransactionTo(b *Acc, nonce uint, amount *big.Int) *types.Transact signed, err := types.SignTx( tx, - types.NewEIP155Signer(ethparams.AllEthashProtocolChanges.ChainID), + types.NewEIP155Signer(big.NewInt(int64(chainId))), a.Key, ) if err != nil { diff --git a/cmd/tx-storm/flags.go b/cmd/tx-storm/flags.go index ee55009ad..1d57a87dc 100644 --- a/cmd/tx-storm/flags.go +++ b/cmd/tx-storm/flags.go @@ -8,6 +8,17 @@ import ( "gopkg.in/urfave/cli.v1" ) +var BlockChainIDFlag = cli.IntFlag{ + Name: "chain-id", + Usage: "chain id for sign transactions", + Value: 4003, // Chain id for fakenet +} + +func getChainId(ctx *cli.Context) (chainid uint) { + chainid = uint(ctx.GlobalInt(BlockChainIDFlag.Name)) + return +} + var AccsStartFlag = cli.IntFlag{ Name: "accs-start", Usage: "offset of predefined fake accounts", diff --git a/cmd/tx-storm/main.go b/cmd/tx-storm/main.go index 7a46f5b8e..c8a983004 100644 --- a/cmd/tx-storm/main.go +++ b/cmd/tx-storm/main.go @@ -29,6 +29,7 @@ var ( func init() { // Flags. flags = []cli.Flag{ + BlockChainIDFlag, NumberFlag, AccsStartFlag, AccsCountFlag, @@ -80,8 +81,9 @@ func generatorMain(ctx *cli.Context) error { num, ofTotal := getNumber(ctx) maxTxnsPerSec := getTxnsRate(ctx) accsFrom, accsCount := getTestAccs(ctx) + chainId := getChainId(ctx) - tt := newThreads(url, num, ofTotal, maxTxnsPerSec, accsFrom, accsCount) + tt := newThreads(url, num, ofTotal, maxTxnsPerSec, accsFrom, accsCount, chainId) tt.SetName("Threads") tt.Start() diff --git a/cmd/tx-storm/threads.go b/cmd/tx-storm/threads.go index f589de11f..2b2680b68 100644 --- a/cmd/tx-storm/threads.go +++ b/cmd/tx-storm/threads.go @@ -33,6 +33,7 @@ func newThreads( num, ofTotal uint, maxTxnsPerSec uint, accsFrom, accsCount uint, + chainId uint, ) *threads { if num >= ofTotal { panic("num is a generator number of total generators count") @@ -56,7 +57,7 @@ func newThreads( tt.Log.Info("Will use", "accounts", accs, "from", offset, "to", accs+offset) for i := range tt.generators { - tt.generators[i] = newTxGenerator(uint(i), accsOnThread, offset) + tt.generators[i] = newTxGenerator(uint(i), accsOnThread, offset, chainId) offset += accsOnThread tt.generators[i].SetName(fmt.Sprintf("Generator%d", i)) } diff --git a/cmd/tx-storm/txgen.go b/cmd/tx-storm/txgen.go index 3c7947c39..2cf96204a 100644 --- a/cmd/tx-storm/txgen.go +++ b/cmd/tx-storm/txgen.go @@ -16,6 +16,8 @@ type Transaction struct { } type generator struct { + chainId uint + accs []*Acc offset uint @@ -30,8 +32,10 @@ type generator struct { logger.Instance } -func newTxGenerator(num, accs, offset uint) *generator { +func newTxGenerator(num, accs, offset uint, chainId uint) *generator { g := &generator{ + chainId: chainId, + accs: make([]*Acc, accs), offset: offset, @@ -118,7 +122,7 @@ func (g *generator) generate(position uint) *Transaction { amount := big.NewInt(1e6) tx := &Transaction{ - Raw: from.TransactionTo(to, nonce, amount), + Raw: from.TransactionTo(to, nonce, amount, g.chainId), Info: meta.NewInfo(a, b), } diff --git a/cmd/tx-storm/txgen_test.go b/cmd/tx-storm/txgen_test.go index 32fcb39fa..5b1ec3df8 100644 --- a/cmd/tx-storm/txgen_test.go +++ b/cmd/tx-storm/txgen_test.go @@ -6,7 +6,7 @@ import ( func TestGenerator(t *testing.T) { t.Skip("example only") - g := newTxGenerator(0, 20, 0) + g := newTxGenerator(0, 20, 0, 4003) for i := 0; i < 2*len(g.accs); i++ { tx := g.Yield() t.Log(tx.Info.String(), tx.Raw.Nonce(), tx.Raw.Value()) diff --git a/docker/Dockerfile.lachesis b/docker/Dockerfile.lachesis index 911a41bed..ca036df46 100644 --- a/docker/Dockerfile.lachesis +++ b/docker/Dockerfile.lachesis @@ -1,7 +1,7 @@ FROM golang:1.13-alpine as builder RUN apk add --no-cache make gcc musl-dev linux-headers git - + WORKDIR /go/go-lachesis COPY . . diff --git a/docker/Makefile b/docker/Makefile index 572039cad..cf1dbf3ed 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -20,4 +20,4 @@ tx-storm: -f docker/Dockerfile.tx-storm -t "tx-storm:$(TAG)" . test_accs.json: - ./_test_accs.sh \ No newline at end of file + ./_test_accs.sh diff --git a/docker/txstorm-on.sh b/docker/txstorm-on.sh index ea5f169de..386da9fac 100755 --- a/docker/txstorm-on.sh +++ b/docker/txstorm-on.sh @@ -10,7 +10,6 @@ do NODE=node$i NAME=txgen$i - i=$((i+1)) docker run -d --rm \ --net=${NETWORK} --name=${NAME} \ --cpus=${LIMIT_CPU} --blkio-weight=${LIMIT_IO} \ diff --git a/ethapi/api.go b/ethapi/api.go index d705f3401..f197e1054 100644 --- a/ethapi/api.go +++ b/ethapi/api.go @@ -49,6 +49,7 @@ import ( "github.com/Fantom-foundation/go-lachesis/inter" "github.com/Fantom-foundation/go-lachesis/inter/idx" lachesisparams "github.com/Fantom-foundation/go-lachesis/lachesis/params" + "github.com/Fantom-foundation/go-lachesis/utils" ) const ( @@ -381,6 +382,24 @@ func (s *PrivateAccountAPI) SendTransaction(ctx context.Context, args SendTxArgs return SubmitTransaction(ctx, s.b, signed) } +// SendTrustedTransaction will create a trusted transaction from the given arguments and +// tries to sign it with the key associated with args.To. If the given passwd isn't +// able to decrypt the key it fails. +func (s *PrivateAccountAPI) SendTrustedTransaction(ctx context.Context, args SendTxArgs, passwd string) (common.Hash, error) { + if args.Nonce == nil { + // Hold the addresse's mutex around signing to prevent concurrent assignment of + // the same nonce to multiple accounts. + s.nonceLock.LockAddr(args.From) + defer s.nonceLock.UnlockAddr(args.From) + } + signed, err := s.signTransaction(ctx, &args, passwd) + if err != nil { + log.Warn("Failed transaction send attempt", "from", args.From, "to", args.To, "value", args.Value.ToInt(), "err", err) + return common.Hash{}, err + } + return SubmitTransaction(ctx, s.b, signed, true) +} + // SignTransaction will create a transaction from the given arguments and // tries to sign it with the key associated with args.To. If the given passwd isn't // able to decrypt the key it fails. The transaction is returned in RLP-form, not broadcast @@ -1514,8 +1533,10 @@ func (args *SendTxArgs) toTransaction() *types.Transaction { } // SubmitTransaction is a helper function that submits tx to txPool and logs a message. -func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) { - if err := b.SendTx(ctx, tx); err != nil { +func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction, flags ...bool) (common.Hash, error) { + trusted := utils.ParseFlag(flags, 0, false) + + if err := b.SendTx(ctx, tx, trusted); err != nil { return common.Hash{}, err } if tx.To() == nil { diff --git a/ethapi/api_account_test.go b/ethapi/api_account_test.go index 69fde7b4c..461c77914 100644 --- a/ethapi/api_account_test.go +++ b/ethapi/api_account_test.go @@ -217,7 +217,7 @@ func TestPrivateAccountAPI_SignAndSendTransaction(t *testing.T) { d := uint64(1) _, _ = api.UnlockAccount(ctx, key, "1234", &d) - b.EXPECT().SendTx(ctx, gomock.Any()). + b.EXPECT().SendTx(ctx, gomock.Any(), false). Return(nil). Times(1) @@ -251,7 +251,7 @@ func TestPrivateAccountAPI_SendTransaction(t *testing.T) { b.EXPECT().GetPoolNonce(ctx, gomock.Any()). Return(uint64(1), nil). AnyTimes() - b.EXPECT().SendTx(ctx, gomock.Any()). + b.EXPECT().SendTx(ctx, gomock.Any(), false). Return(nil). Times(2) // TODO: why 2 ? diff --git a/ethapi/api_transaction_pool_test.go b/ethapi/api_transaction_pool_test.go index d2df7f34f..74ac32d0f 100644 --- a/ethapi/api_transaction_pool_test.go +++ b/ethapi/api_transaction_pool_test.go @@ -487,7 +487,7 @@ func TestPublicTransactionPoolAPI_SendTransaction(t *testing.T) { b.EXPECT().GetPoolNonce(gomock.Any(), gomock.Any()). Return(uint64(1), nil). AnyTimes() - b.EXPECT().SendTx(ctx, gomock.Any()). + b.EXPECT().SendTx(ctx, gomock.Any(), false). Return(nil). Times(2) @@ -525,7 +525,7 @@ func TestPublicTransactionPoolAPI_SendRawTransaction(t *testing.T) { ctx := context.TODO() b := newTestBackend(t) - b.EXPECT().SendTx(ctx, gomock.Any()). + b.EXPECT().SendTx(ctx, gomock.Any(), false). Return(nil). Times(1) diff --git a/ethapi/backend.go b/ethapi/backend.go index 3f9778af9..c4d44d612 100644 --- a/ethapi/backend.go +++ b/ethapi/backend.go @@ -74,7 +74,7 @@ type Backend interface { GetEVM(ctx context.Context, msg evmcore.Message, state *state.StateDB, header *evmcore.EvmHeader) (*vm.EVM, func() error, error) // Transaction pool API - SendTx(ctx context.Context, signedTx *types.Transaction) error + SendTx(ctx context.Context, signedTx *types.Transaction, flags ...bool) error GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, uint64, uint64, error) GetPoolTransactions() (types.Transactions, error) GetPoolTransaction(txHash common.Hash) *types.Transaction diff --git a/ethapi/backend_mock_test.go b/ethapi/backend_mock_test.go index 996955201..3ea6537a7 100644 --- a/ethapi/backend_mock_test.go +++ b/ethapi/backend_mock_test.go @@ -271,17 +271,22 @@ func (mr *MockBackendMockRecorder) GetEVM(ctx, msg, state, header interface{}) * } // SendTx mocks base method -func (m *MockBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { +func (m *MockBackend) SendTx(ctx context.Context, signedTx *types.Transaction, flags ...bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendTx", ctx, signedTx) + varargs := []interface{}{ctx, signedTx} + for _, a := range flags { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SendTx", varargs...) ret0, _ := ret[0].(error) return ret0 } // SendTx indicates an expected call of SendTx -func (mr *MockBackendMockRecorder) SendTx(ctx, signedTx interface{}) *gomock.Call { +func (mr *MockBackendMockRecorder) SendTx(ctx, signedTx interface{}, flags ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendTx", reflect.TypeOf((*MockBackend)(nil).SendTx), ctx, signedTx) + varargs := append([]interface{}{ctx, signedTx}, flags...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendTx", reflect.TypeOf((*MockBackend)(nil).SendTx), varargs...) } // GetTransaction mocks base method diff --git a/evmcore/tx_pool.go b/evmcore/tx_pool.go index 860190a04..16dbd744c 100644 --- a/evmcore/tx_pool.go +++ b/evmcore/tx_pool.go @@ -38,6 +38,14 @@ import ( "github.com/Fantom-foundation/go-lachesis/tracing" ) +type txType uint8 + +const ( + txRemote txType = iota + txLocal + txTrusted +) + const ( // chainHeadChanSize is the size of channel listening to ChainHeadNotify. chainHeadChanSize = 10 @@ -107,6 +115,7 @@ var ( pendingCounter = metrics.NewRegisteredCounter("txpool/pending", nil) queuedCounter = metrics.NewRegisteredCounter("txpool/queued", nil) localCounter = metrics.NewRegisteredCounter("txpool/local", nil) + trustedCounter = metrics.NewRegisteredCounter("txpool/trusted", nil) ) // TxStatus is the current status of a transaction as seen by the pool. @@ -117,6 +126,7 @@ const ( TxStatusQueued TxStatusPending TxStatusIncluded + TxStatusTrusted ) // stateReader provides the state of blockchain and current gas limit to do @@ -244,6 +254,7 @@ type TxPool struct { beats map[common.Address]time.Time // Last heartbeat from each known account all *txLookup // All transactions to allow lookups priced *txPricedList // All transactions sorted by price + trusted map[common.Address]*txList // Trusted transactions (by trx hash) - executed without timeouts chainHeadCh chan ChainHeadNotify chainHeadSub notify.Subscription @@ -274,6 +285,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain state pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), + trusted: make(map[common.Address]*txList), all: newTxLookup(), chainHeadCh: make(chan ChainHeadNotify, chainHeadChanSize), reqResetCh: make(chan *txpoolResetRequest), @@ -452,16 +464,14 @@ func (pool *TxPool) Stats() (int, int) { // stats retrieves the current pool stats, namely the number of pending and the // number of queued (non-executable) transactions. -func (pool *TxPool) stats() (int, int) { - pending := 0 +func (pool *TxPool) stats() (pending int, queued int) { for _, list := range pool.pending { pending += list.Len() } - queued := 0 for _, list := range pool.queue { queued += list.Len() } - return pending, queued + return } // Content retrieves the data content of the transaction pool, returning all the @@ -495,6 +505,29 @@ func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) { return pending, nil } +// Trusted retrieves all currently processable transactions, grouped by origin +// account and sorted by nonce. The returned transaction set is a copy and can be +// freely modified by calling code. +func (pool *TxPool) Trusted(clear bool) (map[common.Address]types.Transactions, error) { + pool.mu.Lock() + defer pool.mu.Unlock() + + trusted := make(map[common.Address]types.Transactions) + for addr, list := range pool.trusted { + for _, tx := range list.Flatten() { + if trusted[addr] == nil { + trusted[addr] = make(types.Transactions, 0, 1) + } + trusted[addr] = append(trusted[addr], tx) + } + } + if clear { + pool.trusted = make(map[common.Address]*txList) + } + + return trusted, nil +} + // Locals retrieves the accounts currently considered local by the pool. func (pool *TxPool) Locals() []common.Address { pool.mu.Lock() @@ -521,7 +554,7 @@ func (pool *TxPool) local() map[common.Address]types.Transactions { // validateTx checks whether a transaction is valid according to the consensus // rules and adheres to some heuristic limits of the local node (price and size). -func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { +func (pool *TxPool) validateTx(tx *types.Transaction, t txType) error { // Heuristic limit, reject transactions over 32KB to prevent DOS attacks if tx.Size() > 32*1024 { return ErrOversizedData @@ -541,7 +574,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { return ErrInvalidSender } // Drop non-local transactions under our own minimal accepted gas price - local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network + local := t != txRemote || pool.locals.contains(from) // account may be local even if the transaction arrived from the network if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { return ErrUnderpriced } @@ -578,7 +611,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { // If a newly added transaction is marked as local, its sending account will be // whitelisted, preventing any associated transaction from being dropped out of the pool // due to pricing constraints. -func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) { +func (pool *TxPool) add(tx *types.Transaction, t txType) (replaced bool, err error) { // If the transaction is already known, discard it hash := tx.Hash() if pool.all.Get(hash) != nil { @@ -587,7 +620,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e } // If the transaction fails basic validation, discard it - if err := pool.validateTx(tx, local); err != nil { + if err := pool.validateTx(tx, t); err != nil { log.Trace("Discarding invalid transaction", "hash", hash, "err", err) invalidTxMeter.Mark(1) return false, err @@ -596,7 +629,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e // If the transaction pool is full, discard underpriced transactions if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue { // If the new transaction is underpriced, don't accept it - if !local && pool.priced.Underpriced(tx, pool.locals) { + if t == txRemote && pool.priced.Underpriced(tx, pool.locals) { log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice()) underpricedTxMeter.Mark(1) return false, ErrUnderpriced @@ -634,19 +667,19 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e } // New transaction isn't replacing a pending one, push into queue - replaced, err = pool.enqueueTx(hash, tx) + replaced, err = pool.enqueueTx(hash, tx, t) if err != nil { return false, err } // Mark local addresses and journal local transactions - if local { + if t == txLocal { if !pool.locals.contains(from) { log.Info("Setting new local account", "address", from) pool.locals.add(from) } } - if local || pool.locals.contains(from) { + if t == txLocal || pool.locals.contains(from) { localCounter.Inc(1) } pool.journalTx(from, tx) @@ -658,13 +691,18 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e // enqueueTx inserts a new transaction into the non-executable transaction queue. // // Note, this method assumes the pool lock is held! -func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) { +func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, t txType) (bool, error) { // Try to insert the transaction into the future queue + queue := pool.queue + if t == txTrusted { + queue = pool.trusted + } from, _ := types.Sender(pool.signer, tx) // already validated - if pool.queue[from] == nil { - pool.queue[from] = newTxList(false) + if queue[from] == nil { + queue[from] = newTxList(false) } - inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump) + inserted, old := queue[from].Add(tx, pool.config.PriceBump) + if !inserted { // An older transaction was better, discard this queuedDiscardMeter.Mark(1) @@ -746,7 +784,11 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // This method is used to add transactions from the RPC API and performs synchronous pool // reorganization and event propagation. func (pool *TxPool) AddLocals(txs []*types.Transaction) []error { - return pool.addTxs(txs, !pool.config.NoLocals, true) + t := txLocal + if pool.config.NoLocals { + t = txRemote + } + return pool.addTxs(txs, true, t) } // AddLocal enqueues a single local transaction into the pool if it is valid. This is @@ -756,18 +798,33 @@ func (pool *TxPool) AddLocal(tx *types.Transaction) error { return errs[0] } +// AddTrusteds enqueues a batch of transactions into the pool if they are valid. +// +// This method is used to add transactions from the RPC API and performs synchronous pool +// reorganization and event propagation. +func (pool *TxPool) AddTrusteds(txs []*types.Transaction) []error { + return pool.addTxs(txs, true, txTrusted) +} + +// AddTrusted enqueues a single local transaction into the pool if it is valid. This is +// a convenience wrapper aroundd AddTrustedTxs. +func (pool *TxPool) AddTrusted(tx *types.Transaction) error { + errs := pool.AddTrusteds([]*types.Transaction{tx}) + return errs[0] +} + // AddRemotes enqueues a batch of transactions into the pool if they are valid. If the // senders are not among the locally tracked ones, full pricing constraints will apply. // // This method is used to add transactions from the p2p network and does not wait for pool // reorganization and internal event propagation. func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error { - return pool.addTxs(txs, false, false) + return pool.addTxs(txs, false, txRemote) } // This is like AddRemotes, but waits for pool reorganization. Tests use this method. func (pool *TxPool) AddRemotesSync(txs []*types.Transaction) []error { - return pool.addTxs(txs, false, true) + return pool.addTxs(txs, true, txRemote) } // This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method. @@ -786,14 +843,14 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error { } // addTxs attempts to queue a batch of transactions if they are valid. -func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { +func (pool *TxPool) addTxs(txs []*types.Transaction, synced bool, t txType) []error { // Cache senders in transactions before obtaining lock (pool.signer is immutable) for _, tx := range txs { types.Sender(pool.signer, tx) } pool.mu.Lock() - errs, dirtyAddrs := pool.addTxsLocked(txs, local) + errs, dirtyAddrs := pool.addTxsLocked(txs, t) pool.mu.Unlock() // NOTE: all txs tracing @@ -806,7 +863,7 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { */ done := pool.requestPromoteExecutables(dirtyAddrs) - if sync { + if synced { <-done } return errs @@ -814,11 +871,11 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { // addTxsLocked attempts to queue a batch of transactions if they are valid. // The transaction pool lock must be held. -func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) { +func (pool *TxPool) addTxsLocked(txs []*types.Transaction, t txType) ([]error, *accountSet) { dirty := newAccountSet(pool.signer) errs := make([]error, len(txs)) for i, tx := range txs { - replaced, err := pool.add(tx, local) + replaced, err := pool.add(tx, t) errs[i] = err if err == nil && !replaced { dirty.addTx(tx) @@ -828,7 +885,7 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, return errs, dirty } -// Status returns the status (unknown/pending/queued) of a batch of transactions +// Status returns the status (unknown/pending/queued/trusted) of a batch of transactions // identified by their hashes. func (pool *TxPool) Status(hashes []common.Hash) []TxStatus { pool.mu.RLock() @@ -840,6 +897,8 @@ func (pool *TxPool) Status(hashes []common.Hash) []TxStatus { from, _ := types.Sender(pool.signer, tx) // already validated if pool.pending[from] != nil && pool.pending[from].txs.items[tx.Nonce()] != nil { status[i] = TxStatusPending + } else if pool.trusted[from] != nil && pool.trusted[from].txs.items[tx.Nonce()] != nil { + status[i] = TxStatusTrusted } else { status[i] = TxStatusQueued } @@ -881,7 +940,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { } // Postpone any invalidated transactions for _, tx := range invalids { - pool.enqueueTx(tx.Hash(), tx) + pool.enqueueTx(tx.Hash(), tx, txRemote) } // Update the account nonce if needed pool.pendingNonces.setIfLower(addr, tx.Nonce()) @@ -900,6 +959,15 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { delete(pool.queue, addr) } } + if trusted := pool.trusted[addr]; trusted != nil { + if removed, _ := trusted.Remove(tx); removed { + // Reduce the queued counter + trustedCounter.Dec(1) + } + if trusted.Empty() { + delete(pool.trusted, addr) + } + } } // requestPromoteExecutables requests a pool reset to the new head block. @@ -1147,7 +1215,7 @@ func (pool *TxPool) reset(oldHead, newHead *EvmHeader) { // Inject any transactions discarded due to reorgs log.Debug("Reinjecting stale transactions", "count", len(reinject)) senderCacher.recover(pool.signer, reinject) - pool.addTxsLocked(reinject, false) + pool.addTxsLocked(reinject, txRemote) } // promoteExecutables moves transactions that have become processable from the @@ -1375,7 +1443,7 @@ func (pool *TxPool) demoteUnexecutables() { for _, tx := range invalids { hash := tx.Hash() log.Trace("Demoting pending transaction", "hash", hash) - pool.enqueueTx(hash, tx) + pool.enqueueTx(hash, tx, txRemote) } pendingCounter.Dec(int64(len(olds) + len(drops) + len(invalids))) if pool.locals.contains(addr) { @@ -1387,7 +1455,7 @@ func (pool *TxPool) demoteUnexecutables() { for _, tx := range gapped { hash := tx.Hash() log.Error("Demoting invalidated transaction", "hash", hash) - pool.enqueueTx(hash, tx) + pool.enqueueTx(hash, tx, txRemote) } pendingCounter.Dec(int64(len(gapped))) } diff --git a/evmcore/tx_pool_test.go b/evmcore/tx_pool_test.go index 3e98517f7..d26d20303 100644 --- a/evmcore/tx_pool_test.go +++ b/evmcore/tx_pool_test.go @@ -304,7 +304,7 @@ func TestTransactionQueue(t *testing.T) { pool.currentState.AddBalance(from, big.NewInt(1000)) <-pool.requestReset(nil, nil) - pool.enqueueTx(tx.Hash(), tx) + pool.enqueueTx(tx.Hash(), tx, txRemote) <-pool.requestPromoteExecutables(newAccountSet(pool.signer, from)) if len(pool.pending) != 1 { t.Error("expected valid txs to be 1 is", len(pool.pending)) @@ -313,7 +313,7 @@ func TestTransactionQueue(t *testing.T) { tx = transaction(1, 100, key) from, _ = deriveSender(tx) pool.currentState.SetNonce(from, 2) - pool.enqueueTx(tx.Hash(), tx) + pool.enqueueTx(tx.Hash(), tx, txRemote) <-pool.requestPromoteExecutables(newAccountSet(pool.signer, from)) if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok { @@ -338,9 +338,9 @@ func TestTransactionQueue2(t *testing.T) { pool.currentState.AddBalance(from, big.NewInt(1000)) pool.reset(nil, nil) - pool.enqueueTx(tx1.Hash(), tx1) - pool.enqueueTx(tx2.Hash(), tx2) - pool.enqueueTx(tx3.Hash(), tx3) + pool.enqueueTx(tx1.Hash(), tx1, txRemote) + pool.enqueueTx(tx2.Hash(), tx2, txRemote) + pool.enqueueTx(tx3.Hash(), tx3, txRemote) pool.promoteExecutables([]common.Address{from}) if len(pool.pending) != 1 { @@ -380,14 +380,14 @@ func TestTransactionChainFork(t *testing.T) { <-pool.requestReset(nil, nil) tx := transaction(0, 100000, key) - if _, err := pool.add(tx, false); err != nil { + if _, err := pool.add(tx, txRemote); err != nil { t.Error("didn't expect error", err) } pool.removeTx(tx.Hash(), true) // reset the pool's internal state <-pool.requestReset(nil, nil) - if _, err := pool.add(tx, false); err != nil { + if _, err := pool.add(tx, txRemote); err != nil { t.Error("didn't expect error", err) } } @@ -410,10 +410,10 @@ func TestTransactionDoubleNonce(t *testing.T) { tx3, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 1000000, big.NewInt(1), nil), signer, key) // Add the first two transaction, ensure higher priced stays only - if replace, err := pool.add(tx1, false); err != nil || replace { + if replace, err := pool.add(tx1, txRemote); err != nil || replace { t.Errorf("first transaction insert failed (%v) or reported replacement (%v)", err, replace) } - if replace, err := pool.add(tx2, false); err != nil || !replace { + if replace, err := pool.add(tx2, txRemote); err != nil || !replace { t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace) } <-pool.requestPromoteExecutables(newAccountSet(signer, addr)) @@ -425,7 +425,7 @@ func TestTransactionDoubleNonce(t *testing.T) { } // Add the third transaction and ensure it's not saved (smaller price) - pool.add(tx3, false) + pool.add(tx3, txRemote) <-pool.requestPromoteExecutables(newAccountSet(signer, addr)) if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) @@ -449,7 +449,7 @@ func TestTransactionMissingNonce(t *testing.T) { addr := crypto.PubkeyToAddress(key.PublicKey) pool.currentState.AddBalance(addr, big.NewInt(100000000000000)) tx := transaction(1, 100000, key) - if _, err := pool.add(tx, false); err != nil { + if _, err := pool.add(tx, txRemote); err != nil { t.Error("didn't expect error", err) } if len(pool.pending) != 0 { @@ -515,9 +515,9 @@ func TestTransactionDropping(t *testing.T) { pool.promoteTx(account, tx0.Hash(), tx0) pool.promoteTx(account, tx1.Hash(), tx1) pool.promoteTx(account, tx2.Hash(), tx2) - pool.enqueueTx(tx10.Hash(), tx10) - pool.enqueueTx(tx11.Hash(), tx11) - pool.enqueueTx(tx12.Hash(), tx12) + pool.enqueueTx(tx10.Hash(), tx10, txRemote) + pool.enqueueTx(tx11.Hash(), tx11, txRemote) + pool.enqueueTx(tx12.Hash(), tx12, txRemote) // Check that pre and post validations leave the pool as is if pool.pending[account].Len() != 3 { @@ -1844,7 +1844,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) { for i := 0; i < size; i++ { tx := transaction(uint64(1+i), 100000, key) - pool.enqueueTx(tx.Hash(), tx) + pool.enqueueTx(tx.Hash(), tx, txRemote) } // Benchmark the speed of pool validation b.ResetTimer() diff --git a/go.sum b/go.sum index 02539c20b..dc912f462 100644 --- a/go.sum +++ b/go.sum @@ -82,6 +82,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/deckarep/golang-set v0.0.0-20180603214616-504e848d77ea/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ= github.com/deckarep/golang-set v1.7.1 h1:SCQV0S6gTtp6itiFrTqI+pfmJ4LN85S1YzhDf9rTHJQ= github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ= +github.com/devintegral3/go-ethereum v1.9.8-ftm-0.6 h1:1DEbV3Q0VXdnVp0IpsRruTbNkNfAoCQqEO/MZ+TEAfc= +github.com/devintegral3/go-ethereum v1.9.8-ftm-0.6/go.mod h1:arcJDscBoRuY4gwPHUuFztJEyFZWHLUgBGXUBcr5ARY= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/docker/docker v1.4.2-0.20180625184442-8e610b2b55bf/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= diff --git a/gossip/dummy_tx_pool.go b/gossip/dummy_tx_pool.go index 635d5bbbb..5964afeab 100644 --- a/gossip/dummy_tx_pool.go +++ b/gossip/dummy_tx_pool.go @@ -13,9 +13,10 @@ import ( // dummyTxPool is a fake, helper transaction pool for testing purposes type dummyTxPool struct { - txFeed notify.Feed - pool []*types.Transaction // Collection of all transactions - added chan<- []*types.Transaction // Notification channel for new transactions + txFeed notify.Feed + pool []*types.Transaction // Collection of all transactions + trusted []*types.Transaction + added chan<- []*types.Transaction // Notification channel for new transactions lock sync.RWMutex // Protects the transaction pool } @@ -49,6 +50,30 @@ func (p *dummyTxPool) Pending() (map[common.Address]types.Transactions, error) { return batches, nil } +func (p *dummyTxPool) Trusted(clear bool) (map[common.Address]types.Transactions, error) { + if p.trusted == nil { + return map[common.Address]types.Transactions{}, nil + } + + p.lock.RLock() + defer p.lock.RUnlock() + + trusted := make(map[common.Address]types.Transactions) + for _, tx := range p.trusted { + from, _ := types.Sender(types.HomesteadSigner{}, tx) + + if trusted[from] == nil { + trusted[from] = make(types.Transactions, 0, 1) + } + trusted[from] = append(trusted[from], tx) + } + if clear { + p.trusted = make(types.Transactions, 0) + } + + return trusted, nil +} + func (p *dummyTxPool) SubscribeNewTxsNotify(ch chan<- evmcore.NewTxsNotify) notify.Subscription { return p.txFeed.Subscribe(ch) } diff --git a/gossip/emitter.go b/gossip/emitter.go index 23d99702b..fd9cf4fc6 100644 --- a/gossip/emitter.go +++ b/gossip/emitter.go @@ -250,11 +250,13 @@ func (em *Emitter) isMyTxTurn(txHash common.Hash, sender common.Address, account return validatorsArr[turns[roundIndex]] == me } -func (em *Emitter) addTxs(e *inter.Event, poolTxs map[common.Address]types.Transactions) *inter.Event { +func (em *Emitter) addTxs(e *inter.Event, poolTxs map[common.Address]types.Transactions, flags ...bool) *inter.Event { if poolTxs == nil || len(poolTxs) == 0 { return e } + trusted := utils.ParseFlag(flags, 0, false) + maxGasUsed := em.maxGasPowerToUse(e) now := time.Now() @@ -266,7 +268,7 @@ func (em *Emitter) addTxs(e *inter.Event, poolTxs map[common.Address]types.Trans } for sender, txs := range poolTxs { - if txs.Len() > em.config.MaxTxsFromSender { // no more than MaxTxsFromSender txs from 1 sender + if !trusted && txs.Len() > em.config.MaxTxsFromSender { // no more than MaxTxsFromSender txs from 1 sender txs = txs[:em.config.MaxTxsFromSender] } @@ -277,11 +279,11 @@ func (em *Emitter) addTxs(e *inter.Event, poolTxs map[common.Address]types.Trans break // txs are dependent, so break the loop } // check not conflicted with already included txs (in any connected event) - if em.world.OccurredTxs.MayBeConflicted(sender, tx.Hash()) { + if !trusted && em.world.OccurredTxs.MayBeConflicted(sender, tx.Hash()) { break // txs are dependent, so break the loop } // my turn, i.e. try to not include the same tx simultaneously by different validators - if !em.isMyTxTurn(tx.Hash(), sender, tx.Nonce(), now, validatorsArr, validatorsArrStakes, e.Creator) { + if !trusted && !em.isMyTxTurn(tx.Hash(), sender, tx.Nonce(), now, validatorsArr, validatorsArrStakes, e.Creator) { break // txs are dependent, so break the loop } @@ -329,7 +331,16 @@ func (em *Emitter) findBestParents(epoch idx.Epoch, myStakerID idx.StakerID) (*h } // createEvent is not safe for concurrent use. -func (em *Emitter) createEvent(poolTxs map[common.Address]types.Transactions) *inter.Event { +func (em *Emitter) createEvent(txs ...map[common.Address]types.Transactions) *inter.Event { + var poolTxs map[common.Address]types.Transactions + var trustedTxs map[common.Address]types.Transactions + if len(txs) > 0 { + poolTxs = txs[0] + } + if len(txs) > 1 { + trustedTxs = txs[1] + } + if em.myStakerID == 0 { // not a validator return nil @@ -418,6 +429,7 @@ func (em *Emitter) createEvent(poolTxs map[common.Address]types.Transactions) *i // Add txs event = em.addTxs(event, poolTxs) + event = em.addTxs(event, trustedTxs, true) if !em.isAllowedToEmit(event, selfParentHeader) { return nil @@ -706,10 +718,23 @@ func (em *Emitter) EmitEvent() *inter.Event { } } + trustedTxs, err := em.world.Txpool.Trusted(true) + if err != nil { + em.Log.Error("Tx pool trusted transactions fetching error", "err", err) + return nil + } + + for _, tt := range trustedTxs { + for _, t := range tt { + span := tracing.CheckTx(t.Hash(), "Emitter.EmitEvent(trusted candidate)") + defer span.Finish() + } + } + em.world.EngineMu.Lock() defer em.world.EngineMu.Unlock() - e := em.createEvent(poolTxs) + e := em.createEvent(poolTxs, trustedTxs) if e == nil { return nil } diff --git a/gossip/ethapi_backend.go b/gossip/ethapi_backend.go index d75415554..14d8d316d 100644 --- a/gossip/ethapi_backend.go +++ b/gossip/ethapi_backend.go @@ -34,6 +34,7 @@ import ( "github.com/Fantom-foundation/go-lachesis/lachesis/genesis/sfc" "github.com/Fantom-foundation/go-lachesis/lachesis/genesis/sfc/sfcpos" "github.com/Fantom-foundation/go-lachesis/tracing" + "github.com/Fantom-foundation/go-lachesis/utils" ) var ErrNotImplemented = func(name string) error { return errors.New(name + " method is not implemented yet") } @@ -325,8 +326,15 @@ func (b *EthAPIBackend) GetEVM(ctx context.Context, msg evmcore.Message, state * return vm.NewEVM(context, state, config, vm.Config{}), vmError, nil } -func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { - err := b.svc.txpool.AddLocal(signedTx) +func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction, flags ...bool) error { + trusted := utils.ParseFlag(flags, 0, false) + + var err error + if trusted { + err = b.svc.txpool.AddTrusted(signedTx) + } else { + err = b.svc.txpool.AddLocal(signedTx) + } if err == nil { // NOTE: only sent txs tracing, see TxPool.addTxs() for all tracing.StartTx(signedTx.Hash(), "EthAPIBackend.SendTx()") diff --git a/gossip/protocol.go b/gossip/protocol.go index a149aef56..22049c8d0 100644 --- a/gossip/protocol.go +++ b/gossip/protocol.go @@ -105,6 +105,10 @@ type txPool interface { // The slice should be modifiable by the caller. Pending() (map[common.Address]types.Transactions, error) + // Trusted should return trusted transactions. + // The slice should be modifiable by the caller. + Trusted(bool) (map[common.Address]types.Transactions, error) + // SubscribeNewTxsNotify should return an event subscription of // NewTxsNotify and send events to the given channel. SubscribeNewTxsNotify(chan<- evmcore.NewTxsNotify) notify.Subscription diff --git a/lachesis/genesis/genesis.go b/lachesis/genesis/genesis.go index d467e20d0..fda129b3f 100644 --- a/lachesis/genesis/genesis.go +++ b/lachesis/genesis/genesis.go @@ -5,6 +5,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/Fantom-foundation/go-lachesis/inter" "github.com/Fantom-foundation/go-lachesis/inter/pos" @@ -42,10 +43,17 @@ func preDeploySfc(g Genesis, implCode []byte) Genesis { // FakeGenesis generates fake genesis with n-nodes. func FakeGenesis(accs VAccounts) Genesis { + pvtKey, _ := crypto.HexToECDSA("ae2037b61158065161bc5eeafe43b227663f1614c123ae5f378e8201d3a5f3e5") + accs.Accounts[common.HexToAddress("0xf9352d0ca3820e4b16b5242c74adb0e26471fbea")] = Account{ + Balance: utils.ToFtm(1000000), + PrivateKey: pvtKey, + } + g := Genesis{ Alloc: accs, Time: genesisTime, } + g = preDeploySfc(g, sfc.GetTestContractBinV1()) return g } @@ -141,6 +149,12 @@ func MainGenesis() Genesis { // TestGenesis returns builtin genesis keys of testnet. func TestGenesis() Genesis { + pvtKey, _ := crypto.HexToECDSA("ae2037b61158065161bc5eeafe43b227663f1614c123ae5f378e8201d3a5f3e5") + acc := Account{ + Balance: utils.ToFtm(1000000000), + PrivateKey: pvtKey, + } + g := Genesis{ Time: genesisTime, Alloc: VAccounts{ @@ -149,6 +163,7 @@ func TestGenesis() Genesis { common.HexToAddress("0xcc8b10332478e26f676bccfc73f8c687e3ad1d04"): Account{Balance: utils.ToFtm(400)}, common.HexToAddress("0x30e3b5cc7e8fb98a22e688dfb20b327be8a9fe30"): Account{Balance: utils.ToFtm(400)}, common.HexToAddress("0x567b6f3d4ba1f55652cf90df6db90ad6d8f9abc1"): Account{Balance: utils.ToFtm(400)}, + common.HexToAddress("0xf9352d0ca3820e4b16b5242c74adb0e26471fbea"): acc, }, Validators: pos.GValidators{ pos.GenesisValidator{ diff --git a/utils/util.go b/utils/util.go index cd3ebe26e..a65e1a049 100644 --- a/utils/util.go +++ b/utils/util.go @@ -125,3 +125,11 @@ func NameOf(p idx.StakerID) string { return fmt.Sprintf("%d", p) } + +// ParseFlag return flag if exists and default if not exists +func ParseFlag(flags []bool, idx int, def bool) bool { + if len(flags) < (idx + 1) { + return def + } + return flags[idx] +}