Skip to content

Commit

Permalink
modify transaction process (#465)
Browse files Browse the repository at this point in the history
* format slaveconn in masterbackend
  • Loading branch information
mask-pp committed Nov 12, 2019
1 parent 96b5201 commit 07d51af
Show file tree
Hide file tree
Showing 21 changed files with 201 additions and 241 deletions.
7 changes: 6 additions & 1 deletion cluster/master/api_backend.go
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/QuarkChain/goquarkchain/core/types"
"github.com/QuarkChain/goquarkchain/p2p"
qrpc "github.com/QuarkChain/goquarkchain/rpc"
"github.com/QuarkChain/goquarkchain/serialize"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -76,7 +77,11 @@ func (s *QKCMasterBackend) AddTransaction(tx *types.Transaction) error {
if err != nil {
return err
}
go s.protocolManager.BroadcastTransactions(fullShardId, []*types.Transaction{tx}, "")
data, err := serialize.SerializeToBytes(&p2p.NewTransactionList{TransactionList: []*types.Transaction{tx}})
if err != nil {
return err
}
go s.protocolManager.BroadcastTransactions(&rpc.P2PRedirectRequest{Branch: fullShardId, Data: data}, "")
return nil
}

Expand Down
95 changes: 23 additions & 72 deletions cluster/master/handle.go
Expand Up @@ -225,30 +225,7 @@ func (pm *ProtocolManager) handleMsg(peer *Peer) error {
return pm.HandleNewMinorTip(qkcMsg.MetaData.Branch, &tip, peer)

case qkcMsg.Op == p2p.NewTransactionListMsg:
var trans p2p.NewTransactionList
if err := serialize.DeserializeFromBytes(qkcMsg.Data, &trans); err != nil {
return err
}
if qkcMsg.MetaData.Branch != 0 {
return pm.HandleNewTransactionListRequest(peer.id, qkcMsg.RpcID, qkcMsg.MetaData.Branch, &trans)
}
branchTxMap := make(map[uint32][]*types.Transaction)
for _, tx := range trans.TransactionList {
fromShardSize, err := pm.clusterConfig.Quarkchain.GetShardSizeByChainId(tx.EvmTx.FromChainID())
if err != nil {
return err
}
if err := tx.EvmTx.SetFromShardSize(fromShardSize); err != nil {
return err
}
branchTxMap[tx.EvmTx.FromFullShardId()] = append(branchTxMap[tx.EvmTx.FromFullShardId()], tx)
}
// todo make them run in Parallelized
for branch, list := range branchTxMap {
if err := pm.HandleNewTransactionListRequest(peer.id, qkcMsg.RpcID, branch, &p2p.NewTransactionList{TransactionList: list}); err != nil {
return err
}
}
return pm.HandleNewTransactionListRequest(peer.id, qkcMsg.RpcID, qkcMsg.MetaData.Branch, qkcMsg.Data)

case qkcMsg.Op == p2p.NewBlockMinorMsg:
var newBlockMinor p2p.NewBlockMinor
Expand Down Expand Up @@ -604,57 +581,31 @@ func (pm *ProtocolManager) HandleGetRootBlockHeaderListWithSkipRequest(peerId st
return &p2p.GetRootBlockHeaderListResponse{RootTip: rTip, BlockHeaderList: headerlist}, nil
}

func (pm *ProtocolManager) HandleNewTransactionListRequest(peerId string, rpcId uint64, branch uint32, request *p2p.NewTransactionList) error {
req := &rpc.NewTransactionList{
TransactionList: request.TransactionList,
PeerID: peerId,
func (pm *ProtocolManager) HandleNewTransactionListRequest(peerId string, rpcId uint64, branch uint32, data []byte) error {
req := &rpc.P2PRedirectRequest{
Branch: branch,
Data: data,
PeerID: peerId,
}
var clients []rpc.ISlaveConn
if branch == 0 {
clients = pm.slaveConns.GetSlaveConns()
} else {
clients = pm.slaveConns.GetSlaveConnsById(branch)
}
clients := pm.slaveConns.GetSlaveConnsById(branch)
if len(clients) == 0 {
return fmt.Errorf("invalid branch %d for rpc request %d", rpcId, branch)
}
go func() {
var hashList []common.Hash
sameResponse := true
// todo make the client call in Parallelized
for _, client := range clients {
result, err := client.AddTransactions(req)
if err != nil {
log.Error("addTransaction err", "branch", branch, "HandleNewTransactionListRequest failed with error: ", err.Error())
return
}
if hashList == nil {
if result != nil {
hashList = result.Hashes
}
} else if len(hashList) != len(result.Hashes) {
sameResponse = false
} else {
for i := 0; i < len(hashList); i++ {
if hashList[i] != result.Hashes[i] {
sameResponse = false
break
}
}
}
}

if !sameResponse {
panic("same shard in different slave is inconsistent")
}
if len(hashList) > 0 {
tx2broadcast := make([]*types.Transaction, 0, len(request.TransactionList))
for _, tx := range request.TransactionList {
for _, hash := range hashList {
if tx.Hash() == hash {
tx2broadcast = append(tx2broadcast, tx)
break
}
}
for _, client := range clients {
conn := client
go func() {
err := conn.AddTransactions(req)
if err != nil {
log.Error("addTransaction err", "peerID", peerId, "branch", branch, "HandleNewTransactionListRequest failed with error: ", err.Error())
}
pm.BroadcastTransactions(branch, tx2broadcast, peerId)
}
}()
}()
}

return nil
}
Expand Down Expand Up @@ -747,13 +698,13 @@ func (pm *ProtocolManager) tipBroadcastLoop() {
}
}

func (pm *ProtocolManager) BroadcastTransactions(branch uint32, txs []*types.Transaction, sourcePeerId string) {
func (pm *ProtocolManager) BroadcastTransactions(txs *rpc.P2PRedirectRequest, sourcePeerId string) {
for _, peer := range pm.peers.Peers() {
if peer.id != sourcePeerId {
peer.AsyncSendTransactions(branch, txs)
peer.AsyncSendTransactions(txs)
}
}
log.Trace("Announced transaction", "count", len(txs), "recipients", pm.peers.Len()-1)
log.Trace("Announced transaction", "recipients", pm.peers.Len()-1)
}

// syncer is responsible for periodically synchronising with the network, both
Expand Down
19 changes: 10 additions & 9 deletions cluster/master/handler_test.go
Expand Up @@ -421,11 +421,8 @@ func TestBroadcastTransactions(t *testing.T) {
defer ctrl.Finish()
fakeConnMngr := newFakeConnManager(1, ctrl)
pm, _ := newTestProtocolManagerMust(t, 15, nil, NewFakeSynchronizer(1), fakeConnMngr)
txs := newTestTransactionList(10)
hashList := make([]common.Hash, 0, len(txs))
for _, tx := range txs {
hashList = append(hashList, tx.Hash())
}
txsBranch, err := newTestTransactionList(10)
assert.NoError(t, err)
peer, err := newTestPeer("peer", int(qkcconfig.P2PProtocolVersion), pm, true)
assert.NoError(t, err)

Expand All @@ -434,12 +431,12 @@ func TestBroadcastTransactions(t *testing.T) {

for _, conn := range fakeConnMngr.GetSlaveConns() {
conn.(*mock_master.MockISlaveConn).EXPECT().
AddTransactions(gomock.Any()).DoAndReturn(func(request *rpc.NewTransactionList) (*rpc.HashList, error) {
AddTransactions(gomock.Any()).DoAndReturn(func(request *rpc.P2PRedirectRequest) error {
errc <- nil
return &rpc.HashList{Hashes: hashList}, nil
return nil
}).AnyTimes()
}
err = clientPeer.SendTransactions(2, txs)
err = clientPeer.SendTransactions(txsBranch)
if err != nil {
t.Errorf("make message failed: %v", err.Error())
}
Expand Down Expand Up @@ -581,7 +578,11 @@ func ExpectMsg(r p2p.MsgReader, op p2p.P2PCommandOp, metadata p2p.Metadata, cont
return &qkcMsg, fmt.Errorf("MetaData miss match: got %d, want %d", qkcMsg.MetaData.Branch, metadata.Branch)
}

contentEnc, err := p2p.Encrypt(metadata, op, qkcMsg.RpcID, content)
cmdBytes, err := serialize.SerializeToBytes(content)
if err != nil {
return &qkcMsg, err
}
contentEnc, err := p2p.Encrypt(metadata, op, qkcMsg.RpcID, cmdBytes)
if err != nil {
panic("content encode error: " + err.Error())
}
Expand Down
8 changes: 6 additions & 2 deletions cluster/master/helper_test.go
Expand Up @@ -133,14 +133,18 @@ func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subs
return p.txFeed.Subscribe(ch)
}

func newTestTransactionList(count int) []*types.Transaction {
func newTestTransactionList(count int) (*rpc.P2PRedirectRequest, error) {
key, _ := crypto.HexToECDSA("45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8")
txs := make([]*types.Transaction, 0, count)
for i := 0; i < count; i++ {
tx := newTestTransaction(key, uint64(i), 100)
txs = append(txs, tx)
}
return txs
data, err := serialize.SerializeToBytes(&p2p.NewTransactionList{TransactionList: txs})
if err != nil {
return nil, err
}
return &rpc.P2PRedirectRequest{Branch: 0, Data: data}, nil
}

// newTestTransaction create a new dummy transaction.
Expand Down
4 changes: 2 additions & 2 deletions cluster/master/master_grpc.go
Expand Up @@ -69,11 +69,11 @@ func (m *MasterServerSideOp) BroadcastNewTip(ctx context.Context, req *rpc.Reque
}

func (m *MasterServerSideOp) BroadcastTransactions(ctx context.Context, req *rpc.Request) (*rpc.Response, error) {
broadcastTxsReq := new(rpc.BroadcastTransactions)
broadcastTxsReq := new(rpc.P2PRedirectRequest)
if err := serialize.DeserializeFromBytes(req.Data, broadcastTxsReq); err != nil {
return nil, err
}
m.p2pApi.BroadcastTransactions(broadcastTxsReq.Branch, broadcastTxsReq.Txs, broadcastTxsReq.PeerID)
m.p2pApi.BroadcastTransactions(broadcastTxsReq, broadcastTxsReq.PeerID)
return &rpc.Response{
RpcId: req.RpcId,
}, nil
Expand Down
7 changes: 3 additions & 4 deletions cluster/master/p2papi.go
Expand Up @@ -33,12 +33,11 @@ func (api *PrivateP2PAPI) BroadcastMinorBlock(branch uint32, block *types.MinorB

// BroadcastTransactions only be called when run performance test which the txs
// are created by shard itself, so broadcast to all the peer
func (api *PrivateP2PAPI) BroadcastTransactions(branch uint32, txs []*types.Transaction, peerID string) {
func (api *PrivateP2PAPI) BroadcastTransactions(txsBatch *rpc.P2PRedirectRequest, peerID string) {
for _, peer := range api.peers.Peers() {
if peer.id == peerID {
continue
if peer.id != peerID {
peer.AsyncSendTransactions(txsBatch)
}
peer.AsyncSendTransactions(branch, txs)
}
}

Expand Down
39 changes: 16 additions & 23 deletions cluster/master/peer.go
Expand Up @@ -5,15 +5,16 @@ package master
import (
"errors"
"fmt"
"github.com/QuarkChain/goquarkchain/p2p/nodefilter"
"io/ioutil"
"math/big"
"sync"
"time"

"github.com/QuarkChain/goquarkchain/cluster/rpc"
qkcom "github.com/QuarkChain/goquarkchain/common"
"github.com/QuarkChain/goquarkchain/core/types"
"github.com/QuarkChain/goquarkchain/p2p"
"github.com/QuarkChain/goquarkchain/p2p/nodefilter"
"github.com/QuarkChain/goquarkchain/serialize"
"github.com/ethereum/go-ethereum/common"
)
Expand Down Expand Up @@ -49,11 +50,6 @@ type newMinorBlock struct {
block *types.MinorBlock
}

type newTxs struct {
branch uint32
txs []*types.Transaction
}

type newTip struct {
branch uint32
tip *p2p.Tip
Expand All @@ -78,10 +74,10 @@ type Peer struct {

lock sync.RWMutex
chanLock sync.RWMutex
queuedTxs chan newTxs // Queue of transactions to broadcast to the peer
queuedMinorBlock chan newMinorBlock // Queue of blocks to broadcast to the peer
queuedTip chan newTip // Queue of Tips to announce to the peer
term chan struct{} // Termination channel to stop the broadcaster
queuedTxs chan *rpc.P2PRedirectRequest // Queue of transactions to broadcast to the peer
queuedMinorBlock chan newMinorBlock // Queue of blocks to broadcast to the peer
queuedTip chan newTip // Queue of Tips to announce to the peer
term chan struct{} // Termination channel to stop the broadcaster
chans map[uint64]chan interface{}
}

Expand All @@ -92,7 +88,7 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
version: version,
id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
head: &peerHead{nil, make(map[uint32]*p2p.Tip)},
queuedTxs: make(chan newTxs, maxQueuedTxs),
queuedTxs: make(chan *rpc.P2PRedirectRequest, maxQueuedTxs),
queuedMinorBlock: make(chan newMinorBlock, maxQueuedMinorBlocks),
queuedTip: make(chan newTip, maxQueuedTips),
term: make(chan struct{}),
Expand All @@ -107,12 +103,12 @@ func (p *Peer) broadcast() {
for {
select {
case nTxs := <-p.queuedTxs:
if err := p.SendTransactions(nTxs.branch, nTxs.txs); err != nil {
if err := p.SendTransactions(nTxs); err != nil {
p.Log().Error("Broadcast transactions failed",
"count", len(nTxs.txs), "branch", nTxs.branch, "error", err.Error())
"peerID", nTxs.PeerID, "branch", nTxs.Branch, "error", err.Error())
return
}
p.Log().Trace("Broadcast transactions", "count", len(nTxs.txs), "branch", nTxs.branch)
p.Log().Trace("Broadcast transactions", "peerID", nTxs.PeerID, "branch", nTxs.Branch)

case nBlock := <-p.queuedMinorBlock:
if err := p.SendNewMinorBlock(nBlock.branch, nBlock.block); err != nil {
Expand Down Expand Up @@ -197,11 +193,8 @@ func (p *Peer) PeerID() string {

// SendTransactions sends transactions to the peer and includes the hashes
// in its transaction hash set for future reference.
func (p *Peer) SendTransactions(branch uint32, txs []*types.Transaction) error {
data := p2p.NewTransactionList{}
data.TransactionList = txs

msg, err := p2p.MakeMsg(p2p.NewTransactionListMsg, 0, p2p.Metadata{Branch: branch}, data)
func (p *Peer) SendTransactions(p2pTxs *rpc.P2PRedirectRequest) error {
msg, err := p2p.MakeMsgWithSerializedData(p2p.NewTransactionListMsg, 0, p2p.Metadata{Branch: p2pTxs.Branch}, p2pTxs.Data)
if err != nil {
return err
}
Expand All @@ -210,12 +203,12 @@ func (p *Peer) SendTransactions(branch uint32, txs []*types.Transaction) error {

// AsyncSendTransactions queues list of transactions propagation to a remote
// peer. If the peer's broadcast queue is full, the event is silently dropped.
func (p *Peer) AsyncSendTransactions(branch uint32, txs []*types.Transaction) {
func (p *Peer) AsyncSendTransactions(txs *rpc.P2PRedirectRequest) {
select {
case p.queuedTxs <- newTxs{branch: branch, txs: txs}:
p.Log().Debug("add transaction to broadcast queue", "count", len(txs))
case p.queuedTxs <- txs:
p.Log().Debug("add transaction to broadcast queue", "peerID", txs.PeerID, "branch", txs.Branch)
default:
p.Log().Debug("Dropping transaction propagation", "count", len(txs))
p.Log().Debug("Dropping transaction propagation", "peerID", txs.PeerID, "branch", txs.Branch)
}
}

Expand Down
17 changes: 5 additions & 12 deletions cluster/master/slave_connection.go
Expand Up @@ -580,23 +580,16 @@ func (s *SlaveConnection) GenTx(numTxPerShard, xShardPercent uint32, tx *types.T
return nil
}

func (s *SlaveConnection) AddTransactions(request *rpc.NewTransactionList) (*rpc.HashList, error) {
var (
rsp = new(rpc.HashList)
res = new(rpc.Response)
)
func (s *SlaveConnection) AddTransactions(request *rpc.P2PRedirectRequest) error {
bytes, err := serialize.SerializeToBytes(request)
if err != nil {
return nil, err
return err
}
res, err = s.client.Call(s.target, &rpc.Request{Op: rpc.OpAddTransactions, Data: bytes})
_, err = s.client.Call(s.target, &rpc.Request{Op: rpc.OpAddTransactions, Data: bytes})
if err != nil {
return nil, err
}
if err = serialize.DeserializeFromBytes(res.Data, rsp); err != nil {
return nil, err
return err
}
return rsp, nil
return nil
}

func (s *SlaveConnection) GetMinorBlocks(request *rpc.GetMinorBlockListRequest) (*p2p.GetMinorBlockListResponse, error) {
Expand Down
12 changes: 4 additions & 8 deletions cluster/rpc/grpc_types.go
Expand Up @@ -282,10 +282,6 @@ type AddTransactionRequest struct {
Tx *types.Transaction `json:"tx" gencodec:"required"`
}

type HashList struct {
Hashes []common.Hash `json:"hash_list" gencodec:"required" bytesizeofslicelen:"4"`
}

// slave -> master
/*
Notify master about a successfully added minro block.
Expand Down Expand Up @@ -416,8 +412,8 @@ type GetRootChainStakesResponse struct {
Signer *account.Recipient `json:"signer" gencodec:"required"`
}

//NewTransactionList new transaction list
type NewTransactionList struct {
TransactionList []*types.Transaction `bytesizeofslicelen:"4"`
PeerID string `json:"peerid" gencodec:"required"`
type P2PRedirectRequest struct {
PeerID string `json:"peerid" gencodec:"required"`
Branch uint32
Data []byte `json:"data" gencodec:"required" bytesizeofslicelen:"4"` // *p2p.NewTransactionList
}

0 comments on commit 07d51af

Please sign in to comment.