Skip to content

Commit

Permalink
les: handler separation
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 committed Aug 12, 2019
1 parent c9cdf14 commit eb55adf
Show file tree
Hide file tree
Showing 31 changed files with 2,352 additions and 2,517 deletions.
28 changes: 28 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const (
bodyCacheLimit = 256
blockCacheLimit = 256
receiptsCacheLimit = 32
txLookupCacheLimit = 1024
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
badBlockLimit = 10
Expand Down Expand Up @@ -155,6 +156,7 @@ type BlockChain struct {
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
receiptsCache *lru.Cache // Cache for the most recent receipts per block
blockCache *lru.Cache // Cache for the most recent entire blocks
txLookupCache *lru.Cache // Cache for the most recent transaction lookup data.
futureBlocks *lru.Cache // future blocks are blocks added for later processing

quit chan struct{} // blockchain quit channel
Expand Down Expand Up @@ -189,6 +191,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
bodyRLPCache, _ := lru.New(bodyCacheLimit)
receiptsCache, _ := lru.New(receiptsCacheLimit)
blockCache, _ := lru.New(blockCacheLimit)
txLookupCache, _ := lru.New(txLookupCacheLimit)
futureBlocks, _ := lru.New(maxFutureBlocks)
badBlocks, _ := lru.New(badBlockLimit)

Expand All @@ -204,6 +207,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
bodyRLPCache: bodyRLPCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
txLookupCache: txLookupCache,
futureBlocks: futureBlocks,
engine: engine,
vmConfig: vmConfig,
Expand Down Expand Up @@ -440,6 +444,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
bc.bodyRLPCache.Purge()
bc.receiptsCache.Purge()
bc.blockCache.Purge()
bc.txLookupCache.Purge()
bc.futureBlocks.Purge()

return bc.loadLastState()
Expand Down Expand Up @@ -921,6 +926,7 @@ func (bc *BlockChain) truncateAncient(head uint64) error {
bc.bodyRLPCache.Purge()
bc.receiptsCache.Purge()
bc.blockCache.Purge()
bc.txLookupCache.Purge()
bc.futureBlocks.Purge()

log.Info("Rewind ancient data", "number", head)
Expand Down Expand Up @@ -2146,6 +2152,28 @@ func (bc *BlockChain) GetHeaderByNumber(number uint64) *types.Header {
return bc.hc.GetHeaderByNumber(number)
}

// GetBlockNumber retrieves the block number belonging to the given hash
// from the cache or database
func (bc *BlockChain) GetBlockNumber(hash common.Hash) *uint64 {
return bc.hc.GetBlockNumber(hash)
}

// GetTransactionLookup retrieves the lookup associate with the given transaction
// hash from the cache or database.
func (bc *BlockChain) GetTransactionLookup(hash common.Hash) *rawdb.LegacyTxLookupEntry {
// Short circuit if the txloop already in the cache, retrieve otherwise
if lookup, exist := bc.txLookupCache.Get(hash); exist {
return lookup.(*rawdb.LegacyTxLookupEntry)
}
tx, blockHash, blockNumber, txIndex := rawdb.ReadTransaction(bc.db, hash)
if tx == nil {
return nil
}
lookup := &rawdb.LegacyTxLookupEntry{BlockHash: blockHash, BlockIndex: blockNumber, Index: txIndex}
bc.txLookupCache.Add(hash, lookup)
return lookup
}

// Config retrieves the chain's fork configuration.
func (bc *BlockChain) Config() *params.ChainConfig { return bc.chainConfig }

Expand Down
14 changes: 5 additions & 9 deletions les/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,11 @@ var (
// PrivateLightAPI provides an API to access the LES light server or light client.
type PrivateLightAPI struct {
backend *lesCommons
reg *checkpointOracle
}

// NewPrivateLightAPI creates a new LES service API.
func NewPrivateLightAPI(backend *lesCommons, reg *checkpointOracle) *PrivateLightAPI {
return &PrivateLightAPI{
backend: backend,
reg: reg,
}
func NewPrivateLightAPI(backend *lesCommons) *PrivateLightAPI {
return &PrivateLightAPI{backend: backend}
}

// LatestCheckpoint returns the latest local checkpoint package.
Expand Down Expand Up @@ -67,7 +63,7 @@ func (api *PrivateLightAPI) LatestCheckpoint() ([4]string, error) {
// result[2], 32 bytes hex encoded latest section bloom trie root hash
func (api *PrivateLightAPI) GetCheckpoint(index uint64) ([3]string, error) {
var res [3]string
cp := api.backend.getLocalCheckpoint(index)
cp := api.backend.localCheckpoint(index)
if cp.Empty() {
return res, errNoCheckpoint
}
Expand All @@ -77,8 +73,8 @@ func (api *PrivateLightAPI) GetCheckpoint(index uint64) ([3]string, error) {

// GetCheckpointContractAddress returns the contract contract address in hex format.
func (api *PrivateLightAPI) GetCheckpointContractAddress() (string, error) {
if api.reg == nil {
if api.backend.oracle == nil {
return "", errNotActivated
}
return api.reg.config.Address.Hex(), nil
return api.backend.oracle.config.Address.Hex(), nil
}
2 changes: 1 addition & 1 deletion les/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (b *LesApiBackend) CurrentBlock() *types.Block {
}

func (b *LesApiBackend) SetHead(number uint64) {
b.eth.protocolManager.downloader.Cancel()
b.eth.handler.downloader.Cancel()
b.eth.blockchain.SetHead(number)
}

Expand Down
18 changes: 5 additions & 13 deletions les/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,16 @@ func TestCapacityAPI10(t *testing.T) {
// while connected and going back and forth between free and priority mode with
// the supplied API calls is also thoroughly tested.
func testCapacityAPI(t *testing.T, clientCount int) {
// Skip test if no data dir specified
if testServerDataDir == "" {
// Skip test if no data dir specified
return
}

for !testSim(t, 1, clientCount, []string{testServerDataDir}, nil, func(ctx context.Context, net *simulations.Network, servers []*simulations.Node, clients []*simulations.Node) bool {
if len(servers) != 1 {
t.Fatalf("Invalid number of servers: %d", len(servers))
}
server := servers[0]

clientRpcClients := make([]*rpc.Client, len(clients))

serverRpcClient, err := server.Client()
if err != nil {
t.Fatalf("Failed to obtain rpc client: %v", err)
Expand All @@ -105,13 +102,13 @@ func testCapacityAPI(t *testing.T, clientCount int) {
}
freeIdx := rand.Intn(len(clients))

clientRpcClients := make([]*rpc.Client, len(clients))
for i, client := range clients {
var err error
clientRpcClients[i], err = client.Client()
if err != nil {
t.Fatalf("Failed to obtain rpc client: %v", err)
}

t.Log("connecting client", i)
if i != freeIdx {
setCapacity(ctx, t, serverRpcClient, client.ID(), testCap/uint64(len(clients)))
Expand All @@ -138,21 +135,22 @@ func testCapacityAPI(t *testing.T, clientCount int) {

reqCount := make([]uint64, len(clientRpcClients))

// Send light request like crazy.
for i, c := range clientRpcClients {
wg.Add(1)
i, c := i, c
go func() {
defer wg.Done()

queue := make(chan struct{}, 100)
reqCount[i] = 0
for {
select {
case queue <- struct{}{}:
select {
case <-stop:
wg.Done()
return
case <-ctx.Done():
wg.Done()
return
default:
wg.Add(1)
Expand All @@ -169,10 +167,8 @@ func testCapacityAPI(t *testing.T, clientCount int) {
}()
}
case <-stop:
wg.Done()
return
case <-ctx.Done():
wg.Done()
return
}
}
Expand Down Expand Up @@ -313,12 +309,10 @@ func getHead(ctx context.Context, t *testing.T, client *rpc.Client) (uint64, com
}

func testRequest(ctx context.Context, t *testing.T, client *rpc.Client) bool {
//res := make(map[string]interface{})
var res string
var addr common.Address
rand.Read(addr[:])
c, _ := context.WithTimeout(ctx, time.Second*12)
// if err := client.CallContext(ctx, &res, "eth_getProof", addr, nil, "latest"); err != nil {
err := client.CallContext(c, &res, "eth_getBalance", addr, "latest")
if err != nil {
t.Log("request error:", err)
Expand Down Expand Up @@ -418,7 +412,6 @@ func NewNetwork() (*simulations.Network, func(), error) {
adapterTeardown()
net.Shutdown()
}

return net, teardown, nil
}

Expand Down Expand Up @@ -516,7 +509,6 @@ func newLesServerService(ctx *adapters.ServiceContext) (node.Service, error) {
if err != nil {
return nil, err
}

server, err := NewLesServer(ethereum, &config)
if err != nil {
return nil, err
Expand Down
47 changes: 24 additions & 23 deletions les/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
// requestBenchmark is an interface for different randomized request generators
type requestBenchmark interface {
// init initializes the generator for generating the given number of randomized requests
init(pm *ProtocolManager, count int) error
init(h *serverHandler, count int) error
// request initiates sending a single request to the given peer
request(peer *peer, index int) error
}
Expand All @@ -52,10 +52,10 @@ type benchmarkBlockHeaders struct {
hashes []common.Hash
}

func (b *benchmarkBlockHeaders) init(pm *ProtocolManager, count int) error {
func (b *benchmarkBlockHeaders) init(h *serverHandler, count int) error {
d := int64(b.amount-1) * int64(b.skip+1)
b.offset = 0
b.randMax = pm.blockchain.CurrentHeader().Number.Int64() + 1 - d
b.randMax = h.blockchain.CurrentHeader().Number.Int64() + 1 - d
if b.randMax < 0 {
return fmt.Errorf("chain is too short")
}
Expand All @@ -65,7 +65,7 @@ func (b *benchmarkBlockHeaders) init(pm *ProtocolManager, count int) error {
if b.byHash {
b.hashes = make([]common.Hash, count)
for i := range b.hashes {
b.hashes[i] = rawdb.ReadCanonicalHash(pm.chainDb, uint64(b.offset+rand.Int63n(b.randMax)))
b.hashes[i] = rawdb.ReadCanonicalHash(h.chainDb, uint64(b.offset+rand.Int63n(b.randMax)))
}
}
return nil
Expand All @@ -85,11 +85,11 @@ type benchmarkBodiesOrReceipts struct {
hashes []common.Hash
}

func (b *benchmarkBodiesOrReceipts) init(pm *ProtocolManager, count int) error {
randMax := pm.blockchain.CurrentHeader().Number.Int64() + 1
func (b *benchmarkBodiesOrReceipts) init(h *serverHandler, count int) error {
randMax := h.blockchain.CurrentHeader().Number.Int64() + 1
b.hashes = make([]common.Hash, count)
for i := range b.hashes {
b.hashes[i] = rawdb.ReadCanonicalHash(pm.chainDb, uint64(rand.Int63n(randMax)))
b.hashes[i] = rawdb.ReadCanonicalHash(h.chainDb, uint64(rand.Int63n(randMax)))
}
return nil
}
Expand All @@ -108,8 +108,8 @@ type benchmarkProofsOrCode struct {
headHash common.Hash
}

func (b *benchmarkProofsOrCode) init(pm *ProtocolManager, count int) error {
b.headHash = pm.blockchain.CurrentHeader().Hash()
func (b *benchmarkProofsOrCode) init(h *serverHandler, count int) error {
b.headHash = h.blockchain.CurrentHeader().Hash()
return nil
}

Expand All @@ -130,11 +130,11 @@ type benchmarkHelperTrie struct {
sectionCount, headNum uint64
}

func (b *benchmarkHelperTrie) init(pm *ProtocolManager, count int) error {
func (b *benchmarkHelperTrie) init(h *serverHandler, count int) error {
if b.bloom {
b.sectionCount, b.headNum, _ = pm.server.bloomTrieIndexer.Sections()
b.sectionCount, b.headNum, _ = h.server.bloomTrieIndexer.Sections()
} else {
b.sectionCount, _, _ = pm.server.chtIndexer.Sections()
b.sectionCount, _, _ = h.server.chtIndexer.Sections()
b.headNum = b.sectionCount*params.CHTFrequency - 1
}
if b.sectionCount == 0 {
Expand Down Expand Up @@ -170,7 +170,7 @@ type benchmarkTxSend struct {
txs types.Transactions
}

func (b *benchmarkTxSend) init(pm *ProtocolManager, count int) error {
func (b *benchmarkTxSend) init(h *serverHandler, count int) error {
key, _ := crypto.GenerateKey()
addr := crypto.PubkeyToAddress(key.PublicKey)
signer := types.NewEIP155Signer(big.NewInt(18))
Expand All @@ -196,7 +196,7 @@ func (b *benchmarkTxSend) request(peer *peer, index int) error {
// benchmarkTxStatus implements requestBenchmark
type benchmarkTxStatus struct{}

func (b *benchmarkTxStatus) init(pm *ProtocolManager, count int) error {
func (b *benchmarkTxStatus) init(h *serverHandler, count int) error {
return nil
}

Expand All @@ -217,7 +217,7 @@ type benchmarkSetup struct {

// runBenchmark runs a benchmark cycle for all benchmark types in the specified
// number of passes
func (pm *ProtocolManager) runBenchmark(benchmarks []requestBenchmark, passCount int, targetTime time.Duration) []*benchmarkSetup {
func (h *serverHandler) runBenchmark(benchmarks []requestBenchmark, passCount int, targetTime time.Duration) []*benchmarkSetup {
setup := make([]*benchmarkSetup, len(benchmarks))
for i, b := range benchmarks {
setup[i] = &benchmarkSetup{req: b}
Expand All @@ -239,7 +239,7 @@ func (pm *ProtocolManager) runBenchmark(benchmarks []requestBenchmark, passCount
if next.totalTime > 0 {
count = int(uint64(next.totalCount) * uint64(targetTime) / uint64(next.totalTime))
}
if err := pm.measure(next, count); err != nil {
if err := h.measure(next, count); err != nil {
next.err = err
}
}
Expand Down Expand Up @@ -275,14 +275,15 @@ func (m *meteredPipe) WriteMsg(msg p2p.Msg) error {

// measure runs a benchmark for a single type in a single pass, with the given
// number of requests
func (pm *ProtocolManager) measure(setup *benchmarkSetup, count int) error {
func (h *serverHandler) measure(setup *benchmarkSetup, count int) error {
clientPipe, serverPipe := p2p.MsgPipe()
clientMeteredPipe := &meteredPipe{rw: clientPipe}
serverMeteredPipe := &meteredPipe{rw: serverPipe}
var id enode.ID
rand.Read(id[:])
clientPeer := pm.newPeer(lpv2, NetworkId, p2p.NewPeer(id, "client", nil), clientMeteredPipe)
serverPeer := pm.newPeer(lpv2, NetworkId, p2p.NewPeer(id, "server", nil), serverMeteredPipe)

clientPeer := newPeer(lpv2, NetworkId, false, p2p.NewPeer(id, "client", nil), clientMeteredPipe)
serverPeer := newPeer(lpv2, NetworkId, false, p2p.NewPeer(id, "server", nil), serverMeteredPipe)
serverPeer.sendQueue = newExecQueue(count)
serverPeer.announceType = announceTypeNone
serverPeer.fcCosts = make(requestCostTable)
Expand All @@ -291,10 +292,10 @@ func (pm *ProtocolManager) measure(setup *benchmarkSetup, count int) error {
serverPeer.fcCosts[code] = c
}
serverPeer.fcParams = flowcontrol.ServerParams{BufLimit: 1, MinRecharge: 1}
serverPeer.fcClient = flowcontrol.NewClientNode(pm.server.fcManager, serverPeer.fcParams)
serverPeer.fcClient = flowcontrol.NewClientNode(h.server.fcManager, serverPeer.fcParams)
defer serverPeer.fcClient.Disconnect()

if err := setup.req.init(pm, count); err != nil {
if err := setup.req.init(h, count); err != nil {
return err
}

Expand All @@ -311,7 +312,7 @@ func (pm *ProtocolManager) measure(setup *benchmarkSetup, count int) error {
}()
go func() {
for i := 0; i < count; i++ {
if err := pm.handleMsg(serverPeer); err != nil {
if err := h.handleMsg(serverPeer); err != nil {
errCh <- err
return
}
Expand All @@ -336,7 +337,7 @@ func (pm *ProtocolManager) measure(setup *benchmarkSetup, count int) error {
if err != nil {
return err
}
case <-pm.quitSync:
case <-h.closeCh:
clientPipe.Close()
serverPipe.Close()
return fmt.Errorf("Benchmark cancelled")
Expand Down
3 changes: 2 additions & 1 deletion les/bloombits.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ const (
func (eth *LightEthereum) startBloomHandlers(sectionSize uint64) {
for i := 0; i < bloomServiceThreads; i++ {
go func() {
defer eth.wg.Done()
for {
select {
case <-eth.shutdownChan:
case <-eth.closeCh:
return

case request := <-eth.bloomRequests:
Expand Down
Loading

0 comments on commit eb55adf

Please sign in to comment.