From 1abfe9bf6f8a6214a85fce56e53e65f81ec4401e Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 12 Jun 2024 10:57:56 -0700 Subject: [PATCH 01/24] [BigQuery] Edge case with our decimal types (#717) --- clients/bigquery/cast.go | 8 +++++++- clients/bigquery/cast_test.go | 6 ++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/clients/bigquery/cast.go b/clients/bigquery/cast.go index 916d354f2..7714f0876 100644 --- a/clients/bigquery/cast.go +++ b/clients/bigquery/cast.go @@ -21,7 +21,13 @@ func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string) } switch colKind.KindDetails.Kind { - case typing.Float.Kind, typing.Integer.Kind, typing.Boolean.Kind, typing.String.Kind: + case typing.String.Kind: + if val, isOk := colVal.(*decimal.Decimal); isOk { + return val.String(), nil + } + + return colVal, nil + case typing.Float.Kind, typing.Integer.Kind, typing.Boolean.Kind: return colVal, nil case typing.EDecimal.Kind: val, isOk := colVal.(*decimal.Decimal) diff --git a/clients/bigquery/cast_test.go b/clients/bigquery/cast_test.go index eed3f871a..def9f639f 100644 --- a/clients/bigquery/cast_test.go +++ b/clients/bigquery/cast_test.go @@ -22,6 +22,12 @@ func (b *BigQueryTestSuite) TestCastColVal() { colVal, err := castColVal("hello", columns.Column{KindDetails: typing.String}, nil) assert.NoError(b.T(), err) assert.Equal(b.T(), "hello", colVal) + + // Decimal + dec := decimal.NewDecimal(ptr.ToInt(5), 2, big.NewFloat(123.45)) + colVal, err = castColVal(dec, columns.Column{KindDetails: typing.String}, nil) + assert.NoError(b.T(), err) + assert.Equal(b.T(), "123.45", colVal) } { // Integers From c53649767b4140c399f39701f51ed3bf329a1453 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Wed, 12 Jun 2024 15:41:49 -0700 Subject: [PATCH 02/24] Update email to `hi@artie.com` (#719) --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 29c95cdab..a3fb9f8d3 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,7 @@ Transfer is aiming to provide coverage across all OLTPs and OLAPs databases. Cur - MongoDB - MySQL - PostgreSQL - + _If the database you are using is not on the list, feel free to file for a [feature request](https://github.com/artie-labs/transfer/issues/new)._ @@ -101,4 +101,4 @@ Artie Transfer is released through [GoReleaser](https://goreleaser.com/), and we ## License -Artie Transfer is licensed under ELv2. Please see the [LICENSE](https://github.com/artie-labs/transfer/blob/master/LICENSE.txt) file for additional information. If you have any licensing questions please email hi@artie.so. +Artie Transfer is licensed under ELv2. Please see the [LICENSE](https://github.com/artie-labs/transfer/blob/master/LICENSE.txt) file for additional information. If you have any licensing questions please email hi@artie.com. From 2539f8591e109bb4f29f756b8f261f783a4d4b39 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 12 Jun 2024 16:33:49 -0700 Subject: [PATCH 03/24] Removing certain Datadog tags (#718) --- lib/artie/message.go | 24 ++++++++++++------------ processes/consumer/process.go | 1 - 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/lib/artie/message.go b/lib/artie/message.go index ccecffb32..ef8f3716f 100644 --- a/lib/artie/message.go +++ b/lib/artie/message.go @@ -74,22 +74,22 @@ func (m *Message) EmitRowLag(metricsClient base.Client, mode config.Mode, groupI return } - metricsClient.GaugeWithSample("row.lag", float64(m.KafkaMsg.HighWaterMark-m.KafkaMsg.Offset), map[string]string{ - "mode": mode.String(), - "groupID": groupID, - "topic": m.Topic(), - "table": table, - "partition": m.Partition(), - }, 0.5) + metricsClient.GaugeWithSample( + "row.lag", + float64(m.KafkaMsg.HighWaterMark-m.KafkaMsg.Offset), + map[string]string{ + "mode": mode.String(), + "groupID": groupID, + "table": table, + }, + 0.5) } func (m *Message) EmitIngestionLag(metricsClient base.Client, mode config.Mode, groupID, table string) { metricsClient.Timing("ingestion.lag", time.Since(m.PublishTime()), map[string]string{ - "mode": mode.String(), - "groupID": groupID, - "topic": m.Topic(), - "table": table, - "partition": m.Partition(), + "mode": mode.String(), + "groupID": groupID, + "table": table, }) } diff --git a/processes/consumer/process.go b/processes/consumer/process.go index 8e30a303b..2e37945d6 100644 --- a/processes/consumer/process.go +++ b/processes/consumer/process.go @@ -27,7 +27,6 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo tags := map[string]string{ "mode": cfg.Mode.String(), "groupID": p.GroupID, - "topic": p.Msg.Topic(), "what": "success", } From 35e78694f9cc40acc636050925e64dc88a61a3f9 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 13 Jun 2024 10:48:12 -0700 Subject: [PATCH 04/24] Log this as debug. (#721) --- processes/consumer/flush.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processes/consumer/flush.go b/processes/consumer/flush.go index 4f8b2a921..8597d3fe3 100644 --- a/processes/consumer/flush.go +++ b/processes/consumer/flush.go @@ -61,7 +61,7 @@ func Flush(ctx context.Context, inMemDB *models.DatabaseData, dest destination.B } if args.CoolDown != nil && _tableData.ShouldSkipFlush(*args.CoolDown) { - slog.With(logFields...).Info("Skipping flush because we are currently in a flush cooldown") + slog.With(logFields...).Debug("Skipping flush because we are currently in a flush cooldown") return } From 585c9d6f6bef9e8c3c24f10822d8b67455187353 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Thu, 13 Jun 2024 15:11:49 -0700 Subject: [PATCH 05/24] [bigquery] Add scaffolding for the Storage Write API (#720) --- clients/bigquery/bigquery.go | 10 ++++++- clients/bigquery/cast.go | 52 +++++++++++++++++++++-------------- clients/bigquery/cast_test.go | 28 +++++++++++++++++++ go.mod | 2 +- go.sum | 4 +-- lib/config/bigquery.go | 11 ++++---- 6 files changed, 78 insertions(+), 29 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index bce9ff214..207d74719 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -68,7 +68,11 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo } // Load the data - return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData) + if s.config.BigQuery.UseStorageWriteAPI { + return s.putTableViaStorageWriteAPI(context.Background(), bqTempTableID, tableData) + } else { + return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData) + } } func buildLegacyRows(tableData *optimization.TableData, additionalDateFmts []string) ([]*Row, error) { @@ -158,6 +162,10 @@ func (s *Store) putTableViaLegacyAPI(ctx context.Context, tableID TableIdentifie return nil } +func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableIdentifier, tableData *optimization.TableData) error { + panic("not implemented") +} + func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error { stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) diff --git a/clients/bigquery/cast.go b/clients/bigquery/cast.go index 7714f0876..b81ee1990 100644 --- a/clients/bigquery/cast.go +++ b/clients/bigquery/cast.go @@ -65,28 +65,14 @@ func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string) return extTime.String(dialect.BQStreamingTimeFormat), nil } case typing.Struct.Kind: - // TODO: See if we can improve this eval and find a better location, see: https://github.com/artie-labs/transfer/pull/697#discussion_r1609280164 - if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) { - return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil - } - - // Structs from relational and Mongo are different. - // MongoDB will return the native objects back such as `map[string]any{"hello": "world"}` - // Relational will return a string representation of the struct such as `{"hello": "world"}` - if colValString, isOk := colVal.(string); isOk { - if colValString == "" { - return nil, nil - } - - return colValString, nil - } - - colValBytes, err := json.Marshal(colVal) + stringValue, err := EncodeStructToJSONString(colVal) if err != nil { - return nil, fmt.Errorf("failed to marshal colVal: %w", err) + return nil, err + } else if stringValue == "" { + return nil, nil + } else { + return stringValue, nil } - - return string(colValBytes), nil case typing.Array.Kind: arrayString, err := array.InterfaceToArrayString(colVal, true) if err != nil { @@ -104,3 +90,29 @@ func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string) slog.Error("Unexpected BigQuery Data Type", slog.Any("colKind", colKind.KindDetails.Kind), slog.Any("colVal", colVal)) return fmt.Sprint(colVal), nil } + +// EncodeStructToJSONString takes a struct as either a string or Go object and encodes it into a JSON string. +// Structs from relational and Mongo are different. +// MongoDB will return the native objects back such as `map[string]any{"hello": "world"}` +// Relational will return a string representation of the struct such as `{"hello": "world"}` +func EncodeStructToJSONString(value any) (string, error) { + if stringValue, isOk := value.(string); isOk { + if strings.Contains(stringValue, constants.ToastUnavailableValuePlaceholder) { + return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil + } + return stringValue, nil + } + + bytes, err := json.Marshal(value) + if err != nil { + return "", fmt.Errorf("failed to marshal value: %w", err) + } + + stringValue := string(bytes) + if strings.Contains(stringValue, constants.ToastUnavailableValuePlaceholder) { + // TODO: Remove this if we don't see it in the logs. + slog.Error("encoded JSON value contains the toast unavailable value placeholder") + return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil + } + return stringValue, nil +} diff --git a/clients/bigquery/cast_test.go b/clients/bigquery/cast_test.go index def9f639f..68d6222f2 100644 --- a/clients/bigquery/cast_test.go +++ b/clients/bigquery/cast_test.go @@ -3,6 +3,7 @@ package bigquery import ( "fmt" "math/big" + "testing" "time" "github.com/artie-labs/transfer/lib/ptr" @@ -160,3 +161,30 @@ func (b *BigQueryTestSuite) TestCastColVal() { assert.Nil(b.T(), colVal) } } + +func TestEncodeStructToJSONString(t *testing.T) { + { + // Empty string: + result, err := EncodeStructToJSONString("") + assert.NoError(t, err) + assert.Equal(t, "", result) + } + { + // Toasted string: + result, err := EncodeStructToJSONString("__debezium_unavailable_value") + assert.NoError(t, err) + assert.Equal(t, `{"key":"__debezium_unavailable_value"}`, result) + } + { + // Map: + result, err := EncodeStructToJSONString(map[string]any{"foo": "bar", "baz": 1234}) + assert.NoError(t, err) + assert.Equal(t, `{"baz":1234,"foo":"bar"}`, result) + } + { + // Toasted map (should not happen): + result, err := EncodeStructToJSONString(map[string]any{"__debezium_unavailable_value": "bar", "baz": 1234}) + assert.NoError(t, err) + assert.Equal(t, `{"key":"__debezium_unavailable_value"}`, result) + } +} diff --git a/go.mod b/go.mod index 16b172e5a..54da614d0 100644 --- a/go.mod +++ b/go.mod @@ -140,6 +140,6 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect google.golang.org/grpc v1.63.2 // indirect - google.golang.org/protobuf v1.33.0 // indirect + google.golang.org/protobuf v1.34.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 3eef7ef73..1de7b64d5 100644 --- a/go.sum +++ b/go.sum @@ -810,8 +810,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/lib/config/bigquery.go b/lib/config/bigquery.go index 6e8250daa..c1d5c123f 100644 --- a/lib/config/bigquery.go +++ b/lib/config/bigquery.go @@ -5,11 +5,12 @@ import "fmt" type BigQuery struct { // PathToCredentials is _optional_ if you have GOOGLE_APPLICATION_CREDENTIALS set as an env var // Links to credentials: https://cloud.google.com/docs/authentication/application-default-credentials#GAC - PathToCredentials string `yaml:"pathToCredentials"` - DefaultDataset string `yaml:"defaultDataset"` - ProjectID string `yaml:"projectID"` - Location string `yaml:"location"` - BatchSize int `yaml:"batchSize"` + PathToCredentials string `yaml:"pathToCredentials"` + DefaultDataset string `yaml:"defaultDataset"` + ProjectID string `yaml:"projectID"` + Location string `yaml:"location"` + BatchSize int `yaml:"batchSize"` + UseStorageWriteAPI bool `yaml:"__useStorageWriteAPI"` // Not officially supported yet. } func (b *BigQuery) LoadDefaultValues() { From 25f185fd7338cc35ed6ac1a1d6dbd2ad338dfcbc Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Thu, 13 Jun 2024 18:18:37 -0700 Subject: [PATCH 06/24] [bigquery] Make `bigquery/batch` generic (#722) --- clients/bigquery/batch.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/clients/bigquery/batch.go b/clients/bigquery/batch.go index 1f151b87d..14967167e 100644 --- a/clients/bigquery/batch.go +++ b/clients/bigquery/batch.go @@ -1,23 +1,23 @@ package bigquery -type Batch struct { - rows []*Row +type Batch[T any] struct { + rows []T chunkSize int iteratorIdx int } -func NewBatch(rows []*Row, chunkSize int) *Batch { - return &Batch{ +func NewBatch[T any](rows []T, chunkSize int) *Batch[T] { + return &Batch[T]{ rows: rows, chunkSize: chunkSize, } } -func (b *Batch) HasNext() bool { +func (b *Batch[T]) HasNext() bool { return len(b.rows) > b.iteratorIdx } -func (b *Batch) NextChunk() []*Row { +func (b *Batch[T]) NextChunk() []T { start := b.iteratorIdx b.iteratorIdx += b.chunkSize end := b.iteratorIdx From 668c5d61ccb7465c056c53821ab814bd5c0dc166 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Thu, 13 Jun 2024 18:45:55 -0700 Subject: [PATCH 07/24] [bigquery] Implement Storage Write API (#715) --- clients/bigquery/bigquery.go | 62 +++++++- clients/bigquery/storagewrite.go | 202 ++++++++++++++++++++++++++ clients/bigquery/storagewrite_test.go | 164 +++++++++++++++++++++ go.mod | 2 +- 4 files changed, 428 insertions(+), 2 deletions(-) create mode 100644 clients/bigquery/storagewrite.go create mode 100644 clients/bigquery/storagewrite_test.go diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 207d74719..c2863e374 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -8,7 +8,10 @@ import ( "strings" "cloud.google.com/go/bigquery" + "cloud.google.com/go/bigquery/storage/managedwriter" + "cloud.google.com/go/bigquery/storage/managedwriter/adapt" _ "github.com/viant/bigquery" + "google.golang.org/protobuf/proto" "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/clients/shared" @@ -163,7 +166,64 @@ func (s *Store) putTableViaLegacyAPI(ctx context.Context, tableID TableIdentifie } func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableIdentifier, tableData *optimization.TableData) error { - panic("not implemented") + columns := tableData.ReadOnlyInMemoryCols().ValidColumns() + + messageDescriptor, err := columnsToMessageDescriptor(columns) + if err != nil { + return err + } + schemaDescriptor, err := adapt.NormalizeDescriptor(*messageDescriptor) + if err != nil { + return err + } + + managedWriterClient, err := managedwriter.NewClient(ctx, bqTableID.ProjectID()) + if err != nil { + return fmt.Errorf("failed to create managedwriter client: %w", err) + } + defer managedWriterClient.Close() + + managedStream, err := managedWriterClient.NewManagedStream(ctx, + managedwriter.WithDestinationTable( + managedwriter.TableParentFromParts(bqTableID.ProjectID(), bqTableID.Dataset(), bqTableID.Table()), + ), + managedwriter.WithType(managedwriter.DefaultStream), + managedwriter.WithSchemaDescriptor(schemaDescriptor), + managedwriter.EnableWriteRetries(true), + ) + if err != nil { + return fmt.Errorf("failed to create managed stream: %w", err) + } + defer managedStream.Close() + + batch := NewBatch(tableData.Rows(), s.batchSize) + for batch.HasNext() { + chunk := batch.NextChunk() + encoded := make([][]byte, len(chunk)) + for i, row := range chunk { + message, err := rowToMessage(row, columns, *messageDescriptor, s.AdditionalDateFormats()) + if err != nil { + return fmt.Errorf("failed to convert row to message: %w", err) + } + + bytes, err := proto.Marshal(message) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + encoded[i] = bytes + } + + result, err := managedStream.AppendRows(ctx, encoded) + if err != nil { + return fmt.Errorf("failed to append rows: %w", err) + } + + if resp, err := result.FullResponse(ctx); err != nil { + return fmt.Errorf("failed to get response (%s): %w", resp.GetError().String(), err) + } + } + + return nil } func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error { diff --git a/clients/bigquery/storagewrite.go b/clients/bigquery/storagewrite.go new file mode 100644 index 000000000..2aafa9adb --- /dev/null +++ b/clients/bigquery/storagewrite.go @@ -0,0 +1,202 @@ +package bigquery + +import ( + "fmt" + "time" + + "cloud.google.com/go/bigquery/storage/apiv1/storagepb" + "cloud.google.com/go/bigquery/storage/managedwriter/adapt" + "github.com/artie-labs/transfer/lib/array" + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" + "github.com/artie-labs/transfer/lib/typing/decimal" + "github.com/artie-labs/transfer/lib/typing/ext" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/types/dynamicpb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// columnToTableFieldSchema returns a [*storagepb.TableFieldSchema] suitable for transfering data of the type that the column specifies. +// Not that the data type is not necessarily the data type that the table in the database is using. +func columnToTableFieldSchema(column columns.Column) (*storagepb.TableFieldSchema, error) { + var fieldType storagepb.TableFieldSchema_Type + mode := storagepb.TableFieldSchema_NULLABLE + + switch column.KindDetails.Kind { + case typing.Boolean.Kind: + fieldType = storagepb.TableFieldSchema_BOOL + case typing.Integer.Kind: + fieldType = storagepb.TableFieldSchema_INT64 + case typing.Float.Kind: + fieldType = storagepb.TableFieldSchema_DOUBLE + case typing.EDecimal.Kind: + fieldType = storagepb.TableFieldSchema_STRING + case typing.String.Kind: + fieldType = storagepb.TableFieldSchema_STRING + case typing.ETime.Kind: + switch column.KindDetails.ExtendedTimeDetails.Type { + case ext.TimeKindType: + fieldType = storagepb.TableFieldSchema_TIME + case ext.DateKindType: + fieldType = storagepb.TableFieldSchema_DATE + case ext.DateTimeKindType: + fieldType = storagepb.TableFieldSchema_TIMESTAMP + default: + return nil, fmt.Errorf("unsupported extended time details type: %q", column.KindDetails.ExtendedTimeDetails.Type) + } + case typing.Struct.Kind: + fieldType = storagepb.TableFieldSchema_STRING + case typing.Array.Kind: + fieldType = storagepb.TableFieldSchema_STRING + mode = storagepb.TableFieldSchema_REPEATED + default: + return nil, fmt.Errorf("unsupported column kind: %q", column.KindDetails.Kind) + } + + return &storagepb.TableFieldSchema{ + Name: column.Name(), + Type: fieldType, + Mode: mode, + }, nil +} + +func columnsToMessageDescriptor(cols []columns.Column) (*protoreflect.MessageDescriptor, error) { + fields := make([]*storagepb.TableFieldSchema, len(cols)) + for i, col := range cols { + field, err := columnToTableFieldSchema(col) + if err != nil { + return nil, err + } + fields[i] = field + } + tableSchema := storagepb.TableSchema{Fields: fields} + + descriptor, err := adapt.StorageSchemaToProto2Descriptor(&tableSchema, "root") + if err != nil { + return nil, fmt.Errorf("failed to build proto descriptor: %w", err) + } + messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor) + if !ok { + return nil, fmt.Errorf("proto descriptor is not a message descriptor") + } + return &messageDescriptor, nil +} + +// This is a reimplementation of https://github.com/googleapis/java-bigquerystorage/blob/f79acb5cfdd12253bca1c41551c478400120d2f9/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/CivilTimeEncoder.java#L143 +// See https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1.CivilTimeEncoder +// And https://cloud.google.com/pubsub/docs/bigquery#date_time_int +func encodePacked64TimeMicros(value time.Time) int64 { + var result = int64(value.Nanosecond() / 1000) + result |= int64(value.Second()) << 20 + result |= int64(value.Minute()) << 26 + result |= int64(value.Hour()) << 32 + return result +} + +func rowToMessage(row map[string]any, columns []columns.Column, messageDescriptor protoreflect.MessageDescriptor, additionalDateFmts []string) (*dynamicpb.Message, error) { + message := dynamicpb.NewMessage(messageDescriptor) + + for _, column := range columns { + field := message.Descriptor().Fields().ByTextName(column.Name()) + if field == nil { + return nil, fmt.Errorf("failed to find a field named %q", column.Name()) + } + + value := row[column.Name()] + + if value == nil { + continue + } + + switch column.KindDetails.Kind { + case typing.Boolean.Kind: + if boolValue, ok := value.(bool); ok { + message.Set(field, protoreflect.ValueOfBool(boolValue)) + } else { + return nil, fmt.Errorf("expected bool received %T with value %v", value, value) + } + case typing.Integer.Kind: + switch value := value.(type) { + case int: + message.Set(field, protoreflect.ValueOfInt64(int64(value))) + case int32: + message.Set(field, protoreflect.ValueOfInt64(int64(value))) + case int64: + message.Set(field, protoreflect.ValueOfInt64(value)) + default: + return nil, fmt.Errorf("expected int/int32/int64 received %T with value %v", value, value) + } + case typing.Float.Kind: + switch value := value.(type) { + case float32: + message.Set(field, protoreflect.ValueOfFloat64(float64(value))) + case float64: + message.Set(field, protoreflect.ValueOfFloat64(value)) + default: + return nil, fmt.Errorf("expected float32/float64 recieved %T with value %v", value, value) + } + case typing.EDecimal.Kind: + if decimalValue, ok := value.(*decimal.Decimal); ok { + message.Set(field, protoreflect.ValueOfString(decimalValue.String())) + } else { + return nil, fmt.Errorf("expected *decimal.Decimal received %T with value %v", decimalValue, decimalValue) + } + case typing.String.Kind: + var stringValue string + switch value := value.(type) { + case string: + stringValue = value + case *decimal.Decimal: + stringValue = value.String() + default: + return nil, fmt.Errorf("expected string/decimal.Decimal received %T with value %v", value, value) + } + message.Set(field, protoreflect.ValueOfString(stringValue)) + case typing.ETime.Kind: + extTime, err := ext.ParseFromInterface(value, additionalDateFmts) + if err != nil { + return nil, fmt.Errorf("failed to cast value as time.Time, value: %v, err: %w", value, err) + } + + if column.KindDetails.ExtendedTimeDetails == nil { + return nil, fmt.Errorf("extended time details for column kind details is nil") + } + + switch column.KindDetails.ExtendedTimeDetails.Type { + case ext.TimeKindType: + message.Set(field, protoreflect.ValueOfInt64(encodePacked64TimeMicros(extTime.Time))) + case ext.DateKindType: + daysSinceEpoch := extTime.Unix() / (60 * 60 * 24) + message.Set(field, protoreflect.ValueOfInt32(int32(daysSinceEpoch))) + case ext.DateTimeKindType: + if err := timestamppb.New(extTime.Time).CheckValid(); err != nil { + return nil, err + } + message.Set(field, protoreflect.ValueOfInt64(extTime.UnixMicro())) + default: + return nil, fmt.Errorf("unsupported extended time details: %q", column.KindDetails.ExtendedTimeDetails.Type) + } + case typing.Struct.Kind: + stringValue, err := EncodeStructToJSONString(value) + if err != nil { + return nil, err + } else if stringValue == "" { + continue + } else { + message.Set(field, protoreflect.ValueOfString(stringValue)) + } + case typing.Array.Kind: + values, err := array.InterfaceToArrayString(value, true) + if err != nil { + return nil, err + } + list := message.Mutable(field).List() + for _, value := range values { + list.Append(protoreflect.ValueOfString(value)) + } + default: + return nil, fmt.Errorf("unsupported column kind: %q", column.KindDetails.Kind) + } + } + return message, nil +} diff --git a/clients/bigquery/storagewrite_test.go b/clients/bigquery/storagewrite_test.go new file mode 100644 index 000000000..53c83a268 --- /dev/null +++ b/clients/bigquery/storagewrite_test.go @@ -0,0 +1,164 @@ +package bigquery + +import ( + "encoding/json" + "math/big" + "testing" + "time" + + "cloud.google.com/go/bigquery/storage/apiv1/storagepb" + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" + "github.com/artie-labs/transfer/lib/typing/decimal" + "github.com/artie-labs/transfer/lib/typing/ext" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/encoding/protojson" +) + +func TestColumnToTableFieldSchema(t *testing.T) { + { + // Boolean: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.Boolean)) + assert.NoError(t, err) + assert.Equal(t, "foo", fieldSchema.Name) + assert.Equal(t, storagepb.TableFieldSchema_NULLABLE, fieldSchema.Mode) + assert.Equal(t, storagepb.TableFieldSchema_BOOL, fieldSchema.Type) + } + { + // Integer: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.Integer)) + assert.NoError(t, err) + assert.Equal(t, storagepb.TableFieldSchema_INT64, fieldSchema.Type) + } + { + // Float: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.Float)) + assert.NoError(t, err) + assert.Equal(t, storagepb.TableFieldSchema_DOUBLE, fieldSchema.Type) + } + { + // EDecimal: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.EDecimal)) + assert.NoError(t, err) + assert.Equal(t, storagepb.TableFieldSchema_STRING, fieldSchema.Type) + } + { + // ETime - Time: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType))) + assert.NoError(t, err) + assert.Equal(t, storagepb.TableFieldSchema_TIME, fieldSchema.Type) + } + { + // ETime - Date: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateKindType))) + assert.NoError(t, err) + assert.Equal(t, storagepb.TableFieldSchema_DATE, fieldSchema.Type) + } + { + // ETime - DateTime: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType))) + assert.NoError(t, err) + assert.Equal(t, storagepb.TableFieldSchema_TIMESTAMP, fieldSchema.Type) + } + { + // ETime - Invalid: + _, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.NewKindDetailsFromTemplate(typing.ETime, ""))) + assert.ErrorContains(t, err, "unsupported extended time details type:") + } + { + // Struct: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.Struct)) + assert.NoError(t, err) + assert.Equal(t, storagepb.TableFieldSchema_STRING, fieldSchema.Type) + } + { + // Array: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.Array)) + assert.NoError(t, err) + assert.Equal(t, storagepb.TableFieldSchema_STRING, fieldSchema.Type) + assert.Equal(t, storagepb.TableFieldSchema_REPEATED, fieldSchema.Mode) + } + { + // Invalid: + _, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.KindDetails{})) + assert.ErrorContains(t, err, "unsupported column kind: ") + } +} + +func TestEncodePacked64TimeMicros(t *testing.T) { + epoch := time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC) + + assert.Equal(t, int64(0), encodePacked64TimeMicros(epoch)) + assert.Equal(t, int64(1), encodePacked64TimeMicros(epoch.Add(time.Duration(1)*time.Microsecond))) + assert.Equal(t, int64(1000), encodePacked64TimeMicros(epoch.Add(time.Duration(1)*time.Millisecond))) + assert.Equal(t, int64(1<<20), encodePacked64TimeMicros(epoch.Add(time.Duration(1)*time.Second))) + assert.Equal(t, int64(1<<26), encodePacked64TimeMicros(epoch.Add(time.Duration(1)*time.Minute))) + assert.Equal(t, int64(1<<32), encodePacked64TimeMicros(epoch.Add(time.Duration(1)*time.Hour))) + assert.Equal(t, int64(1<<32+1), encodePacked64TimeMicros(epoch.Add(time.Duration(1)*time.Hour+time.Duration(1)*time.Microsecond))) + assert.Equal(t, int64(1<<32+1000), encodePacked64TimeMicros(epoch.Add(time.Duration(1)*time.Hour+time.Duration(1)*time.Millisecond))) +} + +func TestRowToMessage(t *testing.T) { + columns := []columns.Column{ + columns.NewColumn("c_bool", typing.Boolean), + columns.NewColumn("c_int", typing.Integer), + columns.NewColumn("c_int32", typing.Integer), + columns.NewColumn("c_int64", typing.Integer), + columns.NewColumn("c_float32", typing.Float), + columns.NewColumn("c_float64", typing.Float), + columns.NewColumn("c_numeric", typing.EDecimal), + columns.NewColumn("c_string", typing.String), + columns.NewColumn("c_string_decimal", typing.String), + columns.NewColumn("c_time", typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType)), + columns.NewColumn("c_date", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateKindType)), + columns.NewColumn("c_datetime", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType)), + columns.NewColumn("c_struct", typing.Struct), + columns.NewColumn("c_array", typing.Array), + } + + row := map[string]any{ + "c_bool": true, + "c_int": int(1234), + "c_int32": int32(1234), + "c_int64": int64(1234), + "c_float32": float32(1234.567), + "c_float64": float64(1234.567), + "c_numeric": decimal.NewDecimal(nil, 5, big.NewFloat(3.1415926)), + "c_string": "foo bar", + "c_string_decimal": decimal.NewDecimal(nil, 5, big.NewFloat(1.618033)), + "c_time": ext.NewExtendedTime(time.Date(0, 0, 0, 4, 5, 6, 7, time.UTC), ext.TimeKindType, ""), + "c_date": ext.NewExtendedTime(time.Date(2001, 2, 3, 0, 0, 0, 0, time.UTC), ext.DateKindType, ""), + "c_datetime": ext.NewExtendedTime(time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC), ext.DateTimeKindType, ""), + "c_struct": map[string]any{"baz": []string{"foo", "bar"}}, + "c_array": []string{"foo", "bar"}, + } + + desc, err := columnsToMessageDescriptor(columns) + assert.NoError(t, err) + + message, err := rowToMessage(row, columns, *desc, []string{}) + assert.NoError(t, err) + + bytes, err := protojson.Marshal(message) + assert.NoError(t, err) + + var result map[string]any + assert.NoError(t, json.Unmarshal(bytes, &result)) + + assert.Equal(t, map[string]any{ + "cBool": true, + "cFloat32": 1234.5670166015625, + "cFloat64": 1234.567, + "cInt": "1234", + "cInt32": "1234", + "cInt64": "1234", + "cNumeric": "3.14159", + "cString": "foo bar", + "cStringDecimal": "1.61803", + "cTime": "17521704960", + "cDate": float64(11356), + "cDatetime": "981173106000000", + "cStruct": `{"baz":["foo","bar"]}`, + "cArray": []any{"foo", "bar"}, + }, result) +} diff --git a/go.mod b/go.mod index 54da614d0..cc6f04226 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0 go.mongodb.org/mongo-driver v1.15.0 google.golang.org/api v0.175.0 + google.golang.org/protobuf v1.34.1 gopkg.in/yaml.v3 v3.0.1 ) @@ -140,6 +141,5 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect google.golang.org/grpc v1.63.2 // indirect - google.golang.org/protobuf v1.34.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) From 355beb56f91b2c6bd8cc49d9fce1826e85f72144 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Fri, 14 Jun 2024 12:29:41 -0700 Subject: [PATCH 08/24] [bigquery] Support additional types for float columns (#723) --- clients/bigquery/storagewrite.go | 13 ++++++++++++- clients/bigquery/storagewrite_test.go | 9 +++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/clients/bigquery/storagewrite.go b/clients/bigquery/storagewrite.go index 2aafa9adb..3cd550bde 100644 --- a/clients/bigquery/storagewrite.go +++ b/clients/bigquery/storagewrite.go @@ -2,6 +2,7 @@ package bigquery import ( "fmt" + "strconv" "time" "cloud.google.com/go/bigquery/storage/apiv1/storagepb" @@ -132,8 +133,18 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto message.Set(field, protoreflect.ValueOfFloat64(float64(value))) case float64: message.Set(field, protoreflect.ValueOfFloat64(value)) + case int32: + message.Set(field, protoreflect.ValueOfFloat64(float64(value))) + case int64: + message.Set(field, protoreflect.ValueOfFloat64(float64(value))) + case string: + floatValue, err := strconv.ParseFloat(value, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse string to float64: %w", err) + } + message.Set(field, protoreflect.ValueOfFloat64(floatValue)) default: - return nil, fmt.Errorf("expected float32/float64 recieved %T with value %v", value, value) + return nil, fmt.Errorf("expected float32/float64/int32/int64/string recieved %T with value %v", value, value) } case typing.EDecimal.Kind: if decimalValue, ok := value.(*decimal.Decimal); ok { diff --git a/clients/bigquery/storagewrite_test.go b/clients/bigquery/storagewrite_test.go index 53c83a268..ceaf26168 100644 --- a/clients/bigquery/storagewrite_test.go +++ b/clients/bigquery/storagewrite_test.go @@ -106,6 +106,9 @@ func TestRowToMessage(t *testing.T) { columns.NewColumn("c_int64", typing.Integer), columns.NewColumn("c_float32", typing.Float), columns.NewColumn("c_float64", typing.Float), + columns.NewColumn("c_float_int32", typing.Float), + columns.NewColumn("c_float_int64", typing.Float), + columns.NewColumn("c_float_string", typing.Float), columns.NewColumn("c_numeric", typing.EDecimal), columns.NewColumn("c_string", typing.String), columns.NewColumn("c_string_decimal", typing.String), @@ -123,6 +126,9 @@ func TestRowToMessage(t *testing.T) { "c_int64": int64(1234), "c_float32": float32(1234.567), "c_float64": float64(1234.567), + "c_float_int32": int32(1234), + "c_float_int64": int64(1234), + "c_float_string": "4444.55555", "c_numeric": decimal.NewDecimal(nil, 5, big.NewFloat(3.1415926)), "c_string": "foo bar", "c_string_decimal": decimal.NewDecimal(nil, 5, big.NewFloat(1.618033)), @@ -149,6 +155,9 @@ func TestRowToMessage(t *testing.T) { "cBool": true, "cFloat32": 1234.5670166015625, "cFloat64": 1234.567, + "cFloatInt32": 1234.0, + "cFloatInt64": 1234.0, + "cFloatString": 4444.55555, "cInt": "1234", "cInt32": "1234", "cInt64": "1234", From a320314ed3da86b37e4d98a3f0505fb22993a884 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 17 Jun 2024 10:53:08 -0700 Subject: [PATCH 09/24] [BigQuery] Use StorageWrite API + Dedupe (#726) Co-authored-by: Nathan <148575555+nathan-artie@users.noreply.github.com> --- clients/bigquery/bigquery.go | 3 ++- clients/bigquery/dialect/dialect.go | 11 ++++++++++- clients/bigquery/merge.go | 2 ++ clients/mssql/dialect/dialect.go | 4 ++++ clients/redshift/dialect/dialect.go | 4 ++++ clients/shared/merge.go | 5 ++--- clients/snowflake/dialect/dialect.go | 4 ++++ lib/config/bigquery.go | 11 +++++------ lib/sql/dialect.go | 1 + 9 files changed, 34 insertions(+), 11 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index c2863e374..78ea841e6 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -34,6 +34,7 @@ const ( describeNameCol = "column_name" describeTypeCol = "data_type" describeCommentCol = "description" + useStorageWriteAPI = true ) type Store struct { @@ -71,7 +72,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo } // Load the data - if s.config.BigQuery.UseStorageWriteAPI { + if useStorageWriteAPI { return s.putTableViaStorageWriteAPI(context.Background(), bqTempTableID, tableData) } else { return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData) diff --git a/clients/bigquery/dialect/dialect.go b/clients/bigquery/dialect/dialect.go index d4341ce7d..bc4c93d3c 100644 --- a/clients/bigquery/dialect/dialect.go +++ b/clients/bigquery/dialect/dialect.go @@ -123,7 +123,7 @@ func (BigQueryDialect) IsColumnAlreadyExistsErr(err error) bool { return strings.Contains(err.Error(), "Column already exists") } -func (BigQueryDialect) IsTableDoesNotExistErr(err error) bool { +func (BigQueryDialect) IsTableDoesNotExistErr(_ error) bool { return false } @@ -154,6 +154,15 @@ func (bd BigQueryDialect) BuildIsNotToastValueExpression(tableAlias constants.Ta return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder) } +func (bd BigQueryDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string { + primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, bd) + return fmt.Sprintf(`(SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 1)`, + tableID.FullyQualifiedName(), + strings.Join(primaryKeysEscaped, ", "), + strings.Join(primaryKeysEscaped, ", "), + ) +} + func (bd BigQueryDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, bd) diff --git a/clients/bigquery/merge.go b/clients/bigquery/merge.go index 213cddcbd..bf807e87b 100644 --- a/clients/bigquery/merge.go +++ b/clients/bigquery/merge.go @@ -51,6 +51,8 @@ func (s *Store) Merge(tableData *optimization.TableData) error { AdditionalEqualityStrings: additionalEqualityStrings, // BigQuery has DDL quotas. RetryColBackfill: true, + // We are using BigQuery's streaming API which doesn't guarantee exactly once semantics + SubQueryDedupe: true, }) } diff --git a/clients/mssql/dialect/dialect.go b/clients/mssql/dialect/dialect.go index aa378cdfd..083880539 100644 --- a/clients/mssql/dialect/dialect.go +++ b/clients/mssql/dialect/dialect.go @@ -169,6 +169,10 @@ func (md MSSQLDialect) BuildIsNotToastValueExpression(tableAlias constants.Table return fmt.Sprintf("COALESCE(%s, '') != '%s'", colName, constants.ToastUnavailableValuePlaceholder) } +func (MSSQLDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string { + panic("not implemented") +} + func (MSSQLDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { panic("not implemented") // We don't currently support deduping for MS SQL. } diff --git a/clients/redshift/dialect/dialect.go b/clients/redshift/dialect/dialect.go index 452cd566c..757b90410 100644 --- a/clients/redshift/dialect/dialect.go +++ b/clients/redshift/dialect/dialect.go @@ -135,6 +135,10 @@ func (rd RedshiftDialect) BuildIsNotToastValueExpression(tableAlias constants.Ta return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder) } +func (rd RedshiftDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, _ []string) string { + return fmt.Sprintf(`( SELECT DISTINCT * FROM %s )`, tableID.FullyQualifiedName()) +} + func (rd RedshiftDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, rd) diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 0b30a4e5a..05909463d 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -111,10 +111,9 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, opt } } - temporaryTableName := temporaryTableID.FullyQualifiedName() - subQuery := temporaryTableName + subQuery := temporaryTableID.FullyQualifiedName() if opts.SubQueryDedupe { - subQuery = fmt.Sprintf(`( SELECT DISTINCT * FROM %s )`, temporaryTableName) + subQuery = dwh.Dialect().BuildDedupeTableQuery(temporaryTableID, tableData.PrimaryKeys()) } if subQuery == "" { diff --git a/clients/snowflake/dialect/dialect.go b/clients/snowflake/dialect/dialect.go index 756ebb907..57148a944 100644 --- a/clients/snowflake/dialect/dialect.go +++ b/clients/snowflake/dialect/dialect.go @@ -153,6 +153,10 @@ func (sd SnowflakeDialect) BuildIsNotToastValueExpression(tableAlias constants.T return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder) } +func (SnowflakeDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string { + panic("not implemented") +} + func (sd SnowflakeDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, sd) diff --git a/lib/config/bigquery.go b/lib/config/bigquery.go index c1d5c123f..6e8250daa 100644 --- a/lib/config/bigquery.go +++ b/lib/config/bigquery.go @@ -5,12 +5,11 @@ import "fmt" type BigQuery struct { // PathToCredentials is _optional_ if you have GOOGLE_APPLICATION_CREDENTIALS set as an env var // Links to credentials: https://cloud.google.com/docs/authentication/application-default-credentials#GAC - PathToCredentials string `yaml:"pathToCredentials"` - DefaultDataset string `yaml:"defaultDataset"` - ProjectID string `yaml:"projectID"` - Location string `yaml:"location"` - BatchSize int `yaml:"batchSize"` - UseStorageWriteAPI bool `yaml:"__useStorageWriteAPI"` // Not officially supported yet. + PathToCredentials string `yaml:"pathToCredentials"` + DefaultDataset string `yaml:"defaultDataset"` + ProjectID string `yaml:"projectID"` + Location string `yaml:"location"` + BatchSize int `yaml:"batchSize"` } func (b *BigQuery) LoadDefaultValues() { diff --git a/lib/sql/dialect.go b/lib/sql/dialect.go index 7690ee505..4c5d257d6 100644 --- a/lib/sql/dialect.go +++ b/lib/sql/dialect.go @@ -24,6 +24,7 @@ type Dialect interface { BuildCreateTableQuery(tableID TableIdentifier, temporary bool, colSQLParts []string) string BuildAlterColumnQuery(tableID TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string + BuildDedupeTableQuery(tableID TableIdentifier, primaryKeys []string) string BuildDedupeQueries(tableID, stagingTableID TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string BuildMergeQueries( tableID TableIdentifier, From df9dd9bca879ce36444408ceb12790c07f5196b6 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 17 Jun 2024 11:10:52 -0700 Subject: [PATCH 10/24] [BigQuery] Removing Legacy API (#729) --- clients/bigquery/bigquery.go | 49 +---------- clients/bigquery/cast.go | 82 ------------------ clients/bigquery/cast_test.go | 156 ---------------------------------- 3 files changed, 1 insertion(+), 286 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 78ea841e6..80bda585e 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -34,7 +34,6 @@ const ( describeNameCol = "column_name" describeTypeCol = "data_type" describeCommentCol = "description" - useStorageWriteAPI = true ) type Store struct { @@ -72,33 +71,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo } // Load the data - if useStorageWriteAPI { - return s.putTableViaStorageWriteAPI(context.Background(), bqTempTableID, tableData) - } else { - return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData) - } -} - -func buildLegacyRows(tableData *optimization.TableData, additionalDateFmts []string) ([]*Row, error) { - // Cast the data into BigQuery values - var rows []*Row - columns := tableData.ReadOnlyInMemoryCols().ValidColumns() - for _, value := range tableData.Rows() { - data := make(map[string]bigquery.Value) - for _, col := range columns { - colVal, err := castColVal(value[col.Name()], col, additionalDateFmts) - if err != nil { - return nil, fmt.Errorf("failed to cast col %q: %w", col.Name(), err) - } - - if colVal != nil { - data[col.Name()] = colVal - } - } - - rows = append(rows, NewRow(data)) - } - return rows, nil + return s.putTableViaStorageWriteAPI(context.Background(), bqTempTableID, tableData) } func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql.TableIdentifier { @@ -146,26 +119,6 @@ func (s *Store) GetClient(ctx context.Context) *bigquery.Client { return client } -func (s *Store) putTableViaLegacyAPI(ctx context.Context, tableID TableIdentifier, tableData *optimization.TableData) error { - rows, err := buildLegacyRows(tableData, s.config.SharedTransferConfig.TypingSettings.AdditionalDateFormats) - if err != nil { - return err - } - - client := s.GetClient(ctx) - defer client.Close() - - batch := NewBatch(rows, s.batchSize) - inserter := client.Dataset(tableID.Dataset()).Table(tableID.Table()).Inserter() - for batch.HasNext() { - if err := inserter.Put(ctx, batch.NextChunk()); err != nil { - return fmt.Errorf("failed to insert rows: %w", err) - } - } - - return nil -} - func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableIdentifier, tableData *optimization.TableData) error { columns := tableData.ReadOnlyInMemoryCols().ValidColumns() diff --git a/clients/bigquery/cast.go b/clients/bigquery/cast.go index b81ee1990..ccc0cf236 100644 --- a/clients/bigquery/cast.go +++ b/clients/bigquery/cast.go @@ -6,91 +6,9 @@ import ( "log/slog" "strings" - "github.com/artie-labs/transfer/clients/bigquery/dialect" - "github.com/artie-labs/transfer/lib/array" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/artie-labs/transfer/lib/typing/decimal" - "github.com/artie-labs/transfer/lib/typing/ext" ) -func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string) (any, error) { - if colVal == nil { - return nil, nil - } - - switch colKind.KindDetails.Kind { - case typing.String.Kind: - if val, isOk := colVal.(*decimal.Decimal); isOk { - return val.String(), nil - } - - return colVal, nil - case typing.Float.Kind, typing.Integer.Kind, typing.Boolean.Kind: - return colVal, nil - case typing.EDecimal.Kind: - val, isOk := colVal.(*decimal.Decimal) - if !isOk { - return nil, fmt.Errorf("colVal is not type *decimal.Decimal") - } - - return val.Value(), nil - case typing.ETime.Kind: - extTime, err := ext.ParseFromInterface(colVal, additionalDateFmts) - if err != nil { - return nil, fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %w", colVal, err) - } - - if colKind.KindDetails.ExtendedTimeDetails == nil { - return nil, fmt.Errorf("column kind details for extended time details is null") - } - - // We should be using the colKind here since the data types coming from the source may be inconsistent. - switch colKind.KindDetails.ExtendedTimeDetails.Type { - // https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery#sending_datetime_data - case ext.DateTimeKindType: - if extTime.Year() == 0 { - return nil, nil - } - - return extTime.StringUTC(ext.BigQueryDateTimeFormat), nil - case ext.DateKindType: - if extTime.Year() == 0 { - return nil, nil - } - - return extTime.String(ext.PostgresDateFormat), nil - case ext.TimeKindType: - return extTime.String(dialect.BQStreamingTimeFormat), nil - } - case typing.Struct.Kind: - stringValue, err := EncodeStructToJSONString(colVal) - if err != nil { - return nil, err - } else if stringValue == "" { - return nil, nil - } else { - return stringValue, nil - } - case typing.Array.Kind: - arrayString, err := array.InterfaceToArrayString(colVal, true) - if err != nil { - return nil, err - } - - if len(arrayString) == 0 { - return nil, nil - } - - return arrayString, nil - } - - // TODO: Change this to return an error once we don't see Sentry - slog.Error("Unexpected BigQuery Data Type", slog.Any("colKind", colKind.KindDetails.Kind), slog.Any("colVal", colVal)) - return fmt.Sprint(colVal), nil -} - // EncodeStructToJSONString takes a struct as either a string or Go object and encodes it into a JSON string. // Structs from relational and Mongo are different. // MongoDB will return the native objects back such as `map[string]any{"hello": "world"}` diff --git a/clients/bigquery/cast_test.go b/clients/bigquery/cast_test.go index 68d6222f2..ffd2fcbb5 100644 --- a/clients/bigquery/cast_test.go +++ b/clients/bigquery/cast_test.go @@ -1,167 +1,11 @@ package bigquery import ( - "fmt" - "math/big" "testing" - "time" - - "github.com/artie-labs/transfer/lib/ptr" - "github.com/artie-labs/transfer/lib/typing/decimal" - - "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/artie-labs/transfer/lib/typing/ext" "github.com/stretchr/testify/assert" ) -func (b *BigQueryTestSuite) TestCastColVal() { - { - // Strings - colVal, err := castColVal("hello", columns.Column{KindDetails: typing.String}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "hello", colVal) - - // Decimal - dec := decimal.NewDecimal(ptr.ToInt(5), 2, big.NewFloat(123.45)) - colVal, err = castColVal(dec, columns.Column{KindDetails: typing.String}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "123.45", colVal) - } - { - // Integers - colVal, err := castColVal(5, columns.Column{KindDetails: typing.Integer}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), 5, colVal) - } - { - // Floats - colVal, err := castColVal(5.55, columns.Column{KindDetails: typing.Float}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), 5.55, colVal) - } - { - // Booleans - colVal, err := castColVal(true, columns.Column{KindDetails: typing.Boolean}, nil) - assert.NoError(b.T(), err) - assert.True(b.T(), colVal.(bool)) - } - { - // EDecimals - dec := decimal.NewDecimal(ptr.ToInt(5), 2, big.NewFloat(123.45)) - colVal, err := castColVal(dec, columns.Column{KindDetails: typing.EDecimal}, nil) - assert.NoError(b.T(), err) - - // Native type is big.Float if precision doesn't exceed the DWH limit - assert.Equal(b.T(), big.NewFloat(123.45), colVal) - - // Precision has clearly exceeded the limit, so we should be returning a string - dec = decimal.NewDecimal(ptr.ToInt(50), 2, big.NewFloat(123.45)) - colVal, err = castColVal(dec, columns.Column{KindDetails: typing.EDecimal}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "123.45", colVal) - } - { - // ETime - birthday := time.Date(2022, time.September, 6, 3, 19, 24, 942000000, time.UTC) - - tsKind := typing.ETime - tsKind.ExtendedTimeDetails = &ext.DateTime - - dateKind := typing.ETime - dateKind.ExtendedTimeDetails = &ext.Date - birthdayTSExt := ext.NewExtendedTime(birthday, tsKind.ExtendedTimeDetails.Type, "") - { - // Timestamp - colVal, err := castColVal(birthdayTSExt, columns.Column{KindDetails: tsKind}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "2022-09-06 03:19:24.942", colVal) - } - { - // Date - birthdayDateExt := ext.NewExtendedTime(birthday, dateKind.ExtendedTimeDetails.Type, "") - colVal, err := castColVal(birthdayDateExt, columns.Column{KindDetails: dateKind}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "2022-09-06", colVal) - } - - { - // Date (column is a date, but value is not) - colVal, err := castColVal(birthdayTSExt, columns.Column{KindDetails: dateKind}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "2022-09-06", colVal) - } - { - // Time - timeKind := typing.ETime - timeKind.ExtendedTimeDetails = &ext.Time - birthdayTimeExt := ext.NewExtendedTime(birthday, timeKind.ExtendedTimeDetails.Type, "") - colVal, err := castColVal(birthdayTimeExt, columns.Column{KindDetails: timeKind}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "03:19:24", colVal) - } - - invalidDate := time.Date(0, time.September, 6, 3, 19, 24, 942000000, time.UTC) - invalidDateTsExt := ext.NewExtendedTime(invalidDate, tsKind.ExtendedTimeDetails.Type, "") - { - // Date (column is a date, but value is invalid) - colVal, err := castColVal(invalidDateTsExt, columns.Column{KindDetails: dateKind}, nil) - assert.NoError(b.T(), err) - assert.Nil(b.T(), colVal) - } - { - // Datetime (column is datetime but value is invalid) - colVal, err := castColVal(invalidDateTsExt, columns.Column{KindDetails: tsKind}, nil) - assert.NoError(b.T(), err) - assert.Nil(b.T(), colVal) - } - } - { - // Structs - colVal, err := castColVal(map[string]any{"hello": "world"}, columns.Column{KindDetails: typing.Struct}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), `{"hello":"world"}`, colVal) - - // With string values - colVal, err = castColVal(`{"hello":"world"}`, columns.Column{KindDetails: typing.Struct}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), `{"hello":"world"}`, colVal) - - // With empty string - colVal, err = castColVal("", columns.Column{KindDetails: typing.Struct}, nil) - assert.NoError(b.T(), err) - assert.Nil(b.T(), colVal) - - // With array - colVal, err = castColVal([]any{map[string]any{}, map[string]any{"hello": "world"}}, columns.Column{KindDetails: typing.Struct}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), `[{},{"hello":"world"}]`, colVal) - - // With TOAST values - colVal, err = castColVal(constants.ToastUnavailableValuePlaceholder, columns.Column{KindDetails: typing.Struct}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), colVal) - } - { - // Arrays - colVal, err := castColVal([]any{1, 2, 3, 4, 5}, columns.Column{KindDetails: typing.Array}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), []string{"1", "2", "3", "4", "5"}, colVal) - - // Empty array - colVal, err = castColVal([]any{}, columns.Column{KindDetails: typing.Array}, nil) - assert.NoError(b.T(), err) - assert.Nil(b.T(), colVal) - - // Null array - colVal, err = castColVal(nil, columns.Column{KindDetails: typing.Array}, nil) - assert.NoError(b.T(), err) - assert.Nil(b.T(), colVal) - } -} - func TestEncodeStructToJSONString(t *testing.T) { { // Empty string: From 50ca572d5e685301efc54a5cf667c744b5d74f25 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 17 Jun 2024 11:19:09 -0700 Subject: [PATCH 11/24] [BigQueyr] Add a comment to dedupe (#731) --- clients/bigquery/dialect/dialect.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/clients/bigquery/dialect/dialect.go b/clients/bigquery/dialect/dialect.go index bc4c93d3c..33804b1c3 100644 --- a/clients/bigquery/dialect/dialect.go +++ b/clients/bigquery/dialect/dialect.go @@ -156,6 +156,9 @@ func (bd BigQueryDialect) BuildIsNotToastValueExpression(tableAlias constants.Ta func (bd BigQueryDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string { primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, bd) + + // BigQuery does not like DISTINCT for JSON columns, so we wrote this instead. + // Error: Column foo of type JSON cannot be used in SELECT DISTINCT return fmt.Sprintf(`(SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 1)`, tableID.FullyQualifiedName(), strings.Join(primaryKeysEscaped, ", "), From 61b279cd191fd4e832b91f2f637ba0e4f46d7123 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 17 Jun 2024 11:35:57 -0700 Subject: [PATCH 12/24] Refactor dedupe signature (#730) --- clients/bigquery/bigquery.go | 4 ++-- clients/bigquery/bigquery_dedupe_test.go | 9 ++++----- clients/bigquery/dialect/dialect.go | 5 ++--- clients/mssql/dialect/dialect.go | 3 +-- clients/mssql/store.go | 2 +- clients/redshift/dialect/dialect.go | 5 ++--- clients/redshift/redshift.go | 4 ++-- clients/redshift/redshift_dedupe_test.go | 9 ++++----- clients/snowflake/dialect/dialect.go | 5 ++--- clients/snowflake/snowflake.go | 4 ++-- clients/snowflake/snowflake_dedupe_test.go | 9 ++++----- lib/destination/dwh.go | 2 +- lib/sql/dialect.go | 3 +-- 13 files changed, 28 insertions(+), 36 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 80bda585e..b16472452 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -180,10 +180,10 @@ func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableI return nil } -func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error { +func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error { stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, topicConfig) + dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt) defer func() { _ = ddl.DropTemporaryTable(s, stagingTableID, false) }() diff --git a/clients/bigquery/bigquery_dedupe_test.go b/clients/bigquery/bigquery_dedupe_test.go index e43018740..b247eda67 100644 --- a/clients/bigquery/bigquery_dedupe_test.go +++ b/clients/bigquery/bigquery_dedupe_test.go @@ -11,7 +11,6 @@ import ( "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/stringutil" ) @@ -21,7 +20,7 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("project12", "public", "customers") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{}) + parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false) assert.Len(t, parts, 3) assert.Equal( t, @@ -39,7 +38,7 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("project12", "public", "customers") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true) assert.Len(t, parts, 3) assert.Equal( t, @@ -57,7 +56,7 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("project123", "public", "user_settings") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{}) + parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false) assert.Len(t, parts, 3) assert.Equal( t, @@ -75,7 +74,7 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("project123", "public", "user_settings") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true) assert.Len(t, parts, 3) assert.Equal( t, diff --git a/clients/bigquery/dialect/dialect.go b/clients/bigquery/dialect/dialect.go index 33804b1c3..3739a6a84 100644 --- a/clients/bigquery/dialect/dialect.go +++ b/clients/bigquery/dialect/dialect.go @@ -7,7 +7,6 @@ import ( "time" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -166,11 +165,11 @@ func (bd BigQueryDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, pri ) } -func (bd BigQueryDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { +func (bd BigQueryDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string { primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, bd) orderColsToIterate := primaryKeysEscaped - if topicConfig.IncludeArtieUpdatedAt { + if includeArtieUpdatedAt { orderColsToIterate = append(orderColsToIterate, bd.QuoteIdentifier(constants.UpdateColumnMarker)) } diff --git a/clients/mssql/dialect/dialect.go b/clients/mssql/dialect/dialect.go index 083880539..adfbff906 100644 --- a/clients/mssql/dialect/dialect.go +++ b/clients/mssql/dialect/dialect.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -173,7 +172,7 @@ func (MSSQLDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKe panic("not implemented") } -func (MSSQLDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { +func (MSSQLDialect) BuildDedupeQueries(_, _ sql.TableIdentifier, _ []string, _ bool) []string { panic("not implemented") // We don't currently support deduping for MS SQL. } diff --git a/clients/mssql/store.go b/clients/mssql/store.go index a34a6e09c..db4254cfe 100644 --- a/clients/mssql/store.go +++ b/clients/mssql/store.go @@ -70,7 +70,7 @@ func (s *Store) Sweep() error { return shared.Sweep(s, tcs, queryFunc) } -func (s *Store) Dedupe(_ sql.TableIdentifier, _ []string, _ kafkalib.TopicConfig) error { +func (s *Store) Dedupe(_ sql.TableIdentifier, _ []string, _ bool) error { return nil // dedupe is not necessary for MS SQL } diff --git a/clients/redshift/dialect/dialect.go b/clients/redshift/dialect/dialect.go index 757b90410..29ae4e00f 100644 --- a/clients/redshift/dialect/dialect.go +++ b/clients/redshift/dialect/dialect.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -139,11 +138,11 @@ func (rd RedshiftDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, _ [ return fmt.Sprintf(`( SELECT DISTINCT * FROM %s )`, tableID.FullyQualifiedName()) } -func (rd RedshiftDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { +func (rd RedshiftDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string { primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, rd) orderColsToIterate := primaryKeysEscaped - if topicConfig.IncludeArtieUpdatedAt { + if includeArtieUpdatedAt { orderColsToIterate = append(orderColsToIterate, rd.QuoteIdentifier(constants.UpdateColumnMarker)) } diff --git a/clients/redshift/redshift.go b/clients/redshift/redshift.go index 4def473f3..9c840e6e4 100644 --- a/clients/redshift/redshift.go +++ b/clients/redshift/redshift.go @@ -110,9 +110,9 @@ WHERE return shared.Sweep(s, tcs, queryFunc) } -func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error { +func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error { stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, topicConfig) + dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt) return destination.ExecStatements(s, dedupeQueries) } diff --git a/clients/redshift/redshift_dedupe_test.go b/clients/redshift/redshift_dedupe_test.go index 9e1f02568..b0cfde510 100644 --- a/clients/redshift/redshift_dedupe_test.go +++ b/clients/redshift/redshift_dedupe_test.go @@ -6,7 +6,6 @@ import ( "github.com/artie-labs/transfer/clients/redshift/dialect" "github.com/artie-labs/transfer/clients/shared" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/stringutil" "github.com/stretchr/testify/assert" ) @@ -17,7 +16,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() { tableID := NewTableIdentifier("public", "customers") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{}) + parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false) assert.Len(r.T(), parts, 3) assert.Equal( r.T(), @@ -32,7 +31,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() { tableID := NewTableIdentifier("public", "customers") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true) assert.Len(r.T(), parts, 3) assert.Equal( r.T(), @@ -47,7 +46,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() { tableID := NewTableIdentifier("public", "user_settings") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{}) + parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false) assert.Len(r.T(), parts, 3) assert.Equal( r.T(), @@ -62,7 +61,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() { tableID := NewTableIdentifier("public", "user_settings") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true) assert.Len(r.T(), parts, 3) assert.Equal( r.T(), diff --git a/clients/snowflake/dialect/dialect.go b/clients/snowflake/dialect/dialect.go index 57148a944..5edee8950 100644 --- a/clients/snowflake/dialect/dialect.go +++ b/clients/snowflake/dialect/dialect.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" @@ -157,11 +156,11 @@ func (SnowflakeDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, prima panic("not implemented") } -func (sd SnowflakeDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { +func (sd SnowflakeDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string { primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, sd) orderColsToIterate := primaryKeysEscaped - if topicConfig.IncludeArtieUpdatedAt { + if includeArtieUpdatedAt { orderColsToIterate = append(orderColsToIterate, sd.QuoteIdentifier(constants.UpdateColumnMarker)) } diff --git a/clients/snowflake/snowflake.go b/clients/snowflake/snowflake.go index f8c6e8353..fb88d427d 100644 --- a/clients/snowflake/snowflake.go +++ b/clients/snowflake/snowflake.go @@ -130,9 +130,9 @@ func (s *Store) reestablishConnection() error { // Dedupe takes a table and will remove duplicates based on the primary key(s). // These queries are inspired and modified from: https://stackoverflow.com/a/71515946 -func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error { +func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error { stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, topicConfig) + dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt) return destination.ExecStatements(s, dedupeQueries) } diff --git a/clients/snowflake/snowflake_dedupe_test.go b/clients/snowflake/snowflake_dedupe_test.go index 88f4d5036..2f37e71fa 100644 --- a/clients/snowflake/snowflake_dedupe_test.go +++ b/clients/snowflake/snowflake_dedupe_test.go @@ -7,7 +7,6 @@ import ( "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/clients/snowflake/dialect" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/stringutil" "github.com/stretchr/testify/assert" ) @@ -18,7 +17,7 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("db", "public", "customers") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{}) + parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false) assert.Len(t, parts, 3) assert.Equal( t, @@ -33,7 +32,7 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("db", "public", "customers") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true) assert.Len(t, parts, 3) assert.Equal( t, @@ -48,7 +47,7 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("db", "public", "user_settings") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{}) + parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false) assert.Len(t, parts, 3) assert.Equal( t, @@ -63,7 +62,7 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("db", "public", "user_settings") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true) assert.Len(t, parts, 3) assert.Equal( t, diff --git a/lib/destination/dwh.go b/lib/destination/dwh.go index 2cf5d43d3..2416761f9 100644 --- a/lib/destination/dwh.go +++ b/lib/destination/dwh.go @@ -15,7 +15,7 @@ type DataWarehouse interface { Dialect() sqllib.Dialect Merge(tableData *optimization.TableData) error Append(tableData *optimization.TableData) error - Dedupe(tableID sqllib.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error + Dedupe(tableID sqllib.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error Exec(query string, args ...any) (sql.Result, error) Query(query string, args ...any) (*sql.Rows, error) Begin() (*sql.Tx, error) diff --git a/lib/sql/dialect.go b/lib/sql/dialect.go index 4c5d257d6..bef2ce857 100644 --- a/lib/sql/dialect.go +++ b/lib/sql/dialect.go @@ -2,7 +2,6 @@ package sql import ( "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" ) @@ -25,7 +24,7 @@ type Dialect interface { BuildAlterColumnQuery(tableID TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string BuildDedupeTableQuery(tableID TableIdentifier, primaryKeys []string) string - BuildDedupeQueries(tableID, stagingTableID TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string + BuildDedupeQueries(tableID, stagingTableID TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string BuildMergeQueries( tableID TableIdentifier, subQuery string, From deb0ce0eb6a249fd5328cecd4ac5364c413c13da Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Mon, 17 Jun 2024 12:02:07 -0700 Subject: [PATCH 13/24] Add `TempTableIDWithSuffix` function (#732) --- clients/bigquery/bigquery.go | 4 +--- clients/bigquery/bigquery_dedupe_test.go | 10 ++++------ clients/bigquery/bigquery_test.go | 4 ++-- clients/mssql/store_test.go | 6 +++--- clients/redshift/redshift.go | 4 +--- clients/redshift/redshift_dedupe_test.go | 10 ++++------ clients/redshift/redshift_test.go | 4 ++-- clients/shared/merge.go | 2 +- clients/shared/temp_table.go | 8 +++++++- clients/snowflake/snowflake.go | 4 +--- clients/snowflake/snowflake_dedupe_test.go | 10 ++++------ clients/snowflake/snowflake_test.go | 4 ++-- 12 files changed, 32 insertions(+), 38 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index b16472452..29f706896 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -5,7 +5,6 @@ import ( "fmt" "log/slog" "os" - "strings" "cloud.google.com/go/bigquery" "cloud.google.com/go/bigquery/storage/managedwriter" @@ -26,7 +25,6 @@ import ( "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" - "github.com/artie-labs/transfer/lib/stringutil" ) const ( @@ -181,7 +179,7 @@ func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableI } func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error { - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt) diff --git a/clients/bigquery/bigquery_dedupe_test.go b/clients/bigquery/bigquery_dedupe_test.go index b247eda67..841244b24 100644 --- a/clients/bigquery/bigquery_dedupe_test.go +++ b/clients/bigquery/bigquery_dedupe_test.go @@ -2,7 +2,6 @@ package bigquery import ( "fmt" - "strings" "testing" "time" @@ -11,14 +10,13 @@ import ( "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/stringutil" ) func TestGenerateDedupeQueries(t *testing.T) { { // Dedupe with one primary key + no `__artie_updated_at` flag. tableID := NewTableIdentifier("project12", "public", "customers") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false) assert.Len(t, parts, 3) @@ -36,7 +34,7 @@ func TestGenerateDedupeQueries(t *testing.T) { { // Dedupe with one primary key + `__artie_updated_at` flag. tableID := NewTableIdentifier("project12", "public", "customers") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true) assert.Len(t, parts, 3) @@ -54,7 +52,7 @@ func TestGenerateDedupeQueries(t *testing.T) { { // Dedupe with composite keys + no `__artie_updated_at` flag. tableID := NewTableIdentifier("project123", "public", "user_settings") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false) assert.Len(t, parts, 3) @@ -72,7 +70,7 @@ func TestGenerateDedupeQueries(t *testing.T) { { // Dedupe with composite keys + `__artie_updated_at` flag. tableID := NewTableIdentifier("project123", "public", "user_settings") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true) assert.Len(t, parts, 3) diff --git a/clients/bigquery/bigquery_test.go b/clients/bigquery/bigquery_test.go index 035c7555c..627f0b4ef 100644 --- a/clients/bigquery/bigquery_test.go +++ b/clients/bigquery/bigquery_test.go @@ -13,7 +13,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestTempTableName(t *testing.T) { +func TestTempTableIDWithSuffix(t *testing.T) { trimTTL := func(tableName string) string { lastUnderscore := strings.LastIndex(tableName, "_") assert.GreaterOrEqual(t, lastUnderscore, 0) @@ -26,6 +26,6 @@ func TestTempTableName(t *testing.T) { store := &Store{config: config.Config{BigQuery: &config.BigQuery{ProjectID: "123454321"}}} tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{Database: "db", Schema: "schema"}, "table") tableID := store.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - tempTableName := shared.TempTableID(tableID, "sUfFiX").FullyQualifiedName() + tempTableName := shared.TempTableIDWithSuffix(tableID, "sUfFiX").FullyQualifiedName() assert.Equal(t, "`123454321`.`db`.`table___artie_sUfFiX`", trimTTL(tempTableName)) } diff --git a/clients/mssql/store_test.go b/clients/mssql/store_test.go index 51a303ff0..f8614fa16 100644 --- a/clients/mssql/store_test.go +++ b/clients/mssql/store_test.go @@ -13,7 +13,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestTempTableName(t *testing.T) { +func TestTempTableIDWithSuffix(t *testing.T) { trimTTL := func(tableName string) string { lastUnderscore := strings.LastIndex(tableName, "_") assert.GreaterOrEqual(t, lastUnderscore, 0) @@ -28,14 +28,14 @@ func TestTempTableName(t *testing.T) { // Schema is "schema": tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{Database: "db", Schema: "schema"}, "table") tableID := store.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - tempTableName := shared.TempTableID(tableID, "sUfFiX").FullyQualifiedName() + tempTableName := shared.TempTableIDWithSuffix(tableID, "sUfFiX").FullyQualifiedName() assert.Equal(t, `"schema"."table___artie_sUfFiX"`, trimTTL(tempTableName)) } { // Schema is "public" -> "dbo": tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{Database: "db", Schema: "public"}, "table") tableID := store.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - tempTableName := shared.TempTableID(tableID, "sUfFiX").FullyQualifiedName() + tempTableName := shared.TempTableIDWithSuffix(tableID, "sUfFiX").FullyQualifiedName() assert.Equal(t, `"dbo"."table___artie_sUfFiX"`, trimTTL(tempTableName)) } } diff --git a/clients/redshift/redshift.go b/clients/redshift/redshift.go index 9c840e6e4..6a0910efb 100644 --- a/clients/redshift/redshift.go +++ b/clients/redshift/redshift.go @@ -2,7 +2,6 @@ package redshift import ( "fmt" - "strings" _ "github.com/lib/pq" @@ -17,7 +16,6 @@ import ( "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" - "github.com/artie-labs/transfer/lib/stringutil" ) type Store struct { @@ -111,7 +109,7 @@ WHERE } func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error { - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt) return destination.ExecStatements(s, dedupeQueries) } diff --git a/clients/redshift/redshift_dedupe_test.go b/clients/redshift/redshift_dedupe_test.go index b0cfde510..0b86d6330 100644 --- a/clients/redshift/redshift_dedupe_test.go +++ b/clients/redshift/redshift_dedupe_test.go @@ -2,11 +2,9 @@ package redshift import ( "fmt" - "strings" "github.com/artie-labs/transfer/clients/redshift/dialect" "github.com/artie-labs/transfer/clients/shared" - "github.com/artie-labs/transfer/lib/stringutil" "github.com/stretchr/testify/assert" ) @@ -14,7 +12,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() { { // Dedupe with one primary key + no `__artie_updated_at` flag. tableID := NewTableIdentifier("public", "customers") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false) assert.Len(r.T(), parts, 3) @@ -29,7 +27,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() { { // Dedupe with one primary key + `__artie_updated_at` flag. tableID := NewTableIdentifier("public", "customers") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true) assert.Len(r.T(), parts, 3) @@ -44,7 +42,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() { { // Dedupe with composite keys + no `__artie_updated_at` flag. tableID := NewTableIdentifier("public", "user_settings") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false) assert.Len(r.T(), parts, 3) @@ -59,7 +57,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() { { // Dedupe with composite keys + `__artie_updated_at` flag. tableID := NewTableIdentifier("public", "user_settings") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true) assert.Len(r.T(), parts, 3) diff --git a/clients/redshift/redshift_test.go b/clients/redshift/redshift_test.go index 5e36fac84..76ea35d4a 100644 --- a/clients/redshift/redshift_test.go +++ b/clients/redshift/redshift_test.go @@ -13,7 +13,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestTempTableName(t *testing.T) { +func TestTempTableIDWithSuffix(t *testing.T) { trimTTL := func(tableName string) string { lastUnderscore := strings.LastIndex(tableName, "_") assert.GreaterOrEqual(t, lastUnderscore, 0) @@ -25,6 +25,6 @@ func TestTempTableName(t *testing.T) { tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{Database: "db", Schema: "schema"}, "table") tableID := (&Store{}).IdentifierFor(tableData.TopicConfig(), tableData.Name()) - tempTableName := shared.TempTableID(tableID, "sUfFiX").FullyQualifiedName() + tempTableName := shared.TempTableIDWithSuffix(tableID, "sUfFiX").FullyQualifiedName() assert.Equal(t, `schema."table___artie_suffix"`, trimTTL(tempTableName)) } diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 05909463d..6c7d8a5da 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -69,7 +69,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, opt return fmt.Errorf("failed to merge columns from destination: %w", err) } - temporaryTableID := TempTableID(dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()), tableData.TempTableSuffix()) + temporaryTableID := TempTableIDWithSuffix(dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()), tableData.TempTableSuffix()) if err = dwh.PrepareTemporaryTable(tableData, tableConfig, temporaryTableID, types.AdditionalSettings{}, true); err != nil { return fmt.Errorf("failed to prepare temporary table: %w", err) } diff --git a/clients/shared/temp_table.go b/clients/shared/temp_table.go index e86566c12..5a9784b84 100644 --- a/clients/shared/temp_table.go +++ b/clients/shared/temp_table.go @@ -2,13 +2,19 @@ package shared import ( "fmt" + "strings" "time" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/sql" + "github.com/artie-labs/transfer/lib/stringutil" ) -func TempTableID(tableID sql.TableIdentifier, suffix string) sql.TableIdentifier { +func TempTableID(tableID sql.TableIdentifier) sql.TableIdentifier { + return TempTableIDWithSuffix(tableID, strings.ToLower(stringutil.Random(5))) +} + +func TempTableIDWithSuffix(tableID sql.TableIdentifier, suffix string) sql.TableIdentifier { tempTable := fmt.Sprintf( "%s_%s_%s_%d", tableID.Table(), diff --git a/clients/snowflake/snowflake.go b/clients/snowflake/snowflake.go index fb88d427d..1e8ef2638 100644 --- a/clients/snowflake/snowflake.go +++ b/clients/snowflake/snowflake.go @@ -2,7 +2,6 @@ package snowflake import ( "fmt" - "strings" "github.com/snowflakedb/gosnowflake" @@ -17,7 +16,6 @@ import ( "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" - "github.com/artie-labs/transfer/lib/stringutil" ) const maxRetries = 10 @@ -131,7 +129,7 @@ func (s *Store) reestablishConnection() error { // Dedupe takes a table and will remove duplicates based on the primary key(s). // These queries are inspired and modified from: https://stackoverflow.com/a/71515946 func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error { - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt) return destination.ExecStatements(s, dedupeQueries) } diff --git a/clients/snowflake/snowflake_dedupe_test.go b/clients/snowflake/snowflake_dedupe_test.go index 2f37e71fa..285567266 100644 --- a/clients/snowflake/snowflake_dedupe_test.go +++ b/clients/snowflake/snowflake_dedupe_test.go @@ -2,12 +2,10 @@ package snowflake import ( "fmt" - "strings" "testing" "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/clients/snowflake/dialect" - "github.com/artie-labs/transfer/lib/stringutil" "github.com/stretchr/testify/assert" ) @@ -15,7 +13,7 @@ func TestGenerateDedupeQueries(t *testing.T) { { // Dedupe with one primary key + no `__artie_updated_at` flag. tableID := NewTableIdentifier("db", "public", "customers") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false) assert.Len(t, parts, 3) @@ -30,7 +28,7 @@ func TestGenerateDedupeQueries(t *testing.T) { { // Dedupe with one primary key + `__artie_updated_at` flag. tableID := NewTableIdentifier("db", "public", "customers") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true) assert.Len(t, parts, 3) @@ -45,7 +43,7 @@ func TestGenerateDedupeQueries(t *testing.T) { { // Dedupe with composite keys + no `__artie_updated_at` flag. tableID := NewTableIdentifier("db", "public", "user_settings") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false) assert.Len(t, parts, 3) @@ -60,7 +58,7 @@ func TestGenerateDedupeQueries(t *testing.T) { { // Dedupe with composite keys + `__artie_updated_at` flag. tableID := NewTableIdentifier("db", "public", "user_settings") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true) assert.Len(t, parts, 3) diff --git a/clients/snowflake/snowflake_test.go b/clients/snowflake/snowflake_test.go index 319233ffe..b3e17aa4d 100644 --- a/clients/snowflake/snowflake_test.go +++ b/clients/snowflake/snowflake_test.go @@ -316,7 +316,7 @@ func (s *SnowflakeTestSuite) TestStore_AdditionalEqualityStrings() { } } -func TestTempTableName(t *testing.T) { +func TestTempTableIDWithSuffix(t *testing.T) { trimTTL := func(tableName string) string { lastUnderscore := strings.LastIndex(tableName, "_") assert.GreaterOrEqual(t, lastUnderscore, 0) @@ -328,6 +328,6 @@ func TestTempTableName(t *testing.T) { tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{Database: "db", Schema: "schema"}, "table") tableID := (&Store{}).IdentifierFor(tableData.TopicConfig(), tableData.Name()) - tempTableName := shared.TempTableID(tableID, "sUfFiX").FullyQualifiedName() + tempTableName := shared.TempTableIDWithSuffix(tableID, "sUfFiX").FullyQualifiedName() assert.Equal(t, `db.schema."TABLE___ARTIE_SUFFIX"`, trimTTL(tempTableName)) } From 7f46499ee9e42432ed92819290a819f50544cbeb Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 17 Jun 2024 12:07:57 -0700 Subject: [PATCH 14/24] [ddl] Drop Temp Table fix. (#733) --- lib/destination/ddl/ddl.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index 96ffbc64b..5bcb48c5e 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -19,7 +19,7 @@ import ( // It has a safety check to make sure the tableName contains the `constants.ArtiePrefix` key. // Temporary tables look like this: database.schema.tableName__artie__RANDOM_STRING(5)_expiryUnixTs func DropTemporaryTable(dwh destination.DataWarehouse, tableIdentifier sql.TableIdentifier, shouldReturnError bool) error { - if strings.Contains(tableIdentifier.Table(), constants.ArtiePrefix) { + if strings.Contains(strings.ToLower(tableIdentifier.Table()), constants.ArtiePrefix) { sqlCommand := fmt.Sprintf("DROP TABLE IF EXISTS %s", tableIdentifier.FullyQualifiedName()) if _, err := dwh.Exec(sqlCommand); err != nil { slog.Warn("Failed to drop temporary table, it will get garbage collected by the TTL...", From 9f1c5e2d198db87b93cf3777199f2a1875ec319c Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 17 Jun 2024 13:30:00 -0700 Subject: [PATCH 15/24] [Decimal] Fix Encoding Problem (#734) --- lib/debezium/decimal.go | 13 ++++++++++--- lib/debezium/decimal_test.go | 9 +++++++-- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/lib/debezium/decimal.go b/lib/debezium/decimal.go index 793c4b814..32ef35969 100644 --- a/lib/debezium/decimal.go +++ b/lib/debezium/decimal.go @@ -9,15 +9,22 @@ import ( // EncodeDecimal is used to encode a string representation of a number to `org.apache.kafka.connect.data.Decimal`. func EncodeDecimal(value string, scale int) ([]byte, error) { - scaledValue := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil) + // TODO: Refactor scale to be uint16 + bigFloatValue := new(big.Float) if _, success := bigFloatValue.SetString(value); !success { - return nil, fmt.Errorf("unable to use '%s' as a floating-point number", value) + return nil, fmt.Errorf("unable to use %q as a floating-point number", value) } + + scaledValue := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil) bigFloatValue.Mul(bigFloatValue, new(big.Float).SetInt(scaledValue)) // Extract the scaled integer value. - bigIntValue, _ := bigFloatValue.Int(nil) + bigIntValue := new(big.Int) + if _, success := bigIntValue.SetString(bigFloatValue.String(), 10); !success { + return nil, fmt.Errorf("unable to use %q as a floating-point number", value) + } + data := bigIntValue.Bytes() if bigIntValue.Sign() < 0 { // Convert to two's complement if the number is negative diff --git a/lib/debezium/decimal_test.go b/lib/debezium/decimal_test.go index 395415281..4f8b6e7e1 100644 --- a/lib/debezium/decimal_test.go +++ b/lib/debezium/decimal_test.go @@ -66,15 +66,20 @@ func TestEncodeDecimal(t *testing.T) { value: "6408.355", scale: 3, }, + { + name: "total", + value: "1.05", + scale: 2, + }, { name: "malformed - empty string", value: "", - expectedErr: "unable to use '' as a floating-point number", + expectedErr: `unable to use "" as a floating-point number`, }, { name: "malformed - not a floating-point", value: "abcdefg", - expectedErr: "unable to use 'abcdefg' as a floating-point number", + expectedErr: `unable to use "abcdefg" as a floating-point number`, }, } From b75b4acd9c600abcd97b1c925b218f58a8b645eb Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Mon, 17 Jun 2024 16:36:11 -0700 Subject: [PATCH 16/24] Add `batch.BySize` function (#735) --- lib/batch/batch.go | 48 ++++++++++++++++++++ lib/batch/batch_test.go | 98 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+) create mode 100644 lib/batch/batch.go create mode 100644 lib/batch/batch_test.go diff --git a/lib/batch/batch.go b/lib/batch/batch.go new file mode 100644 index 000000000..3c8b6de82 --- /dev/null +++ b/lib/batch/batch.go @@ -0,0 +1,48 @@ +package batch + +import "fmt" + +// BySize takes a series of elements [in], encodes them using [encode], groups them into batches of bytes that sum to at +// most [maxSizeBytes], and then passes each batch to the [yield] function. +func BySize[T any](in []T, maxSizeBytes int, encode func(T) ([]byte, error), yield func([][]byte) error) error { + var buffer [][]byte + var currentSizeBytes int + + for i, item := range in { + bytes, err := encode(item) + if err != nil { + return fmt.Errorf("failed to encode item %d: %w", i, err) + } + + if len(bytes) > maxSizeBytes { + return fmt.Errorf("item %d is larger (%d bytes) than maxSizeBytes (%d bytes)", i, len(bytes), maxSizeBytes) + } + + currentSizeBytes += len(bytes) + + if currentSizeBytes < maxSizeBytes { + buffer = append(buffer, bytes) + } else if currentSizeBytes == maxSizeBytes { + buffer = append(buffer, bytes) + if err := yield(buffer); err != nil { + return err + } + buffer = [][]byte{} + currentSizeBytes = 0 + } else { + if err := yield(buffer); err != nil { + return err + } + buffer = [][]byte{bytes} + currentSizeBytes = len(bytes) + } + } + + if len(buffer) > 0 { + if err := yield(buffer); err != nil { + return err + } + } + + return nil +} diff --git a/lib/batch/batch_test.go b/lib/batch/batch_test.go new file mode 100644 index 000000000..078062521 --- /dev/null +++ b/lib/batch/batch_test.go @@ -0,0 +1,98 @@ +package batch + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBySize(t *testing.T) { + goodEncoder := func(value string) ([]byte, error) { + return []byte(value), nil + } + + panicEncoder := func(value string) ([]byte, error) { + panic("should not be called") + } + + badEncoder := func(value string) ([]byte, error) { + return nil, fmt.Errorf("failed to encode %q", value) + } + + testBySize := func(in []string, maxSizeBytes int, encoder func(value string) ([]byte, error)) ([][][]byte, error) { + batches := [][][]byte{} + err := BySize(in, maxSizeBytes, encoder, func(batch [][]byte) error { batches = append(batches, batch); return nil }) + return batches, err + } + + badYield := func(batch [][]byte) error { + out := make([]string, len(batch)) + for i, bytes := range batch { + out[i] = string(bytes) + } + return fmt.Errorf("yield failed for %v", out) + } + + { + // Empty slice: + batches, err := testBySize([]string{}, 10, panicEncoder) + assert.NoError(t, err) + assert.Empty(t, batches) + } + { + // Non-empty slice + bad encoder: + _, err := testBySize([]string{"foo", "bar"}, 10, badEncoder) + assert.ErrorContains(t, err, `failed to encode item 0: failed to encode "foo"`) + } + { + // Non-empty slice + two items that are < maxSizeBytes + yield returns error. + err := BySize([]string{"foo", "bar"}, 10, goodEncoder, badYield) + assert.ErrorContains(t, err, "yield failed for [foo bar]") + } + { + // Non-empty slice + two items that are = maxSizeBytes + yield returns error. + err := BySize([]string{"foo", "bar"}, 6, goodEncoder, badYield) + assert.ErrorContains(t, err, "yield failed for [foo bar]") + } + { + // Non-empty slice + two items that are > maxSizeBytes + yield returns error. + err := BySize([]string{"foo", "bar-baz"}, 8, goodEncoder, badYield) + assert.ErrorContains(t, err, "yield failed for [foo]") + } + { + // Non-empty slice + item is larger than maxSizeBytes: + _, err := testBySize([]string{"foo", "i-am-23-characters-long", "bar"}, 20, goodEncoder) + assert.ErrorContains(t, err, "item 1 is larger (23 bytes) than maxSizeBytes (20 bytes)") + } + { + // Non-empty slice + item equal to maxSizeBytes: + batches, err := testBySize([]string{"foo", "i-am-23-characters-long", "bar"}, 23, goodEncoder) + assert.NoError(t, err) + assert.Len(t, batches, 3) + assert.Equal(t, [][]byte{[]byte("foo")}, batches[0]) + assert.Equal(t, [][]byte{[]byte("i-am-23-characters-long")}, batches[1]) + assert.Equal(t, [][]byte{[]byte("bar")}, batches[2]) + } + { + // Non-empty slice + one item: + batches, err := testBySize([]string{"foo"}, 100, goodEncoder) + assert.NoError(t, err) + assert.Len(t, batches, 1) + assert.Equal(t, [][]byte{[]byte("foo")}, batches[0]) + } + { + // Non-empty slice + all items exactly fit into one batch: + batches, err := testBySize([]string{"foo", "bar", "baz", "qux"}, 12, goodEncoder) + assert.NoError(t, err) + assert.Len(t, batches, 1) + assert.Equal(t, [][]byte{[]byte("foo"), []byte("bar"), []byte("baz"), []byte("qux")}, batches[0]) + } + { + // Non-empty slice + all items exactly fit into just under one batch: + batches, err := testBySize([]string{"foo", "bar", "baz", "qux"}, 13, goodEncoder) + assert.NoError(t, err) + assert.Len(t, batches, 1) + assert.Equal(t, [][]byte{[]byte("foo"), []byte("bar"), []byte("baz"), []byte("qux")}, batches[0]) + } +} From 54429138f79e3b4b883d05e2a2b3f3de4520f700 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Mon, 17 Jun 2024 18:12:23 -0700 Subject: [PATCH 17/24] [bigquery] Use `batch.BySize` (#736) --- clients/bigquery/batch.go | 34 ------------------------------- clients/bigquery/batch_test.go | 31 ---------------------------- clients/bigquery/bigquery.go | 37 +++++++++++++++++----------------- 3 files changed, 19 insertions(+), 83 deletions(-) delete mode 100644 clients/bigquery/batch.go delete mode 100644 clients/bigquery/batch_test.go diff --git a/clients/bigquery/batch.go b/clients/bigquery/batch.go deleted file mode 100644 index 14967167e..000000000 --- a/clients/bigquery/batch.go +++ /dev/null @@ -1,34 +0,0 @@ -package bigquery - -type Batch[T any] struct { - rows []T - chunkSize int - iteratorIdx int -} - -func NewBatch[T any](rows []T, chunkSize int) *Batch[T] { - return &Batch[T]{ - rows: rows, - chunkSize: chunkSize, - } -} - -func (b *Batch[T]) HasNext() bool { - return len(b.rows) > b.iteratorIdx -} - -func (b *Batch[T]) NextChunk() []T { - start := b.iteratorIdx - b.iteratorIdx += b.chunkSize - end := b.iteratorIdx - - if end > len(b.rows) { - end = len(b.rows) - } - - if start > end { - return nil - } - - return b.rows[start:end] -} diff --git a/clients/bigquery/batch_test.go b/clients/bigquery/batch_test.go deleted file mode 100644 index b56b08473..000000000 --- a/clients/bigquery/batch_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package bigquery - -import ( - "cloud.google.com/go/bigquery" - - "github.com/stretchr/testify/assert" -) - -func (b *BigQueryTestSuite) TestBatch_NextChunk() { - messages := []*Row{ - NewRow(map[string]bigquery.Value{"col1": "message1"}), - NewRow(map[string]bigquery.Value{"col1": "message2"}), - NewRow(map[string]bigquery.Value{"col1": "message3"}), - } - - batch := NewBatch(messages, 2) - // First call to NextChunk - chunk := batch.NextChunk() - assert.Equal(b.T(), 2, len(chunk), "Expected chunk size to be 2") - assert.Equal(b.T(), map[string]bigquery.Value{"col1": "message1"}, chunk[0].data, "Expected first message in chunk to be message1") - assert.Equal(b.T(), map[string]bigquery.Value{"col1": "message2"}, chunk[1].data, "Expected second message in chunk to be message2") - - // Second call to NextChunk - chunk = batch.NextChunk() - assert.Equal(b.T(), 1, len(chunk), "Expected chunk size to be 1 for the remaining messages") - assert.Equal(b.T(), map[string]bigquery.Value{"col1": "message3"}, chunk[0].data, "Expected the last message in chunk to be message3") - - // Third call to NextChunk should return an empty chunk as there are no more messages - chunk = batch.NextChunk() - assert.Empty(b.T(), chunk, "Expected an empty chunk when there are no more messages") -} diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 29f706896..45a20ce2f 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -14,6 +14,7 @@ import ( "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/clients/shared" + "github.com/artie-labs/transfer/lib/batch" "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/db" @@ -32,6 +33,8 @@ const ( describeNameCol = "column_name" describeTypeCol = "data_type" describeCommentCol = "description" + // Storage Write API is limited to 10 MB, let's start out conservative and use 80% of that. + maxRequestByteSize = 10_000_000 * .8 ) type Store struct { @@ -148,24 +151,22 @@ func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableI } defer managedStream.Close() - batch := NewBatch(tableData.Rows(), s.batchSize) - for batch.HasNext() { - chunk := batch.NextChunk() - encoded := make([][]byte, len(chunk)) - for i, row := range chunk { - message, err := rowToMessage(row, columns, *messageDescriptor, s.AdditionalDateFormats()) - if err != nil { - return fmt.Errorf("failed to convert row to message: %w", err) - } - - bytes, err := proto.Marshal(message) - if err != nil { - return fmt.Errorf("failed to marshal message: %w", err) - } - encoded[i] = bytes + encoder := func(row map[string]any) ([]byte, error) { + message, err := rowToMessage(row, columns, *messageDescriptor, s.AdditionalDateFormats()) + if err != nil { + return nil, fmt.Errorf("failed to convert row to message: %w", err) + } + + bytes, err := proto.Marshal(message) + if err != nil { + return nil, fmt.Errorf("failed to marshal message: %w", err) } - result, err := managedStream.AppendRows(ctx, encoded) + return bytes, nil + } + + return batch.BySize(tableData.Rows(), maxRequestByteSize, encoder, func(chunk [][]byte) error { + result, err := managedStream.AppendRows(ctx, chunk) if err != nil { return fmt.Errorf("failed to append rows: %w", err) } @@ -173,9 +174,9 @@ func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableI if resp, err := result.FullResponse(ctx); err != nil { return fmt.Errorf("failed to get response (%s): %w", resp.GetError().String(), err) } - } - return nil + return nil + }) } func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error { From 0d649bdcc076c365fec56a6fe790c100f5f24375 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 17 Jun 2024 18:25:52 -0700 Subject: [PATCH 18/24] [BigQuery] Refactoring Append to allow `useTempTable` (#738) --- clients/bigquery/bigquery.go | 31 +++++++++++++++++++++++++++++-- clients/mssql/store.go | 2 +- clients/redshift/redshift.go | 2 +- clients/s3/s3.go | 2 +- clients/shared/append.go | 7 ++++++- clients/snowflake/writes.go | 2 +- lib/destination/dwh.go | 4 ++-- lib/destination/types/types.go | 4 ++++ processes/consumer/flush.go | 2 +- 9 files changed, 46 insertions(+), 10 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 45a20ce2f..132f01769 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -45,8 +45,35 @@ type Store struct { db.Store } -func (s *Store) Append(tableData *optimization.TableData) error { - return shared.Append(s, tableData, types.AdditionalSettings{}) +func (s *Store) Append(tableData *optimization.TableData, useTempTable bool) error { + if !useTempTable { + return shared.Append(s, tableData, types.AdditionalSettings{}) + } + + // We can simplify this once Google has fully rolled out the ability to execute DML on recently streamed data + // See: https://cloud.google.com/bigquery/docs/write-api#use_data_manipulation_language_dml_with_recently_streamed_data + // For now, we'll need to append this to a temporary table and then append temporary table onto the target table + tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) + temporaryTableID := shared.TempTableID(tableID) + + defer func() { _ = ddl.DropTemporaryTable(s, temporaryTableID, false) }() + + err := shared.Append(s, tableData, types.AdditionalSettings{ + UseTempTable: true, + TempTableID: temporaryTableID, + }) + + if err != nil { + return fmt.Errorf("failed to append: %w", err) + } + + if _, err = s.Exec( + fmt.Sprintf(`INSERT INTO %s SELECT * FROM %s`, tableID.FullyQualifiedName(), temporaryTableID.FullyQualifiedName()), + ); err != nil { + return fmt.Errorf("failed to insert data into target table: %w", err) + } + + return nil } func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { diff --git a/clients/mssql/store.go b/clients/mssql/store.go index db4254cfe..b8346d566 100644 --- a/clients/mssql/store.go +++ b/clients/mssql/store.go @@ -43,7 +43,7 @@ func (s *Store) Merge(tableData *optimization.TableData) error { return shared.Merge(s, tableData, types.MergeOpts{}) } -func (s *Store) Append(tableData *optimization.TableData) error { +func (s *Store) Append(tableData *optimization.TableData, _ bool) error { return shared.Append(s, tableData, types.AdditionalSettings{}) } diff --git a/clients/redshift/redshift.go b/clients/redshift/redshift.go index 6a0910efb..89919c919 100644 --- a/clients/redshift/redshift.go +++ b/clients/redshift/redshift.go @@ -28,7 +28,7 @@ type Store struct { db.Store } -func (s *Store) Append(tableData *optimization.TableData) error { +func (s *Store) Append(tableData *optimization.TableData, _ bool) error { return shared.Append(s, tableData, types.AdditionalSettings{}) } diff --git a/clients/s3/s3.go b/clients/s3/s3.go index 451ab9f96..eb2667974 100644 --- a/clients/s3/s3.go +++ b/clients/s3/s3.go @@ -64,7 +64,7 @@ func (s *Store) ObjectPrefix(tableData *optimization.TableData) string { return strings.Join([]string{fqTableName, yyyyMMDDFormat}, "/") } -func (s *Store) Append(tableData *optimization.TableData) error { +func (s *Store) Append(tableData *optimization.TableData, _ bool) error { // There's no difference in appending or merging for S3. return s.Merge(tableData) } diff --git a/clients/shared/append.go b/clients/shared/append.go index 1db07cf09..fe668962d 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -51,11 +51,16 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op return fmt.Errorf("failed to merge columns from destination: %w", err) } + if opts.UseTempTable { + // Override tableID with tempTableID if we're using a temporary table + tableID = opts.TempTableID + } + return dwh.PrepareTemporaryTable( tableData, tableConfig, tableID, opts, - false, + opts.UseTempTable, ) } diff --git a/clients/snowflake/writes.go b/clients/snowflake/writes.go index 353cb94b3..7d9061535 100644 --- a/clients/snowflake/writes.go +++ b/clients/snowflake/writes.go @@ -13,7 +13,7 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -func (s *Store) Append(tableData *optimization.TableData) error { +func (s *Store) Append(tableData *optimization.TableData, _ bool) error { var err error for i := 0; i < maxRetries; i++ { if i > 0 { diff --git a/lib/destination/dwh.go b/lib/destination/dwh.go index 2416761f9..b64091922 100644 --- a/lib/destination/dwh.go +++ b/lib/destination/dwh.go @@ -14,7 +14,7 @@ import ( type DataWarehouse interface { Dialect() sqllib.Dialect Merge(tableData *optimization.TableData) error - Append(tableData *optimization.TableData) error + Append(tableData *optimization.TableData, useTempTable bool) error Dedupe(tableID sqllib.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error Exec(query string, args ...any) (sql.Result, error) Query(query string, args ...any) (*sql.Rows, error) @@ -30,7 +30,7 @@ type DataWarehouse interface { type Baseline interface { Merge(tableData *optimization.TableData) error - Append(tableData *optimization.TableData) error + Append(tableData *optimization.TableData, useTempTable bool) error IsRetryableError(err error) bool IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sqllib.TableIdentifier } diff --git a/lib/destination/types/types.go b/lib/destination/types/types.go index 90f9ee7a1..12c268b46 100644 --- a/lib/destination/types/types.go +++ b/lib/destination/types/types.go @@ -42,4 +42,8 @@ type MergeOpts struct { type AdditionalSettings struct { AdditionalCopyClause string + + // These settings are used for the `Append` method. + UseTempTable bool + TempTableID sql.TableIdentifier } diff --git a/processes/consumer/flush.go b/processes/consumer/flush.go index 8597d3fe3..b8e0afacb 100644 --- a/processes/consumer/flush.go +++ b/processes/consumer/flush.go @@ -89,7 +89,7 @@ func Flush(ctx context.Context, inMemDB *models.DatabaseData, dest destination.B action := "merge" // Merge or Append depending on the mode. if _tableData.Mode() == config.History { - err = dest.Append(_tableData.TableData) + err = dest.Append(_tableData.TableData, false) action = "append" } else { err = dest.Merge(_tableData.TableData) From 9eef6e58b6b24897da5e830700d61e9b64533ab1 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 17 Jun 2024 19:15:02 -0700 Subject: [PATCH 19/24] [BigQuery] Specify column names for Append (#739) --- clients/bigquery/bigquery.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 132f01769..658c343e7 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "os" + "strings" "cloud.google.com/go/bigquery" "cloud.google.com/go/bigquery/storage/managedwriter" @@ -67,9 +68,14 @@ func (s *Store) Append(tableData *optimization.TableData, useTempTable bool) err return fmt.Errorf("failed to append: %w", err) } - if _, err = s.Exec( - fmt.Sprintf(`INSERT INTO %s SELECT * FROM %s`, tableID.FullyQualifiedName(), temporaryTableID.FullyQualifiedName()), - ); err != nil { + query := fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s`, + tableID.FullyQualifiedName(), + strings.Join(sql.QuoteIdentifiers(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), s.Dialect()), ","), + strings.Join(sql.QuoteIdentifiers(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), s.Dialect()), ","), + temporaryTableID.FullyQualifiedName(), + ) + + if _, err = s.Exec(query); err != nil { return fmt.Errorf("failed to insert data into target table: %w", err) } From 6b979d7624c45c8954140cc3eb38a2ab5dd333cd Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 17 Jun 2024 20:24:27 -0700 Subject: [PATCH 20/24] Soft Delete bug (#740) --- clients/bigquery/dialect/dialect.go | 6 +++--- clients/bigquery/dialect/dialect_test.go | 4 ++-- clients/mssql/dialect/dialect.go | 6 +++--- clients/mssql/dialect/dialect_test.go | 2 +- clients/shared/merge.go | 1 + clients/snowflake/dialect/dialect.go | 4 ++-- clients/snowflake/dialect/dialect_test.go | 4 ++-- 7 files changed, 14 insertions(+), 13 deletions(-) diff --git a/clients/bigquery/dialect/dialect.go b/clients/bigquery/dialect/dialect.go index 3739a6a84..e8d050e57 100644 --- a/clients/bigquery/dialect/dialect.go +++ b/clients/bigquery/dialect/dialect.go @@ -254,11 +254,11 @@ MERGE INTO %s %s USING %s AS %s ON %s`, if softDelete { return []string{baseQuery + fmt.Sprintf(` WHEN MATCHED %sTHEN UPDATE SET %s -WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`, +WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);`, // WHEN MATCHED %sTHEN UPDATE SET %s idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, bd), - // WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) - sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd), strings.Join(sql.QuoteColumns(cols, bd), ","), + // WHEN NOT MATCHED THEN INSERT (%s) + strings.Join(sql.QuoteColumns(cols, bd), ","), // VALUES (%s); strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, bd), ","), )}, nil diff --git a/clients/bigquery/dialect/dialect_test.go b/clients/bigquery/dialect/dialect_test.go index 6abf88c42..6e669d735 100644 --- a/clients/bigquery/dialect/dialect_test.go +++ b/clients/bigquery/dialect/dialect_test.go @@ -266,7 +266,7 @@ func TestBigQueryDialect_BuildMergeQueries_SoftDelete(t *testing.T) { assert.Equal(t, []string{ "MERGE INTO customers.orders tgt USING {SUB_QUERY} AS stg ON tgt.`order_id` = stg.`order_id`", "WHEN MATCHED THEN UPDATE SET `order_id`=stg.`order_id`,`name`=stg.`name`,`__artie_delete`=stg.`__artie_delete`", - "WHEN NOT MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN INSERT (`order_id`,`name`,`__artie_delete`) VALUES (stg.`order_id`,stg.`name`,stg.`__artie_delete`);"}, + "WHEN NOT MATCHED THEN INSERT (`order_id`,`name`,`__artie_delete`) VALUES (stg.`order_id`,stg.`name`,stg.`__artie_delete`);"}, strings.Split(strings.TrimSpace(statements[0]), "\n")) } @@ -295,7 +295,7 @@ func TestBigQueryDialect_BuildMergeQueries_IdempotentKey(t *testing.T) { assert.Equal(t, []string{ "MERGE INTO customers.orders tgt USING {SUB_QUERY} AS stg ON tgt.`order_id` = stg.`order_id`", "WHEN MATCHED AND stg.idempotent_key >= tgt.idempotent_key THEN UPDATE SET `order_id`=stg.`order_id`,`name`=stg.`name`,`__artie_delete`=stg.`__artie_delete`", - "WHEN NOT MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN INSERT (`order_id`,`name`,`__artie_delete`) VALUES (stg.`order_id`,stg.`name`,stg.`__artie_delete`);"}, + "WHEN NOT MATCHED THEN INSERT (`order_id`,`name`,`__artie_delete`) VALUES (stg.`order_id`,stg.`name`,stg.`__artie_delete`);"}, strings.Split(strings.TrimSpace(statements[0]), "\n")) } diff --git a/clients/mssql/dialect/dialect.go b/clients/mssql/dialect/dialect.go index adfbff906..a34787bee 100644 --- a/clients/mssql/dialect/dialect.go +++ b/clients/mssql/dialect/dialect.go @@ -202,11 +202,11 @@ USING %s AS %s ON %s`, if softDelete { return []string{baseQuery + fmt.Sprintf(` WHEN MATCHED %sTHEN UPDATE SET %s -WHEN NOT MATCHED AND COALESCE(%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, +WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);`, // WHEN MATCHED %sTHEN UPDATE SET %s idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, md), - // WHEN NOT MATCHED AND COALESCE(%s, 0) = 0 THEN INSERT (%s) - sql.QuotedDeleteColumnMarker(constants.StagingAlias, md), strings.Join(sql.QuoteColumns(cols, md), ","), + // WHEN NOT MATCHED THEN INSERT (%s) + strings.Join(sql.QuoteColumns(cols, md), ","), // VALUES (%s); strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, md), ","), )}, nil diff --git a/clients/mssql/dialect/dialect_test.go b/clients/mssql/dialect/dialect_test.go index bdc6e58a2..7bc7d65d2 100644 --- a/clients/mssql/dialect/dialect_test.go +++ b/clients/mssql/dialect/dialect_test.go @@ -247,6 +247,6 @@ WHEN NOT MATCHED AND COALESCE(stg."__artie_delete", 1) = 0 THEN INSERT ("id","ba MERGE INTO database.schema.table tgt USING {SUB_QUERY} AS stg ON tgt."id" = stg."id" WHEN MATCHED THEN UPDATE SET "id"=stg."id","bar"=stg."bar","updated_at"=stg."updated_at","start"=stg."start","__artie_delete"=stg."__artie_delete" -WHEN NOT MATCHED AND COALESCE(stg."__artie_delete", 0) = 0 THEN INSERT ("id","bar","updated_at","start","__artie_delete") VALUES (stg."id",stg."bar",stg."updated_at",stg."start",stg."__artie_delete");`, queries[0]) +WHEN NOT MATCHED THEN INSERT ("id","bar","updated_at","start","__artie_delete") VALUES (stg."id",stg."bar",stg."updated_at",stg."start",stg."__artie_delete");`, queries[0]) } } diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 6c7d8a5da..7260366c9 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -130,6 +130,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, opt } primaryKeys = append(primaryKeys, column) } + if len(primaryKeys) == 0 { return fmt.Errorf("primary keys cannot be empty") } diff --git a/clients/snowflake/dialect/dialect.go b/clients/snowflake/dialect/dialect.go index 5edee8950..4624d3354 100644 --- a/clients/snowflake/dialect/dialect.go +++ b/clients/snowflake/dialect/dialect.go @@ -228,11 +228,11 @@ MERGE INTO %s %s USING ( %s ) AS %s ON %s`, if softDelete { return []string{baseQuery + fmt.Sprintf(` WHEN MATCHED %sTHEN UPDATE SET %s -WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`, +WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);`, // Update + Soft Deletion idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, sd), // Insert - sql.QuotedDeleteColumnMarker(constants.StagingAlias, sd), strings.Join(sql.QuoteColumns(cols, sd), ","), + strings.Join(sql.QuoteColumns(cols, sd), ","), strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, sd), ","), )}, nil } diff --git a/clients/snowflake/dialect/dialect_test.go b/clients/snowflake/dialect/dialect_test.go index 4710db0b0..864b58e9c 100644 --- a/clients/snowflake/dialect/dialect_test.go +++ b/clients/snowflake/dialect/dialect_test.go @@ -301,7 +301,7 @@ func TestSnowflakeDialect_BuildMergeQueries_SoftDelete(t *testing.T) { assert.Equal(t, ` MERGE INTO database.schema.table tgt USING ( SELECT id,bar,updated_at,__artie_delete from (values ('1', '456', '2001-02-03T04:05:06Z', false),('2', 'bb', '2001-02-03T04:05:06Z', true),('3', 'dd', '2001-02-03T04:05:06Z', false)) as _tbl(id,bar,updated_at,__artie_delete) ) AS stg ON tgt."ID" = stg."ID" WHEN MATCHED THEN UPDATE SET "ID"=stg."ID","__ARTIE_DELETE"=stg."__ARTIE_DELETE" -WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("ID","__ARTIE_DELETE") VALUES (stg."ID",stg."__ARTIE_DELETE");`, statements[0]) +WHEN NOT MATCHED THEN INSERT ("ID","__ARTIE_DELETE") VALUES (stg."ID",stg."__ARTIE_DELETE");`, statements[0]) } { statements, err := SnowflakeDialect{}.BuildMergeQueries( @@ -319,7 +319,7 @@ WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("I assert.Equal(t, ` MERGE INTO database.schema.table tgt USING ( SELECT id,bar,updated_at,__artie_delete from (values ('1', '456', '2001-02-03T04:05:06Z', false),('2', 'bb', '2001-02-03T04:05:06Z', true),('3', 'dd', '2001-02-03T04:05:06Z', false)) as _tbl(id,bar,updated_at,__artie_delete) ) AS stg ON tgt."ID" = stg."ID" WHEN MATCHED AND stg.updated_at >= tgt.updated_at THEN UPDATE SET "ID"=stg."ID","__ARTIE_DELETE"=stg."__ARTIE_DELETE" -WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("ID","__ARTIE_DELETE") VALUES (stg."ID",stg."__ARTIE_DELETE");`, statements[0]) +WHEN NOT MATCHED THEN INSERT ("ID","__ARTIE_DELETE") VALUES (stg."ID",stg."__ARTIE_DELETE");`, statements[0]) } } From 39f9479b979b0520eb105d7e772a9dcfaf9ab4db Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Mon, 17 Jun 2024 22:16:04 -0700 Subject: [PATCH 21/24] [bigquery] Increase Storage Write API max request size (#741) --- clients/bigquery/bigquery.go | 7 ++----- lib/config/bigquery.go | 7 ------- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 658c343e7..c6d992ab3 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -34,13 +34,12 @@ const ( describeNameCol = "column_name" describeTypeCol = "data_type" describeCommentCol = "description" - // Storage Write API is limited to 10 MB, let's start out conservative and use 80% of that. - maxRequestByteSize = 10_000_000 * .8 + // Storage Write API is limited to 10 MiB, subtract 50 KiB to account for request overhead. + maxRequestByteSize = (10 * 1024 * 1024) - (50 * 1024) ) type Store struct { configMap *types.DwhToTablesConfigMap - batchSize int config config.Config db.Store @@ -223,7 +222,6 @@ func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includ } func LoadBigQuery(cfg config.Config, _store *db.Store) (*Store, error) { - cfg.BigQuery.LoadDefaultValues() if _store != nil { // Used for tests. return &Store{ @@ -250,7 +248,6 @@ func LoadBigQuery(cfg config.Config, _store *db.Store) (*Store, error) { return &Store{ Store: store, configMap: &types.DwhToTablesConfigMap{}, - batchSize: cfg.BigQuery.BatchSize, config: cfg, }, nil } diff --git a/lib/config/bigquery.go b/lib/config/bigquery.go index 6e8250daa..5226fa95f 100644 --- a/lib/config/bigquery.go +++ b/lib/config/bigquery.go @@ -9,13 +9,6 @@ type BigQuery struct { DefaultDataset string `yaml:"defaultDataset"` ProjectID string `yaml:"projectID"` Location string `yaml:"location"` - BatchSize int `yaml:"batchSize"` -} - -func (b *BigQuery) LoadDefaultValues() { - if b.BatchSize == 0 { - b.BatchSize = 1000 - } } // DSN - returns the notation for BigQuery following this format: bigquery://projectID/[location/]datasetID?queryString From 0d9b9088b0bbef56d0d023a8c6ea51e5fdc0a84d Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Tue, 18 Jun 2024 11:39:21 -0700 Subject: [PATCH 22/24] [bigquery] Move `encodeStructToJSONString` function (#742) --- clients/bigquery/cast.go | 36 --------------------------- clients/bigquery/cast_test.go | 34 ------------------------- clients/bigquery/dialect/dialect.go | 2 -- clients/bigquery/storagewrite.go | 32 +++++++++++++++++++++++- clients/bigquery/storagewrite_test.go | 27 ++++++++++++++++++++ lib/typing/ext/variables.go | 1 - 6 files changed, 58 insertions(+), 74 deletions(-) delete mode 100644 clients/bigquery/cast.go delete mode 100644 clients/bigquery/cast_test.go diff --git a/clients/bigquery/cast.go b/clients/bigquery/cast.go deleted file mode 100644 index ccc0cf236..000000000 --- a/clients/bigquery/cast.go +++ /dev/null @@ -1,36 +0,0 @@ -package bigquery - -import ( - "encoding/json" - "fmt" - "log/slog" - "strings" - - "github.com/artie-labs/transfer/lib/config/constants" -) - -// EncodeStructToJSONString takes a struct as either a string or Go object and encodes it into a JSON string. -// Structs from relational and Mongo are different. -// MongoDB will return the native objects back such as `map[string]any{"hello": "world"}` -// Relational will return a string representation of the struct such as `{"hello": "world"}` -func EncodeStructToJSONString(value any) (string, error) { - if stringValue, isOk := value.(string); isOk { - if strings.Contains(stringValue, constants.ToastUnavailableValuePlaceholder) { - return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil - } - return stringValue, nil - } - - bytes, err := json.Marshal(value) - if err != nil { - return "", fmt.Errorf("failed to marshal value: %w", err) - } - - stringValue := string(bytes) - if strings.Contains(stringValue, constants.ToastUnavailableValuePlaceholder) { - // TODO: Remove this if we don't see it in the logs. - slog.Error("encoded JSON value contains the toast unavailable value placeholder") - return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil - } - return stringValue, nil -} diff --git a/clients/bigquery/cast_test.go b/clients/bigquery/cast_test.go deleted file mode 100644 index ffd2fcbb5..000000000 --- a/clients/bigquery/cast_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package bigquery - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestEncodeStructToJSONString(t *testing.T) { - { - // Empty string: - result, err := EncodeStructToJSONString("") - assert.NoError(t, err) - assert.Equal(t, "", result) - } - { - // Toasted string: - result, err := EncodeStructToJSONString("__debezium_unavailable_value") - assert.NoError(t, err) - assert.Equal(t, `{"key":"__debezium_unavailable_value"}`, result) - } - { - // Map: - result, err := EncodeStructToJSONString(map[string]any{"foo": "bar", "baz": 1234}) - assert.NoError(t, err) - assert.Equal(t, `{"baz":1234,"foo":"bar"}`, result) - } - { - // Toasted map (should not happen): - result, err := EncodeStructToJSONString(map[string]any{"__debezium_unavailable_value": "bar", "baz": 1234}) - assert.NoError(t, err) - assert.Equal(t, `{"key":"__debezium_unavailable_value"}`, result) - } -} diff --git a/clients/bigquery/dialect/dialect.go b/clients/bigquery/dialect/dialect.go index e8d050e57..641819c97 100644 --- a/clients/bigquery/dialect/dialect.go +++ b/clients/bigquery/dialect/dialect.go @@ -14,8 +14,6 @@ import ( ) const ( - BQStreamingTimeFormat = "15:04:05" - bqLayout = "2006-01-02 15:04:05 MST" ) diff --git a/clients/bigquery/storagewrite.go b/clients/bigquery/storagewrite.go index 3cd550bde..53af61583 100644 --- a/clients/bigquery/storagewrite.go +++ b/clients/bigquery/storagewrite.go @@ -1,13 +1,17 @@ package bigquery import ( + "encoding/json" "fmt" + "log/slog" "strconv" + "strings" "time" "cloud.google.com/go/bigquery/storage/apiv1/storagepb" "cloud.google.com/go/bigquery/storage/managedwriter/adapt" "github.com/artie-labs/transfer/lib/array" + "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/typing/decimal" @@ -188,7 +192,7 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto return nil, fmt.Errorf("unsupported extended time details: %q", column.KindDetails.ExtendedTimeDetails.Type) } case typing.Struct.Kind: - stringValue, err := EncodeStructToJSONString(value) + stringValue, err := encodeStructToJSONString(value) if err != nil { return nil, err } else if stringValue == "" { @@ -211,3 +215,29 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto } return message, nil } + +// encodeStructToJSONString takes a struct as either a string or Go object and encodes it into a JSON string. +// Structs from relational and Mongo are different. +// MongoDB will return the native objects back such as `map[string]any{"hello": "world"}` +// Relational will return a string representation of the struct such as `{"hello": "world"}` +func encodeStructToJSONString(value any) (string, error) { + if stringValue, isOk := value.(string); isOk { + if strings.Contains(stringValue, constants.ToastUnavailableValuePlaceholder) { + return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil + } + return stringValue, nil + } + + bytes, err := json.Marshal(value) + if err != nil { + return "", fmt.Errorf("failed to marshal value: %w", err) + } + + stringValue := string(bytes) + if strings.Contains(stringValue, constants.ToastUnavailableValuePlaceholder) { + // TODO: Remove this if we don't see it in the logs. + slog.Error("encoded JSON value contains the toast unavailable value placeholder") + return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil + } + return stringValue, nil +} diff --git a/clients/bigquery/storagewrite_test.go b/clients/bigquery/storagewrite_test.go index ceaf26168..7101e7718 100644 --- a/clients/bigquery/storagewrite_test.go +++ b/clients/bigquery/storagewrite_test.go @@ -171,3 +171,30 @@ func TestRowToMessage(t *testing.T) { "cArray": []any{"foo", "bar"}, }, result) } + +func TestEncodeStructToJSONString(t *testing.T) { + { + // Empty string: + result, err := encodeStructToJSONString("") + assert.NoError(t, err) + assert.Equal(t, "", result) + } + { + // Toasted string: + result, err := encodeStructToJSONString("__debezium_unavailable_value") + assert.NoError(t, err) + assert.Equal(t, `{"key":"__debezium_unavailable_value"}`, result) + } + { + // Map: + result, err := encodeStructToJSONString(map[string]any{"foo": "bar", "baz": 1234}) + assert.NoError(t, err) + assert.Equal(t, `{"baz":1234,"foo":"bar"}`, result) + } + { + // Toasted map (should not happen): + result, err := encodeStructToJSONString(map[string]any{"__debezium_unavailable_value": "bar", "baz": 1234}) + assert.NoError(t, err) + assert.Equal(t, `{"key":"__debezium_unavailable_value"}`, result) + } +} diff --git a/lib/typing/ext/variables.go b/lib/typing/ext/variables.go index 2f44d353f..56b8d81ab 100644 --- a/lib/typing/ext/variables.go +++ b/lib/typing/ext/variables.go @@ -3,7 +3,6 @@ package ext import "time" const ( - BigQueryDateTimeFormat = "2006-01-02 15:04:05.999999" ISO8601 = "2006-01-02T15:04:05.999-07:00" PostgresDateFormat = "2006-01-02" PostgresTimeFormat = "15:04:05.999999-07" // microsecond precision From 2853e305aaefe184b21a1d6ef32ca4e3eb64056d Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Tue, 18 Jun 2024 11:50:04 -0700 Subject: [PATCH 23/24] [bigquery] Remove dead code (#743) --- clients/bigquery/merge.go | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/clients/bigquery/merge.go b/clients/bigquery/merge.go index bf807e87b..f334a3e06 100644 --- a/clients/bigquery/merge.go +++ b/clients/bigquery/merge.go @@ -4,8 +4,6 @@ import ( "fmt" "strings" - "cloud.google.com/go/bigquery" - "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination/types" @@ -16,20 +14,6 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -type Row struct { - data map[string]bigquery.Value -} - -func NewRow(data map[string]bigquery.Value) *Row { - return &Row{ - data: data, - } -} - -func (r *Row) Save() (map[string]bigquery.Value, string, error) { - return r.data, bigquery.NoDedupeID, nil -} - func (s *Store) Merge(tableData *optimization.TableData) error { var additionalEqualityStrings []string if tableData.TopicConfig().BigQueryPartitionSettings != nil { From ef85a8436e04384656cc334f2c170a239db1df84 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Tue, 18 Jun 2024 13:40:46 -0700 Subject: [PATCH 24/24] [bigquery] Rename `putTableViaStorageWriteAPI` method (#744) --- clients/bigquery/bigquery.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index c6d992ab3..1bc15630c 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -103,8 +103,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo return fmt.Errorf("unable to cast tempTableID to BigQuery TableIdentifier") } - // Load the data - return s.putTableViaStorageWriteAPI(context.Background(), bqTempTableID, tableData) + return s.putTable(context.Background(), bqTempTableID, tableData) } func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql.TableIdentifier { @@ -152,7 +151,7 @@ func (s *Store) GetClient(ctx context.Context) *bigquery.Client { return client } -func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableIdentifier, tableData *optimization.TableData) error { +func (s *Store) putTable(ctx context.Context, bqTableID TableIdentifier, tableData *optimization.TableData) error { columns := tableData.ReadOnlyInMemoryCols().ValidColumns() messageDescriptor, err := columnsToMessageDescriptor(columns)