Skip to content

Commit

Permalink
les: address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 committed Aug 15, 2019
1 parent 8304db4 commit 9bd903c
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 26 deletions.
8 changes: 4 additions & 4 deletions les/costtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ type costTracker struct {
stats map[uint64][]uint64 // Used for testing purpose.

// TestHooks
disableRealCost bool // Disable real cost evaluation for testing purpose.
costListHook func() RequestCostList // Return customized cost table for testing purpose.
testing bool // Disable real cost evaluation for testing purpose.
testCostList RequestCostList // Customized cost table for testing purpose.
}

// newCostTracker creates a cost tracker and loads the cost factor statistics from the database.
Expand Down Expand Up @@ -269,8 +269,8 @@ func (ct *costTracker) gfLoop() {
for {
select {
case r := <-ct.reqInfoCh:
requestServedTimer.Update(time.Duration(r.servingTime))
requestEstimatedTimer.Update(time.Duration(r.avgTimeCost / factor))
requestServedMeter.Mark(int64(r.servingTime))
requestEstimatedMeter.Mark(int64(r.avgTimeCost / factor))
relativeCostHistogram.Update(int64(r.avgTimeCost / factor / r.servingTime))

now := mclock.Now()
Expand Down
8 changes: 5 additions & 3 deletions les/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type distReq struct {
sentChn chan distPeer
element *list.Element
waitForPeers mclock.AbsTime
enterQueue time.Time
enterQueue mclock.AbsTime
}

// newRequestDistributor creates a new request distributor
Expand Down Expand Up @@ -146,7 +146,7 @@ func (d *requestDistributor) loop() {
send := req.request(peer)
if send != nil {
peer.queueSend(send)
requestSendDelay.UpdateSince(req.enterQueue)
requestSendDelay.Update(time.Duration(d.clock.Now() - req.enterQueue))
}
chn <- peer
close(chn)
Expand Down Expand Up @@ -256,7 +256,9 @@ func (d *requestDistributor) queue(r *distReq) chan distPeer {
r.reqOrder = d.lastReqOrder
r.waitForPeers = d.clock.Now() + mclock.AbsTime(waitForPeers)
}
r.enterQueue = time.Now()
// Assign the timestamp when the request is queued no matter it's
// a new one or re-queued one.
r.enterQueue = d.clock.Now()

back := d.reqQueue.Back()
if back == nil || r.reqOrder > back.Value.(*distReq).reqOrder {
Expand Down
2 changes: 1 addition & 1 deletion les/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ func TestStopResumeLes3(t *testing.T) {
server, tearDown := newServerEnv(t, 0, 3, nil, true, true, testBufLimit/10)
defer tearDown()

server.handler.server.costTracker.disableRealCost = true
server.handler.server.costTracker.testing = true

var (
reqID uint64
Expand Down
4 changes: 2 additions & 2 deletions les/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ var (
totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil)
blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil)

requestServedTimer = metrics.NewRegisteredTimer("les/server/req/servingTime", nil)
requestEstimatedTimer = metrics.NewRegisteredTimer("les/server/req/estimatedTime", nil)
requestServedMeter = metrics.NewRegisteredMeter("les/server/req/servingTime", nil)
requestEstimatedMeter = metrics.NewRegisteredMeter("les/server/req/estimatedTime", nil)
relativeCostHistogram = metrics.NewRegisteredHistogram("les/server/req/relative", nil, metrics.NewExpDecaySample(1028, 0.015))

recentServedGauge = metrics.NewRegisteredGauge("les/server/recentRequestServed", nil)
Expand Down
12 changes: 5 additions & 7 deletions les/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,6 @@ func (p *peer) updateCapacity(cap uint64) {
}

func (p *peer) responseID() uint64 {
p.responseLock.Lock()
defer p.responseLock.Unlock()

p.responseCount += 1
return p.responseCount
}
Expand Down Expand Up @@ -607,10 +604,11 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis
send = send.add("flowControl/BL", server.defParams.BufLimit)
send = send.add("flowControl/MRR", server.defParams.MinRecharge)

costList := server.costTracker.makeCostList(server.costTracker.globalFactor())
// Generate some fake cost list for testing purpose.
if server.costTracker.costListHook != nil {
costList = server.costTracker.costListHook()
var costList RequestCostList
if server.costTracker.testCostList != nil {
costList = server.costTracker.testCostList
} else {
costList = server.costTracker.makeCostList(server.costTracker.globalFactor())
}
send = send.add("flowControl/MRC", costList)
p.fcCosts = costList.decode(ProtocolLengths[uint(p.version)])
Expand Down
6 changes: 5 additions & 1 deletion les/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (h *serverHandler) handleMsg(p *peer) error {
factor := h.server.costTracker.globalFactor()
if factor < 0.001 {
factor = 1
p.Log().Error("Invalid global cost factor", "factor", factor)
}
maxTime := uint64(float64(maxCost) / factor)
task = h.server.servingQueue.newTask(p, maxTime, priority)
Expand All @@ -215,6 +216,9 @@ func (h *serverHandler) handleMsg(p *peer) error {
}
// sendResponse sends back the response and updates the flow control statistic.
sendResponse := func(reqID, amount uint64, reply *reply, servingTime uint64) {
p.responseLock.Lock()
defer p.responseLock.Unlock()

// Short circuit if the client is already frozen.
if p.isFrozen() {
realCost := h.server.costTracker.realCost(servingTime, msg.Size, 0)
Expand All @@ -228,7 +232,7 @@ func (h *serverHandler) handleMsg(p *peer) error {
}
realCost := h.server.costTracker.realCost(servingTime, msg.Size, replySize)
// Assign a fake cost for testing purpose.
if h.server.costTracker.disableRealCost {
if h.server.costTracker.testing {
realCost = maxCost
}
bv := p.fcClient.RequestProcessed(reqID, respId, maxCost, realCost)
Expand Down
15 changes: 7 additions & 8 deletions les/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da
fcManager: flowcontrol.NewClientManager(nil, clock),
}
server.costTracker, server.freeCapacity = newCostTracker(db, server.config)
server.costTracker.costListHook = func() RequestCostList { return testCostList(0) } // Disable flow control mechanism.
server.costTracker.testCostList = testCostList(0) // Disable flow control mechanism.
server.handler = newServerHandler(server, simulation.Blockchain(), db, txpool, func() bool { return true })
if server.oracle != nil {
server.oracle.start(simulation)
Expand Down Expand Up @@ -323,17 +323,16 @@ func newTestPeer(t *testing.T, name string, version int, handler *serverHandler,
}
// Execute any implicitly requested handshakes and return
if shake {
// Customize the cost table generation function.
// Customize the cost table if required.
if testCost != 0 {
handler.server.costTracker.costListHook = func() RequestCostList { return testCostList(testCost) }
handler.server.costTracker.testCostList = testCostList(testCost)
}
var (
genesis = handler.blockchain.Genesis()
head = handler.blockchain.CurrentHeader()
td = handler.blockchain.GetTd(head.Hash(), head.Number.Uint64())
costList = handler.server.costTracker.costListHook()
genesis = handler.blockchain.Genesis()
head = handler.blockchain.CurrentHeader()
td = handler.blockchain.GetTd(head.Hash(), head.Number.Uint64())
)
tp.handshake(t, td, head.Hash(), head.Number.Uint64(), genesis.Hash(), costList)
tp.handshake(t, td, head.Hash(), head.Number.Uint64(), genesis.Hash(), testCostList(testCost))
}
return tp, errCh
}
Expand Down

0 comments on commit 9bd903c

Please sign in to comment.