Skip to content

Commit

Permalink
[bigquery] Move encodeStructToJSONString function
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Jun 18, 2024
1 parent 39f9479 commit fae4346
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 73 deletions.
4 changes: 2 additions & 2 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ const (
describeNameCol = "column_name"
describeTypeCol = "data_type"
describeCommentCol = "description"
// Storage Write API is limited to 10 MiB, subtract 50 KiB to account for request overhead.
maxRequestByteSize = (10 * 1024 * 1024) - (50 * 1024)
// Storage Write API is limited to 10 MB, let's start out conservative and use 80% of that.
maxRequestByteSize = 10_000_000 * .8
)

type Store struct {
Expand Down
36 changes: 0 additions & 36 deletions clients/bigquery/cast.go

This file was deleted.

34 changes: 0 additions & 34 deletions clients/bigquery/cast_test.go

This file was deleted.

32 changes: 31 additions & 1 deletion clients/bigquery/storagewrite.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package bigquery

import (
"encoding/json"
"fmt"
"log/slog"
"strconv"
"strings"
"time"

"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
"github.com/artie-labs/transfer/lib/array"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/decimal"
Expand Down Expand Up @@ -188,7 +192,7 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto
return nil, fmt.Errorf("unsupported extended time details: %q", column.KindDetails.ExtendedTimeDetails.Type)
}
case typing.Struct.Kind:
stringValue, err := EncodeStructToJSONString(value)
stringValue, err := encodeStructToJSONString(value)
if err != nil {
return nil, err
} else if stringValue == "" {
Expand All @@ -211,3 +215,29 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto
}
return message, nil
}

// encodeStructToJSONString takes a struct as either a string or Go object and encodes it into a JSON string.
// Structs from relational and Mongo are different.
// MongoDB will return the native objects back such as `map[string]any{"hello": "world"}`
// Relational will return a string representation of the struct such as `{"hello": "world"}`
func encodeStructToJSONString(value any) (string, error) {
if stringValue, isOk := value.(string); isOk {
if strings.Contains(stringValue, constants.ToastUnavailableValuePlaceholder) {
return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil
}
return stringValue, nil
}

bytes, err := json.Marshal(value)
if err != nil {
return "", fmt.Errorf("failed to marshal value: %w", err)
}

stringValue := string(bytes)
if strings.Contains(stringValue, constants.ToastUnavailableValuePlaceholder) {
// TODO: Remove this if we don't see it in the logs.
slog.Error("encoded JSON value contains the toast unavailable value placeholder")
return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil
}
return stringValue, nil
}
27 changes: 27 additions & 0 deletions clients/bigquery/storagewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,30 @@ func TestRowToMessage(t *testing.T) {
"cArray": []any{"foo", "bar"},
}, result)
}

func TestEncodeStructToJSONString(t *testing.T) {
{
// Empty string:
result, err := encodeStructToJSONString("")
assert.NoError(t, err)
assert.Equal(t, "", result)
}
{
// Toasted string:
result, err := encodeStructToJSONString("__debezium_unavailable_value")
assert.NoError(t, err)
assert.Equal(t, `{"key":"__debezium_unavailable_value"}`, result)
}
{
// Map:
result, err := encodeStructToJSONString(map[string]any{"foo": "bar", "baz": 1234})
assert.NoError(t, err)
assert.Equal(t, `{"baz":1234,"foo":"bar"}`, result)
}
{
// Toasted map (should not happen):
result, err := encodeStructToJSONString(map[string]any{"__debezium_unavailable_value": "bar", "baz": 1234})
assert.NoError(t, err)
assert.Equal(t, `{"key":"__debezium_unavailable_value"}`, result)
}
}

0 comments on commit fae4346

Please sign in to comment.