Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions internal/rows/arrowbased/arrowRows.go
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add tests for this, for ex, to repro locally i used:

  func TestGetArrowIPCStreams_SchemaBytesWithoutDirectResults(t *testing.T) {
      fetchResp := cli_service.TFetchResultsResp{}
      loadTestData(t, "multipleFetch/FetchResults1.json", &fetchResp)
      // Simulate cold-start: server returns no ArrowSchema
      fetchResp.ResultSetMetadata.ArrowSchema = nil

      ctx := driverctx.NewContextWithConnId(context.Background(), "connId")
      ctx = driverctx.NewContextWithCorrelationId(ctx, "corrId")
      client := getSimpleClient([]cli_service.TFetchResultsResp{fetchResp})
      cfg := config.WithDefaults()
      rows, _ := NewRows(ctx, nil, client, cfg, nil)
      rows2 := rows.(dbsqlrows.Rows)

      ipcIter, err := rows2.GetArrowIPCStreams(context.Background())
      assert.Nil(t, err)
      schemaBytes, err := ipcIter.SchemaBytes()
      assert.Nil(t, err)
      assert.NotEmpty(t, schemaBytes)
  }

Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,37 @@ func getColumnInfo(arrowSchema *arrow.Schema, schema *cli_service.TTableSchema)
return colInfos
}

// GetArrowSchemaBytes returns the Arrow schema bytes from result set metadata.
// If ArrowSchema is not directly available, it generates the bytes from TTableSchema.
func GetArrowSchemaBytes(resultSetMetadata *cli_service.TGetResultSetMetadataResp, cfg *config.Config, ctx context.Context) ([]byte, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

golang convention is have the context as the first param

if resultSetMetadata == nil {
return nil, nil
}

// If ArrowSchema is directly available, return it
if resultSetMetadata.ArrowSchema != nil {
return resultSetMetadata.ArrowSchema, nil
}

// Otherwise, generate from TTableSchema
if resultSetMetadata.Schema == nil {
return nil, nil
}

arrowConfig := cfg.ArrowConfig
arrowSchema, err := tTableSchemaToArrowSchema(resultSetMetadata.Schema, &arrowConfig)
if err != nil {
return nil, err
}

schemaBytes, err := getArrowSchemaBytes(arrowSchema, ctx)
if err != nil {
return nil, err
}

return schemaBytes, nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function seems to duplicate a lot of logic from tGetResultSetMetadataRespToArrowSchema, can we not make this function a simpler wrapper function that calls tGetResultSetMetadataRespToArrowSchema?

}

// Derive an arrow.Schema object and the corresponding serialized bytes from TGetResultSetMetadataResp
func tGetResultSetMetadataRespToArrowSchema(resultSetMetadata *cli_service.TGetResultSetMetadataResp, arrowConfig config.ArrowConfig, ctx context.Context, logger *dbsqllog.DBSQLLogger) ([]byte, *arrow.Schema, dbsqlerr.DBError) {

Expand Down
8 changes: 7 additions & 1 deletion internal/rows/rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,5 +548,11 @@ func (r *rows) GetArrowIPCStreams(ctx context.Context) (dbsqlrows.ArrowIPCStream
return r.RowScanner.GetArrowIPCStreams(ctx, *r.config, r.ResultPageIterator)
}

return arrowbased.NewArrowIPCStreamIterator(ctx, r.ResultPageIterator, nil, nil, *r.config), nil
// Get arrow schema bytes from metadata (generates from TTableSchema if ArrowSchema not available)
arrowSchemaBytes, err := arrowbased.GetArrowSchemaBytes(r.resultSetMetadata, r.config, ctx)
if err != nil {
return nil, dbsqlerr_int.NewDriverError(ctx, "failed to get arrow schema bytes", err)
}

return arrowbased.NewArrowIPCStreamIterator(ctx, r.ResultPageIterator, nil, arrowSchemaBytes, *r.config), nil
}
Loading