Skip to content

Commit

Permalink
[bigquery] Use batch.BySize
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Jun 17, 2024
1 parent e5019fa commit 3e3fac6
Showing 1 changed file with 18 additions and 18 deletions.
36 changes: 18 additions & 18 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/batch"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/db"
Expand All @@ -32,6 +33,7 @@ const (
describeNameCol = "column_name"
describeTypeCol = "data_type"
describeCommentCol = "description"
maxRequestByteSize = 10_000_000 * .9 // Storage Write API is limited to 10 MB
)

type Store struct {
Expand Down Expand Up @@ -148,34 +150,32 @@ func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableI
}
defer managedStream.Close()

batch := NewBatch(tableData.Rows(), s.batchSize)
for batch.HasNext() {
chunk := batch.NextChunk()
encoded := make([][]byte, len(chunk))
for i, row := range chunk {
message, err := rowToMessage(row, columns, *messageDescriptor, s.AdditionalDateFormats())
if err != nil {
return fmt.Errorf("failed to convert row to message: %w", err)
}

bytes, err := proto.Marshal(message)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
encoded[i] = bytes
encoder := func(row map[string]any) ([]byte, error) {
message, err := rowToMessage(row, columns, *messageDescriptor, s.AdditionalDateFormats())
if err != nil {
return nil, fmt.Errorf("failed to convert row to message: %w", err)
}

bytes, err := proto.Marshal(message)
if err != nil {
return nil, fmt.Errorf("failed to marshal message: %w", err)
}

result, err := managedStream.AppendRows(ctx, encoded)
return bytes, nil
}

return batch.BySize(tableData.Rows(), maxRequestByteSize, encoder, func(chunk [][]byte) error {
result, err := managedStream.AppendRows(ctx, chunk)
if err != nil {
return fmt.Errorf("failed to append rows: %w", err)
}

if resp, err := result.FullResponse(ctx); err != nil {
return fmt.Errorf("failed to get response (%s): %w", resp.GetError().String(), err)
}
}

return nil
return nil
})
}

func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error {
Expand Down

0 comments on commit 3e3fac6

Please sign in to comment.