Skip to content

Commit

Permalink
Merge branch 'master' into nv/optimize-reading-columns
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Jun 18, 2024
2 parents a66acf8 + ef85a84 commit b520224
Show file tree
Hide file tree
Showing 46 changed files with 836 additions and 509 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Transfer is aiming to provide coverage across all OLTPs and OLAPs databases. Cur
- MongoDB
- MySQL
- PostgreSQL


_If the database you are using is not on the list, feel free to file for a [feature request](https://github.com/artie-labs/transfer/issues/new)._

Expand All @@ -101,4 +101,4 @@ Artie Transfer is released through [GoReleaser](https://goreleaser.com/), and we

## License

Artie Transfer is licensed under ELv2. Please see the [LICENSE](https://github.com/artie-labs/transfer/blob/master/LICENSE.txt) file for additional information. If you have any licensing questions please email hi@artie.so.
Artie Transfer is licensed under ELv2. Please see the [LICENSE](https://github.com/artie-labs/transfer/blob/master/LICENSE.txt) file for additional information. If you have any licensing questions please email hi@artie.com.
34 changes: 0 additions & 34 deletions clients/bigquery/batch.go

This file was deleted.

31 changes: 0 additions & 31 deletions clients/bigquery/batch_test.go

This file was deleted.

136 changes: 93 additions & 43 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ import (
"strings"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/bigquery/storage/managedwriter"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
_ "github.com/viant/bigquery"
"google.golang.org/protobuf/proto"

"github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/batch"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/db"
Expand All @@ -23,26 +27,58 @@ import (
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/stringutil"
)

const (
GooglePathToCredentialsEnvKey = "GOOGLE_APPLICATION_CREDENTIALS"
describeNameCol = "column_name"
describeTypeCol = "data_type"
describeCommentCol = "description"
// Storage Write API is limited to 10 MiB, subtract 50 KiB to account for request overhead.
maxRequestByteSize = (10 * 1024 * 1024) - (50 * 1024)
)

type Store struct {
configMap *types.DwhToTablesConfigMap
batchSize int
config config.Config

db.Store
}

func (s *Store) Append(tableData *optimization.TableData) error {
return shared.Append(s, tableData, types.AdditionalSettings{})
func (s *Store) Append(tableData *optimization.TableData, useTempTable bool) error {
if !useTempTable {
return shared.Append(s, tableData, types.AdditionalSettings{})
}

// We can simplify this once Google has fully rolled out the ability to execute DML on recently streamed data
// See: https://cloud.google.com/bigquery/docs/write-api#use_data_manipulation_language_dml_with_recently_streamed_data
// For now, we'll need to append this to a temporary table and then append temporary table onto the target table
tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name())
temporaryTableID := shared.TempTableID(tableID)

defer func() { _ = ddl.DropTemporaryTable(s, temporaryTableID, false) }()

err := shared.Append(s, tableData, types.AdditionalSettings{
UseTempTable: true,
TempTableID: temporaryTableID,
})

if err != nil {
return fmt.Errorf("failed to append: %w", err)
}

query := fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s`,
tableID.FullyQualifiedName(),
strings.Join(sql.QuoteIdentifiers(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), s.Dialect()), ","),
strings.Join(sql.QuoteIdentifiers(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), s.Dialect()), ","),
temporaryTableID.FullyQualifiedName(),
)

if _, err = s.Exec(query); err != nil {
return fmt.Errorf("failed to insert data into target table: %w", err)
}

return nil
}

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
Expand All @@ -67,30 +103,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo
return fmt.Errorf("unable to cast tempTableID to BigQuery TableIdentifier")
}

// Load the data
return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData)
}

func buildLegacyRows(tableData *optimization.TableData, additionalDateFmts []string) ([]*Row, error) {
// Cast the data into BigQuery values
var rows []*Row
columns := tableData.ReadOnlyInMemoryCols().ValidColumns()
for _, value := range tableData.Rows() {
data := make(map[string]bigquery.Value)
for _, col := range columns {
colVal, err := castColVal(value[col.Name()], col, additionalDateFmts)
if err != nil {
return nil, fmt.Errorf("failed to cast col %q: %w", col.Name(), err)
}

if colVal != nil {
data[col.Name()] = colVal
}
}

rows = append(rows, NewRow(data))
}
return rows, nil
return s.putTable(context.Background(), bqTempTableID, tableData)
}

func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql.TableIdentifier {
Expand Down Expand Up @@ -138,38 +151,76 @@ func (s *Store) GetClient(ctx context.Context) *bigquery.Client {
return client
}

func (s *Store) putTableViaLegacyAPI(ctx context.Context, tableID TableIdentifier, tableData *optimization.TableData) error {
rows, err := buildLegacyRows(tableData, s.config.SharedTransferConfig.TypingSettings.AdditionalDateFormats)
func (s *Store) putTable(ctx context.Context, bqTableID TableIdentifier, tableData *optimization.TableData) error {
columns := tableData.ReadOnlyInMemoryCols().ValidColumns()

messageDescriptor, err := columnsToMessageDescriptor(columns)
if err != nil {
return err
}
schemaDescriptor, err := adapt.NormalizeDescriptor(*messageDescriptor)
if err != nil {
return err
}

client := s.GetClient(ctx)
defer client.Close()
managedWriterClient, err := managedwriter.NewClient(ctx, bqTableID.ProjectID())
if err != nil {
return fmt.Errorf("failed to create managedwriter client: %w", err)
}
defer managedWriterClient.Close()

managedStream, err := managedWriterClient.NewManagedStream(ctx,
managedwriter.WithDestinationTable(
managedwriter.TableParentFromParts(bqTableID.ProjectID(), bqTableID.Dataset(), bqTableID.Table()),
),
managedwriter.WithType(managedwriter.DefaultStream),
managedwriter.WithSchemaDescriptor(schemaDescriptor),
managedwriter.EnableWriteRetries(true),
)
if err != nil {
return fmt.Errorf("failed to create managed stream: %w", err)
}
defer managedStream.Close()

batch := NewBatch(rows, s.batchSize)
inserter := client.Dataset(tableID.Dataset()).Table(tableID.Table()).Inserter()
for batch.HasNext() {
if err := inserter.Put(ctx, batch.NextChunk()); err != nil {
return fmt.Errorf("failed to insert rows: %w", err)
encoder := func(row map[string]any) ([]byte, error) {
message, err := rowToMessage(row, columns, *messageDescriptor, s.AdditionalDateFormats())
if err != nil {
return nil, fmt.Errorf("failed to convert row to message: %w", err)
}

bytes, err := proto.Marshal(message)
if err != nil {
return nil, fmt.Errorf("failed to marshal message: %w", err)
}

return bytes, nil
}

return nil
return batch.BySize(tableData.Rows(), maxRequestByteSize, encoder, func(chunk [][]byte) error {
result, err := managedStream.AppendRows(ctx, chunk)
if err != nil {
return fmt.Errorf("failed to append rows: %w", err)
}

if resp, err := result.FullResponse(ctx); err != nil {
return fmt.Errorf("failed to get response (%s): %w", resp.GetError().String(), err)
}

return nil
})
}

func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error {
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error {
stagingTableID := shared.TempTableID(tableID)

dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, topicConfig)
dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt)

defer func() { _ = ddl.DropTemporaryTable(s, stagingTableID, false) }()

return destination.ExecStatements(s, dedupeQueries)
}

func LoadBigQuery(cfg config.Config, _store *db.Store) (*Store, error) {
cfg.BigQuery.LoadDefaultValues()
if _store != nil {
// Used for tests.
return &Store{
Expand All @@ -196,7 +247,6 @@ func LoadBigQuery(cfg config.Config, _store *db.Store) (*Store, error) {
return &Store{
Store: store,
configMap: &types.DwhToTablesConfigMap{},
batchSize: cfg.BigQuery.BatchSize,
config: cfg,
}, nil
}
19 changes: 8 additions & 11 deletions clients/bigquery/bigquery_dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package bigquery

import (
"fmt"
"strings"
"testing"
"time"

Expand All @@ -11,17 +10,15 @@ import (
"github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/stringutil"
)

func TestGenerateDedupeQueries(t *testing.T) {
{
// Dedupe with one primary key + no `__artie_updated_at` flag.
tableID := NewTableIdentifier("project12", "public", "customers")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)

parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{})
parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false)
assert.Len(t, parts, 3)
assert.Equal(
t,
Expand All @@ -37,9 +34,9 @@ func TestGenerateDedupeQueries(t *testing.T) {
{
// Dedupe with one primary key + `__artie_updated_at` flag.
tableID := NewTableIdentifier("project12", "public", "customers")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)

parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true})
parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true)
assert.Len(t, parts, 3)
assert.Equal(
t,
Expand All @@ -55,9 +52,9 @@ func TestGenerateDedupeQueries(t *testing.T) {
{
// Dedupe with composite keys + no `__artie_updated_at` flag.
tableID := NewTableIdentifier("project123", "public", "user_settings")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)

parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{})
parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false)
assert.Len(t, parts, 3)
assert.Equal(
t,
Expand All @@ -73,9 +70,9 @@ func TestGenerateDedupeQueries(t *testing.T) {
{
// Dedupe with composite keys + `__artie_updated_at` flag.
tableID := NewTableIdentifier("project123", "public", "user_settings")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)

parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true})
parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true)
assert.Len(t, parts, 3)
assert.Equal(
t,
Expand Down
4 changes: 2 additions & 2 deletions clients/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestTempTableName(t *testing.T) {
func TestTempTableIDWithSuffix(t *testing.T) {
trimTTL := func(tableName string) string {
lastUnderscore := strings.LastIndex(tableName, "_")
assert.GreaterOrEqual(t, lastUnderscore, 0)
Expand All @@ -26,6 +26,6 @@ func TestTempTableName(t *testing.T) {
store := &Store{config: config.Config{BigQuery: &config.BigQuery{ProjectID: "123454321"}}}
tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{Database: "db", Schema: "schema"}, "table")
tableID := store.IdentifierFor(tableData.TopicConfig(), tableData.Name())
tempTableName := shared.TempTableID(tableID, "sUfFiX").FullyQualifiedName()
tempTableName := shared.TempTableIDWithSuffix(tableID, "sUfFiX").FullyQualifiedName()
assert.Equal(t, "`123454321`.`db`.`table___artie_sUfFiX`", trimTTL(tempTableName))
}

0 comments on commit b520224

Please sign in to comment.