Skip to content

Commit

Permalink
Merge branch 'master' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
tamird committed Aug 30, 2016
2 parents 4500007 + 280679f commit 860b83b
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 18 deletions.
4 changes: 2 additions & 2 deletions GLOCKFILE
Expand Up @@ -44,7 +44,7 @@ github.com/cockroachdb/pq 40c6b2414c76cdb84aacc955f79dc844e48ad0c0
github.com/cockroachdb/stress 029c9348806514969d1109a6ae36e521af411ca7
github.com/cockroachdb/yacc 7c99dfd2164a5d23c3f495ffc2e0b72b6379d066
github.com/codahale/hdrhistogram f8ad88b59a584afeee9d334eff879b104439117b
github.com/coreos/etcd fb64c8ccfeb9408b80c3f657a52ec6cad2fdb3f8
github.com/coreos/etcd 48f4a7d037ca8fd276fff09ef068074193b24dfa
github.com/cpuguy83/go-md2man 2724a9c9051aa62e9cca11304e7dd518e9e41599
github.com/davecgh/go-spew 6cf5744a041a0022271cefed95ba843f6d87fd51
github.com/docker/distribution c810308d1bf3051521dd63cc1cdda03be9f11327
Expand Down Expand Up @@ -110,7 +110,7 @@ golang.org/x/sys a646d33e2ee3172a661fc09bca23bb4889a41bc8
golang.org/x/text d69c40b4be55797923cec7457fac7a244d91a9b6
golang.org/x/tools 08b1e0510c4cfc628a52f0ce641d027b4b2cebe0
google.golang.org/appengine e951d3868b377b14f4e60efa3a301532ee3c1ebf
google.golang.org/grpc b7aa4e95cbb5ec9d07c52a7541ce178ef9626316
google.golang.org/grpc 79b7c349179cdd6efd8bac4a1ce7f01b98c16e9b
gopkg.in/check.v1 4f90aeace3a26ad7021961c297b22c42160c7b25
gopkg.in/inf.v0 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4
gopkg.in/yaml.v2 e4d366fc3c7938e2958e662b4258c7a89e1f0e3e
Expand Down
4 changes: 2 additions & 2 deletions build/cockroach.rb
Expand Up @@ -4,8 +4,8 @@ class Cockroach < Formula
desc "Distributed SQL database"
homepage "https://www.cockroachlabs.com"
url "https://github.com/cockroachdb/cockroach.git",
:tag => "beta-20160728",
:revision => "844e419503ab060aa091c40a7126cb6766fb6621"
:tag => "beta-20160829",
:revision => "ce2bc501f35e5a0d5707fd11d88ca28224aa34b9"
head "https://github.com/cockroachdb/cockroach.git"

depends_on "go" => :build
Expand Down
27 changes: 22 additions & 5 deletions gossip/client.go
Expand Up @@ -19,6 +19,7 @@ package gossip
import (
"fmt"
"net"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -82,8 +83,19 @@ func (c *client) start(
) {
stopper.RunWorker(func() {
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
var wg sync.WaitGroup
defer func() {
// This closes the outgoing stream, causing any attempt to send or
// receive to return an error.
//
// Note: it is still possible for incoming gossip to be processed after
// this point.
cancel()

// The stream is closed, but there may still be some incoming gossip
// being processed. Wait until that is complete to avoid racing the
// client's removal against the discovery of its remote's node ID.
wg.Wait()
disconnected <- c
}()

Expand Down Expand Up @@ -111,7 +123,7 @@ func (c *client) start(

// Start gossiping.
log.Infof(ctx, "node %d: started gossip client to %s", nodeID, c.addr)
if err := c.gossip(ctx, g, stream, stopper); err != nil {
if err := c.gossip(ctx, g, stream, stopper, &wg); err != nil {
if !grpcutil.IsClosedConnection(err) {
g.mu.Lock()
peerID := c.peerID
Expand Down Expand Up @@ -238,9 +250,9 @@ func (c *client) handleResponse(g *Gossip, reply *Response) error {
// matches an incoming or the client is connecting to itself.
if g.mu.is.NodeID == c.peerID {
return errors.Errorf("stopping outgoing client to node %d (%s); loopback connection", c.peerID, c.addr)
} else if g.hasIncomingLocked(c.peerID) && g.mu.is.NodeID < c.peerID {
} else if g.hasIncomingLocked(c.peerID) && g.mu.is.NodeID > c.peerID {
// To avoid mutual shutdown, we only shutdown our client if our
// node ID is less than the peer's.
// node ID is higher than the peer's.
return errors.Errorf("stopping outgoing client to node %d (%s); already have incoming", c.peerID, c.addr)
}

Expand All @@ -250,7 +262,7 @@ func (c *client) handleResponse(g *Gossip, reply *Response) error {
// gossip loops, sending deltas of the infostore and receiving deltas
// in turn. If an alternate is proposed on response, the client addr
// is modified and method returns for forwarding by caller.
func (c *client) gossip(ctx context.Context, g *Gossip, stream Gossip_GossipClient, stopper *stop.Stopper) error {
func (c *client) gossip(ctx context.Context, g *Gossip, stream Gossip_GossipClient, stopper *stop.Stopper, wg *sync.WaitGroup) error {
sendGossipChan := make(chan struct{}, 1)

// Register a callback for gossip updates.
Expand All @@ -264,7 +276,12 @@ func (c *client) gossip(ctx context.Context, g *Gossip, stream Gossip_GossipClie
defer g.RegisterCallback(".*", updateCallback)()

errCh := make(chan error, 1)
// This wait group is used to allow the caller to wait until gossip
// processing is terminated.
wg.Add(1)
stopper.RunWorker(func() {
defer wg.Done()

errCh <- func() error {
for {
reply, err := stream.Recv()
Expand Down
4 changes: 2 additions & 2 deletions gossip/client_test.go
Expand Up @@ -334,15 +334,15 @@ func TestClientDisconnectRedundant(t *testing.T) {
// Check which of the clients is connected to the other.
ok1 := local.findClient(func(c *client) bool { return c.addr.String() == rAddr.String() }) != nil
ok2 := remote.findClient(func(c *client) bool { return c.addr.String() == lAddr.String() }) != nil
// We expect node 1 to disconnect; if both are still connected,
// We expect node 2 to disconnect; if both are still connected,
// it's possible that node 1 gossiped before node 2 connected, in
// which case we have to gossip from node 1 to trigger the
// disconnect redundant client code.
if ok1 && ok2 {
if err := local.AddInfo("local-key", nil, time.Second); err != nil {
t.Fatal(err)
}
} else if !ok1 && ok2 && verifyServerMaps(local, 1) && verifyServerMaps(remote, 0) {
} else if ok1 && !ok2 && verifyServerMaps(local, 0) && verifyServerMaps(remote, 1) {
return nil
}
return errors.New("local client to remote not yet closed as redundant")
Expand Down
6 changes: 3 additions & 3 deletions scripts/bootstrap-debian.sh
Expand Up @@ -3,9 +3,9 @@
# On a (recent enough) Debian/Ubuntu system, bootstraps a source Go install
# (with improved parallel build patches) and the cockroach repo.

set -euo pipefail
set -euxo pipefail

GOVERSION="1.7"
GOVERSION="${GOVERSION-1.7}"

cd "$(dirname "${0}")"

Expand All @@ -15,7 +15,7 @@ mkdir -p ~/go-bootstrap
curl "https://storage.googleapis.com/golang/go${GOVERSION}.linux-amd64.tar.gz" | tar -C ~/go-bootstrap -xvz --strip=1
curl "https://storage.googleapis.com/golang/go${GOVERSION}.src.tar.gz" | tar -C ~ -xvz

patch -p1 -d ../go < parallelbuilds-go1.7.patch
patch -p1 -d ../go < "parallelbuilds-go${GOVERSION}.patch"

(cd ~/go/src && GOROOT_BOOTSTRAP=~/go-bootstrap ./make.bash)

Expand Down
9 changes: 5 additions & 4 deletions scripts/gceslave.sh → scripts/gceworker.sh
Expand Up @@ -3,17 +3,18 @@
set -euxo pipefail

export CLOUDSDK_CORE_PROJECT=${CLOUDSDK_CORE_PROJECT-${GOOGLE_PROJECT-cockroach-$(id -un)}}
export CLOUDSDK_COMPUTE_ZONE=${GCESLAVE_ZONE-${CLOUDSDK_COMPUTE_ZONE-us-east1-b}}
export CLOUDSDK_COMPUTE_ZONE=${GCEWORKER_ZONE-${CLOUDSDK_COMPUTE_ZONE-us-east1-b}}
GOVERSION=${GOVERSION-1.7}

name=${GCESLAVE_NAME-gceslave}
name=${GCEWORKER_NAME-gceworker$(echo "${GOVERSION}" | tr -d '.')}

cd "$(dirname "${0}")"

case ${1-} in
create)
gcloud compute instances \
create "${name}" \
--machine-type "custom-32-65536" \
--machine-type "custom-32-32768" \
--network "default" \
--maintenance-policy "MIGRATE" \
--image "/debian-cloud/debian-8-jessie-v20160803" \
Expand All @@ -23,7 +24,7 @@ case ${1-} in
sleep 20 # avoid SSH timeout on copy-files

gcloud compute copy-files . "${name}:scripts"
gcloud compute ssh "${name}" ./scripts/bootstrap-debian.sh
gcloud compute ssh "${name}" "GOVERSION=${GOVERSION} ./scripts/bootstrap-debian.sh"
# Install automatic shutdown after ten minutes of operation without a
# logged in user. To disable this, `sudo touch /.active`.
# This is much more intricate than it looks. A few complications which
Expand Down
125 changes: 125 additions & 0 deletions scripts/parallelbuilds-go1.6.patch
@@ -0,0 +1,125 @@
diff --git a/src/cmd/go/build.go b/src/cmd/go/build.go
index f2a2a60..53c962c 100644
--- a/src/cmd/go/build.go
+++ b/src/cmd/go/build.go
@@ -694,6 +694,8 @@ type builder struct {
exec sync.Mutex
readySema chan bool
ready actionQueue
+
+ tasks chan func()
}

// An action represents a single action in the action graph.
@@ -1236,6 +1238,7 @@ func (b *builder) do(root *action) {
}

b.readySema = make(chan bool, len(all))
+ b.tasks = make(chan func(), buildP)

// Initialize per-action execution state.
for _, a := range all {
@@ -1312,6 +1315,8 @@ func (b *builder) do(root *action) {
a := b.ready.pop()
b.exec.Unlock()
handle(a)
+ case task := <-b.tasks:
+ task()
case <-interrupted:
setExitStatus(1)
return
@@ -3141,12 +3146,16 @@ func (b *builder) cgo(p *Package, cgoExe, obj string, pcCFLAGS, pcLDFLAGS, cgofi
staticLibs = []string{"-Wl,--start-group", "-lmingwex", "-lmingw32", "-Wl,--end-group"}
}

+ var tasks []func()
+ var results chan error
+
cflags := stringList(cgoCPPFLAGS, cgoCFLAGS)
for _, cfile := range cfiles {
+ cfile := cfile
ofile := obj + cfile[:len(cfile)-1] + "o"
- if err := b.gcc(p, ofile, cflags, obj+cfile); err != nil {
- return nil, nil, err
- }
+ tasks = append(tasks, func() {
+ results <- b.gcc(p, ofile, cflags, obj+cfile)
+ })
linkobj = append(linkobj, ofile)
if !strings.HasSuffix(ofile, "_cgo_main.o") {
outObj = append(outObj, ofile)
@@ -3154,35 +3163,65 @@ func (b *builder) cgo(p *Package, cgoExe, obj string, pcCFLAGS, pcLDFLAGS, cgofi
}

for _, file := range gccfiles {
+ file := file
ofile := obj + cgoRe.ReplaceAllString(file[:len(file)-1], "_") + "o"
- if err := b.gcc(p, ofile, cflags, file); err != nil {
- return nil, nil, err
- }
+ tasks = append(tasks, func() {
+ results <- b.gcc(p, ofile, cflags, file)
+ })
linkobj = append(linkobj, ofile)
outObj = append(outObj, ofile)
}

cxxflags := stringList(cgoCPPFLAGS, cgoCXXFLAGS)
for _, file := range gxxfiles {
+ file := file
// Append .o to the file, just in case the pkg has file.c and file.cpp
ofile := obj + cgoRe.ReplaceAllString(file, "_") + ".o"
- if err := b.gxx(p, ofile, cxxflags, file); err != nil {
- return nil, nil, err
- }
+ tasks = append(tasks, func() {
+ results <- b.gxx(p, ofile, cxxflags, file)
+ })
linkobj = append(linkobj, ofile)
outObj = append(outObj, ofile)
}

for _, file := range mfiles {
+ file := file
// Append .o to the file, just in case the pkg has file.c and file.m
ofile := obj + cgoRe.ReplaceAllString(file, "_") + ".o"
- if err := b.gcc(p, ofile, cflags, file); err != nil {
- return nil, nil, err
- }
+ tasks = append(tasks, func() {
+ results <- b.gcc(p, ofile, cflags, file)
+ })
linkobj = append(linkobj, ofile)
outObj = append(outObj, ofile)
}

+ // Give the results channel enough capacity so that sending the
+ // result is guaranteed not to block.
+ results = make(chan error, len(tasks))
+
+ // Feed the tasks into the b.tasks channel on a separate goroutine
+ // because the b.tasks channel's limited capacity might cause
+ // sending the task to block.
+ go func() {
+ for _, task := range tasks {
+ b.tasks <- task
+ }
+ }()
+
+ // Loop until we've received results from all of our tasks or an
+ // error occurs.
+ for count := 0; count < len(tasks); {
+ select {
+ case err := <-results:
+ if err != nil {
+ return nil, nil, err
+ }
+ count++
+ case task := <-b.tasks:
+ task()
+ }
+ }
+
linkobj = append(linkobj, p.SysoFiles...)
dynobj := obj + "_cgo_.o"
pie := (goarch == "arm" && goos == "linux") || goos == "android"
85 changes: 85 additions & 0 deletions storage/replica_test.go
Expand Up @@ -1936,6 +1936,91 @@ func TestReplicaCommandQueueInconsistent(t *testing.T) {
// Success.
}

// TestReplicaCommandQueueCancellation verifies that commands which are
// waiting on the command queue do not execute if their context is cancelled.
func TestReplicaCommandQueueCancellation(t *testing.T) {
defer leaktest.AfterTest(t)()
// Intercept commands with matching command IDs and block them.
blockingStart := make(chan struct{})
blockingDone := make(chan struct{})

tc := testContext{}
tsc := TestStoreContext()
tsc.TestingKnobs.TestingCommandFilter =
func(filterArgs storagebase.FilterArgs) *roachpb.Error {
if filterArgs.Hdr.UserPriority == 42 {
blockingStart <- struct{}{}
<-blockingDone
}
return nil
}
tc.StartWithStoreContext(t, tsc)
defer tc.Stop()

defer close(blockingDone) // make sure teardown can happen

startBlockingCmd := func(ctx context.Context, keys ...roachpb.Key) <-chan *roachpb.Error {
done := make(chan *roachpb.Error)

if err := tc.stopper.RunAsyncTask(func() {
ba := roachpb.BatchRequest{
Header: roachpb.Header{
UserPriority: 42,
},
}
for _, key := range keys {
args := putArgs(key, nil)
ba.Add(&args)
}

_, pErr := tc.Sender().Send(ctx, ba)
done <- pErr
}); err != nil {
t.Fatal(err)
}

return done
}

key1 := roachpb.Key("one")
key2 := roachpb.Key("two")

// Wait until the command queue is blocked.
cmd1Done := startBlockingCmd(context.Background(), key1)
<-blockingStart

// Put a cancelled blocking command in the command queue. This command will
// block on the previous command, but will not itself reach the filter since
// its context is cancelled.
ctx, cancel := context.WithCancel(context.Background())
cancel()
cmd2Done := startBlockingCmd(ctx, key1, key2)

// Wait until both commands are in the command queue.
util.SucceedsSoon(t, func() error {
tc.rng.mu.Lock()
chans := tc.rng.mu.cmdQ.getWait(false, roachpb.Span{Key: key1}, roachpb.Span{Key: key2})
tc.rng.mu.Unlock()
if a, e := len(chans), 2; a < e {
return errors.Errorf("%d of %d commands in the command queue", a, e)
}
return nil
})

// Finish the previous command.
blockingDone <- struct{}{}
if pErr := <-cmd1Done; pErr != nil {
t.Fatal(pErr)
}

// If this deadlocks, the command has unexpectedly begun executing and was
// trapped in the command filter. Indeed, the absence of such a deadlock is
// what's being tested here.
if pErr := <-cmd2Done; !testutils.IsPError(pErr, context.Canceled.Error()) {
t.Fatal(pErr)
}
}

func SendWrapped(sender client.Sender, ctx context.Context, header roachpb.Header, args roachpb.Request) (roachpb.Response, roachpb.BatchResponse_Header, *roachpb.Error) {
var ba roachpb.BatchRequest
ba.Add(args)
Expand Down

0 comments on commit 860b83b

Please sign in to comment.