Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing error types #117

Merged
merged 8 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 19 additions & 17 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/databricks/databricks-sql-go/internal/cli_service"
"github.com/databricks/databricks-sql-go/internal/client"
"github.com/databricks/databricks-sql-go/internal/config"
dbsqlerr "github.com/databricks/databricks-sql-go/internal/err"
dbsqlerr "github.com/databricks/databricks-sql-go/internal/errors"
"github.com/databricks/databricks-sql-go/internal/rows"
"github.com/databricks/databricks-sql-go/internal/sentinel"
"github.com/databricks/databricks-sql-go/logger"
Expand Down Expand Up @@ -46,19 +46,19 @@ func (c *conn) Close() error {

if err != nil {
log.Err(err).Msg("databricks: failed to close connection")
return dbsqlerr.WrapErr(err, "failed to close connection")
return dbsqlerr.NewRequestError(ctx, dbsqlerr.ErrCloseConnection, err)
}
return nil
}

// Not supported in Databricks.
func (c *conn) Begin() (driver.Tx, error) {
return nil, errors.New(dbsqlerr.ErrTransactionsNotSupported)
return nil, dbsqlerr.NewSystemFault(context.TODO(), dbsqlerr.ErrNotImplemented, nil)
}

// Not supported in Databricks.
func (c *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
return nil, errors.New(dbsqlerr.ErrTransactionsNotSupported)
return nil, dbsqlerr.NewSystemFault(context.TODO(), dbsqlerr.ErrNotImplemented, nil)
}

// Ping attempts to verify that the server is accessible.
Expand Down Expand Up @@ -100,7 +100,7 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name

ctx = driverctx.NewContextWithConnId(ctx, c.id)
if len(args) > 0 {
return nil, errors.New(dbsqlerr.ErrParametersNotSupported)
return nil, dbsqlerr.NewSystemFault(ctx, dbsqlerr.ErrParametersNotSupported, nil)
}
exStmtResp, opStatusResp, err := c.runQuery(ctx, query, args)

Expand All @@ -122,7 +122,7 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
}
if err != nil {
log.Err(err).Msgf("databricks: failed to execute query: query %s", query)
return nil, dbsqlerr.WrapErrf(err, "failed to execute query")
return nil, dbsqlerr.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)
}

res := result{AffectedRows: opStatusResp.GetNumModifiedRows()}
Expand All @@ -142,11 +142,11 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam

ctx = driverctx.NewContextWithConnId(ctx, c.id)
if len(args) > 0 {
return nil, errors.New(dbsqlerr.ErrParametersNotSupported)
return nil, dbsqlerr.NewSystemFault(ctx, dbsqlerr.ErrParametersNotSupported, nil)
}
// first we try to get the results synchronously.
// at any point in time that the context is done we must cancel and return
exStmtResp, _, err := c.runQuery(ctx, query, args)
exStmtResp, opStatusResp, err := c.runQuery(ctx, query, args)

if exStmtResp != nil && exStmtResp.OperationHandle != nil {
log = logger.WithContext(c.id, driverctx.CorrelationIdFromContext(ctx), client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID))
Expand All @@ -155,7 +155,7 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam

if err != nil {
log.Err(err).Msg("databricks: failed to run query") // To log query we need to redact credentials
return nil, dbsqlerr.WrapErrf(err, "failed to run query")
return nil, dbsqlerr.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)
}
// hold on to the operation handle
opHandle := exStmtResp.OperationHandle
Expand All @@ -177,9 +177,10 @@ func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedVa
}
opHandle := exStmtResp.OperationHandle
if opHandle != nil && opHandle.OperationId != nil {
ctx = driverctx.NewContextWithQueryId(ctx, client.SprintGuid(opHandle.OperationId.GUID))
log = logger.WithContext(
c.id,
driverctx.CorrelationIdFromContext(ctx), client.SprintGuid(opHandle.OperationId.GUID),
driverctx.CorrelationIdFromContext(ctx), driverctx.QueryIdFromContext(ctx),
)
}

Expand Down Expand Up @@ -217,16 +218,16 @@ func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedVa
cli_service.TOperationState_ERROR_STATE,
cli_service.TOperationState_TIMEDOUT_STATE:
logBadQueryState(log, statusResp)
return exStmtResp, statusResp, errors.New(statusResp.GetDisplayMessage())
return exStmtResp, statusResp, dbsqlerr.NewSystemFault(ctx, dbsqlerr.ErrInvalidOperationState, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request error?

// live states
default:
logBadQueryState(log, statusResp)
return exStmtResp, statusResp, errors.New("invalid operation state. This should not have happened")
return exStmtResp, statusResp, dbsqlerr.NewSystemFault(ctx, dbsqlerr.ErrInvalidOperationState, nil)
}
// weird states
default:
logBadQueryState(log, opStatus)
return exStmtResp, opStatus, errors.New("invalid operation state. This should not have happened")
return exStmtResp, opStatus, dbsqlerr.NewSystemFault(ctx, dbsqlerr.ErrInvalidOperationState, nil)
}

} else {
Expand All @@ -245,11 +246,11 @@ func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedVa
cli_service.TOperationState_ERROR_STATE,
cli_service.TOperationState_TIMEDOUT_STATE:
logBadQueryState(log, statusResp)
return exStmtResp, statusResp, errors.New(statusResp.GetDisplayMessage())
return exStmtResp, statusResp, dbsqlerr.NewSystemFault(ctx, dbsqlerr.ErrInvalidOperationState, nil)
// live states
default:
logBadQueryState(log, statusResp)
return exStmtResp, statusResp, errors.New("invalid operation state. This should not have happened")
return exStmtResp, statusResp, dbsqlerr.NewSystemFault(ctx, dbsqlerr.ErrInvalidOperationState, nil)
}
}
}
Expand Down Expand Up @@ -337,6 +338,7 @@ func (c *conn) pollOperation(ctx context.Context, opHandle *cli_service.TOperati
statusResp, err = c.client.GetOperationStatus(newCtx, &cli_service.TGetOperationStatusReq{
OperationHandle: opHandle,
})

if statusResp != nil && statusResp.OperationState != nil {
log.Debug().Msgf("databricks: status %s", statusResp.GetOperationState().String())
}
Expand Down Expand Up @@ -365,11 +367,11 @@ func (c *conn) pollOperation(ctx context.Context, opHandle *cli_service.TOperati
}
_, resp, err := pollSentinel.Watch(ctx, c.cfg.PollInterval, 0)
if err != nil {
return nil, dbsqlerr.WrapErr(err, "failed to poll query state")
return nil, dbsqlerr.NewSystemFault(ctx, "failed to poll query state", err)
}
statusResp, ok := resp.(*cli_service.TGetOperationStatusResp)
if !ok {
return nil, errors.New("could not read query status")
return nil, dbsqlerr.NewSystemFault(ctx, dbsqlerr.ErrReadQueryStatus, nil)
}
return statusResp, nil
}
Expand Down
3 changes: 2 additions & 1 deletion connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestConn_executeStatement(t *testing.T) {
if opTest.err == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, opTest.err)
assert.EqualError(t, err, "databricks: execution error: failed to execute query: "+opTest.err)
}
assert.Equal(t, 1, executeStatementCount)
assert.Equal(t, opTest.closeOperationCount, closeOperationCount)
Expand Down Expand Up @@ -539,6 +539,7 @@ func TestConn_pollOperation(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
res, err := testConn.pollOperation(ctx, &cli_service.TOperationHandle{

OperationId: &cli_service.THandleIdentifier{
GUID: []byte{1, 2, 3, 4, 2, 23, 4, 2, 3, 1, 2, 4, 4, 223, 34, 54},
Secret: []byte("b"),
Expand Down
8 changes: 4 additions & 4 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/databricks/databricks-sql-go/internal/cli_service"
"github.com/databricks/databricks-sql-go/internal/client"
"github.com/databricks/databricks-sql-go/internal/config"
dbsqlerr "github.com/databricks/databricks-sql-go/internal/err"
dbsqlerr "github.com/databricks/databricks-sql-go/internal/errors"
"github.com/databricks/databricks-sql-go/logger"
)

Expand All @@ -35,7 +35,7 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {

tclient, err := client.InitThriftClient(c.cfg, c.client)
if err != nil {
return nil, dbsqlerr.WrapErr(err, "error initializing thrift client")
return nil, dbsqlerr.NewRequestError(ctx, dbsqlerr.ErrThriftClient, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

driver error

}
protocolVersion := int64(c.cfg.ThriftProtocolVersion)
session, err := tclient.OpenSession(ctx, &cli_service.TOpenSessionReq{
Expand All @@ -49,7 +49,7 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
})

if err != nil {
return nil, dbsqlerr.WrapErrf(err, "error connecting: host=%s port=%d, httpPath=%s", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath)
return nil, dbsqlerr.NewRequestError(ctx, fmt.Sprintf("error connecting: host=%s port=%d, httpPath=%s", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath), err)
}

conn := &conn{
Expand All @@ -66,7 +66,7 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
setStmt := fmt.Sprintf("SET `%s` = `%s`;", k, v)
_, err := conn.ExecContext(ctx, setStmt, []driver.NamedValue{})
if err != nil {
return nil, err
return nil, dbsqlerr.NewExecutionError(ctx, fmt.Sprintf("error setting session param: %s", setStmt), err, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be just returning the error, no?

}
log.Info().Msgf("set session parameter: param=%s value=%s", k, v)
}
Expand Down
46 changes: 46 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,52 @@ The result log may look like this:

{"level":"debug","connId":"01ed6545-5669-1ec7-8c7e-6d8a1ea0ab16","corrId":"workflow-example","queryId":"01ed6545-57cc-188a-bfc5-d9c0eaf8e189","time":1668558402,"message":"Run Main elapsed time: 1.298712292s"}

# Errors

There are three error types exposed via dbsql/errors

DBSystemFault - A fault caused by Databricks services

DBRequestError - An error that is caused by an invalid request. Example: permission denied, or the user tries to access a warehouse that doesn’t exist
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some more examples would really help


DBExecutionError - Any error that occurs after the SQL statement has been accepted (e.g. SQL syntax error)

Each type has a corresponding sentinel value which can be used with errors.Is() to determine if one of the types is present in an error chain.

SystemFault
RequestError
ExecutionError

Example usage:

import (
fmt
errors
dbsqlerr "github.com/databricks/databricks-sql-go/errors"
)

func main() {
...
_, err := db.ExecContext(ogCtx, `Select id from range(100)`)
if err != nil {
if errors.Is(err, dbsqlerr.ExecutionError) {
var execErr dbsqlerr.DBExecutionError
if ok := errors.As(err, &execError); ok {
fmt.Printf("%s, corrId: %s, connId: %s, queryId: %s, sqlState: %s",
execErr.Error(),
execErr.CorrelationId(),
execErr.ConnectionId(),
execErr.QueryId(),
execErr.SqlState())
}
}
...
}
...
}

See the documentation for dbsql/errors for more information.

# Supported Data Types

==================================
Expand Down
10 changes: 7 additions & 3 deletions driver_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/databricks/databricks-sql-go/driverctx"
"github.com/databricks/databricks-sql-go/internal/cli_service"
"github.com/databricks/databricks-sql-go/internal/client"
dbsqlerr "github.com/databricks/databricks-sql-go/internal/err"
"github.com/databricks/databricks-sql-go/logger"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -281,11 +281,15 @@ func TestContextTimeoutExample(t *testing.T) {
ctx1, cancel := context.WithTimeout(ogCtx, 5*time.Second)
defer cancel()
rows, err := db.QueryContext(ctx1, `SELECT id FROM RANGE(100000000) ORDER BY RANDOM() + 2 asc`)
if err, ok := err.(interface{ StackTrace() errors.StackTrace }); ok {
fmt.Printf("Stack trace: %v", err.StackTrace())
}
require.ErrorContains(t, err, context.DeadlineExceeded.Error())
require.Nil(t, rows)
_, ok := err.(dbsqlerr.Causer)

_, ok := err.(interface{ Cause() error })
assert.True(t, ok)
_, ok = err.(dbsqlerr.StackTracer)
_, ok = err.(interface{ StackTrace() errors.StackTrace })
assert.True(t, ok)
assert.Equal(t, 1, state.executeStatementCalls)
assert.GreaterOrEqual(t, state.getOperationStatusCalls, 1)
Expand Down
27 changes: 27 additions & 0 deletions driverctx/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type contextKey int
const (
CorrelationIdContextKey contextKey = iota
ConnIdContextKey
QueryIdContextKey
)

// NewContextWithCorrelationId creates a new context with correlationId value. Used by Logger to populate field corrId.
Expand All @@ -20,6 +21,10 @@ func NewContextWithCorrelationId(ctx context.Context, correlationId string) cont

// CorrelationIdFromContext retrieves the correlationId stored in context.
func CorrelationIdFromContext(ctx context.Context) string {
if ctx == nil {
return ""
}

corrId, ok := ctx.Value(CorrelationIdContextKey).(string)
if !ok {
return ""
Expand All @@ -34,9 +39,31 @@ func NewContextWithConnId(ctx context.Context, connId string) context.Context {

// ConnIdFromContext retrieves the connectionId stored in context.
func ConnIdFromContext(ctx context.Context) string {
if ctx == nil {
return ""
}

connId, ok := ctx.Value(ConnIdContextKey).(string)
if !ok {
return ""
}
return connId
}

// NewContextWithQueryId creates a new context with queryId value.
func NewContextWithQueryId(ctx context.Context, queryId string) context.Context {
return context.WithValue(ctx, QueryIdContextKey, queryId)
}

// QueryIdFromContext retrieves the queryId stored in context.
func QueryIdFromContext(ctx context.Context) string {
if ctx == nil {
return ""
}

queryId, ok := ctx.Value(QueryIdContextKey).(string)
if !ok {
return ""
}
return queryId
}
60 changes: 60 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package errors

import "github.com/pkg/errors"

// value to be used with errors.Is() to determine if an error chain contains a request error
var RequestError error = errors.New("Request Error")

// value to be used with errors.Is() to determine if an error chain contains a system fault
var SystemFault error = errors.New("System Fault")

// value to be used with errors.Is() to determine if an error chain contains an execution error
var ExecutionError error = errors.New("Execution Error")

// Base interface for driver errors
type DatabricksError interface {
// Descriptive message describing the error
Error() string

// ErrorType() DBsqlErrorType

// User specified id to track what happens under a request. Useful to track multiple connections in the same request.
// Appears in log messages as field corrId. See driverctx.NewContextWithCorrelationId()
CorrelationId() string

// Internal id to track what happens under a connection. Connections can be reused so this would track across queries.
// Appears in log messages as field connId.
ConnectionId() string

// Stack trace associated with the error. May be nil.
StackTrace() errors.StackTrace

// Underlying causative error. May be nil.
Cause() error
}

// An error that is caused by an invalid request.
// Example: permission denied, or the user tries to access a warehouse that doesn’t exist
type DBRequestError interface {
DatabricksError
}

// A fault that is caused by Databricks services
type DBSystemFault interface {
DatabricksError

IsRetryable() bool
}

// Any error that occurs after the SQL statement has been accepted (e.g. SQL syntax error).
type DBExecutionError interface {
DatabricksError

// Internal id to track what happens under a query.
// Appears in log messages as field queryId.
QueryId() string

// Optional portable error identifier across SQL engines.
// See https://github.com/apache/spark/tree/master/core/src/main/resources/error#ansiiso-standard
SqlState() string
}