Skip to content

Commit

Permalink
fix(go/adbc/driver/snowflake): fix potential deadlock and error handl…
Browse files Browse the repository at this point in the history
…ing (#828)

Found these when trying to do some performance testing.
  • Loading branch information
zeroshade committed Jun 21, 2023
1 parent 17a13f9 commit f35485a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 30 deletions.
4 changes: 4 additions & 0 deletions go/adbc/driver/flightsql/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, cl *flightsql.
schema = rdr.Schema()
group.Go(func() error {
defer rdr.Release()
if numEndpoints > 1 {
defer close(ch)
}

for rdr.Next() && ctx.Err() == nil {
rec := rdr.Record()
rec.Retain()
Expand Down
5 changes: 2 additions & 3 deletions go/adbc/driver/snowflake/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,8 @@ func errToAdbcErr(code adbc.Status, err error) error {
var sferr *gosnowflake.SnowflakeError
if errors.As(err, &sferr) {
var sqlstate [5]byte
if len(sferr.SQLState) > 0 {
copy(sqlstate[:], sferr.SQLState[:5])
}
copy(sqlstate[:], []byte(sferr.SQLState))

return adbc.Error{
Code: code,
Msg: sferr.Error(),
Expand Down
59 changes: 32 additions & 27 deletions go/adbc/driver/snowflake/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake
group.Go(func() error {
defer rr.Release()
defer r.Close()
if len(batches) > 1 {
defer close(ch)
}

for rr.Next() && ctx.Err() == nil {
rec := rr.Record()
Expand All @@ -297,39 +300,41 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake
}

lastChannelIndex := len(chs) - 1
for i, b := range batches[1:] {
batch, batchIdx := b, i+1
chs[batchIdx] = make(chan arrow.Record, bufferSize)
group.Go(func() error {
// close channels (except the last) so that Next can move on to the next channel properly
if batchIdx != lastChannelIndex {
defer close(chs[batchIdx])
}

rdr, err := batch.GetStream(ctx)
if err != nil {
return err
}
defer rdr.Close()
go func() {
for i, b := range batches[1:] {
batch, batchIdx := b, i+1
chs[batchIdx] = make(chan arrow.Record, bufferSize)
group.Go(func() error {
// close channels (except the last) so that Next can move on to the next channel properly
if batchIdx != lastChannelIndex {
defer close(chs[batchIdx])
}

rr, err := ipc.NewReader(rdr, ipc.WithAllocator(alloc))
if err != nil {
return err
}
defer rr.Release()
rdr, err := batch.GetStream(ctx)
if err != nil {
return err
}
defer rdr.Close()

for rr.Next() && ctx.Err() == nil {
rec := rr.Record()
rec, err = recTransform(ctx, rec)
rr, err := ipc.NewReader(rdr, ipc.WithAllocator(alloc))
if err != nil {
return err
}
chs[batchIdx] <- rec
}
defer rr.Release()

return rr.Err()
})
}
for rr.Next() && ctx.Err() == nil {
rec := rr.Record()
rec, err = recTransform(ctx, rec)
if err != nil {
return err
}
chs[batchIdx] <- rec
}

return rr.Err()
})
}
}()

go func() {
rdr.err = group.Wait()
Expand Down

0 comments on commit f35485a

Please sign in to comment.