Skip to content

Commit

Permalink
storage: augment command queue with timestamps for non-interfering reads
Browse files Browse the repository at this point in the history
Previously, the command queue considered any two commands to overlap if the
key ranges overlapped, regardless of whether the timestamps on the two
commands guaranteed they were non-overlapping.

Consider the case of a read over key ranges "a" - "z" at timestamp t=1s.
If a write arrives for key "c" at timestamp > t=1s, then it should be
free to proceed regardless of the disposition of the read because the
read cannot affect the write's timestamp.

Similarly, if a write is active for key "c" at timestamp t=1s, a read
that arrives for key range "a" - "z" should be free to proceed as long
as its timestamp is < t=1s.

Note that local keys do not allow non-interference between earlier reads
and overlapping writes.

Fixes #14298
  • Loading branch information
spencerkimball committed Mar 23, 2017
1 parent bd92fbd commit b7e956b
Show file tree
Hide file tree
Showing 4 changed files with 429 additions and 83 deletions.
121 changes: 93 additions & 28 deletions pkg/storage/command_queue.go
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/interval"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
Expand All @@ -33,7 +34,7 @@ import (
// executing commands. New commands affecting keys or key ranges must
// wait on already-executing commands which overlap their key range.
//
// Before executing, a command invokes GetWait() to acquire a slice of
// Before executing, a command invokes getWait() to acquire a slice of
// channels belonging to overlapping commands which are already
// running. Each channel is waited on by the caller for confirmation
// that all overlapping, pending commands have completed and the
Expand All @@ -42,7 +43,7 @@ import (
// After waiting, a command is added to the queue's already-executing
// set via add(). add accepts a parameter indicating whether the
// command is read-only. Read-only commands don't need to wait on other
// read-only commands, so the channels returned via GetWait() don't
// read-only commands, so the channels returned via getWait() don't
// include read-only on read-only overlapping commands as an
// optimization.
//
Expand All @@ -55,8 +56,8 @@ type CommandQueue struct {
reads interval.Tree
writes interval.Tree
idAlloc int64
wRg, rwRg interval.RangeGroup // avoids allocating in GetWait
oHeap overlapHeap // avoids allocating in GetWait
wRg, rwRg interval.RangeGroup // avoids allocating in getWait
oHeap overlapHeap // avoids allocating in getWait
overlaps []*cmd // avoids allocating in getOverlaps

coveringOptimization bool // if true, use covering span optimization
Expand All @@ -71,12 +72,13 @@ type CommandQueue struct {
}

type cmd struct {
id int64
key interval.Range
readOnly bool
expanded bool // have the children been added
pending chan struct{} // closed when complete
children []cmd
id int64
key interval.Range
readOnly bool
timestamp hlc.Timestamp
expanded bool // have the children been added
pending chan struct{} // closed when complete
children []cmd
}

// ID implements interval.Interface.
Expand Down Expand Up @@ -195,13 +197,15 @@ func (cq *CommandQueue) expand(c *cmd, isInserted bool) bool {
return true
}

// GetWait returns a slice of the pending channels of executing commands which
// getWait returns a slice of the pending channels of executing commands which
// overlap the specified key ranges. If an end key is empty, it only affects
// the start key. The caller should call wg.Wait() to wait for confirmation
// that all gating commands have completed or failed, and then call add() to
// add the keys to the command queue. readOnly is true if the requester is a
// read-only command; false for read-write.
func (cq *CommandQueue) getWait(readOnly bool, spans []roachpb.Span) (chans []<-chan struct{}) {
func (cq *CommandQueue) getWait(
readOnly bool, timestamp hlc.Timestamp, spans []roachpb.Span,
) (chans []<-chan struct{}) {
prepareSpans(spans)

for i := 0; i < len(spans); i++ {
Expand All @@ -214,7 +218,7 @@ func (cq *CommandQueue) getWait(readOnly bool, spans []roachpb.Span) (chans []<-
Start: interval.Comparable(start),
End: interval.Comparable(end),
}
overlaps := cq.getOverlaps(readOnly, newCmdRange.Start, newCmdRange.End)
overlaps := cq.getOverlaps(readOnly, timestamp, newCmdRange.Start, newCmdRange.End)

// Check to see if any of the overlapping entries are "covering"
// entries. If we encounter a covering entry, we remove it from the
Expand Down Expand Up @@ -260,7 +264,7 @@ func (cq *CommandQueue) getWait(readOnly bool, spans []roachpb.Span) (chans []<-
// Instead of having each command establish explicit dependencies on all previous
// overlapping commands, each command only needs to establish explicit dependencies
// on the set of overlapping commands closest to the new command that together span
// the new commands overlapped range. Following this strategy, the other dependencies
// the new command's overlapped range. Following this strategy, the other dependencies
// will be implicitly enforced, which reduces memory utilization and synchronization
// costs.
//
Expand All @@ -269,7 +273,7 @@ func (cq *CommandQueue) getWait(readOnly bool, spans []roachpb.Span) (chans []<-
// "later" read won't wait for the earlier read to complete). However, if that read is
// covered by a "later" write, we don't need to wait because writes can't be reordered.
//
// Two example of how this logic works are shown below. Notice in the first example how
// Two examples of how this logic works are shown below. Notice in the first example how
// the overlapping reads do not establish dependencies on each other, and can therefore
// be reordered. Also notice in the second example that once read command 4 overlaps
// a "later" write, it no longer needs to be a dependency for the new write command 5.
Expand All @@ -288,24 +292,63 @@ func (cq *CommandQueue) getWait(readOnly bool, spans []roachpb.Span) (chans []<-
// | | | | |
// cmd 5 [W]: ==================== ====================
//
// Things get more interesting with timestamps:
// -------------------------------------------
// - For a read-only command, overlaps will include only writes which have occurred
// with earlier timestamps. Because writes all must depend on each other, things
// work as expected.
//
// - Write commands overlap both reads and writes. The writes that a write command
// overlaps will depend reliably on each other if they in turn overlap. However, reads
// that a write command overlaps may not in turn be depended on by overlapping writes,
// if the reads have earlier timestamps. This means that writes don't necessarily
// subsume overlapping reads.
//
// We solve this problem by always including read commands with timestamps less than
// the latest write timestamp seen so far, which guarantees that we will wait on any
// reads which might not be dependend on by writes with higher command IDs. Similarly,
// we include write commands with timestamps greater than or equal to the earliest
// read timestamp seen so far.
//
// TODO(spencer): this mechanism is a blunt instrument and will lead to reads rarely
// being consolidated because of range group overlaps.
maxWriteTS, minReadTS := hlc.Timestamp{}, hlc.MaxTimestamp
cq.oHeap.Init(overlaps)
for enclosed := false; cq.oHeap.Len() > 0 && !enclosed; {
cmd := cq.oHeap.PopOverlap()
keyRange := cmd.key
mustWait := false

if cmd.readOnly {
if (cmd.timestamp != hlc.Timestamp{}) {
if cmd.timestamp.Less(minReadTS) {
minReadTS = cmd.timestamp
}
if cmd.timestamp.Less(maxWriteTS) {
mustWait = true
}
}
// If the current overlap is a read (meaning we're a write because other reads will
// be filtered out if we're a read as well), we only need to wait if the write RangeGroup
// doesn't already overlap the read. Otherwise, we know that this current read is a dependent
// itself to a command already accounted for in out write RangeGroup. Either way, we need to add
// itself to a command already accounted for in our write RangeGroup. Either way, we need to add
// this current command to the combined RangeGroup.
cq.rwRg.Add(keyRange)
if !cq.wRg.Overlaps(keyRange) {
if !cq.wRg.Overlaps(keyRange) || mustWait {
if cmd.pending == nil {
cmd.pending = make(chan struct{})
}
chans = append(chans, cmd.pending)
}
} else {
if (cmd.timestamp != hlc.Timestamp{}) {
if maxWriteTS.Less(cmd.timestamp) {
maxWriteTS = cmd.timestamp
}
if minReadTS.Less(cmd.timestamp) {
mustWait = true
}
}
// If the current overlap is a write, pick which RangeGroup will be used to determine necessary
// dependencies based on if we are a read or write.
overlapRg := cq.wRg
Expand All @@ -323,7 +366,7 @@ func (cq *CommandQueue) getWait(readOnly bool, spans []roachpb.Span) (chans []<-
// any other reads or writes in its future. If it is overlapping, we know there was already a
// dependency established with a dependent of the current overlap, meaning we already established
// an implicit transitive dependency to the current overlap.
if !overlapRg.Overlaps(keyRange) {
if !overlapRg.Overlaps(keyRange) || mustWait {
if cmd.pending == nil {
cmd.pending = make(chan struct{})
}
Expand Down Expand Up @@ -367,26 +410,44 @@ func (cq *CommandQueue) getWait(readOnly bool, spans []roachpb.Span) (chans []<-

// getOverlaps returns a slice of values which overlap the specified
// interval. The slice is only valid until the next call to GetOverlaps.
func (cq *CommandQueue) getOverlaps(readOnly bool, start, end []byte) []*cmd {
func (cq *CommandQueue) getOverlaps(
readOnly bool, timestamp hlc.Timestamp, start, end []byte,
) []*cmd {
rng := interval.Range{
Start: interval.Comparable(start),
End: interval.Comparable(end),
}
if !readOnly {
cq.reads.DoMatching(cq.doOverlaps, rng)
cq.reads.DoMatching(func(i interval.Interface) bool {
c := i.(*cmd)
// Writes only wait on equal or later reads (we always wait
// if the pending read didn't have a timestamp specified).
if (c.timestamp == hlc.Timestamp{}) || (timestamp == hlc.Timestamp{}) ||
!c.timestamp.Less(timestamp) {
cq.overlaps = append(cq.overlaps, c)
}
return false
}, rng)
}
cq.writes.DoMatching(cq.doOverlaps, rng)
// Both reads and writes must wait on other writes, depending on timestamps.
cq.writes.DoMatching(func(i interval.Interface) bool {
c := i.(*cmd)
// Writes always wait on other writes. Reads must wait on writes
// which occur at the same or an earlier timestamp. Note that
// timestamps for write commands may be pushed forward by the
// timestamp cache. This is fine because it doesn't matter how far
// forward the timestamp is pushed if it's already ahead of this read.
if !readOnly || (c.timestamp == hlc.Timestamp{}) ||
(timestamp == hlc.Timestamp{}) || !timestamp.Less(c.timestamp) {
cq.overlaps = append(cq.overlaps, c)
}
return false
}, rng)
overlaps := cq.overlaps
cq.overlaps = cq.overlaps[:0]
return overlaps
}

func (cq *CommandQueue) doOverlaps(i interval.Interface) bool {
c := i.(*cmd)
cq.overlaps = append(cq.overlaps, c)
return false
}

// overlapHeap is a max-heap of cache.Overlaps, sorting the elements
// in decreasing Value.(*cmd).id order.
type overlapHeap []*cmd
Expand Down Expand Up @@ -434,7 +495,9 @@ func (o *overlapHeap) PopOverlap() *cmd {
//
// add should be invoked after waiting on already-executing, overlapping
// commands via the WaitGroup initialized through getWait().
func (cq *CommandQueue) add(readOnly bool, spans []roachpb.Span) *cmd {
func (cq *CommandQueue) add(
readOnly bool, timestamp hlc.Timestamp, spans []roachpb.Span,
) *cmd {
if len(spans) == 0 {
return nil
}
Expand Down Expand Up @@ -474,6 +537,7 @@ func (cq *CommandQueue) add(readOnly bool, spans []roachpb.Span) *cmd {
End: interval.Comparable(maxKey),
}
cmd.readOnly = readOnly
cmd.timestamp = timestamp
cmd.expanded = false

if len(spans) > 1 {
Expand All @@ -487,6 +551,7 @@ func (cq *CommandQueue) add(readOnly bool, spans []roachpb.Span) *cmd {
End: interval.Comparable(span.EndKey),
}
child.readOnly = readOnly
child.timestamp = timestamp
child.expanded = true
}
}
Expand Down

0 comments on commit b7e956b

Please sign in to comment.