Skip to content

Commit

Permalink
Extended use of public error types
Browse files Browse the repository at this point in the history
- updated rows package to use the new public error types
- refactored arrow/column based rows to put errors into a separate file
Signed-off-by: Raymond Cypher <raymond.cypher@databricks.com>
  • Loading branch information
rcypher-databricks committed Apr 4, 2023
1 parent aff1bc8 commit e447bf0
Show file tree
Hide file tree
Showing 14 changed files with 773 additions and 669 deletions.
3 changes: 2 additions & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ func (c *conn) pollOperation(ctx context.Context, opHandle *cli_service.TOperati
statusResp, err = c.client.GetOperationStatus(newCtx, &cli_service.TGetOperationStatusReq{
OperationHandle: opHandle,
})

if statusResp != nil && statusResp.OperationState != nil {
log.Debug().Msgf("databricks: status %s", statusResp.GetOperationState().String())
}
Expand Down Expand Up @@ -366,7 +367,7 @@ func (c *conn) pollOperation(ctx context.Context, opHandle *cli_service.TOperati
}
_, resp, err := pollSentinel.Watch(ctx, c.cfg.PollInterval, 0)
if err != nil {
return nil, dbsqlerr.WrapErr(err, "failed to poll query state") // TODO: figure out what error type this should be
return nil, dbsqlerr.NewSystemFault(ctx, "failed to poll query state", err)
}
statusResp, ok := resp.(*cli_service.TGetOperationStatusResp)
if !ok {
Expand Down
12 changes: 12 additions & 0 deletions driverctx/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func NewContextWithCorrelationId(ctx context.Context, correlationId string) cont

// CorrelationIdFromContext retrieves the correlationId stored in context.
func CorrelationIdFromContext(ctx context.Context) string {
if ctx == nil {
return ""
}

corrId, ok := ctx.Value(CorrelationIdContextKey).(string)
if !ok {
return ""
Expand All @@ -35,6 +39,10 @@ func NewContextWithConnId(ctx context.Context, connId string) context.Context {

// ConnIdFromContext retrieves the connectionId stored in context.
func ConnIdFromContext(ctx context.Context) string {
if ctx == nil {
return ""
}

connId, ok := ctx.Value(ConnIdContextKey).(string)
if !ok {
return ""
Expand All @@ -49,6 +57,10 @@ func NewContextWithQueryId(ctx context.Context, queryId string) context.Context

// QueryIdFromContext retrieves the queryId stored in context.
func QueryIdFromContext(ctx context.Context) string {
if ctx == nil {
return ""
}

queryId, ok := ctx.Value(QueryIdContextKey).(string)
if !ok {
return ""
Expand Down
18 changes: 9 additions & 9 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (tsc *ThriftServiceClient) OpenSession(ctx context.Context, req *cli_servic
msg, start := logger.Track("OpenSession")
resp, err := tsc.TCLIServiceClient.OpenSession(ctx, req)
if err != nil {
return nil, dbsqlerr.WrapErr(err, "open session request error") // TODO: figure out what error type this should be
return nil, dbsqlerr.NewRequestError(ctx, "open session request error", err)
}
log := logger.WithContext(SprintGuid(resp.SessionHandle.SessionId.GUID), driverctx.CorrelationIdFromContext(ctx), "")
defer log.Duration(msg, start)
Expand All @@ -58,7 +58,7 @@ func (tsc *ThriftServiceClient) CloseSession(ctx context.Context, req *cli_servi
defer log.Duration(logger.Track("CloseSession"))
resp, err := tsc.TCLIServiceClient.CloseSession(ctx, req)
if err != nil {
return resp, dbsqlerr.WrapErr(err, "open session request error") // TODO: figure out what error type this should be
return resp, dbsqlerr.NewRequestError(ctx, "close session request error", err)
}
if RecordResults {
j, _ := json.MarshalIndent(resp, "", " ")
Expand All @@ -75,7 +75,7 @@ func (tsc *ThriftServiceClient) FetchResults(ctx context.Context, req *cli_servi
defer log.Duration(logger.Track("FetchResults"))
resp, err := tsc.TCLIServiceClient.FetchResults(ctx, req)
if err != nil {
return resp, dbsqlerr.WrapErr(err, "open session request error") // TODO: figure out what error type this should be
return resp, dbsqlerr.NewRequestError(ctx, "fetch results request error", err)
}
if RecordResults {
j, _ := json.MarshalIndent(resp, "", " ")
Expand All @@ -92,11 +92,11 @@ func (tsc *ThriftServiceClient) GetResultSetMetadata(ctx context.Context, req *c
defer log.Duration(logger.Track("GetResultSetMetadata"))
resp, err := tsc.TCLIServiceClient.GetResultSetMetadata(ctx, req)
if err != nil {
return resp, dbsqlerr.WrapErr(err, "open session request error") // TODO: figure out what error type this should be
return resp, dbsqlerr.NewRequestError(ctx, "get result set metadata request error", err)
}
if RecordResults {
j, _ := json.MarshalIndent(resp, "", " ")
_ = os.WriteFile(fmt.Sprintf("ExecuteStatement%d.json", resultIndex), j, 0600)
_ = os.WriteFile(fmt.Sprintf("GetResultSetMetadata%d.json", resultIndex), j, 0600)
resultIndex++
}
return resp, CheckStatus(resp)
Expand All @@ -108,7 +108,7 @@ func (tsc *ThriftServiceClient) ExecuteStatement(ctx context.Context, req *cli_s
msg, start := logger.Track("ExecuteStatement")
resp, err := tsc.TCLIServiceClient.ExecuteStatement(context.Background(), req)
if err != nil {
return resp, dbsqlerr.WrapErr(err, "open session request error") // TODO: figure out what error type this should be
return resp, dbsqlerr.NewRequestError(ctx, "execute statement request error", err)
}
if RecordResults {
j, _ := json.MarshalIndent(resp, "", " ")
Expand All @@ -132,7 +132,7 @@ func (tsc *ThriftServiceClient) GetOperationStatus(ctx context.Context, req *cli
defer log.Duration(logger.Track("GetOperationStatus"))
resp, err := tsc.TCLIServiceClient.GetOperationStatus(ctx, req)
if err != nil {
return resp, dbsqlerr.WrapErr(err, "open session request error") // TODO: figure out what error type this should be
return resp, dbsqlerr.NewRequestError(driverctx.NewContextWithQueryId(ctx, SprintGuid(req.OperationHandle.OperationId.GUID)), "databricks: get operation status request error", err)
}
if RecordResults {
j, _ := json.MarshalIndent(resp, "", " ")
Expand All @@ -149,7 +149,7 @@ func (tsc *ThriftServiceClient) CloseOperation(ctx context.Context, req *cli_ser
defer log.Duration(logger.Track("CloseOperation"))
resp, err := tsc.TCLIServiceClient.CloseOperation(ctx, req)
if err != nil {
return resp, dbsqlerr.WrapErr(err, "open session request error") // TODO: figure out what error type this should be
return resp, dbsqlerr.NewRequestError(ctx, "close operation request error", err)
}
if RecordResults {
j, _ := json.MarshalIndent(resp, "", " ")
Expand All @@ -166,7 +166,7 @@ func (tsc *ThriftServiceClient) CancelOperation(ctx context.Context, req *cli_se
defer log.Duration(logger.Track("CancelOperation"))
resp, err := tsc.TCLIServiceClient.CancelOperation(ctx, req)
if err != nil {
return resp, dbsqlerr.WrapErr(err, "open session request error") // TODO: figure out what error type this should be
return resp, dbsqlerr.NewRequestError(ctx, "cancel operation request error", err)
}
if RecordResults {
j, _ := json.MarshalIndent(resp, "", " ")
Expand Down
3 changes: 3 additions & 0 deletions internal/errors/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ type stackTracer interface {
}

func newDatabricksError(ctx context.Context, msg string, err error) databricksError {
// create an error with the new message
if err == nil {
err = errors.New(msg)
} else {
err = errors.WithMessage(err, msg)
}

// if the source error does not have a stack trace in its
// error chain add a stack trace
var st stackTracer
if ok := errors.As(err, &st); !ok {
err = errors.WithStack(err)
Expand Down

0 comments on commit e447bf0

Please sign in to comment.