Skip to content

Commit

Permalink
Merge pull request #14342 from cockroachdb/spencerkimball/cmd-queue-t…
Browse files Browse the repository at this point in the history
…imestamps

storage: augment command queue with timestamps for non-interfering reads
  • Loading branch information
spencerkimball committed Mar 24, 2017
2 parents a358892 + 81ce3af commit 6c1b3f1
Show file tree
Hide file tree
Showing 4 changed files with 471 additions and 112 deletions.
132 changes: 99 additions & 33 deletions pkg/storage/command_queue.go
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/interval"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
Expand All @@ -33,7 +34,7 @@ import (
// executing commands. New commands affecting keys or key ranges must
// wait on already-executing commands which overlap their key range.
//
// Before executing, a command invokes GetWait() to acquire a slice of
// Before executing, a command invokes getWait() to acquire a slice of
// channels belonging to overlapping commands which are already
// running. Each channel is waited on by the caller for confirmation
// that all overlapping, pending commands have completed and the
Expand All @@ -42,7 +43,7 @@ import (
// After waiting, a command is added to the queue's already-executing
// set via add(). add accepts a parameter indicating whether the
// command is read-only. Read-only commands don't need to wait on other
// read-only commands, so the channels returned via GetWait() don't
// read-only commands, so the channels returned via getWait() don't
// include read-only on read-only overlapping commands as an
// optimization.
//
Expand All @@ -55,8 +56,8 @@ type CommandQueue struct {
reads interval.Tree
writes interval.Tree
idAlloc int64
wRg, rwRg interval.RangeGroup // avoids allocating in GetWait
oHeap overlapHeap // avoids allocating in GetWait
wRg, rwRg interval.RangeGroup // avoids allocating in getWait
oHeap overlapHeap // avoids allocating in getWait
overlaps []*cmd // avoids allocating in getOverlaps

coveringOptimization bool // if true, use covering span optimization
Expand All @@ -71,12 +72,13 @@ type CommandQueue struct {
}

type cmd struct {
id int64
key interval.Range
readOnly bool
expanded bool // have the children been added
pending chan struct{} // closed when complete
children []cmd
id int64
key interval.Range
readOnly bool
timestamp hlc.Timestamp
expanded bool // have the children been added
pending chan struct{} // closed when complete
children []cmd
}

// ID implements interval.Interface.
Expand Down Expand Up @@ -195,13 +197,19 @@ func (cq *CommandQueue) expand(c *cmd, isInserted bool) bool {
return true
}

// GetWait returns a slice of the pending channels of executing commands which
// overlap the specified key ranges. If an end key is empty, it only affects
// the start key. The caller should call wg.Wait() to wait for confirmation
// that all gating commands have completed or failed, and then call add() to
// add the keys to the command queue. readOnly is true if the requester is a
// read-only command; false for read-write.
func (cq *CommandQueue) getWait(readOnly bool, spans []roachpb.Span) (chans []<-chan struct{}) {
// getWait returns a slice of the pending channels of executing
// commands which overlap the specified key ranges. The caller should
// call wg.Wait() to fetch the required wait channels. The caller
// should then invoke add() to add the keys to the command queue and
// then wait for confirmation that all gating commands have completed
// or failed. readOnly is true if the requester is a read-only
// command; false for read-write. The provided timestamp, if non-zero,
// is used to allow reads to proceed if they are at earlier timestamps
// than pending writes, and writes to proceed if they are at later
// timestamps than pending reads.
func (cq *CommandQueue) getWait(
readOnly bool, timestamp hlc.Timestamp, spans []roachpb.Span,
) (chans []<-chan struct{}) {
prepareSpans(spans)

for i := 0; i < len(spans); i++ {
Expand All @@ -214,7 +222,7 @@ func (cq *CommandQueue) getWait(readOnly bool, spans []roachpb.Span) (chans []<-
Start: interval.Comparable(start),
End: interval.Comparable(end),
}
overlaps := cq.getOverlaps(readOnly, newCmdRange.Start, newCmdRange.End)
overlaps := cq.getOverlaps(readOnly, timestamp, newCmdRange.Start, newCmdRange.End)

// Check to see if any of the overlapping entries are "covering"
// entries. If we encounter a covering entry, we remove it from the
Expand Down Expand Up @@ -260,7 +268,7 @@ func (cq *CommandQueue) getWait(readOnly bool, spans []roachpb.Span) (chans []<-
// Instead of having each command establish explicit dependencies on all previous
// overlapping commands, each command only needs to establish explicit dependencies
// on the set of overlapping commands closest to the new command that together span
// the new commands overlapped range. Following this strategy, the other dependencies
// the new command's overlapped range. Following this strategy, the other dependencies
// will be implicitly enforced, which reduces memory utilization and synchronization
// costs.
//
Expand All @@ -269,7 +277,7 @@ func (cq *CommandQueue) getWait(readOnly bool, spans []roachpb.Span) (chans []<-
// "later" read won't wait for the earlier read to complete). However, if that read is
// covered by a "later" write, we don't need to wait because writes can't be reordered.
//
// Two example of how this logic works are shown below. Notice in the first example how
// Two examples of how this logic works are shown below. Notice in the first example how
// the overlapping reads do not establish dependencies on each other, and can therefore
// be reordered. Also notice in the second example that once read command 4 overlaps
// a "later" write, it no longer needs to be a dependency for the new write command 5.
Expand All @@ -288,24 +296,64 @@ func (cq *CommandQueue) getWait(readOnly bool, spans []roachpb.Span) (chans []<-
// | | | | |
// cmd 5 [W]: ==================== ====================
//
// Things get more interesting with timestamps:
// -------------------------------------------
// - For a read-only command, overlaps will include only writes which have occurred
// with earlier timestamps. Because writes all must depend on each other, things
// work as expected.
//
// - Write commands overlap both reads and writes. The writes that a write command
// overlaps will depend reliably on each other if they in turn overlap. However, reads
// that a write command overlaps may not in turn be depended on by overlapping writes,
// if the reads have earlier timestamps. This means that writes don't necessarily
// subsume overlapping reads.
//
// We solve this problem by always including read commands with timestamps less than
// the latest write timestamp seen so far, which guarantees that we will wait on any
// reads which might not be dependend on by writes with higher IDs. Similarly, we
// include write commands with timestamps greater than or equal to the earliest
// read timestamp seen so far.
//
// TODO(spencer): this mechanism is a blunt instrument and will lead to reads rarely
// being consolidated because of range group overlaps.
maxWriteTS, minReadTS := hlc.Timestamp{}, hlc.MaxTimestamp
cq.oHeap.Init(overlaps)
for enclosed := false; cq.oHeap.Len() > 0 && !enclosed; {
cmd := cq.oHeap.PopOverlap()
keyRange := cmd.key
cmdHasTimestamp := cmd.timestamp != hlc.Timestamp{}
mustWait := false

if cmd.readOnly {
if cmdHasTimestamp {
if cmd.timestamp.Less(minReadTS) {
minReadTS = cmd.timestamp
}
if cmd.timestamp.Less(maxWriteTS) {
mustWait = true
}
}
// If the current overlap is a read (meaning we're a write because other reads will
// be filtered out if we're a read as well), we only need to wait if the write RangeGroup
// doesn't already overlap the read. Otherwise, we know that this current read is a dependent
// itself to a command already accounted for in out write RangeGroup. Either way, we need to add
// itself to a command already accounted for in our write RangeGroup. Either way, we need to add
// this current command to the combined RangeGroup.
cq.rwRg.Add(keyRange)
if !cq.wRg.Overlaps(keyRange) {
if mustWait || !cq.wRg.Overlaps(keyRange) {
if cmd.pending == nil {
cmd.pending = make(chan struct{})
}
chans = append(chans, cmd.pending)
}
} else {
if cmdHasTimestamp {
if maxWriteTS.Less(cmd.timestamp) {
maxWriteTS = cmd.timestamp
}
if minReadTS.Less(cmd.timestamp) {
mustWait = true
}
}
// If the current overlap is a write, pick which RangeGroup will be used to determine necessary
// dependencies based on if we are a read or write.
overlapRg := cq.wRg
Expand All @@ -323,7 +371,7 @@ func (cq *CommandQueue) getWait(readOnly bool, spans []roachpb.Span) (chans []<-
// any other reads or writes in its future. If it is overlapping, we know there was already a
// dependency established with a dependent of the current overlap, meaning we already established
// an implicit transitive dependency to the current overlap.
if !overlapRg.Overlaps(keyRange) {
if mustWait || !overlapRg.Overlaps(keyRange) {
if cmd.pending == nil {
cmd.pending = make(chan struct{})
}
Expand Down Expand Up @@ -367,26 +415,42 @@ func (cq *CommandQueue) getWait(readOnly bool, spans []roachpb.Span) (chans []<-

// getOverlaps returns a slice of values which overlap the specified
// interval. The slice is only valid until the next call to GetOverlaps.
func (cq *CommandQueue) getOverlaps(readOnly bool, start, end []byte) []*cmd {
func (cq *CommandQueue) getOverlaps(
readOnly bool, timestamp hlc.Timestamp, start, end []byte,
) []*cmd {
rng := interval.Range{
Start: interval.Comparable(start),
End: interval.Comparable(end),
}
if !readOnly {
cq.reads.DoMatching(cq.doOverlaps, rng)
cq.reads.DoMatching(func(i interval.Interface) bool {
c := i.(*cmd)
// Writes only wait on equal or later reads (we always wait
// if the pending read didn't have a timestamp specified).
if (c.timestamp == hlc.Timestamp{}) || !c.timestamp.Less(timestamp) {
cq.overlaps = append(cq.overlaps, c)
}
return false
}, rng)
}
cq.writes.DoMatching(cq.doOverlaps, rng)
// Both reads and writes must wait on other writes, depending on timestamps.
cq.writes.DoMatching(func(i interval.Interface) bool {
c := i.(*cmd)
// Writes always wait on other writes. Reads must wait on writes
// which occur at the same or an earlier timestamp. Note that
// timestamps for write commands may be pushed forward by the
// timestamp cache. This is fine because it doesn't matter how far
// forward the timestamp is pushed if it's already ahead of this read.
if !readOnly || (timestamp == hlc.Timestamp{}) || !timestamp.Less(c.timestamp) {
cq.overlaps = append(cq.overlaps, c)
}
return false
}, rng)
overlaps := cq.overlaps
cq.overlaps = cq.overlaps[:0]
return overlaps
}

func (cq *CommandQueue) doOverlaps(i interval.Interface) bool {
c := i.(*cmd)
cq.overlaps = append(cq.overlaps, c)
return false
}

// overlapHeap is a max-heap of cache.Overlaps, sorting the elements
// in decreasing Value.(*cmd).id order.
type overlapHeap []*cmd
Expand Down Expand Up @@ -434,7 +498,7 @@ func (o *overlapHeap) PopOverlap() *cmd {
//
// add should be invoked after waiting on already-executing, overlapping
// commands via the WaitGroup initialized through getWait().
func (cq *CommandQueue) add(readOnly bool, spans []roachpb.Span) *cmd {
func (cq *CommandQueue) add(readOnly bool, timestamp hlc.Timestamp, spans []roachpb.Span) *cmd {
if len(spans) == 0 {
return nil
}
Expand Down Expand Up @@ -474,6 +538,7 @@ func (cq *CommandQueue) add(readOnly bool, spans []roachpb.Span) *cmd {
End: interval.Comparable(maxKey),
}
cmd.readOnly = readOnly
cmd.timestamp = timestamp
cmd.expanded = false

if len(spans) > 1 {
Expand All @@ -487,6 +552,7 @@ func (cq *CommandQueue) add(readOnly bool, spans []roachpb.Span) *cmd {
End: interval.Comparable(span.EndKey),
}
child.readOnly = readOnly
child.timestamp = timestamp
child.expanded = true
}
}
Expand Down

0 comments on commit 6c1b3f1

Please sign in to comment.