diff --git a/README.md b/README.md index 29c95cdab..a3fb9f8d3 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,7 @@ Transfer is aiming to provide coverage across all OLTPs and OLAPs databases. Cur - MongoDB - MySQL - PostgreSQL - + _If the database you are using is not on the list, feel free to file for a [feature request](https://github.com/artie-labs/transfer/issues/new)._ @@ -101,4 +101,4 @@ Artie Transfer is released through [GoReleaser](https://goreleaser.com/), and we ## License -Artie Transfer is licensed under ELv2. Please see the [LICENSE](https://github.com/artie-labs/transfer/blob/master/LICENSE.txt) file for additional information. If you have any licensing questions please email hi@artie.so. +Artie Transfer is licensed under ELv2. Please see the [LICENSE](https://github.com/artie-labs/transfer/blob/master/LICENSE.txt) file for additional information. If you have any licensing questions please email hi@artie.com. diff --git a/clients/bigquery/batch.go b/clients/bigquery/batch.go deleted file mode 100644 index 1f151b87d..000000000 --- a/clients/bigquery/batch.go +++ /dev/null @@ -1,34 +0,0 @@ -package bigquery - -type Batch struct { - rows []*Row - chunkSize int - iteratorIdx int -} - -func NewBatch(rows []*Row, chunkSize int) *Batch { - return &Batch{ - rows: rows, - chunkSize: chunkSize, - } -} - -func (b *Batch) HasNext() bool { - return len(b.rows) > b.iteratorIdx -} - -func (b *Batch) NextChunk() []*Row { - start := b.iteratorIdx - b.iteratorIdx += b.chunkSize - end := b.iteratorIdx - - if end > len(b.rows) { - end = len(b.rows) - } - - if start > end { - return nil - } - - return b.rows[start:end] -} diff --git a/clients/bigquery/batch_test.go b/clients/bigquery/batch_test.go deleted file mode 100644 index b56b08473..000000000 --- a/clients/bigquery/batch_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package bigquery - -import ( - "cloud.google.com/go/bigquery" - - "github.com/stretchr/testify/assert" -) - -func (b *BigQueryTestSuite) TestBatch_NextChunk() { - messages := []*Row{ - NewRow(map[string]bigquery.Value{"col1": "message1"}), - NewRow(map[string]bigquery.Value{"col1": "message2"}), - NewRow(map[string]bigquery.Value{"col1": "message3"}), - } - - batch := NewBatch(messages, 2) - // First call to NextChunk - chunk := batch.NextChunk() - assert.Equal(b.T(), 2, len(chunk), "Expected chunk size to be 2") - assert.Equal(b.T(), map[string]bigquery.Value{"col1": "message1"}, chunk[0].data, "Expected first message in chunk to be message1") - assert.Equal(b.T(), map[string]bigquery.Value{"col1": "message2"}, chunk[1].data, "Expected second message in chunk to be message2") - - // Second call to NextChunk - chunk = batch.NextChunk() - assert.Equal(b.T(), 1, len(chunk), "Expected chunk size to be 1 for the remaining messages") - assert.Equal(b.T(), map[string]bigquery.Value{"col1": "message3"}, chunk[0].data, "Expected the last message in chunk to be message3") - - // Third call to NextChunk should return an empty chunk as there are no more messages - chunk = batch.NextChunk() - assert.Empty(b.T(), chunk, "Expected an empty chunk when there are no more messages") -} diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index bce9ff214..1bc15630c 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -8,10 +8,14 @@ import ( "strings" "cloud.google.com/go/bigquery" + "cloud.google.com/go/bigquery/storage/managedwriter" + "cloud.google.com/go/bigquery/storage/managedwriter/adapt" _ "github.com/viant/bigquery" + "google.golang.org/protobuf/proto" "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/clients/shared" + "github.com/artie-labs/transfer/lib/batch" "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/db" @@ -23,7 +27,6 @@ import ( "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" - "github.com/artie-labs/transfer/lib/stringutil" ) const ( @@ -31,18 +34,51 @@ const ( describeNameCol = "column_name" describeTypeCol = "data_type" describeCommentCol = "description" + // Storage Write API is limited to 10 MiB, subtract 50 KiB to account for request overhead. + maxRequestByteSize = (10 * 1024 * 1024) - (50 * 1024) ) type Store struct { configMap *types.DwhToTablesConfigMap - batchSize int config config.Config db.Store } -func (s *Store) Append(tableData *optimization.TableData) error { - return shared.Append(s, tableData, types.AdditionalSettings{}) +func (s *Store) Append(tableData *optimization.TableData, useTempTable bool) error { + if !useTempTable { + return shared.Append(s, tableData, types.AdditionalSettings{}) + } + + // We can simplify this once Google has fully rolled out the ability to execute DML on recently streamed data + // See: https://cloud.google.com/bigquery/docs/write-api#use_data_manipulation_language_dml_with_recently_streamed_data + // For now, we'll need to append this to a temporary table and then append temporary table onto the target table + tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) + temporaryTableID := shared.TempTableID(tableID) + + defer func() { _ = ddl.DropTemporaryTable(s, temporaryTableID, false) }() + + err := shared.Append(s, tableData, types.AdditionalSettings{ + UseTempTable: true, + TempTableID: temporaryTableID, + }) + + if err != nil { + return fmt.Errorf("failed to append: %w", err) + } + + query := fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s`, + tableID.FullyQualifiedName(), + strings.Join(sql.QuoteIdentifiers(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), s.Dialect()), ","), + strings.Join(sql.QuoteIdentifiers(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), s.Dialect()), ","), + temporaryTableID.FullyQualifiedName(), + ) + + if _, err = s.Exec(query); err != nil { + return fmt.Errorf("failed to insert data into target table: %w", err) + } + + return nil } func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { @@ -67,30 +103,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo return fmt.Errorf("unable to cast tempTableID to BigQuery TableIdentifier") } - // Load the data - return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData) -} - -func buildLegacyRows(tableData *optimization.TableData, additionalDateFmts []string) ([]*Row, error) { - // Cast the data into BigQuery values - var rows []*Row - columns := tableData.ReadOnlyInMemoryCols().ValidColumns() - for _, value := range tableData.Rows() { - data := make(map[string]bigquery.Value) - for _, col := range columns { - colVal, err := castColVal(value[col.Name()], col, additionalDateFmts) - if err != nil { - return nil, fmt.Errorf("failed to cast col %q: %w", col.Name(), err) - } - - if colVal != nil { - data[col.Name()] = colVal - } - } - - rows = append(rows, NewRow(data)) - } - return rows, nil + return s.putTable(context.Background(), bqTempTableID, tableData) } func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql.TableIdentifier { @@ -138,30 +151,69 @@ func (s *Store) GetClient(ctx context.Context) *bigquery.Client { return client } -func (s *Store) putTableViaLegacyAPI(ctx context.Context, tableID TableIdentifier, tableData *optimization.TableData) error { - rows, err := buildLegacyRows(tableData, s.config.SharedTransferConfig.TypingSettings.AdditionalDateFormats) +func (s *Store) putTable(ctx context.Context, bqTableID TableIdentifier, tableData *optimization.TableData) error { + columns := tableData.ReadOnlyInMemoryCols().ValidColumns() + + messageDescriptor, err := columnsToMessageDescriptor(columns) + if err != nil { + return err + } + schemaDescriptor, err := adapt.NormalizeDescriptor(*messageDescriptor) if err != nil { return err } - client := s.GetClient(ctx) - defer client.Close() + managedWriterClient, err := managedwriter.NewClient(ctx, bqTableID.ProjectID()) + if err != nil { + return fmt.Errorf("failed to create managedwriter client: %w", err) + } + defer managedWriterClient.Close() + + managedStream, err := managedWriterClient.NewManagedStream(ctx, + managedwriter.WithDestinationTable( + managedwriter.TableParentFromParts(bqTableID.ProjectID(), bqTableID.Dataset(), bqTableID.Table()), + ), + managedwriter.WithType(managedwriter.DefaultStream), + managedwriter.WithSchemaDescriptor(schemaDescriptor), + managedwriter.EnableWriteRetries(true), + ) + if err != nil { + return fmt.Errorf("failed to create managed stream: %w", err) + } + defer managedStream.Close() - batch := NewBatch(rows, s.batchSize) - inserter := client.Dataset(tableID.Dataset()).Table(tableID.Table()).Inserter() - for batch.HasNext() { - if err := inserter.Put(ctx, batch.NextChunk()); err != nil { - return fmt.Errorf("failed to insert rows: %w", err) + encoder := func(row map[string]any) ([]byte, error) { + message, err := rowToMessage(row, columns, *messageDescriptor, s.AdditionalDateFormats()) + if err != nil { + return nil, fmt.Errorf("failed to convert row to message: %w", err) } + + bytes, err := proto.Marshal(message) + if err != nil { + return nil, fmt.Errorf("failed to marshal message: %w", err) + } + + return bytes, nil } - return nil + return batch.BySize(tableData.Rows(), maxRequestByteSize, encoder, func(chunk [][]byte) error { + result, err := managedStream.AppendRows(ctx, chunk) + if err != nil { + return fmt.Errorf("failed to append rows: %w", err) + } + + if resp, err := result.FullResponse(ctx); err != nil { + return fmt.Errorf("failed to get response (%s): %w", resp.GetError().String(), err) + } + + return nil + }) } -func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error { - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) +func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error { + stagingTableID := shared.TempTableID(tableID) - dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, topicConfig) + dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt) defer func() { _ = ddl.DropTemporaryTable(s, stagingTableID, false) }() @@ -169,7 +221,6 @@ func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicC } func LoadBigQuery(cfg config.Config, _store *db.Store) (*Store, error) { - cfg.BigQuery.LoadDefaultValues() if _store != nil { // Used for tests. return &Store{ @@ -196,7 +247,6 @@ func LoadBigQuery(cfg config.Config, _store *db.Store) (*Store, error) { return &Store{ Store: store, configMap: &types.DwhToTablesConfigMap{}, - batchSize: cfg.BigQuery.BatchSize, config: cfg, }, nil } diff --git a/clients/bigquery/bigquery_dedupe_test.go b/clients/bigquery/bigquery_dedupe_test.go index e43018740..841244b24 100644 --- a/clients/bigquery/bigquery_dedupe_test.go +++ b/clients/bigquery/bigquery_dedupe_test.go @@ -2,7 +2,6 @@ package bigquery import ( "fmt" - "strings" "testing" "time" @@ -11,17 +10,15 @@ import ( "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" - "github.com/artie-labs/transfer/lib/stringutil" ) func TestGenerateDedupeQueries(t *testing.T) { { // Dedupe with one primary key + no `__artie_updated_at` flag. tableID := NewTableIdentifier("project12", "public", "customers") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) - parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{}) + parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false) assert.Len(t, parts, 3) assert.Equal( t, @@ -37,9 +34,9 @@ func TestGenerateDedupeQueries(t *testing.T) { { // Dedupe with one primary key + `__artie_updated_at` flag. tableID := NewTableIdentifier("project12", "public", "customers") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) - parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true) assert.Len(t, parts, 3) assert.Equal( t, @@ -55,9 +52,9 @@ func TestGenerateDedupeQueries(t *testing.T) { { // Dedupe with composite keys + no `__artie_updated_at` flag. tableID := NewTableIdentifier("project123", "public", "user_settings") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) - parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{}) + parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false) assert.Len(t, parts, 3) assert.Equal( t, @@ -73,9 +70,9 @@ func TestGenerateDedupeQueries(t *testing.T) { { // Dedupe with composite keys + `__artie_updated_at` flag. tableID := NewTableIdentifier("project123", "public", "user_settings") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) - parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true) assert.Len(t, parts, 3) assert.Equal( t, diff --git a/clients/bigquery/bigquery_test.go b/clients/bigquery/bigquery_test.go index 035c7555c..627f0b4ef 100644 --- a/clients/bigquery/bigquery_test.go +++ b/clients/bigquery/bigquery_test.go @@ -13,7 +13,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestTempTableName(t *testing.T) { +func TestTempTableIDWithSuffix(t *testing.T) { trimTTL := func(tableName string) string { lastUnderscore := strings.LastIndex(tableName, "_") assert.GreaterOrEqual(t, lastUnderscore, 0) @@ -26,6 +26,6 @@ func TestTempTableName(t *testing.T) { store := &Store{config: config.Config{BigQuery: &config.BigQuery{ProjectID: "123454321"}}} tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{Database: "db", Schema: "schema"}, "table") tableID := store.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - tempTableName := shared.TempTableID(tableID, "sUfFiX").FullyQualifiedName() + tempTableName := shared.TempTableIDWithSuffix(tableID, "sUfFiX").FullyQualifiedName() assert.Equal(t, "`123454321`.`db`.`table___artie_sUfFiX`", trimTTL(tempTableName)) } diff --git a/clients/bigquery/cast.go b/clients/bigquery/cast.go deleted file mode 100644 index 916d354f2..000000000 --- a/clients/bigquery/cast.go +++ /dev/null @@ -1,100 +0,0 @@ -package bigquery - -import ( - "encoding/json" - "fmt" - "log/slog" - "strings" - - "github.com/artie-labs/transfer/clients/bigquery/dialect" - "github.com/artie-labs/transfer/lib/array" - "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/artie-labs/transfer/lib/typing/decimal" - "github.com/artie-labs/transfer/lib/typing/ext" -) - -func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string) (any, error) { - if colVal == nil { - return nil, nil - } - - switch colKind.KindDetails.Kind { - case typing.Float.Kind, typing.Integer.Kind, typing.Boolean.Kind, typing.String.Kind: - return colVal, nil - case typing.EDecimal.Kind: - val, isOk := colVal.(*decimal.Decimal) - if !isOk { - return nil, fmt.Errorf("colVal is not type *decimal.Decimal") - } - - return val.Value(), nil - case typing.ETime.Kind: - extTime, err := ext.ParseFromInterface(colVal, additionalDateFmts) - if err != nil { - return nil, fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %w", colVal, err) - } - - if colKind.KindDetails.ExtendedTimeDetails == nil { - return nil, fmt.Errorf("column kind details for extended time details is null") - } - - // We should be using the colKind here since the data types coming from the source may be inconsistent. - switch colKind.KindDetails.ExtendedTimeDetails.Type { - // https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery#sending_datetime_data - case ext.DateTimeKindType: - if extTime.Year() == 0 { - return nil, nil - } - - return extTime.StringUTC(ext.BigQueryDateTimeFormat), nil - case ext.DateKindType: - if extTime.Year() == 0 { - return nil, nil - } - - return extTime.String(ext.PostgresDateFormat), nil - case ext.TimeKindType: - return extTime.String(dialect.BQStreamingTimeFormat), nil - } - case typing.Struct.Kind: - // TODO: See if we can improve this eval and find a better location, see: https://github.com/artie-labs/transfer/pull/697#discussion_r1609280164 - if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) { - return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil - } - - // Structs from relational and Mongo are different. - // MongoDB will return the native objects back such as `map[string]any{"hello": "world"}` - // Relational will return a string representation of the struct such as `{"hello": "world"}` - if colValString, isOk := colVal.(string); isOk { - if colValString == "" { - return nil, nil - } - - return colValString, nil - } - - colValBytes, err := json.Marshal(colVal) - if err != nil { - return nil, fmt.Errorf("failed to marshal colVal: %w", err) - } - - return string(colValBytes), nil - case typing.Array.Kind: - arrayString, err := array.InterfaceToArrayString(colVal, true) - if err != nil { - return nil, err - } - - if len(arrayString) == 0 { - return nil, nil - } - - return arrayString, nil - } - - // TODO: Change this to return an error once we don't see Sentry - slog.Error("Unexpected BigQuery Data Type", slog.Any("colKind", colKind.KindDetails.Kind), slog.Any("colVal", colVal)) - return fmt.Sprint(colVal), nil -} diff --git a/clients/bigquery/cast_test.go b/clients/bigquery/cast_test.go deleted file mode 100644 index eed3f871a..000000000 --- a/clients/bigquery/cast_test.go +++ /dev/null @@ -1,156 +0,0 @@ -package bigquery - -import ( - "fmt" - "math/big" - "time" - - "github.com/artie-labs/transfer/lib/ptr" - "github.com/artie-labs/transfer/lib/typing/decimal" - - "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/artie-labs/transfer/lib/typing/ext" - - "github.com/stretchr/testify/assert" -) - -func (b *BigQueryTestSuite) TestCastColVal() { - { - // Strings - colVal, err := castColVal("hello", columns.Column{KindDetails: typing.String}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "hello", colVal) - } - { - // Integers - colVal, err := castColVal(5, columns.Column{KindDetails: typing.Integer}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), 5, colVal) - } - { - // Floats - colVal, err := castColVal(5.55, columns.Column{KindDetails: typing.Float}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), 5.55, colVal) - } - { - // Booleans - colVal, err := castColVal(true, columns.Column{KindDetails: typing.Boolean}, nil) - assert.NoError(b.T(), err) - assert.True(b.T(), colVal.(bool)) - } - { - // EDecimals - dec := decimal.NewDecimal(ptr.ToInt(5), 2, big.NewFloat(123.45)) - colVal, err := castColVal(dec, columns.Column{KindDetails: typing.EDecimal}, nil) - assert.NoError(b.T(), err) - - // Native type is big.Float if precision doesn't exceed the DWH limit - assert.Equal(b.T(), big.NewFloat(123.45), colVal) - - // Precision has clearly exceeded the limit, so we should be returning a string - dec = decimal.NewDecimal(ptr.ToInt(50), 2, big.NewFloat(123.45)) - colVal, err = castColVal(dec, columns.Column{KindDetails: typing.EDecimal}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "123.45", colVal) - } - { - // ETime - birthday := time.Date(2022, time.September, 6, 3, 19, 24, 942000000, time.UTC) - - tsKind := typing.ETime - tsKind.ExtendedTimeDetails = &ext.DateTime - - dateKind := typing.ETime - dateKind.ExtendedTimeDetails = &ext.Date - birthdayTSExt := ext.NewExtendedTime(birthday, tsKind.ExtendedTimeDetails.Type, "") - { - // Timestamp - colVal, err := castColVal(birthdayTSExt, columns.Column{KindDetails: tsKind}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "2022-09-06 03:19:24.942", colVal) - } - { - // Date - birthdayDateExt := ext.NewExtendedTime(birthday, dateKind.ExtendedTimeDetails.Type, "") - colVal, err := castColVal(birthdayDateExt, columns.Column{KindDetails: dateKind}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "2022-09-06", colVal) - } - - { - // Date (column is a date, but value is not) - colVal, err := castColVal(birthdayTSExt, columns.Column{KindDetails: dateKind}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "2022-09-06", colVal) - } - { - // Time - timeKind := typing.ETime - timeKind.ExtendedTimeDetails = &ext.Time - birthdayTimeExt := ext.NewExtendedTime(birthday, timeKind.ExtendedTimeDetails.Type, "") - colVal, err := castColVal(birthdayTimeExt, columns.Column{KindDetails: timeKind}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "03:19:24", colVal) - } - - invalidDate := time.Date(0, time.September, 6, 3, 19, 24, 942000000, time.UTC) - invalidDateTsExt := ext.NewExtendedTime(invalidDate, tsKind.ExtendedTimeDetails.Type, "") - { - // Date (column is a date, but value is invalid) - colVal, err := castColVal(invalidDateTsExt, columns.Column{KindDetails: dateKind}, nil) - assert.NoError(b.T(), err) - assert.Nil(b.T(), colVal) - } - { - // Datetime (column is datetime but value is invalid) - colVal, err := castColVal(invalidDateTsExt, columns.Column{KindDetails: tsKind}, nil) - assert.NoError(b.T(), err) - assert.Nil(b.T(), colVal) - } - } - { - // Structs - colVal, err := castColVal(map[string]any{"hello": "world"}, columns.Column{KindDetails: typing.Struct}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), `{"hello":"world"}`, colVal) - - // With string values - colVal, err = castColVal(`{"hello":"world"}`, columns.Column{KindDetails: typing.Struct}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), `{"hello":"world"}`, colVal) - - // With empty string - colVal, err = castColVal("", columns.Column{KindDetails: typing.Struct}, nil) - assert.NoError(b.T(), err) - assert.Nil(b.T(), colVal) - - // With array - colVal, err = castColVal([]any{map[string]any{}, map[string]any{"hello": "world"}}, columns.Column{KindDetails: typing.Struct}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), `[{},{"hello":"world"}]`, colVal) - - // With TOAST values - colVal, err = castColVal(constants.ToastUnavailableValuePlaceholder, columns.Column{KindDetails: typing.Struct}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), colVal) - } - { - // Arrays - colVal, err := castColVal([]any{1, 2, 3, 4, 5}, columns.Column{KindDetails: typing.Array}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), []string{"1", "2", "3", "4", "5"}, colVal) - - // Empty array - colVal, err = castColVal([]any{}, columns.Column{KindDetails: typing.Array}, nil) - assert.NoError(b.T(), err) - assert.Nil(b.T(), colVal) - - // Null array - colVal, err = castColVal(nil, columns.Column{KindDetails: typing.Array}, nil) - assert.NoError(b.T(), err) - assert.Nil(b.T(), colVal) - } -} diff --git a/clients/bigquery/dialect/dialect.go b/clients/bigquery/dialect/dialect.go index d4341ce7d..641819c97 100644 --- a/clients/bigquery/dialect/dialect.go +++ b/clients/bigquery/dialect/dialect.go @@ -7,7 +7,6 @@ import ( "time" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -15,8 +14,6 @@ import ( ) const ( - BQStreamingTimeFormat = "15:04:05" - bqLayout = "2006-01-02 15:04:05 MST" ) @@ -123,7 +120,7 @@ func (BigQueryDialect) IsColumnAlreadyExistsErr(err error) bool { return strings.Contains(err.Error(), "Column already exists") } -func (BigQueryDialect) IsTableDoesNotExistErr(err error) bool { +func (BigQueryDialect) IsTableDoesNotExistErr(_ error) bool { return false } @@ -154,11 +151,23 @@ func (bd BigQueryDialect) BuildIsNotToastValueExpression(tableAlias constants.Ta return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder) } -func (bd BigQueryDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { +func (bd BigQueryDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string { + primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, bd) + + // BigQuery does not like DISTINCT for JSON columns, so we wrote this instead. + // Error: Column foo of type JSON cannot be used in SELECT DISTINCT + return fmt.Sprintf(`(SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 1)`, + tableID.FullyQualifiedName(), + strings.Join(primaryKeysEscaped, ", "), + strings.Join(primaryKeysEscaped, ", "), + ) +} + +func (bd BigQueryDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string { primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, bd) orderColsToIterate := primaryKeysEscaped - if topicConfig.IncludeArtieUpdatedAt { + if includeArtieUpdatedAt { orderColsToIterate = append(orderColsToIterate, bd.QuoteIdentifier(constants.UpdateColumnMarker)) } @@ -243,11 +252,11 @@ MERGE INTO %s %s USING %s AS %s ON %s`, if softDelete { return []string{baseQuery + fmt.Sprintf(` WHEN MATCHED %sTHEN UPDATE SET %s -WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`, +WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);`, // WHEN MATCHED %sTHEN UPDATE SET %s idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, bd), - // WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) - sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd), strings.Join(sql.QuoteColumns(cols, bd), ","), + // WHEN NOT MATCHED THEN INSERT (%s) + strings.Join(sql.QuoteColumns(cols, bd), ","), // VALUES (%s); strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, bd), ","), )}, nil diff --git a/clients/bigquery/dialect/dialect_test.go b/clients/bigquery/dialect/dialect_test.go index 6abf88c42..6e669d735 100644 --- a/clients/bigquery/dialect/dialect_test.go +++ b/clients/bigquery/dialect/dialect_test.go @@ -266,7 +266,7 @@ func TestBigQueryDialect_BuildMergeQueries_SoftDelete(t *testing.T) { assert.Equal(t, []string{ "MERGE INTO customers.orders tgt USING {SUB_QUERY} AS stg ON tgt.`order_id` = stg.`order_id`", "WHEN MATCHED THEN UPDATE SET `order_id`=stg.`order_id`,`name`=stg.`name`,`__artie_delete`=stg.`__artie_delete`", - "WHEN NOT MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN INSERT (`order_id`,`name`,`__artie_delete`) VALUES (stg.`order_id`,stg.`name`,stg.`__artie_delete`);"}, + "WHEN NOT MATCHED THEN INSERT (`order_id`,`name`,`__artie_delete`) VALUES (stg.`order_id`,stg.`name`,stg.`__artie_delete`);"}, strings.Split(strings.TrimSpace(statements[0]), "\n")) } @@ -295,7 +295,7 @@ func TestBigQueryDialect_BuildMergeQueries_IdempotentKey(t *testing.T) { assert.Equal(t, []string{ "MERGE INTO customers.orders tgt USING {SUB_QUERY} AS stg ON tgt.`order_id` = stg.`order_id`", "WHEN MATCHED AND stg.idempotent_key >= tgt.idempotent_key THEN UPDATE SET `order_id`=stg.`order_id`,`name`=stg.`name`,`__artie_delete`=stg.`__artie_delete`", - "WHEN NOT MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN INSERT (`order_id`,`name`,`__artie_delete`) VALUES (stg.`order_id`,stg.`name`,stg.`__artie_delete`);"}, + "WHEN NOT MATCHED THEN INSERT (`order_id`,`name`,`__artie_delete`) VALUES (stg.`order_id`,stg.`name`,stg.`__artie_delete`);"}, strings.Split(strings.TrimSpace(statements[0]), "\n")) } diff --git a/clients/bigquery/merge.go b/clients/bigquery/merge.go index 213cddcbd..f334a3e06 100644 --- a/clients/bigquery/merge.go +++ b/clients/bigquery/merge.go @@ -4,8 +4,6 @@ import ( "fmt" "strings" - "cloud.google.com/go/bigquery" - "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination/types" @@ -16,20 +14,6 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -type Row struct { - data map[string]bigquery.Value -} - -func NewRow(data map[string]bigquery.Value) *Row { - return &Row{ - data: data, - } -} - -func (r *Row) Save() (map[string]bigquery.Value, string, error) { - return r.data, bigquery.NoDedupeID, nil -} - func (s *Store) Merge(tableData *optimization.TableData) error { var additionalEqualityStrings []string if tableData.TopicConfig().BigQueryPartitionSettings != nil { @@ -51,6 +35,8 @@ func (s *Store) Merge(tableData *optimization.TableData) error { AdditionalEqualityStrings: additionalEqualityStrings, // BigQuery has DDL quotas. RetryColBackfill: true, + // We are using BigQuery's streaming API which doesn't guarantee exactly once semantics + SubQueryDedupe: true, }) } diff --git a/clients/bigquery/storagewrite.go b/clients/bigquery/storagewrite.go new file mode 100644 index 000000000..53af61583 --- /dev/null +++ b/clients/bigquery/storagewrite.go @@ -0,0 +1,243 @@ +package bigquery + +import ( + "encoding/json" + "fmt" + "log/slog" + "strconv" + "strings" + "time" + + "cloud.google.com/go/bigquery/storage/apiv1/storagepb" + "cloud.google.com/go/bigquery/storage/managedwriter/adapt" + "github.com/artie-labs/transfer/lib/array" + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" + "github.com/artie-labs/transfer/lib/typing/decimal" + "github.com/artie-labs/transfer/lib/typing/ext" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/types/dynamicpb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// columnToTableFieldSchema returns a [*storagepb.TableFieldSchema] suitable for transfering data of the type that the column specifies. +// Not that the data type is not necessarily the data type that the table in the database is using. +func columnToTableFieldSchema(column columns.Column) (*storagepb.TableFieldSchema, error) { + var fieldType storagepb.TableFieldSchema_Type + mode := storagepb.TableFieldSchema_NULLABLE + + switch column.KindDetails.Kind { + case typing.Boolean.Kind: + fieldType = storagepb.TableFieldSchema_BOOL + case typing.Integer.Kind: + fieldType = storagepb.TableFieldSchema_INT64 + case typing.Float.Kind: + fieldType = storagepb.TableFieldSchema_DOUBLE + case typing.EDecimal.Kind: + fieldType = storagepb.TableFieldSchema_STRING + case typing.String.Kind: + fieldType = storagepb.TableFieldSchema_STRING + case typing.ETime.Kind: + switch column.KindDetails.ExtendedTimeDetails.Type { + case ext.TimeKindType: + fieldType = storagepb.TableFieldSchema_TIME + case ext.DateKindType: + fieldType = storagepb.TableFieldSchema_DATE + case ext.DateTimeKindType: + fieldType = storagepb.TableFieldSchema_TIMESTAMP + default: + return nil, fmt.Errorf("unsupported extended time details type: %q", column.KindDetails.ExtendedTimeDetails.Type) + } + case typing.Struct.Kind: + fieldType = storagepb.TableFieldSchema_STRING + case typing.Array.Kind: + fieldType = storagepb.TableFieldSchema_STRING + mode = storagepb.TableFieldSchema_REPEATED + default: + return nil, fmt.Errorf("unsupported column kind: %q", column.KindDetails.Kind) + } + + return &storagepb.TableFieldSchema{ + Name: column.Name(), + Type: fieldType, + Mode: mode, + }, nil +} + +func columnsToMessageDescriptor(cols []columns.Column) (*protoreflect.MessageDescriptor, error) { + fields := make([]*storagepb.TableFieldSchema, len(cols)) + for i, col := range cols { + field, err := columnToTableFieldSchema(col) + if err != nil { + return nil, err + } + fields[i] = field + } + tableSchema := storagepb.TableSchema{Fields: fields} + + descriptor, err := adapt.StorageSchemaToProto2Descriptor(&tableSchema, "root") + if err != nil { + return nil, fmt.Errorf("failed to build proto descriptor: %w", err) + } + messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor) + if !ok { + return nil, fmt.Errorf("proto descriptor is not a message descriptor") + } + return &messageDescriptor, nil +} + +// This is a reimplementation of https://github.com/googleapis/java-bigquerystorage/blob/f79acb5cfdd12253bca1c41551c478400120d2f9/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/CivilTimeEncoder.java#L143 +// See https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1.CivilTimeEncoder +// And https://cloud.google.com/pubsub/docs/bigquery#date_time_int +func encodePacked64TimeMicros(value time.Time) int64 { + var result = int64(value.Nanosecond() / 1000) + result |= int64(value.Second()) << 20 + result |= int64(value.Minute()) << 26 + result |= int64(value.Hour()) << 32 + return result +} + +func rowToMessage(row map[string]any, columns []columns.Column, messageDescriptor protoreflect.MessageDescriptor, additionalDateFmts []string) (*dynamicpb.Message, error) { + message := dynamicpb.NewMessage(messageDescriptor) + + for _, column := range columns { + field := message.Descriptor().Fields().ByTextName(column.Name()) + if field == nil { + return nil, fmt.Errorf("failed to find a field named %q", column.Name()) + } + + value := row[column.Name()] + + if value == nil { + continue + } + + switch column.KindDetails.Kind { + case typing.Boolean.Kind: + if boolValue, ok := value.(bool); ok { + message.Set(field, protoreflect.ValueOfBool(boolValue)) + } else { + return nil, fmt.Errorf("expected bool received %T with value %v", value, value) + } + case typing.Integer.Kind: + switch value := value.(type) { + case int: + message.Set(field, protoreflect.ValueOfInt64(int64(value))) + case int32: + message.Set(field, protoreflect.ValueOfInt64(int64(value))) + case int64: + message.Set(field, protoreflect.ValueOfInt64(value)) + default: + return nil, fmt.Errorf("expected int/int32/int64 received %T with value %v", value, value) + } + case typing.Float.Kind: + switch value := value.(type) { + case float32: + message.Set(field, protoreflect.ValueOfFloat64(float64(value))) + case float64: + message.Set(field, protoreflect.ValueOfFloat64(value)) + case int32: + message.Set(field, protoreflect.ValueOfFloat64(float64(value))) + case int64: + message.Set(field, protoreflect.ValueOfFloat64(float64(value))) + case string: + floatValue, err := strconv.ParseFloat(value, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse string to float64: %w", err) + } + message.Set(field, protoreflect.ValueOfFloat64(floatValue)) + default: + return nil, fmt.Errorf("expected float32/float64/int32/int64/string recieved %T with value %v", value, value) + } + case typing.EDecimal.Kind: + if decimalValue, ok := value.(*decimal.Decimal); ok { + message.Set(field, protoreflect.ValueOfString(decimalValue.String())) + } else { + return nil, fmt.Errorf("expected *decimal.Decimal received %T with value %v", decimalValue, decimalValue) + } + case typing.String.Kind: + var stringValue string + switch value := value.(type) { + case string: + stringValue = value + case *decimal.Decimal: + stringValue = value.String() + default: + return nil, fmt.Errorf("expected string/decimal.Decimal received %T with value %v", value, value) + } + message.Set(field, protoreflect.ValueOfString(stringValue)) + case typing.ETime.Kind: + extTime, err := ext.ParseFromInterface(value, additionalDateFmts) + if err != nil { + return nil, fmt.Errorf("failed to cast value as time.Time, value: %v, err: %w", value, err) + } + + if column.KindDetails.ExtendedTimeDetails == nil { + return nil, fmt.Errorf("extended time details for column kind details is nil") + } + + switch column.KindDetails.ExtendedTimeDetails.Type { + case ext.TimeKindType: + message.Set(field, protoreflect.ValueOfInt64(encodePacked64TimeMicros(extTime.Time))) + case ext.DateKindType: + daysSinceEpoch := extTime.Unix() / (60 * 60 * 24) + message.Set(field, protoreflect.ValueOfInt32(int32(daysSinceEpoch))) + case ext.DateTimeKindType: + if err := timestamppb.New(extTime.Time).CheckValid(); err != nil { + return nil, err + } + message.Set(field, protoreflect.ValueOfInt64(extTime.UnixMicro())) + default: + return nil, fmt.Errorf("unsupported extended time details: %q", column.KindDetails.ExtendedTimeDetails.Type) + } + case typing.Struct.Kind: + stringValue, err := encodeStructToJSONString(value) + if err != nil { + return nil, err + } else if stringValue == "" { + continue + } else { + message.Set(field, protoreflect.ValueOfString(stringValue)) + } + case typing.Array.Kind: + values, err := array.InterfaceToArrayString(value, true) + if err != nil { + return nil, err + } + list := message.Mutable(field).List() + for _, value := range values { + list.Append(protoreflect.ValueOfString(value)) + } + default: + return nil, fmt.Errorf("unsupported column kind: %q", column.KindDetails.Kind) + } + } + return message, nil +} + +// encodeStructToJSONString takes a struct as either a string or Go object and encodes it into a JSON string. +// Structs from relational and Mongo are different. +// MongoDB will return the native objects back such as `map[string]any{"hello": "world"}` +// Relational will return a string representation of the struct such as `{"hello": "world"}` +func encodeStructToJSONString(value any) (string, error) { + if stringValue, isOk := value.(string); isOk { + if strings.Contains(stringValue, constants.ToastUnavailableValuePlaceholder) { + return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil + } + return stringValue, nil + } + + bytes, err := json.Marshal(value) + if err != nil { + return "", fmt.Errorf("failed to marshal value: %w", err) + } + + stringValue := string(bytes) + if strings.Contains(stringValue, constants.ToastUnavailableValuePlaceholder) { + // TODO: Remove this if we don't see it in the logs. + slog.Error("encoded JSON value contains the toast unavailable value placeholder") + return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil + } + return stringValue, nil +} diff --git a/clients/bigquery/storagewrite_test.go b/clients/bigquery/storagewrite_test.go new file mode 100644 index 000000000..7101e7718 --- /dev/null +++ b/clients/bigquery/storagewrite_test.go @@ -0,0 +1,200 @@ +package bigquery + +import ( + "encoding/json" + "math/big" + "testing" + "time" + + "cloud.google.com/go/bigquery/storage/apiv1/storagepb" + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" + "github.com/artie-labs/transfer/lib/typing/decimal" + "github.com/artie-labs/transfer/lib/typing/ext" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/encoding/protojson" +) + +func TestColumnToTableFieldSchema(t *testing.T) { + { + // Boolean: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.Boolean)) + assert.NoError(t, err) + assert.Equal(t, "foo", fieldSchema.Name) + assert.Equal(t, storagepb.TableFieldSchema_NULLABLE, fieldSchema.Mode) + assert.Equal(t, storagepb.TableFieldSchema_BOOL, fieldSchema.Type) + } + { + // Integer: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.Integer)) + assert.NoError(t, err) + assert.Equal(t, storagepb.TableFieldSchema_INT64, fieldSchema.Type) + } + { + // Float: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.Float)) + assert.NoError(t, err) + assert.Equal(t, storagepb.TableFieldSchema_DOUBLE, fieldSchema.Type) + } + { + // EDecimal: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.EDecimal)) + assert.NoError(t, err) + assert.Equal(t, storagepb.TableFieldSchema_STRING, fieldSchema.Type) + } + { + // ETime - Time: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType))) + assert.NoError(t, err) + assert.Equal(t, storagepb.TableFieldSchema_TIME, fieldSchema.Type) + } + { + // ETime - Date: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateKindType))) + assert.NoError(t, err) + assert.Equal(t, storagepb.TableFieldSchema_DATE, fieldSchema.Type) + } + { + // ETime - DateTime: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType))) + assert.NoError(t, err) + assert.Equal(t, storagepb.TableFieldSchema_TIMESTAMP, fieldSchema.Type) + } + { + // ETime - Invalid: + _, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.NewKindDetailsFromTemplate(typing.ETime, ""))) + assert.ErrorContains(t, err, "unsupported extended time details type:") + } + { + // Struct: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.Struct)) + assert.NoError(t, err) + assert.Equal(t, storagepb.TableFieldSchema_STRING, fieldSchema.Type) + } + { + // Array: + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.Array)) + assert.NoError(t, err) + assert.Equal(t, storagepb.TableFieldSchema_STRING, fieldSchema.Type) + assert.Equal(t, storagepb.TableFieldSchema_REPEATED, fieldSchema.Mode) + } + { + // Invalid: + _, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.KindDetails{})) + assert.ErrorContains(t, err, "unsupported column kind: ") + } +} + +func TestEncodePacked64TimeMicros(t *testing.T) { + epoch := time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC) + + assert.Equal(t, int64(0), encodePacked64TimeMicros(epoch)) + assert.Equal(t, int64(1), encodePacked64TimeMicros(epoch.Add(time.Duration(1)*time.Microsecond))) + assert.Equal(t, int64(1000), encodePacked64TimeMicros(epoch.Add(time.Duration(1)*time.Millisecond))) + assert.Equal(t, int64(1<<20), encodePacked64TimeMicros(epoch.Add(time.Duration(1)*time.Second))) + assert.Equal(t, int64(1<<26), encodePacked64TimeMicros(epoch.Add(time.Duration(1)*time.Minute))) + assert.Equal(t, int64(1<<32), encodePacked64TimeMicros(epoch.Add(time.Duration(1)*time.Hour))) + assert.Equal(t, int64(1<<32+1), encodePacked64TimeMicros(epoch.Add(time.Duration(1)*time.Hour+time.Duration(1)*time.Microsecond))) + assert.Equal(t, int64(1<<32+1000), encodePacked64TimeMicros(epoch.Add(time.Duration(1)*time.Hour+time.Duration(1)*time.Millisecond))) +} + +func TestRowToMessage(t *testing.T) { + columns := []columns.Column{ + columns.NewColumn("c_bool", typing.Boolean), + columns.NewColumn("c_int", typing.Integer), + columns.NewColumn("c_int32", typing.Integer), + columns.NewColumn("c_int64", typing.Integer), + columns.NewColumn("c_float32", typing.Float), + columns.NewColumn("c_float64", typing.Float), + columns.NewColumn("c_float_int32", typing.Float), + columns.NewColumn("c_float_int64", typing.Float), + columns.NewColumn("c_float_string", typing.Float), + columns.NewColumn("c_numeric", typing.EDecimal), + columns.NewColumn("c_string", typing.String), + columns.NewColumn("c_string_decimal", typing.String), + columns.NewColumn("c_time", typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType)), + columns.NewColumn("c_date", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateKindType)), + columns.NewColumn("c_datetime", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType)), + columns.NewColumn("c_struct", typing.Struct), + columns.NewColumn("c_array", typing.Array), + } + + row := map[string]any{ + "c_bool": true, + "c_int": int(1234), + "c_int32": int32(1234), + "c_int64": int64(1234), + "c_float32": float32(1234.567), + "c_float64": float64(1234.567), + "c_float_int32": int32(1234), + "c_float_int64": int64(1234), + "c_float_string": "4444.55555", + "c_numeric": decimal.NewDecimal(nil, 5, big.NewFloat(3.1415926)), + "c_string": "foo bar", + "c_string_decimal": decimal.NewDecimal(nil, 5, big.NewFloat(1.618033)), + "c_time": ext.NewExtendedTime(time.Date(0, 0, 0, 4, 5, 6, 7, time.UTC), ext.TimeKindType, ""), + "c_date": ext.NewExtendedTime(time.Date(2001, 2, 3, 0, 0, 0, 0, time.UTC), ext.DateKindType, ""), + "c_datetime": ext.NewExtendedTime(time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC), ext.DateTimeKindType, ""), + "c_struct": map[string]any{"baz": []string{"foo", "bar"}}, + "c_array": []string{"foo", "bar"}, + } + + desc, err := columnsToMessageDescriptor(columns) + assert.NoError(t, err) + + message, err := rowToMessage(row, columns, *desc, []string{}) + assert.NoError(t, err) + + bytes, err := protojson.Marshal(message) + assert.NoError(t, err) + + var result map[string]any + assert.NoError(t, json.Unmarshal(bytes, &result)) + + assert.Equal(t, map[string]any{ + "cBool": true, + "cFloat32": 1234.5670166015625, + "cFloat64": 1234.567, + "cFloatInt32": 1234.0, + "cFloatInt64": 1234.0, + "cFloatString": 4444.55555, + "cInt": "1234", + "cInt32": "1234", + "cInt64": "1234", + "cNumeric": "3.14159", + "cString": "foo bar", + "cStringDecimal": "1.61803", + "cTime": "17521704960", + "cDate": float64(11356), + "cDatetime": "981173106000000", + "cStruct": `{"baz":["foo","bar"]}`, + "cArray": []any{"foo", "bar"}, + }, result) +} + +func TestEncodeStructToJSONString(t *testing.T) { + { + // Empty string: + result, err := encodeStructToJSONString("") + assert.NoError(t, err) + assert.Equal(t, "", result) + } + { + // Toasted string: + result, err := encodeStructToJSONString("__debezium_unavailable_value") + assert.NoError(t, err) + assert.Equal(t, `{"key":"__debezium_unavailable_value"}`, result) + } + { + // Map: + result, err := encodeStructToJSONString(map[string]any{"foo": "bar", "baz": 1234}) + assert.NoError(t, err) + assert.Equal(t, `{"baz":1234,"foo":"bar"}`, result) + } + { + // Toasted map (should not happen): + result, err := encodeStructToJSONString(map[string]any{"__debezium_unavailable_value": "bar", "baz": 1234}) + assert.NoError(t, err) + assert.Equal(t, `{"key":"__debezium_unavailable_value"}`, result) + } +} diff --git a/clients/mssql/dialect/dialect.go b/clients/mssql/dialect/dialect.go index aa378cdfd..a34787bee 100644 --- a/clients/mssql/dialect/dialect.go +++ b/clients/mssql/dialect/dialect.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -169,7 +168,11 @@ func (md MSSQLDialect) BuildIsNotToastValueExpression(tableAlias constants.Table return fmt.Sprintf("COALESCE(%s, '') != '%s'", colName, constants.ToastUnavailableValuePlaceholder) } -func (MSSQLDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { +func (MSSQLDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string { + panic("not implemented") +} + +func (MSSQLDialect) BuildDedupeQueries(_, _ sql.TableIdentifier, _ []string, _ bool) []string { panic("not implemented") // We don't currently support deduping for MS SQL. } @@ -199,11 +202,11 @@ USING %s AS %s ON %s`, if softDelete { return []string{baseQuery + fmt.Sprintf(` WHEN MATCHED %sTHEN UPDATE SET %s -WHEN NOT MATCHED AND COALESCE(%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, +WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);`, // WHEN MATCHED %sTHEN UPDATE SET %s idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, md), - // WHEN NOT MATCHED AND COALESCE(%s, 0) = 0 THEN INSERT (%s) - sql.QuotedDeleteColumnMarker(constants.StagingAlias, md), strings.Join(sql.QuoteColumns(cols, md), ","), + // WHEN NOT MATCHED THEN INSERT (%s) + strings.Join(sql.QuoteColumns(cols, md), ","), // VALUES (%s); strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, md), ","), )}, nil diff --git a/clients/mssql/dialect/dialect_test.go b/clients/mssql/dialect/dialect_test.go index bdc6e58a2..7bc7d65d2 100644 --- a/clients/mssql/dialect/dialect_test.go +++ b/clients/mssql/dialect/dialect_test.go @@ -247,6 +247,6 @@ WHEN NOT MATCHED AND COALESCE(stg."__artie_delete", 1) = 0 THEN INSERT ("id","ba MERGE INTO database.schema.table tgt USING {SUB_QUERY} AS stg ON tgt."id" = stg."id" WHEN MATCHED THEN UPDATE SET "id"=stg."id","bar"=stg."bar","updated_at"=stg."updated_at","start"=stg."start","__artie_delete"=stg."__artie_delete" -WHEN NOT MATCHED AND COALESCE(stg."__artie_delete", 0) = 0 THEN INSERT ("id","bar","updated_at","start","__artie_delete") VALUES (stg."id",stg."bar",stg."updated_at",stg."start",stg."__artie_delete");`, queries[0]) +WHEN NOT MATCHED THEN INSERT ("id","bar","updated_at","start","__artie_delete") VALUES (stg."id",stg."bar",stg."updated_at",stg."start",stg."__artie_delete");`, queries[0]) } } diff --git a/clients/mssql/store.go b/clients/mssql/store.go index a34a6e09c..b8346d566 100644 --- a/clients/mssql/store.go +++ b/clients/mssql/store.go @@ -43,7 +43,7 @@ func (s *Store) Merge(tableData *optimization.TableData) error { return shared.Merge(s, tableData, types.MergeOpts{}) } -func (s *Store) Append(tableData *optimization.TableData) error { +func (s *Store) Append(tableData *optimization.TableData, _ bool) error { return shared.Append(s, tableData, types.AdditionalSettings{}) } @@ -70,7 +70,7 @@ func (s *Store) Sweep() error { return shared.Sweep(s, tcs, queryFunc) } -func (s *Store) Dedupe(_ sql.TableIdentifier, _ []string, _ kafkalib.TopicConfig) error { +func (s *Store) Dedupe(_ sql.TableIdentifier, _ []string, _ bool) error { return nil // dedupe is not necessary for MS SQL } diff --git a/clients/mssql/store_test.go b/clients/mssql/store_test.go index 51a303ff0..f8614fa16 100644 --- a/clients/mssql/store_test.go +++ b/clients/mssql/store_test.go @@ -13,7 +13,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestTempTableName(t *testing.T) { +func TestTempTableIDWithSuffix(t *testing.T) { trimTTL := func(tableName string) string { lastUnderscore := strings.LastIndex(tableName, "_") assert.GreaterOrEqual(t, lastUnderscore, 0) @@ -28,14 +28,14 @@ func TestTempTableName(t *testing.T) { // Schema is "schema": tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{Database: "db", Schema: "schema"}, "table") tableID := store.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - tempTableName := shared.TempTableID(tableID, "sUfFiX").FullyQualifiedName() + tempTableName := shared.TempTableIDWithSuffix(tableID, "sUfFiX").FullyQualifiedName() assert.Equal(t, `"schema"."table___artie_sUfFiX"`, trimTTL(tempTableName)) } { // Schema is "public" -> "dbo": tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{Database: "db", Schema: "public"}, "table") tableID := store.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - tempTableName := shared.TempTableID(tableID, "sUfFiX").FullyQualifiedName() + tempTableName := shared.TempTableIDWithSuffix(tableID, "sUfFiX").FullyQualifiedName() assert.Equal(t, `"dbo"."table___artie_sUfFiX"`, trimTTL(tempTableName)) } } diff --git a/clients/redshift/dialect/dialect.go b/clients/redshift/dialect/dialect.go index 452cd566c..29ae4e00f 100644 --- a/clients/redshift/dialect/dialect.go +++ b/clients/redshift/dialect/dialect.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -135,11 +134,15 @@ func (rd RedshiftDialect) BuildIsNotToastValueExpression(tableAlias constants.Ta return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder) } -func (rd RedshiftDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { +func (rd RedshiftDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, _ []string) string { + return fmt.Sprintf(`( SELECT DISTINCT * FROM %s )`, tableID.FullyQualifiedName()) +} + +func (rd RedshiftDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string { primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, rd) orderColsToIterate := primaryKeysEscaped - if topicConfig.IncludeArtieUpdatedAt { + if includeArtieUpdatedAt { orderColsToIterate = append(orderColsToIterate, rd.QuoteIdentifier(constants.UpdateColumnMarker)) } diff --git a/clients/redshift/redshift.go b/clients/redshift/redshift.go index 4def473f3..89919c919 100644 --- a/clients/redshift/redshift.go +++ b/clients/redshift/redshift.go @@ -2,7 +2,6 @@ package redshift import ( "fmt" - "strings" _ "github.com/lib/pq" @@ -17,7 +16,6 @@ import ( "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" - "github.com/artie-labs/transfer/lib/stringutil" ) type Store struct { @@ -30,7 +28,7 @@ type Store struct { db.Store } -func (s *Store) Append(tableData *optimization.TableData) error { +func (s *Store) Append(tableData *optimization.TableData, _ bool) error { return shared.Append(s, tableData, types.AdditionalSettings{}) } @@ -110,9 +108,9 @@ WHERE return shared.Sweep(s, tcs, queryFunc) } -func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error { - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, topicConfig) +func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error { + stagingTableID := shared.TempTableID(tableID) + dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt) return destination.ExecStatements(s, dedupeQueries) } diff --git a/clients/redshift/redshift_dedupe_test.go b/clients/redshift/redshift_dedupe_test.go index 9e1f02568..0b86d6330 100644 --- a/clients/redshift/redshift_dedupe_test.go +++ b/clients/redshift/redshift_dedupe_test.go @@ -2,12 +2,9 @@ package redshift import ( "fmt" - "strings" "github.com/artie-labs/transfer/clients/redshift/dialect" "github.com/artie-labs/transfer/clients/shared" - "github.com/artie-labs/transfer/lib/kafkalib" - "github.com/artie-labs/transfer/lib/stringutil" "github.com/stretchr/testify/assert" ) @@ -15,9 +12,9 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() { { // Dedupe with one primary key + no `__artie_updated_at` flag. tableID := NewTableIdentifier("public", "customers") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) - parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{}) + parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false) assert.Len(r.T(), parts, 3) assert.Equal( r.T(), @@ -30,9 +27,9 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() { { // Dedupe with one primary key + `__artie_updated_at` flag. tableID := NewTableIdentifier("public", "customers") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) - parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true) assert.Len(r.T(), parts, 3) assert.Equal( r.T(), @@ -45,9 +42,9 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() { { // Dedupe with composite keys + no `__artie_updated_at` flag. tableID := NewTableIdentifier("public", "user_settings") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) - parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{}) + parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false) assert.Len(r.T(), parts, 3) assert.Equal( r.T(), @@ -60,9 +57,9 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() { { // Dedupe with composite keys + `__artie_updated_at` flag. tableID := NewTableIdentifier("public", "user_settings") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) - parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true) assert.Len(r.T(), parts, 3) assert.Equal( r.T(), diff --git a/clients/redshift/redshift_test.go b/clients/redshift/redshift_test.go index 5e36fac84..76ea35d4a 100644 --- a/clients/redshift/redshift_test.go +++ b/clients/redshift/redshift_test.go @@ -13,7 +13,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestTempTableName(t *testing.T) { +func TestTempTableIDWithSuffix(t *testing.T) { trimTTL := func(tableName string) string { lastUnderscore := strings.LastIndex(tableName, "_") assert.GreaterOrEqual(t, lastUnderscore, 0) @@ -25,6 +25,6 @@ func TestTempTableName(t *testing.T) { tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{Database: "db", Schema: "schema"}, "table") tableID := (&Store{}).IdentifierFor(tableData.TopicConfig(), tableData.Name()) - tempTableName := shared.TempTableID(tableID, "sUfFiX").FullyQualifiedName() + tempTableName := shared.TempTableIDWithSuffix(tableID, "sUfFiX").FullyQualifiedName() assert.Equal(t, `schema."table___artie_suffix"`, trimTTL(tempTableName)) } diff --git a/clients/s3/s3.go b/clients/s3/s3.go index 13ee53622..4c1ec9548 100644 --- a/clients/s3/s3.go +++ b/clients/s3/s3.go @@ -64,7 +64,7 @@ func (s *Store) ObjectPrefix(tableData *optimization.TableData) string { return strings.Join([]string{fqTableName, yyyyMMDDFormat}, "/") } -func (s *Store) Append(tableData *optimization.TableData) error { +func (s *Store) Append(tableData *optimization.TableData, _ bool) error { // There's no difference in appending or merging for S3. return s.Merge(tableData) } diff --git a/clients/shared/append.go b/clients/shared/append.go index 1db07cf09..fe668962d 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -51,11 +51,16 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op return fmt.Errorf("failed to merge columns from destination: %w", err) } + if opts.UseTempTable { + // Override tableID with tempTableID if we're using a temporary table + tableID = opts.TempTableID + } + return dwh.PrepareTemporaryTable( tableData, tableConfig, tableID, opts, - false, + opts.UseTempTable, ) } diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 0b30a4e5a..7260366c9 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -69,7 +69,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, opt return fmt.Errorf("failed to merge columns from destination: %w", err) } - temporaryTableID := TempTableID(dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()), tableData.TempTableSuffix()) + temporaryTableID := TempTableIDWithSuffix(dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()), tableData.TempTableSuffix()) if err = dwh.PrepareTemporaryTable(tableData, tableConfig, temporaryTableID, types.AdditionalSettings{}, true); err != nil { return fmt.Errorf("failed to prepare temporary table: %w", err) } @@ -111,10 +111,9 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, opt } } - temporaryTableName := temporaryTableID.FullyQualifiedName() - subQuery := temporaryTableName + subQuery := temporaryTableID.FullyQualifiedName() if opts.SubQueryDedupe { - subQuery = fmt.Sprintf(`( SELECT DISTINCT * FROM %s )`, temporaryTableName) + subQuery = dwh.Dialect().BuildDedupeTableQuery(temporaryTableID, tableData.PrimaryKeys()) } if subQuery == "" { @@ -131,6 +130,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, opt } primaryKeys = append(primaryKeys, column) } + if len(primaryKeys) == 0 { return fmt.Errorf("primary keys cannot be empty") } diff --git a/clients/shared/temp_table.go b/clients/shared/temp_table.go index e86566c12..5a9784b84 100644 --- a/clients/shared/temp_table.go +++ b/clients/shared/temp_table.go @@ -2,13 +2,19 @@ package shared import ( "fmt" + "strings" "time" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/sql" + "github.com/artie-labs/transfer/lib/stringutil" ) -func TempTableID(tableID sql.TableIdentifier, suffix string) sql.TableIdentifier { +func TempTableID(tableID sql.TableIdentifier) sql.TableIdentifier { + return TempTableIDWithSuffix(tableID, strings.ToLower(stringutil.Random(5))) +} + +func TempTableIDWithSuffix(tableID sql.TableIdentifier, suffix string) sql.TableIdentifier { tempTable := fmt.Sprintf( "%s_%s_%s_%d", tableID.Table(), diff --git a/clients/snowflake/dialect/dialect.go b/clients/snowflake/dialect/dialect.go index 756ebb907..4624d3354 100644 --- a/clients/snowflake/dialect/dialect.go +++ b/clients/snowflake/dialect/dialect.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" @@ -153,11 +152,15 @@ func (sd SnowflakeDialect) BuildIsNotToastValueExpression(tableAlias constants.T return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder) } -func (sd SnowflakeDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { +func (SnowflakeDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string { + panic("not implemented") +} + +func (sd SnowflakeDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string { primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, sd) orderColsToIterate := primaryKeysEscaped - if topicConfig.IncludeArtieUpdatedAt { + if includeArtieUpdatedAt { orderColsToIterate = append(orderColsToIterate, sd.QuoteIdentifier(constants.UpdateColumnMarker)) } @@ -225,11 +228,11 @@ MERGE INTO %s %s USING ( %s ) AS %s ON %s`, if softDelete { return []string{baseQuery + fmt.Sprintf(` WHEN MATCHED %sTHEN UPDATE SET %s -WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`, +WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);`, // Update + Soft Deletion idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, sd), // Insert - sql.QuotedDeleteColumnMarker(constants.StagingAlias, sd), strings.Join(sql.QuoteColumns(cols, sd), ","), + strings.Join(sql.QuoteColumns(cols, sd), ","), strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, sd), ","), )}, nil } diff --git a/clients/snowflake/dialect/dialect_test.go b/clients/snowflake/dialect/dialect_test.go index 4710db0b0..864b58e9c 100644 --- a/clients/snowflake/dialect/dialect_test.go +++ b/clients/snowflake/dialect/dialect_test.go @@ -301,7 +301,7 @@ func TestSnowflakeDialect_BuildMergeQueries_SoftDelete(t *testing.T) { assert.Equal(t, ` MERGE INTO database.schema.table tgt USING ( SELECT id,bar,updated_at,__artie_delete from (values ('1', '456', '2001-02-03T04:05:06Z', false),('2', 'bb', '2001-02-03T04:05:06Z', true),('3', 'dd', '2001-02-03T04:05:06Z', false)) as _tbl(id,bar,updated_at,__artie_delete) ) AS stg ON tgt."ID" = stg."ID" WHEN MATCHED THEN UPDATE SET "ID"=stg."ID","__ARTIE_DELETE"=stg."__ARTIE_DELETE" -WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("ID","__ARTIE_DELETE") VALUES (stg."ID",stg."__ARTIE_DELETE");`, statements[0]) +WHEN NOT MATCHED THEN INSERT ("ID","__ARTIE_DELETE") VALUES (stg."ID",stg."__ARTIE_DELETE");`, statements[0]) } { statements, err := SnowflakeDialect{}.BuildMergeQueries( @@ -319,7 +319,7 @@ WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("I assert.Equal(t, ` MERGE INTO database.schema.table tgt USING ( SELECT id,bar,updated_at,__artie_delete from (values ('1', '456', '2001-02-03T04:05:06Z', false),('2', 'bb', '2001-02-03T04:05:06Z', true),('3', 'dd', '2001-02-03T04:05:06Z', false)) as _tbl(id,bar,updated_at,__artie_delete) ) AS stg ON tgt."ID" = stg."ID" WHEN MATCHED AND stg.updated_at >= tgt.updated_at THEN UPDATE SET "ID"=stg."ID","__ARTIE_DELETE"=stg."__ARTIE_DELETE" -WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("ID","__ARTIE_DELETE") VALUES (stg."ID",stg."__ARTIE_DELETE");`, statements[0]) +WHEN NOT MATCHED THEN INSERT ("ID","__ARTIE_DELETE") VALUES (stg."ID",stg."__ARTIE_DELETE");`, statements[0]) } } diff --git a/clients/snowflake/snowflake.go b/clients/snowflake/snowflake.go index f8c6e8353..1e8ef2638 100644 --- a/clients/snowflake/snowflake.go +++ b/clients/snowflake/snowflake.go @@ -2,7 +2,6 @@ package snowflake import ( "fmt" - "strings" "github.com/snowflakedb/gosnowflake" @@ -17,7 +16,6 @@ import ( "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" - "github.com/artie-labs/transfer/lib/stringutil" ) const maxRetries = 10 @@ -130,9 +128,9 @@ func (s *Store) reestablishConnection() error { // Dedupe takes a table and will remove duplicates based on the primary key(s). // These queries are inspired and modified from: https://stackoverflow.com/a/71515946 -func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error { - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, topicConfig) +func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error { + stagingTableID := shared.TempTableID(tableID) + dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt) return destination.ExecStatements(s, dedupeQueries) } diff --git a/clients/snowflake/snowflake_dedupe_test.go b/clients/snowflake/snowflake_dedupe_test.go index 88f4d5036..285567266 100644 --- a/clients/snowflake/snowflake_dedupe_test.go +++ b/clients/snowflake/snowflake_dedupe_test.go @@ -2,13 +2,10 @@ package snowflake import ( "fmt" - "strings" "testing" "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/clients/snowflake/dialect" - "github.com/artie-labs/transfer/lib/kafkalib" - "github.com/artie-labs/transfer/lib/stringutil" "github.com/stretchr/testify/assert" ) @@ -16,9 +13,9 @@ func TestGenerateDedupeQueries(t *testing.T) { { // Dedupe with one primary key + no `__artie_updated_at` flag. tableID := NewTableIdentifier("db", "public", "customers") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) - parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{}) + parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false) assert.Len(t, parts, 3) assert.Equal( t, @@ -31,9 +28,9 @@ func TestGenerateDedupeQueries(t *testing.T) { { // Dedupe with one primary key + `__artie_updated_at` flag. tableID := NewTableIdentifier("db", "public", "customers") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) - parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true) assert.Len(t, parts, 3) assert.Equal( t, @@ -46,9 +43,9 @@ func TestGenerateDedupeQueries(t *testing.T) { { // Dedupe with composite keys + no `__artie_updated_at` flag. tableID := NewTableIdentifier("db", "public", "user_settings") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) - parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{}) + parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false) assert.Len(t, parts, 3) assert.Equal( t, @@ -61,9 +58,9 @@ func TestGenerateDedupeQueries(t *testing.T) { { // Dedupe with composite keys + `__artie_updated_at` flag. tableID := NewTableIdentifier("db", "public", "user_settings") - stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) + stagingTableID := shared.TempTableID(tableID) - parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true) assert.Len(t, parts, 3) assert.Equal( t, diff --git a/clients/snowflake/snowflake_test.go b/clients/snowflake/snowflake_test.go index 319233ffe..b3e17aa4d 100644 --- a/clients/snowflake/snowflake_test.go +++ b/clients/snowflake/snowflake_test.go @@ -316,7 +316,7 @@ func (s *SnowflakeTestSuite) TestStore_AdditionalEqualityStrings() { } } -func TestTempTableName(t *testing.T) { +func TestTempTableIDWithSuffix(t *testing.T) { trimTTL := func(tableName string) string { lastUnderscore := strings.LastIndex(tableName, "_") assert.GreaterOrEqual(t, lastUnderscore, 0) @@ -328,6 +328,6 @@ func TestTempTableName(t *testing.T) { tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{Database: "db", Schema: "schema"}, "table") tableID := (&Store{}).IdentifierFor(tableData.TopicConfig(), tableData.Name()) - tempTableName := shared.TempTableID(tableID, "sUfFiX").FullyQualifiedName() + tempTableName := shared.TempTableIDWithSuffix(tableID, "sUfFiX").FullyQualifiedName() assert.Equal(t, `db.schema."TABLE___ARTIE_SUFFIX"`, trimTTL(tempTableName)) } diff --git a/clients/snowflake/writes.go b/clients/snowflake/writes.go index 353cb94b3..7d9061535 100644 --- a/clients/snowflake/writes.go +++ b/clients/snowflake/writes.go @@ -13,7 +13,7 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -func (s *Store) Append(tableData *optimization.TableData) error { +func (s *Store) Append(tableData *optimization.TableData, _ bool) error { var err error for i := 0; i < maxRetries; i++ { if i > 0 { diff --git a/go.mod b/go.mod index 16b172e5a..cc6f04226 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0 go.mongodb.org/mongo-driver v1.15.0 google.golang.org/api v0.175.0 + google.golang.org/protobuf v1.34.1 gopkg.in/yaml.v3 v3.0.1 ) @@ -140,6 +141,5 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect google.golang.org/grpc v1.63.2 // indirect - google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 3eef7ef73..1de7b64d5 100644 --- a/go.sum +++ b/go.sum @@ -810,8 +810,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/lib/artie/message.go b/lib/artie/message.go index ccecffb32..ef8f3716f 100644 --- a/lib/artie/message.go +++ b/lib/artie/message.go @@ -74,22 +74,22 @@ func (m *Message) EmitRowLag(metricsClient base.Client, mode config.Mode, groupI return } - metricsClient.GaugeWithSample("row.lag", float64(m.KafkaMsg.HighWaterMark-m.KafkaMsg.Offset), map[string]string{ - "mode": mode.String(), - "groupID": groupID, - "topic": m.Topic(), - "table": table, - "partition": m.Partition(), - }, 0.5) + metricsClient.GaugeWithSample( + "row.lag", + float64(m.KafkaMsg.HighWaterMark-m.KafkaMsg.Offset), + map[string]string{ + "mode": mode.String(), + "groupID": groupID, + "table": table, + }, + 0.5) } func (m *Message) EmitIngestionLag(metricsClient base.Client, mode config.Mode, groupID, table string) { metricsClient.Timing("ingestion.lag", time.Since(m.PublishTime()), map[string]string{ - "mode": mode.String(), - "groupID": groupID, - "topic": m.Topic(), - "table": table, - "partition": m.Partition(), + "mode": mode.String(), + "groupID": groupID, + "table": table, }) } diff --git a/lib/batch/batch.go b/lib/batch/batch.go new file mode 100644 index 000000000..3c8b6de82 --- /dev/null +++ b/lib/batch/batch.go @@ -0,0 +1,48 @@ +package batch + +import "fmt" + +// BySize takes a series of elements [in], encodes them using [encode], groups them into batches of bytes that sum to at +// most [maxSizeBytes], and then passes each batch to the [yield] function. +func BySize[T any](in []T, maxSizeBytes int, encode func(T) ([]byte, error), yield func([][]byte) error) error { + var buffer [][]byte + var currentSizeBytes int + + for i, item := range in { + bytes, err := encode(item) + if err != nil { + return fmt.Errorf("failed to encode item %d: %w", i, err) + } + + if len(bytes) > maxSizeBytes { + return fmt.Errorf("item %d is larger (%d bytes) than maxSizeBytes (%d bytes)", i, len(bytes), maxSizeBytes) + } + + currentSizeBytes += len(bytes) + + if currentSizeBytes < maxSizeBytes { + buffer = append(buffer, bytes) + } else if currentSizeBytes == maxSizeBytes { + buffer = append(buffer, bytes) + if err := yield(buffer); err != nil { + return err + } + buffer = [][]byte{} + currentSizeBytes = 0 + } else { + if err := yield(buffer); err != nil { + return err + } + buffer = [][]byte{bytes} + currentSizeBytes = len(bytes) + } + } + + if len(buffer) > 0 { + if err := yield(buffer); err != nil { + return err + } + } + + return nil +} diff --git a/lib/batch/batch_test.go b/lib/batch/batch_test.go new file mode 100644 index 000000000..078062521 --- /dev/null +++ b/lib/batch/batch_test.go @@ -0,0 +1,98 @@ +package batch + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBySize(t *testing.T) { + goodEncoder := func(value string) ([]byte, error) { + return []byte(value), nil + } + + panicEncoder := func(value string) ([]byte, error) { + panic("should not be called") + } + + badEncoder := func(value string) ([]byte, error) { + return nil, fmt.Errorf("failed to encode %q", value) + } + + testBySize := func(in []string, maxSizeBytes int, encoder func(value string) ([]byte, error)) ([][][]byte, error) { + batches := [][][]byte{} + err := BySize(in, maxSizeBytes, encoder, func(batch [][]byte) error { batches = append(batches, batch); return nil }) + return batches, err + } + + badYield := func(batch [][]byte) error { + out := make([]string, len(batch)) + for i, bytes := range batch { + out[i] = string(bytes) + } + return fmt.Errorf("yield failed for %v", out) + } + + { + // Empty slice: + batches, err := testBySize([]string{}, 10, panicEncoder) + assert.NoError(t, err) + assert.Empty(t, batches) + } + { + // Non-empty slice + bad encoder: + _, err := testBySize([]string{"foo", "bar"}, 10, badEncoder) + assert.ErrorContains(t, err, `failed to encode item 0: failed to encode "foo"`) + } + { + // Non-empty slice + two items that are < maxSizeBytes + yield returns error. + err := BySize([]string{"foo", "bar"}, 10, goodEncoder, badYield) + assert.ErrorContains(t, err, "yield failed for [foo bar]") + } + { + // Non-empty slice + two items that are = maxSizeBytes + yield returns error. + err := BySize([]string{"foo", "bar"}, 6, goodEncoder, badYield) + assert.ErrorContains(t, err, "yield failed for [foo bar]") + } + { + // Non-empty slice + two items that are > maxSizeBytes + yield returns error. + err := BySize([]string{"foo", "bar-baz"}, 8, goodEncoder, badYield) + assert.ErrorContains(t, err, "yield failed for [foo]") + } + { + // Non-empty slice + item is larger than maxSizeBytes: + _, err := testBySize([]string{"foo", "i-am-23-characters-long", "bar"}, 20, goodEncoder) + assert.ErrorContains(t, err, "item 1 is larger (23 bytes) than maxSizeBytes (20 bytes)") + } + { + // Non-empty slice + item equal to maxSizeBytes: + batches, err := testBySize([]string{"foo", "i-am-23-characters-long", "bar"}, 23, goodEncoder) + assert.NoError(t, err) + assert.Len(t, batches, 3) + assert.Equal(t, [][]byte{[]byte("foo")}, batches[0]) + assert.Equal(t, [][]byte{[]byte("i-am-23-characters-long")}, batches[1]) + assert.Equal(t, [][]byte{[]byte("bar")}, batches[2]) + } + { + // Non-empty slice + one item: + batches, err := testBySize([]string{"foo"}, 100, goodEncoder) + assert.NoError(t, err) + assert.Len(t, batches, 1) + assert.Equal(t, [][]byte{[]byte("foo")}, batches[0]) + } + { + // Non-empty slice + all items exactly fit into one batch: + batches, err := testBySize([]string{"foo", "bar", "baz", "qux"}, 12, goodEncoder) + assert.NoError(t, err) + assert.Len(t, batches, 1) + assert.Equal(t, [][]byte{[]byte("foo"), []byte("bar"), []byte("baz"), []byte("qux")}, batches[0]) + } + { + // Non-empty slice + all items exactly fit into just under one batch: + batches, err := testBySize([]string{"foo", "bar", "baz", "qux"}, 13, goodEncoder) + assert.NoError(t, err) + assert.Len(t, batches, 1) + assert.Equal(t, [][]byte{[]byte("foo"), []byte("bar"), []byte("baz"), []byte("qux")}, batches[0]) + } +} diff --git a/lib/config/bigquery.go b/lib/config/bigquery.go index 6e8250daa..5226fa95f 100644 --- a/lib/config/bigquery.go +++ b/lib/config/bigquery.go @@ -9,13 +9,6 @@ type BigQuery struct { DefaultDataset string `yaml:"defaultDataset"` ProjectID string `yaml:"projectID"` Location string `yaml:"location"` - BatchSize int `yaml:"batchSize"` -} - -func (b *BigQuery) LoadDefaultValues() { - if b.BatchSize == 0 { - b.BatchSize = 1000 - } } // DSN - returns the notation for BigQuery following this format: bigquery://projectID/[location/]datasetID?queryString diff --git a/lib/debezium/decimal.go b/lib/debezium/decimal.go index 793c4b814..32ef35969 100644 --- a/lib/debezium/decimal.go +++ b/lib/debezium/decimal.go @@ -9,15 +9,22 @@ import ( // EncodeDecimal is used to encode a string representation of a number to `org.apache.kafka.connect.data.Decimal`. func EncodeDecimal(value string, scale int) ([]byte, error) { - scaledValue := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil) + // TODO: Refactor scale to be uint16 + bigFloatValue := new(big.Float) if _, success := bigFloatValue.SetString(value); !success { - return nil, fmt.Errorf("unable to use '%s' as a floating-point number", value) + return nil, fmt.Errorf("unable to use %q as a floating-point number", value) } + + scaledValue := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil) bigFloatValue.Mul(bigFloatValue, new(big.Float).SetInt(scaledValue)) // Extract the scaled integer value. - bigIntValue, _ := bigFloatValue.Int(nil) + bigIntValue := new(big.Int) + if _, success := bigIntValue.SetString(bigFloatValue.String(), 10); !success { + return nil, fmt.Errorf("unable to use %q as a floating-point number", value) + } + data := bigIntValue.Bytes() if bigIntValue.Sign() < 0 { // Convert to two's complement if the number is negative diff --git a/lib/debezium/decimal_test.go b/lib/debezium/decimal_test.go index 395415281..4f8b6e7e1 100644 --- a/lib/debezium/decimal_test.go +++ b/lib/debezium/decimal_test.go @@ -66,15 +66,20 @@ func TestEncodeDecimal(t *testing.T) { value: "6408.355", scale: 3, }, + { + name: "total", + value: "1.05", + scale: 2, + }, { name: "malformed - empty string", value: "", - expectedErr: "unable to use '' as a floating-point number", + expectedErr: `unable to use "" as a floating-point number`, }, { name: "malformed - not a floating-point", value: "abcdefg", - expectedErr: "unable to use 'abcdefg' as a floating-point number", + expectedErr: `unable to use "abcdefg" as a floating-point number`, }, } diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index 96ffbc64b..5bcb48c5e 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -19,7 +19,7 @@ import ( // It has a safety check to make sure the tableName contains the `constants.ArtiePrefix` key. // Temporary tables look like this: database.schema.tableName__artie__RANDOM_STRING(5)_expiryUnixTs func DropTemporaryTable(dwh destination.DataWarehouse, tableIdentifier sql.TableIdentifier, shouldReturnError bool) error { - if strings.Contains(tableIdentifier.Table(), constants.ArtiePrefix) { + if strings.Contains(strings.ToLower(tableIdentifier.Table()), constants.ArtiePrefix) { sqlCommand := fmt.Sprintf("DROP TABLE IF EXISTS %s", tableIdentifier.FullyQualifiedName()) if _, err := dwh.Exec(sqlCommand); err != nil { slog.Warn("Failed to drop temporary table, it will get garbage collected by the TTL...", diff --git a/lib/destination/dwh.go b/lib/destination/dwh.go index 2cf5d43d3..b64091922 100644 --- a/lib/destination/dwh.go +++ b/lib/destination/dwh.go @@ -14,8 +14,8 @@ import ( type DataWarehouse interface { Dialect() sqllib.Dialect Merge(tableData *optimization.TableData) error - Append(tableData *optimization.TableData) error - Dedupe(tableID sqllib.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error + Append(tableData *optimization.TableData, useTempTable bool) error + Dedupe(tableID sqllib.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error Exec(query string, args ...any) (sql.Result, error) Query(query string, args ...any) (*sql.Rows, error) Begin() (*sql.Tx, error) @@ -30,7 +30,7 @@ type DataWarehouse interface { type Baseline interface { Merge(tableData *optimization.TableData) error - Append(tableData *optimization.TableData) error + Append(tableData *optimization.TableData, useTempTable bool) error IsRetryableError(err error) bool IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sqllib.TableIdentifier } diff --git a/lib/destination/types/types.go b/lib/destination/types/types.go index 90f9ee7a1..12c268b46 100644 --- a/lib/destination/types/types.go +++ b/lib/destination/types/types.go @@ -42,4 +42,8 @@ type MergeOpts struct { type AdditionalSettings struct { AdditionalCopyClause string + + // These settings are used for the `Append` method. + UseTempTable bool + TempTableID sql.TableIdentifier } diff --git a/lib/sql/dialect.go b/lib/sql/dialect.go index 7690ee505..bef2ce857 100644 --- a/lib/sql/dialect.go +++ b/lib/sql/dialect.go @@ -2,7 +2,6 @@ package sql import ( "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" ) @@ -24,7 +23,8 @@ type Dialect interface { BuildCreateTableQuery(tableID TableIdentifier, temporary bool, colSQLParts []string) string BuildAlterColumnQuery(tableID TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string - BuildDedupeQueries(tableID, stagingTableID TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string + BuildDedupeTableQuery(tableID TableIdentifier, primaryKeys []string) string + BuildDedupeQueries(tableID, stagingTableID TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string BuildMergeQueries( tableID TableIdentifier, subQuery string, diff --git a/lib/typing/ext/variables.go b/lib/typing/ext/variables.go index 2f44d353f..56b8d81ab 100644 --- a/lib/typing/ext/variables.go +++ b/lib/typing/ext/variables.go @@ -3,7 +3,6 @@ package ext import "time" const ( - BigQueryDateTimeFormat = "2006-01-02 15:04:05.999999" ISO8601 = "2006-01-02T15:04:05.999-07:00" PostgresDateFormat = "2006-01-02" PostgresTimeFormat = "15:04:05.999999-07" // microsecond precision diff --git a/processes/consumer/flush.go b/processes/consumer/flush.go index 4f8b2a921..b8e0afacb 100644 --- a/processes/consumer/flush.go +++ b/processes/consumer/flush.go @@ -61,7 +61,7 @@ func Flush(ctx context.Context, inMemDB *models.DatabaseData, dest destination.B } if args.CoolDown != nil && _tableData.ShouldSkipFlush(*args.CoolDown) { - slog.With(logFields...).Info("Skipping flush because we are currently in a flush cooldown") + slog.With(logFields...).Debug("Skipping flush because we are currently in a flush cooldown") return } @@ -89,7 +89,7 @@ func Flush(ctx context.Context, inMemDB *models.DatabaseData, dest destination.B action := "merge" // Merge or Append depending on the mode. if _tableData.Mode() == config.History { - err = dest.Append(_tableData.TableData) + err = dest.Append(_tableData.TableData, false) action = "append" } else { err = dest.Merge(_tableData.TableData) diff --git a/processes/consumer/process.go b/processes/consumer/process.go index 8e30a303b..2e37945d6 100644 --- a/processes/consumer/process.go +++ b/processes/consumer/process.go @@ -27,7 +27,6 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo tags := map[string]string{ "mode": cfg.Mode.String(), "groupID": p.GroupID, - "topic": p.Msg.Topic(), "what": "success", }