Skip to content

Commit

Permalink
Merge pull request #4367 from nvanbenschoten/nvanbenschoten/timers
Browse files Browse the repository at this point in the history
Allocate timers outside of loops to avoid repeat allocations
  • Loading branch information
nvanbenschoten committed Feb 15, 2016
2 parents a9545df + 89fe553 commit 96381b5
Show file tree
Hide file tree
Showing 10 changed files with 254 additions and 21 deletions.
11 changes: 4 additions & 7 deletions acceptance/cluster/localcluster.go
Expand Up @@ -504,13 +504,10 @@ func (l *LocalCluster) Start() {
func (l *LocalCluster) Assert(t *testing.T) {
const almostZero = 50 * time.Millisecond
filter := func(ch chan Event, wait time.Duration) *Event {
for {
select {
case act := <-ch:
return &act
case <-time.After(wait):
}
break
select {
case act := <-ch:
return &act
case <-time.After(wait):
}
return nil
}
Expand Down
6 changes: 5 additions & 1 deletion gossip/gossip.go
Expand Up @@ -754,6 +754,8 @@ func (g *Gossip) bootstrap() {
stopper := g.server.stopper

stopper.RunWorker(func() {
var bootstrapTimer util.Timer
defer bootstrapTimer.Stop()
for {
stopper.RunTask(func() {
g.mu.Lock()
Expand All @@ -769,8 +771,10 @@ func (g *Gossip) bootstrap() {
})

// Pause an interval before next possible bootstrap.
bootstrapTimer.Reset(g.bootstrapInterval)
select {
case <-time.After(g.bootstrapInterval):
case <-bootstrapTimer.C:
bootstrapTimer.Read = true
// continue
case <-stopper.ShouldStop():
return
Expand Down
20 changes: 12 additions & 8 deletions kv/send.go
Expand Up @@ -159,8 +159,20 @@ func send(opts SendOptions, replicas ReplicaSlice,
var errors, retryableErrors int

// Wait for completions.
var sendNextTimer util.Timer
defer sendNextTimer.Stop()
for {
sendNextTimer.Reset(opts.SendNextTimeout)
select {
case <-sendNextTimer.C:
sendNextTimer.Read = true
// On successive RPC timeouts, send to additional replicas if available.
if len(orderedClients) > 0 {
sp.LogEvent("timeout, trying next peer")
sendOneFn(&orderedClients[0], opts.Timeout, context, sp, done)
orderedClients = orderedClients[1:]
}

case call := <-done:
if call.Error == nil {
// Verify response data integrity if this is a proto response.
Expand Down Expand Up @@ -207,14 +219,6 @@ func send(opts SendOptions, replicas ReplicaSlice,
sendOneFn(&orderedClients[0], opts.Timeout, context, sp, done)
orderedClients = orderedClients[1:]
}

case <-time.After(opts.SendNextTimeout):
// On successive RPC timeouts, send to additional replicas if available.
if len(orderedClients) > 0 {
sp.LogEvent("timeout, trying next peer")
sendOneFn(&orderedClients[0], opts.Timeout, context, sp, done)
orderedClients = orderedClients[1:]
}
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion kv/txn_coord_sender.go
Expand Up @@ -222,9 +222,13 @@ func NewTxnCoordSender(wrapped client.Sender, clock *hlc.Clock, linearizable boo
func (tc *TxnCoordSender) startStats() {
res := time.Millisecond // for duration logging resolution
lastNow := tc.clock.PhysicalNow()
var statusLogTimer util.Timer
defer statusLogTimer.Stop()
for {
statusLogTimer.Reset(statusLogInterval)
select {
case <-time.After(statusLogInterval):
case <-statusLogTimer.C:
statusLogTimer.Read = true
if !log.V(1) {
continue
}
Expand Down
6 changes: 5 additions & 1 deletion rpc/client.go
Expand Up @@ -309,6 +309,8 @@ func (c *Client) runHeartbeat(retryOpts retry.Options) {
}

var err = errUnstarted // initial condition
var heartbeatTimer util.Timer
defer heartbeatTimer.Stop()
for {
for r := retry.Start(retryOpts); r.Next(); {
if c.maybeClose(retryOpts.Closer) {
Expand Down Expand Up @@ -344,13 +346,15 @@ func (c *Client) runHeartbeat(retryOpts retry.Options) {

// Wait after the heartbeat so that the first iteration gets a wait-free
// heartbeat attempt.
heartbeatTimer.Reset(c.heartbeatInterval)
select {
case <-c.closer:
return
case <-retryOpts.Closer:
c.close()
return
case <-time.After(c.heartbeatInterval):
case <-heartbeatTimer.C:
heartbeatTimer.Read = true
// TODO(tamird): Perhaps retry more aggressively when the client is unhealthy.
}
}
Expand Down
7 changes: 6 additions & 1 deletion rpc/clock_offset.go
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/stop"
Expand Down Expand Up @@ -152,11 +153,15 @@ func (r *RemoteClockMonitor) MonitorRemoteOffsets(stopper *stop.Stopper) {
if log.V(1) {
log.Infof("monitoring cluster offset")
}
var monitorTimer util.Timer
defer monitorTimer.Stop()
for {
monitorTimer.Reset(monitorInterval)
select {
case <-stopper.ShouldStop():
return
case <-time.After(monitorInterval):
case <-monitorTimer.C:
monitorTimer.Read = true
offsetInterval, err := r.findOffsetInterval()
// By the contract of the hlc, if the value is 0, then safety checking
// of the max offset is disabled. However we may still want to
Expand Down
6 changes: 5 additions & 1 deletion server/raft_transport.go
Expand Up @@ -159,11 +159,15 @@ func (t *rpcTransport) processQueue(nodeID roachpb.NodeID, storeID roachpb.Store
done := make(chan *gorpc.Call, cap(ch))
var req *storage.RaftMessageRequest
protoResp := &storage.RaftMessageResponse{}
var raftIdleTimer util.Timer
defer raftIdleTimer.Stop()
for {
raftIdleTimer.Reset(raftIdleTimeout)
select {
case <-t.rpcContext.Stopper.ShouldStop():
return
case <-time.After(raftIdleTimeout):
case <-raftIdleTimer.C:
raftIdleTimer.Read = true
if log.V(1) {
log.Infof("closing Raft transport to %d due to inactivity", nodeID)
}
Expand Down
7 changes: 6 additions & 1 deletion storage/store_pool.go
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/stop"
Expand Down Expand Up @@ -186,6 +187,8 @@ func (sp *StorePool) storeGossipUpdate(_ string, content roachpb.Value) {
// heard from in longer than timeUntilStoreDead.
func (sp *StorePool) start(stopper *stop.Stopper) {
stopper.RunWorker(func() {
var timeoutTimer util.Timer
defer timeoutTimer.Stop()
for {
var timeout time.Duration
sp.mu.Lock()
Expand All @@ -210,8 +213,10 @@ func (sp *StorePool) start(stopper *stop.Stopper) {
}
}
sp.mu.Unlock()
timeoutTimer.Reset(timeout)
select {
case <-time.After(timeout):
case <-timeoutTimer.C:
timeoutTimer.Read = true
case <-stopper.ShouldStop():
return
}
Expand Down
79 changes: 79 additions & 0 deletions util/timer.go
@@ -0,0 +1,79 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Nathan VanBenschoten (nvanbenschoten@gmail.com)

package util

import "time"

// The Timer type represents a single event. When the Timer expires,
// the current time will be sent on Timer.C.
//
// This timer implementation is an abstraction around the standard
// library's time.Timer that provides a temporary workaround for the
// issue described in https://github.com/golang/go/issues/14038. As
// such, this timer should only be used when Reset is planned to
// be called continually in a loop. For this Reset pattern to work,
// Timer.Read must be set to true whenever a timestamp is read from
// the Timer.C channel. If Timer.Read is not set to true when the
// channel is read from, the next call to Timer.Reset will deadlock.
// This pattern looks something like:
//
// var timer util.Timer
// defer timer.Stop()
// for {
// timer.Reset(wait)
// switch {
// case <-timer.C:
// timer.Read = true
// ...
// }
// }
//
// Note that unlike the standard library's Timer type, this Timer will
// not begin counting down until Reset is called for the first time, as
// there is no constructor function.
type Timer struct {
*time.Timer
Read bool
}

// Reset changes the timer to expire after duration d and returns
// the new value of the timer. This method includes the fix proposed
// in https://github.com/golang/go/issues/11513#issuecomment-157062583,
// but requires users of Timer to set Timer.Read to true whenever
// they successfully read from the Timer's channel. Reset operates on
// and returns a value so that Timer can be stack allocated.
func (t *Timer) Reset(d time.Duration) {
if t.Timer == nil {
t.Timer = time.NewTimer(d)
return
}
if !t.Timer.Reset(d) && !t.Read {
<-t.C
}
t.Read = false
}

// Stop prevents the Timer from firing. It returns true if the call stops
// the timer, false if the timer has already expired, been stopped previously,
// or had never been initialized with a call to Timer.Reset. Stop does not
// close the channel, to prevent a read from succeeding incorrectly.
func (t *Timer) Stop() bool {
if t.Timer == nil {
return false
}
return t.Timer.Stop()
}
127 changes: 127 additions & 0 deletions util/timer_test.go
@@ -0,0 +1,127 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Nathan VanBenschoten (nvanbenschoten@gmail.com)

package util

import (
"testing"
"time"
)

const timeStep = 10 * time.Millisecond

func TestTimerTimeout(t *testing.T) {
var timer Timer
defer func() {
if stopped := timer.Stop(); stopped {
t.Errorf("expected Stop to return false, got true")
}
}()
timer.Reset(timeStep)

<-timer.C
timer.Read = true

select {
case <-timer.C:
t.Errorf("expected timer to only timeout once after Reset; got two timeouts")
case <-time.After(5 * timeStep):
}
}

func TestTimerStop(t *testing.T) {
var timer Timer
timer.Reset(timeStep)
if stopped := timer.Stop(); !stopped {
t.Errorf("expected Stop to return true, got false")
}

select {
case <-timer.C:
t.Errorf("expected timer to stop after call to Stop; got timer that was not stopped")
case <-time.After(5 * timeStep):
}
}

func TestTimerUninitializedStopNoop(t *testing.T) {
var timer Timer
if stopped := timer.Stop(); stopped {
t.Errorf("expected Stop to return false when the timer was never reset, got true")
}
}

func TestTimerResetBeforeTimeout(t *testing.T) {
var timer Timer
defer timer.Stop()
timer.Reset(timeStep)

timer.Reset(timeStep)
<-timer.C
timer.Read = true

select {
case <-timer.C:
t.Errorf("expected timer to only timeout once after Reset; got two timeouts")
case <-time.After(5 * timeStep):
}
}

func TestTimerResetAfterTimeoutAndNoRead(t *testing.T) {
var timer Timer
defer timer.Stop()
timer.Reset(timeStep)

time.Sleep(2 * timeStep)

timer.Reset(timeStep)
<-timer.C
timer.Read = true

select {
case <-timer.C:
t.Errorf("expected timer to only timeout once after Reset; got two timeouts")
case <-time.After(5 * timeStep):
}
}

func TestTimerResetAfterTimeoutAndRead(t *testing.T) {
var timer Timer
defer timer.Stop()
timer.Reset(timeStep)

<-timer.C
timer.Read = true

timer.Reset(timeStep)
<-timer.C
timer.Read = true

select {
case <-timer.C:
t.Errorf("expected timer to only timeout once after Reset; got two timeouts")
case <-time.After(5 * timeStep):
}
}

func TestTimerMakesProgressInLoop(t *testing.T) {
var timer Timer
defer timer.Stop()
for i := 0; i < 5; i++ {
timer.Reset(timeStep)
<-timer.C
timer.Read = true
}
}

0 comments on commit 96381b5

Please sign in to comment.