Skip to content

Commit

Permalink
Merge branch 'master' into nv/optimize-reading-columns
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jun 8, 2024
2 parents 99cbf9d + 7caa287 commit a66acf8
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 35 deletions.
27 changes: 17 additions & 10 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,25 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo
}
}

bqTempTableID, ok := tempTableID.(TableIdentifier)
if !ok {
return fmt.Errorf("unable to cast tempTableID to BigQuery TableIdentifier")
}

// Load the data
return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData)
}

func buildLegacyRows(tableData *optimization.TableData, additionalDateFmts []string) ([]*Row, error) {
// Cast the data into BigQuery values
var rows []*Row
additionalDateFmts := s.config.SharedTransferConfig.TypingSettings.AdditionalDateFormats
columns := tableData.ReadOnlyInMemoryCols().ValidColumns()
for _, value := range tableData.Rows() {
data := make(map[string]bigquery.Value)
for _, col := range columns {
colVal, err := castColVal(value[col.Name()], col, additionalDateFmts)
if err != nil {
return fmt.Errorf("failed to cast col %q: %w", col.Name(), err)
return nil, fmt.Errorf("failed to cast col %q: %w", col.Name(), err)
}

if colVal != nil {
Expand All @@ -81,9 +90,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo

rows = append(rows, NewRow(data))
}

// Load the data
return s.putTable(context.Background(), tempTableID, rows)
return rows, nil
}

func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql.TableIdentifier {
Expand Down Expand Up @@ -131,17 +138,17 @@ func (s *Store) GetClient(ctx context.Context) *bigquery.Client {
return client
}

func (s *Store) putTable(ctx context.Context, tableID sql.TableIdentifier, rows []*Row) error {
bqTableID, ok := tableID.(TableIdentifier)
if !ok {
return fmt.Errorf("unable to cast tableID to BigQuery TableIdentifier")
func (s *Store) putTableViaLegacyAPI(ctx context.Context, tableID TableIdentifier, tableData *optimization.TableData) error {
rows, err := buildLegacyRows(tableData, s.config.SharedTransferConfig.TypingSettings.AdditionalDateFormats)
if err != nil {
return err
}

client := s.GetClient(ctx)
defer client.Close()

batch := NewBatch(rows, s.batchSize)
inserter := client.Dataset(bqTableID.Dataset()).Table(bqTableID.Table()).Inserter()
inserter := client.Dataset(tableID.Dataset()).Table(tableID.Table()).Inserter()
for batch.HasNext() {
if err := inserter.Put(ctx, batch.NextChunk()); err != nil {
return fmt.Errorf("failed to insert rows: %w", err)
Expand Down
6 changes: 1 addition & 5 deletions clients/redshift/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,5 @@ func (s *Store) CastColValStaging(colVal any, colKind columns.Column, additional
}

// Checks for DDL overflow needs to be done at the end in case there are any conversions that need to be done.
if s.skipLgCols {
colValString = replaceExceededValues(colValString, colKind)
}

return colValString, nil
return replaceExceededValues(colValString, colKind), nil
}
4 changes: 1 addition & 3 deletions clients/redshift/cast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,7 @@ func (r *RedshiftTestSuite) TestCastColValStaging_ExceededValues() {
}

cfg := config.Config{
Redshift: &config.Redshift{
SkipLgCols: true,
},
Redshift: &config.Redshift{},
}

store := db.Store(r.fakeStore)
Expand Down
7 changes: 2 additions & 5 deletions clients/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type Store struct {
bucket string
optionalS3Prefix string
configMap *types.DwhToTablesConfigMap
skipLgCols bool
config config.Config

db.Store
Expand Down Expand Up @@ -121,9 +120,8 @@ func LoadRedshift(cfg config.Config, _store *db.Store) (*Store, error) {
if _store != nil {
// Used for tests.
return &Store{
configMap: &types.DwhToTablesConfigMap{},
skipLgCols: cfg.Redshift.SkipLgCols,
config: cfg,
configMap: &types.DwhToTablesConfigMap{},
config: cfg,

Store: *_store,
}, nil
Expand All @@ -142,7 +140,6 @@ func LoadRedshift(cfg config.Config, _store *db.Store) (*Store, error) {
credentialsClause: cfg.Redshift.CredentialsClause,
bucket: cfg.Redshift.Bucket,
optionalS3Prefix: cfg.Redshift.OptionalS3Prefix,
skipLgCols: cfg.Redshift.SkipLgCols,
configMap: &types.DwhToTablesConfigMap{},
config: cfg,

Expand Down
1 change: 0 additions & 1 deletion clients/redshift/redshift_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func (r *RedshiftTestSuite) SetupTest() {
var err error
r.store, err = LoadRedshift(cfg, &store)
assert.NoError(r.T(), err)
r.store.skipLgCols = true
}

func TestRedshiftTestSuite(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion lib/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ type Redshift struct {
OptionalS3Prefix string `yaml:"optionalS3Prefix"`
// https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html
CredentialsClause string `yaml:"credentialsClause"`
SkipLgCols bool `yaml:"skipLgCols"`
}

type SharedTransferConfig struct {
Expand Down
11 changes: 2 additions & 9 deletions lib/debezium/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package debezium
import (
"encoding/base64"
"fmt"
"log/slog"
"strings"
"time"

Expand Down Expand Up @@ -51,10 +50,9 @@ const (
KafkaDecimalPrecisionKey = "connect.decimal.precision"
)

// toBytes attempts to convert a value of unknown type to a slice of bytes.
// toBytes attempts to convert a value (type []byte, or string) to a slice of bytes.
// - If value is already a slice of bytes it will be directly returned.
// - If value is a string we will attempt to base64 decode it.
// - If value is any other type we will convert it to a string and then attempt to base64 decode it.
func toBytes(value any) ([]byte, error) {
var stringVal string

Expand All @@ -64,12 +62,7 @@ func toBytes(value any) ([]byte, error) {
case string:
stringVal = typedValue
default:
// TODO: Make this a hard error if we don't observe this happening.
slog.Error("Expected string/[]byte, falling back to fmt.Sprint(value)",
slog.String("type", fmt.Sprintf("%T", value)),
slog.Any("value", value),
)
stringVal = fmt.Sprint(value)
return nil, fmt.Errorf("failed to cast value '%v' with type '%T' to []byte", value, value)
}

data, err := base64.StdEncoding.DecodeString(stringVal)
Expand Down
2 changes: 1 addition & 1 deletion lib/debezium/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestToBytes(t *testing.T) {
{
name: "type that isn't a string or []byte",
value: map[string]any{},
expectedErr: "failed to base64 decode",
expectedErr: "failed to cast value 'map[]' with type 'map[string]interface {}",
},
}

Expand Down

0 comments on commit a66acf8

Please sign in to comment.