Skip to content

Commit

Permalink
Merge pull request #8913 from RaduBerinde/eventlog-sync
Browse files Browse the repository at this point in the history
log: allow Finishing an EventLog concurrently with events
  • Loading branch information
RaduBerinde committed Aug 30, 2016
2 parents b35e006 + 5e77c62 commit 4500007
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 51 deletions.
6 changes: 2 additions & 4 deletions gossip/gossip.go
Expand Up @@ -65,7 +65,6 @@ import (
"google.golang.org/grpc"

"golang.org/x/net/context"
"golang.org/x/net/trace"

"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
Expand Down Expand Up @@ -205,8 +204,7 @@ func New(
stopper *stop.Stopper,
registry *metric.Registry,
) *Gossip {
eventLog := trace.NewEventLog("gossip", "gossip")
ctx = log.WithEventLog(ctx, eventLog)
ctx = log.WithEventLog(ctx, "gossip", "gossip")
g := &Gossip{
ctx: ctx,
Connected: make(chan struct{}),
Expand All @@ -224,7 +222,7 @@ func New(
bootstrapAddrs: map[util.UnresolvedAddr]struct{}{},
}
stopper.AddCloser(stop.CloserFn(func() {
eventLog.Finish()
log.FinishEventLog(ctx)
}))

registry.AddMetric(g.outgoing.gauge)
Expand Down
2 changes: 1 addition & 1 deletion storage/gc_queue.go
Expand Up @@ -314,7 +314,7 @@ func (gcq *gcQueue) process(
return err
}

log.Infof(gcq.mu.ctx, "completed with stats %+v", info)
log.Infof(gcq.ctx, "completed with stats %+v", info)

var ba roachpb.BatchRequest
var gcArgs roachpb.GCRequest
Expand Down
57 changes: 25 additions & 32 deletions storage/queue.go
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"golang.org/x/net/context"
"golang.org/x/net/trace"

"github.com/cockroachdb/cockroach/config"
"github.com/cockroachdb/cockroach/gossip"
Expand Down Expand Up @@ -175,11 +174,8 @@ type baseQueue struct {
stopped bool
// Some tests in this package disable queues.
disabled bool
eventLog trace.EventLog
// ctx is protected by the mutex because it embeds a reference to the
// eventLog.
ctx context.Context
}
ctx context.Context
}

// makeBaseQueue returns a new instance of baseQueue with the
Expand All @@ -206,11 +202,10 @@ func makeBaseQueue(
bq.mu.Locker = new(syncutil.Mutex)
bq.mu.replicas = map[roachpb.RangeID]*replicaItem{}

bq.mu.eventLog = trace.NewEventLog("queue", name)
bq.mu.ctx = context.TODO()
bq.ctx = context.TODO()
// Prepend [name] to logs.
bq.mu.ctx = log.WithLogTag(bq.mu.ctx, name, nil)
bq.mu.ctx = log.WithEventLog(bq.mu.ctx, bq.mu.eventLog)
bq.ctx = log.WithLogTag(bq.ctx, name, nil)
bq.ctx = log.WithEventLog(bq.ctx, "queue", name)
return bq
}

Expand Down Expand Up @@ -279,20 +274,20 @@ func (bq *baseQueue) MaybeAdd(repl *Replica, now hlc.Timestamp) {
}

if !cfgOk {
log.VEvent(1, bq.mu.ctx, "no system config available. skipping")
log.VEvent(1, bq.ctx, "no system config available. skipping")
return
}

if requiresSplit {
// Range needs to be split due to zone configs, but queue does
// not accept unsplit ranges.
log.VEventf(1, bq.mu.ctx, "%s: split needed; not adding", repl)
log.VEventf(1, bq.ctx, "%s: split needed; not adding", repl)
return
}

should, priority := bq.impl.shouldQueue(now, repl, cfg)
if _, err := bq.addInternal(repl, should, priority); !isExpectedQueueError(err) {
log.Errorf(bq.mu.ctx, "unable to add %s: %s", repl, err)
log.Errorf(bq.ctx, "unable to add %s: %s", repl, err)
}
}

Expand All @@ -319,7 +314,7 @@ func (bq *baseQueue) addInternal(repl *Replica, should bool, priority float64) (
}

if bq.mu.disabled {
log.Event(bq.mu.ctx, "queue disabled")
log.Event(bq.ctx, "queue disabled")
return false, errQueueDisabled
}

Expand All @@ -331,21 +326,21 @@ func (bq *baseQueue) addInternal(repl *Replica, should bool, priority float64) (
item, ok := bq.mu.replicas[repl.RangeID]
if !should {
if ok {
log.Eventf(bq.mu.ctx, "%s: removing", item.value)
log.Eventf(bq.ctx, "%s: removing", item.value)
bq.remove(item)
}
return false, errReplicaNotAddable
} else if ok {
if item.priority != priority {
log.Eventf(bq.mu.ctx, "%s: updating priority: %0.3f -> %0.3f",
log.Eventf(bq.ctx, "%s: updating priority: %0.3f -> %0.3f",
repl, item.priority, priority)
}
// Replica has already been added; update priority.
bq.mu.priorityQ.update(item, priority)
return false, nil
}

log.VEventf(3, bq.mu.ctx, "%s: adding: priority=%0.3f", repl, priority)
log.VEventf(3, bq.ctx, "%s: adding: priority=%0.3f", repl, priority)
item = &replicaItem{value: repl.RangeID, priority: priority}
heap.Push(&bq.mu.priorityQ, item)
bq.mu.replicas[repl.RangeID] = item
Expand All @@ -370,12 +365,11 @@ func (bq *baseQueue) MaybeRemove(repl *Replica) {
defer bq.mu.Unlock()

if bq.mu.stopped {
// The eventLog is no longer available after we stopped.
return
}

if item, ok := bq.mu.replicas[repl.RangeID]; ok {
log.VEventf(3, bq.mu.ctx, "%s: removing", item.value)
log.VEventf(3, bq.ctx, "%s: removing", item.value)
bq.remove(item)
}
}
Expand All @@ -389,9 +383,8 @@ func (bq *baseQueue) processLoop(clock *hlc.Clock, stopper *stop.Stopper) {
defer func() {
bq.mu.Lock()
bq.mu.stopped = true
bq.mu.eventLog.Finish()
bq.mu.ctx = nil
bq.mu.Unlock()
log.FinishEventLog(bq.ctx)
}()

// nextTime is initially nil; we don't start any timers until the queue
Expand Down Expand Up @@ -448,14 +441,14 @@ func (bq *baseQueue) processReplica(repl *Replica, clock *hlc.Clock) error {
// Load the system config.
cfg, ok := bq.gossip.GetSystemConfig()
if !ok {
log.VEventf(1, bq.mu.ctx, "no system config available. skipping")
log.VEventf(1, bq.ctx, "no system config available. skipping")
return nil
}

if bq.requiresSplit(cfg, repl) {
// Range needs to be split due to zone configs, but queue does
// not accept unsplit ranges.
log.VEventf(3, bq.mu.ctx, "%s: split needed; skipping", repl)
log.VEventf(3, bq.ctx, "%s: split needed; skipping", repl)
return nil
}

Expand All @@ -471,20 +464,20 @@ func (bq *baseQueue) processReplica(repl *Replica, clock *hlc.Clock) error {
// Create a "fake" get request in order to invoke redirectOnOrAcquireLease.
if err := repl.redirectOnOrAcquireLease(ctx); err != nil {
if _, harmless := err.GetDetail().(*roachpb.NotLeaseHolderError); harmless {
log.VEventf(3, bq.mu.ctx, "%s: not holding lease; skipping", repl)
log.VEventf(3, bq.ctx, "%s: not holding lease; skipping", repl)
return nil
}
return errors.Wrapf(err.GoError(), "%s: could not obtain lease", repl)
}
log.Trace(ctx, "got range lease")
}

log.VEventf(3, bq.mu.ctx, "%s: processing", repl)
log.VEventf(3, bq.ctx, "%s: processing", repl)
start := timeutil.Now()
if err := bq.impl.process(ctx, clock.Now(), repl, cfg); err != nil {
return err
}
log.VEventf(2, bq.mu.ctx, "%s: done: %s", repl, timeutil.Since(start))
log.VEventf(2, bq.ctx, "%s: done: %s", repl, timeutil.Since(start))
log.Trace(ctx, "done")
return nil
}
Expand All @@ -498,7 +491,7 @@ func (bq *baseQueue) processReplica(repl *Replica, clock *hlc.Clock) error {
func (bq *baseQueue) maybeAddToPurgatory(repl *Replica, err error, clock *hlc.Clock, stopper *stop.Stopper) {
// Check whether the failure is a purgatory error and whether the queue supports it.
if _, ok := err.(purgatoryError); !ok || bq.impl.purgatoryChan() == nil {
log.Errorf(bq.mu.ctx, "on %s: %s", repl, err)
log.Errorf(bq.ctx, "on %s: %s", repl, err)
return
}
bq.mu.Lock()
Expand All @@ -509,7 +502,7 @@ func (bq *baseQueue) maybeAddToPurgatory(repl *Replica, err error, clock *hlc.Cl
return
}

log.Errorf(bq.mu.ctx, "(purgatory) on %s: %s", repl, err)
log.Errorf(bq.ctx, "(purgatory) on %s: %s", repl, err)

item := &replicaItem{value: repl.RangeID}
bq.mu.replicas[repl.RangeID] = item
Expand Down Expand Up @@ -542,7 +535,7 @@ func (bq *baseQueue) maybeAddToPurgatory(repl *Replica, err error, clock *hlc.Cl
for _, id := range ranges {
repl, err := bq.store.GetReplica(id)
if err != nil {
log.Errorf(bq.mu.ctx, "range %s no longer exists on store: %s", id, err)
log.Errorf(bq.ctx, "range %s no longer exists on store: %s", id, err)
return
}
if stopper.RunTask(func() {
Expand All @@ -555,7 +548,7 @@ func (bq *baseQueue) maybeAddToPurgatory(repl *Replica, err error, clock *hlc.Cl
}
bq.mu.Lock()
if len(bq.mu.purgatory) == 0 {
log.Infof(bq.mu.ctx, "purgatory is now empty")
log.Infof(bq.ctx, "purgatory is now empty")
bq.mu.purgatory = nil
bq.mu.Unlock()
return
Expand All @@ -570,7 +563,7 @@ func (bq *baseQueue) maybeAddToPurgatory(repl *Replica, err error, clock *hlc.Cl
}
bq.mu.Unlock()
for errStr, count := range errMap {
log.Errorf(bq.mu.ctx, "%d replicas failing with %q", count, errStr)
log.Errorf(bq.ctx, "%d replicas failing with %q", count, errStr)
}
case <-stopper.ShouldStop():
return
Expand All @@ -595,7 +588,7 @@ func (bq *baseQueue) pop() *Replica {

repl, err := bq.store.GetReplica(item.value)
if err != nil {
log.Errorf(bq.mu.ctx, "range %s no longer exists on store: %s", item.value, err)
log.Errorf(bq.ctx, "range %s no longer exists on store: %s", item.value, err)
return nil
}
return repl
Expand All @@ -619,7 +612,7 @@ func (bq *baseQueue) DrainQueue(clock *hlc.Clock) {
repl := bq.pop()
for repl != nil {
if err := bq.processReplica(repl, clock); err != nil {
log.Errorf(bq.mu.ctx, "failed processing replica %s: %s", repl, err)
log.Errorf(bq.ctx, "failed processing replica %s: %s", repl, err)
}
repl = bq.pop()
}
Expand Down
49 changes: 38 additions & 11 deletions util/log/trace.go
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"

"github.com/cockroachdb/cockroach/util/syncutil"
opentracing "github.com/opentracing/opentracing-go"
"golang.org/x/net/context"
"golang.org/x/net/trace"
Expand All @@ -31,40 +32,62 @@ type ctxEventLogKey struct{}

// ctxEventLog is used for contexts to keep track of an EventLog.
type ctxEventLog struct {
syncutil.Mutex
eventLog trace.EventLog
}

// WithEventLog embeds a trace.EventLog in the context, causing future logging
// and event calls to go to the EventLog. The current context must not have an
// open span already.
func WithEventLog(ctx context.Context, eventLog trace.EventLog) context.Context {
// withEventLogInternal embeds a trace.EventLog in the context, causing future
// logging and event calls to go to the EventLog. The current context must not
// have an existing open span.
func withEventLogInternal(ctx context.Context, eventLog trace.EventLog) context.Context {
if opentracing.SpanFromContext(ctx) != nil {
panic("event log under span")
}
val := &ctxEventLog{eventLog: eventLog}
return context.WithValue(ctx, ctxEventLogKey{}, val)
}

// WithEventLog creates and embeds a trace.EventLog in the context, causing
// future logging and event calls to go to the EventLog. The current context
// must not have an existing open span.
func WithEventLog(ctx context.Context, family, title string) context.Context {
return withEventLogInternal(ctx, trace.NewEventLog(family, title))
}

func eventLogFromCtx(ctx context.Context) *ctxEventLog {
if val := ctx.Value(ctxEventLogKey{}); val != nil {
return val.(*ctxEventLog)
}
return nil
}

// FinishEventLog closes the event log in the context (see WithEventLog).
// Concurrent and subsequent calls to record events are allowed.
func FinishEventLog(ctx context.Context) {
if el := eventLogFromCtx(ctx); el != nil {
el.Lock()
if el.eventLog != nil {
el.eventLog.Finish()
el.eventLog = nil
}
el.Unlock()
}
}

var noopTracer opentracing.NoopTracer

// getSpanOrEventLog returns the current Span. If there is no Span, it returns
// the current EventLog. If neither (or the Span is NoopTracer), returns false.
func getSpanOrEventLog(ctx context.Context) (opentracing.Span, trace.EventLog, bool) {
// the current ctxEventLog. If neither (or the Span is NoopTracer), returns
// false.
func getSpanOrEventLog(ctx context.Context) (opentracing.Span, *ctxEventLog, bool) {
if sp := opentracing.SpanFromContext(ctx); sp != nil {
if sp.Tracer() == noopTracer {
return nil, nil, false
}
return sp, nil, true
}
if el := eventLogFromCtx(ctx); el != nil {
return nil, el.eventLog, true
return nil, el, true
}
return nil, nil, false
}
Expand Down Expand Up @@ -101,11 +124,15 @@ func eventInternal(ctx context.Context, isErr, withTags bool, format string, arg
// Tag or Baggage on the span. See #8827 for more discussion.
}
} else {
if isErr {
el.Errorf("%s", msg)
} else {
el.Printf("%s", msg)
el.Lock()
if el.eventLog != nil {
if isErr {
el.eventLog.Errorf("%s", msg)
} else {
el.eventLog.Printf("%s", msg)
}
}
el.Unlock()
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions util/log/trace_test.go
Expand Up @@ -126,7 +126,7 @@ func TestEventLog(t *testing.T) {
Event(ctx, "should-not-show-up")

el := &testingEventLog{}
ctxWithEventLog := WithEventLog(ctx, el)
ctxWithEventLog := withEventLogInternal(ctx, el)

Eventf(ctxWithEventLog, "test%d", 1)
ErrEvent(ctxWithEventLog, "testerr")
Expand All @@ -137,7 +137,10 @@ func TestEventLog(t *testing.T) {
// Events to parent context should still be no-ops.
Event(ctx, "should-not-show-up")

el.Finish()
FinishEventLog(ctxWithEventLog)

// Events after Finish should be ignored.
Errorf(ctxWithEventLog, "should-not-show-up")

expected := "[test1 testerr(err) test2 log errlog1(err) finish]"
if evStr := fmt.Sprint(el.ev); evStr != expected {
Expand All @@ -152,7 +155,7 @@ func TestEventLogAndTrace(t *testing.T) {
Event(ctx, "should-not-show-up")

el := &testingEventLog{}
ctxWithEventLog := WithEventLog(ctx, el)
ctxWithEventLog := withEventLogInternal(ctx, el)

Event(ctxWithEventLog, "test1")
ErrEvent(ctxWithEventLog, "testerr")
Expand Down

0 comments on commit 4500007

Please sign in to comment.