Skip to content
Permalink
Browse files

Merge #38557 #38654 #38710

38557: exec: protect against unset syncFlowConsumer r=jordanlewis a=asubiotto

This should never happen since it implies that the receiver isn't
connected correctly. These happen when a node sends a SetupFlow request
to a remote node where the spec specifies that the response is on that
remote node. We don't see panics in the row execution engine due to
wrapping the syncFlowConsumer with a copyingRowReceiver, but this state
can cause setupVectorized to panic.

This commit protects against this state pending further investigation.

Release note: None

38654: exec: Handle NULLS in TopK sorter r=rohany a=rohany

This commit fixes NULLs in the TopK sorter by avoiding use
of the vec copy method, which has a bug. Instead, we add
a set method to the vec comparator, and use the templatized
comparator to perform the sets that the TopK sorter needs.

To facilitate this, we add an UnsetNull method to the Nulls
object. However, use of this method results in HasNull()
maybe returning true even if the vector doesn't have nulls.
This behavior already occurs when selection vectors are used.
Based on discussions with @solongordon and @asubiotto, this behavior
is OK, and future PR's will attempt to make this behavior better, and address
the bugs within the Vec Copy method.

38710: errors: fix the formatting with %+v r=knz a=knz

(found by @RaduBerinde; needed to complete #38570)

The new library `github.com/cockroachdb/errors` was not implementing
`%+v` formatting properly for assertion and unimplemented errors.
The wrong implementation was hiding the details of the cause
of these two error types from the formatting logic.

Fixing this bug comprehensively required completing the investigation
of the Go 2 / `xerrors` error proposal. This revealed that the
implementation of `fmt.Formatter` for wrapper errors (a `Format()`
method) is required in all cases, at least until Go's stdlib
learns about `errors.Formatter`. More details at
golang/go#29934 and this commit message: cockroachdb/errors@78b6caa.

This patch bumps the dependency `github.com/cockroachdb/errors` to
pick up the fixes to assertion failures and unimplemented errors.

The new definition of `errors.FormatError()` subsequently required
re-implemening `Format)` for `pgerros.withCandidateCode`, which is
also done here.

Finally, this patch also picks up `errors.As()` and the new
streamlined `fmt.Formatter` / `errors.Formatter` interaction, so this
patch also simplifies a few custom error types in CockroachDB
accordingly.

Release note: None

Co-authored-by: Alfonso Subiotto Marqués <alfonso@cockroachlabs.com>
Co-authored-by: Rohan Yadav <rohany@alumni.cmu.edu>
Co-authored-by: Raphael 'kena' Poss <knz@cockroachlabs.com>
  • Loading branch information
4 people committed Jul 8, 2019
4 parents a9dd587 + 08cc37d + 3c0aa0b + 3f7f688 commit 10a073ca369a9f93e89bfaeb36ffcbde31f7c890

Some generated files are not rendered by default. Learn more.

@@ -33,7 +33,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/workload/movr" // registers workloads
_ "github.com/cockroachdb/cockroach/pkg/workload/tpcc" // registers workloads
_ "github.com/cockroachdb/cockroach/pkg/workload/ycsb" // registers workloads
"github.com/pkg/errors"
"github.com/cockroachdb/errors"
"github.com/spf13/cobra"
)

@@ -67,8 +67,9 @@ func Main() {
if err := Run(os.Args[1:]); err != nil {
fmt.Fprintf(stderr, "Failed running %q\n", cmdName)
errCode = 1
if ec, ok := errors.Cause(err).(*cliError); ok {
errCode = ec.exitCode
var cliErr *cliError
if errors.As(err, &cliErr) {
errCode = cliErr.exitCode
}
}
os.Exit(errCode)
@@ -96,6 +97,20 @@ type cliError struct {

func (e *cliError) Error() string { return e.cause.Error() }

// Cause implements causer.
func (e *cliError) Cause() error { return e.cause }

// Format implements fmt.Formatter.
func (e *cliError) Format(s fmt.State, verb rune) { errors.FormatError(e, s, verb) }

// FormatError implements errors.Formatter.
func (e *cliError) FormatError(p errors.Printer) error {
if p.Detail() {
p.Printf("error with exit code: %d", e.exitCode)
}
return e.cause
}

// stderr aliases log.OrigStderr; we use an alias here so that tests
// in this package can redirect the output of CLI commands to stdout
// to be captured.
@@ -23,8 +23,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/errors"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -101,32 +101,32 @@ func MaybeDecorateGRPCError(
}

// Is this an "unable to connect" type of error?
unwrappedErr := errors.Cause(err)

if unwrappedErr == pq.ErrSSLNotSupported {
if errors.Is(err, pq.ErrSSLNotSupported) {
// SQL command failed after establishing a TCP connection
// successfully, but discovering that it cannot use TLS while it
// expected the server supports TLS.
return connInsecureHint()
}

switch wErr := unwrappedErr.(type) {
case *security.Error:
if wErr := (*security.Error)(nil); errors.As(err, &wErr) {
return errors.Errorf("cannot load certificates.\n"+
"Check your certificate settings, set --certs-dir, or use --insecure for insecure clusters.\n\n%v",
unwrappedErr)
err)
}

case *x509.UnknownAuthorityError:
if wErr := (*x509.UnknownAuthorityError)(nil); errors.As(err, &wErr) {
// A SQL connection was attempted with an incorrect CA.
return connSecurityHint()
}

case *initialSQLConnectionError:
if wErr := (*initialSQLConnectionError)(nil); errors.As(err, &wErr) {
// SQL handshake failed after establishing a TCP connection
// successfully, something else than CockroachDB responded, was
// confused and closed the door on us.
return connRefused()
}

case *pq.Error:
if wErr := (*pq.Error)(nil); errors.As(err, &wErr) {
// SQL commands will fail with a pq error but only after
// establishing a TCP connection successfully. So if we got
// here, there was a TCP connection already.
@@ -137,8 +137,9 @@ func MaybeDecorateGRPCError(
}
// Otherwise, there was a regular SQL error. Just report that.
return wErr
}

case *net.OpError:
if wErr := (*net.OpError)(nil); errors.As(err, &wErr) {
// A non-RPC client command was used (e.g. a SQL command) and an
// error occurred early while establishing the TCP connection.

@@ -150,8 +151,9 @@ func MaybeDecorateGRPCError(
return connSecurityHint()
}
return connFailed()
}

case *netutil.InitialHeartbeatFailedError:
if wErr := (*netutil.InitialHeartbeatFailedError)(nil); errors.As(err, &wErr) {
// A GRPC TCP connection was established but there was an early failure.
// Try to distinguish the cases.
msg := wErr.Error()
@@ -177,28 +179,27 @@ func MaybeDecorateGRPCError(
}

// Is it a plain context cancellation (i.e. timeout)?
switch unwrappedErr {
case context.DeadlineExceeded:
return opTimeout()
case context.Canceled:
if errors.IsAny(err,
context.DeadlineExceeded,
context.Canceled) {
return opTimeout()
}

// Is it a GRPC-observed context cancellation (i.e. timeout), a GRPC
// connection error, or a known indication of a too-old server?
if code := status.Code(unwrappedErr); code == codes.DeadlineExceeded {
if code := status.Code(errors.Cause(err)); code == codes.DeadlineExceeded {
return opTimeout()
} else if code == codes.Unimplemented &&
strings.Contains(unwrappedErr.Error(), "unknown method Decommission") ||
strings.Contains(unwrappedErr.Error(), "unknown service cockroach.server.serverpb.Init") {
strings.Contains(err.Error(), "unknown method Decommission") ||
strings.Contains(err.Error(), "unknown service cockroach.server.serverpb.Init") {
return fmt.Errorf(
"incompatible client and server versions (likely server version: v1.0, required: >=v1.1)")
} else if grpcutil.IsClosedConnection(unwrappedErr) {
} else if grpcutil.IsClosedConnection(err) {
return errors.Errorf("connection lost.\n\n%v", err)
}

// Does the server require GSSAPI authentication?
if strings.Contains(unwrappedErr.Error(), "pq: unknown authentication response: 7") {
if strings.Contains(err.Error(), "pq: unknown authentication response: 7") {
return fmt.Errorf(
"server requires GSSAPI authentication for this user.\n" +
"The CockroachDB CLI does not support GSSAPI authentication; use 'psql' instead")
@@ -34,8 +34,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/version"
"github.com/cockroachdb/errors"
"github.com/lib/pq"
"github.com/pkg/errors"
)

type sqlConnI interface {
@@ -74,6 +74,20 @@ type initialSQLConnectionError struct {
// Error implements the error interface.
func (i *initialSQLConnectionError) Error() string { return i.err.Error() }

// Cause implements causer.
func (i *initialSQLConnectionError) Cause() error { return i.err }

// Format implements fmt.Formatter.
func (i *initialSQLConnectionError) Format(s fmt.State, verb rune) { errors.FormatError(i, s, verb) }

// FormatError implements errors.Formatter.
func (i *initialSQLConnectionError) FormatError(p errors.Printer) error {
if p.Detail() {
p.Print("error while establishing the SQL session")
}
return i.err
}

// wrapConnError detects TCP EOF errors during the initial SQL handshake.
// These are translated to a message "perhaps this is not a CockroachDB node"
// at the top level.
@@ -35,9 +35,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors/markers"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/cockroachdb/errors"
opentracing "github.com/opentracing/opentracing-go"
)

// To allow queries to send out flow RPCs in parallel, we use a pool of workers
@@ -169,7 +168,7 @@ func (dsp *DistSQLPlanner) setupFlows(
// into the local flow.
}
if firstErr != nil {
if _, ok := markers.If(firstErr, func(err error) (v interface{}, ok bool) {
if _, ok := errors.If(firstErr, func(err error) (v interface{}, ok bool) {
v, ok = err.(*distsqlrun.VectorizedSetupError)
return v, ok
}); ok && evalCtx.SessionData.Vectorize == sessiondata.VectorizeOn {
@@ -32,7 +32,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/errbase"
opentracing "github.com/opentracing/opentracing-go"
)

@@ -946,13 +945,24 @@ type VectorizedSetupError struct {
cause error
}

// Error is part of the error interface.
func (e *VectorizedSetupError) Error() string {
return e.cause.Error()
}
var _ error = (*VectorizedSetupError)(nil)
var _ fmt.Formatter = (*VectorizedSetupError)(nil)
var _ errors.Formatter = (*VectorizedSetupError)(nil)

// Error implemented the error interface.
func (e *VectorizedSetupError) Error() string { return e.cause.Error() }

// Cause implements the causer interface.
func (e *VectorizedSetupError) Cause() error { return e.cause }

// Unwrap is part of the Wrapper interface.
func (e *VectorizedSetupError) Unwrap() error {
// Format implements the fmt.Formatter interface.
func (e *VectorizedSetupError) Format(s fmt.State, verb rune) { errors.FormatError(e, s, verb) }

// FormatError implements the errors.Formatter interface.
func (e *VectorizedSetupError) FormatError(p errors.Printer) error {
if p.Detail() {
p.Print("error while setting up columnar execution")
}
return e.cause
}

@@ -963,7 +973,7 @@ func decodeVectorizedSetupError(
}

func init() {
errors.RegisterWrapperDecoder(errbase.GetTypeKey((*VectorizedSetupError)(nil)), decodeVectorizedSetupError)
errors.RegisterWrapperDecoder(errors.GetTypeKey((*VectorizedSetupError)(nil)), decodeVectorizedSetupError)
}

func (f *Flow) setupVectorized(ctx context.Context) error {
@@ -1086,6 +1096,9 @@ func (f *Flow) setupVectorized(ctx context.Context) error {
}
metadataSourcesQueue = metadataSourcesQueue[:0]
case distsqlpb.StreamEndpointSpec_SYNC_RESPONSE:
if f.syncFlowConsumer == nil {
return errors.New("syncFlowConsumer unset, unable to create materializer")
}
// Make the materializer, which will write to the given receiver.
columnTypes := f.syncFlowConsumer.Types()
outputToInputColIdx := make([]int, len(columnTypes))
@@ -23,7 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
"github.com/cockroachdb/errors"
)

// ScrubTypes is the schema for TableReaders that are doing a SCRUB
@@ -250,8 +250,8 @@ func (tr *scrubTableReader) Next() (sqlbase.EncDatumRow, *distsqlpb.ProducerMeta
//
// NB: Cases 3 and 4 are handled further below, in the standard
// table scanning code path.
err = errors.Cause(err)
if v, ok := err.(*scrub.Error); ok {
var v *scrub.Error
if errors.As(err, &v) {
row, err = tr.generateScrubErrorRow(row, v)
} else if err == nil && row != nil {
continue
@@ -70,6 +70,11 @@ func (n *Nulls) SetNull(i uint16) {
n.SetNull64(uint64(i))
}

// UnsetNull unsets the ith value of the column.
func (n *Nulls) UnsetNull(i uint16) {
n.UnsetNull64(uint64(i))
}

// SetNullRange sets all the values in [start, end) to null.
func (n *Nulls) SetNullRange(start uint64, end uint64) {
if start >= end {
@@ -146,6 +151,11 @@ func (n *Nulls) SetNull64(i uint64) {
n.nulls[i/8] &= flippedBitMask[i%8]
}

// UnsetNull64 unsets the ith values of the column.
func (n *Nulls) UnsetNull64(i uint64) {
n.nulls[i/8] |= bitMask[i%8]
}

// Extend extends the nulls vector with the next toAppend values from src,
// starting at srcStartIdx.
func (n *Nulls) Extend(src *Nulls, destStartIdx uint64, srcStartIdx uint16, toAppend uint16) {
@@ -93,6 +93,18 @@ func TestSetAndUnsetNulls(t *testing.T) {
for i := uint16(0); i < BatchSize; i++ {
require.True(t, n.NullAt(i))
}

for i := uint16(0); i < BatchSize; i += 3 {
n.UnsetNull(i)
}
for i := uint16(0); i < BatchSize; i++ {
if i%3 == 0 {
require.False(t, n.NullAt(i))
} else {
require.True(t, n.NullAt(i))
}
}

n.UnsetNulls()
for i := uint16(0); i < BatchSize; i++ {
require.False(t, n.NullAt(i))

0 comments on commit 10a073c

Please sign in to comment.
You can’t perform that action at this time.