Skip to content

Commit

Permalink
[BigQuery] Refactoring Append to allow useTempTable (#738)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jun 18, 2024
1 parent 5442913 commit 0d649bd
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 10 deletions.
31 changes: 29 additions & 2 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,35 @@ type Store struct {
db.Store
}

func (s *Store) Append(tableData *optimization.TableData) error {
return shared.Append(s, tableData, types.AdditionalSettings{})
func (s *Store) Append(tableData *optimization.TableData, useTempTable bool) error {
if !useTempTable {
return shared.Append(s, tableData, types.AdditionalSettings{})
}

// We can simplify this once Google has fully rolled out the ability to execute DML on recently streamed data
// See: https://cloud.google.com/bigquery/docs/write-api#use_data_manipulation_language_dml_with_recently_streamed_data
// For now, we'll need to append this to a temporary table and then append temporary table onto the target table
tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name())
temporaryTableID := shared.TempTableID(tableID)

defer func() { _ = ddl.DropTemporaryTable(s, temporaryTableID, false) }()

err := shared.Append(s, tableData, types.AdditionalSettings{
UseTempTable: true,
TempTableID: temporaryTableID,
})

if err != nil {
return fmt.Errorf("failed to append: %w", err)
}

if _, err = s.Exec(
fmt.Sprintf(`INSERT INTO %s SELECT * FROM %s`, tableID.FullyQualifiedName(), temporaryTableID.FullyQualifiedName()),
); err != nil {
return fmt.Errorf("failed to insert data into target table: %w", err)
}

return nil
}

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
Expand Down
2 changes: 1 addition & 1 deletion clients/mssql/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *Store) Merge(tableData *optimization.TableData) error {
return shared.Merge(s, tableData, types.MergeOpts{})
}

func (s *Store) Append(tableData *optimization.TableData) error {
func (s *Store) Append(tableData *optimization.TableData, _ bool) error {
return shared.Append(s, tableData, types.AdditionalSettings{})
}

Expand Down
2 changes: 1 addition & 1 deletion clients/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Store struct {
db.Store
}

func (s *Store) Append(tableData *optimization.TableData) error {
func (s *Store) Append(tableData *optimization.TableData, _ bool) error {
return shared.Append(s, tableData, types.AdditionalSettings{})
}

Expand Down
2 changes: 1 addition & 1 deletion clients/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *Store) ObjectPrefix(tableData *optimization.TableData) string {
return strings.Join([]string{fqTableName, yyyyMMDDFormat}, "/")
}

func (s *Store) Append(tableData *optimization.TableData) error {
func (s *Store) Append(tableData *optimization.TableData, _ bool) error {
// There's no difference in appending or merging for S3.
return s.Merge(tableData)
}
Expand Down
7 changes: 6 additions & 1 deletion clients/shared/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,16 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op
return fmt.Errorf("failed to merge columns from destination: %w", err)
}

if opts.UseTempTable {
// Override tableID with tempTableID if we're using a temporary table
tableID = opts.TempTableID
}

return dwh.PrepareTemporaryTable(
tableData,
tableConfig,
tableID,
opts,
false,
opts.UseTempTable,
)
}
2 changes: 1 addition & 1 deletion clients/snowflake/writes.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/artie-labs/transfer/lib/typing/columns"
)

func (s *Store) Append(tableData *optimization.TableData) error {
func (s *Store) Append(tableData *optimization.TableData, _ bool) error {
var err error
for i := 0; i < maxRetries; i++ {
if i > 0 {
Expand Down
4 changes: 2 additions & 2 deletions lib/destination/dwh.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
type DataWarehouse interface {
Dialect() sqllib.Dialect
Merge(tableData *optimization.TableData) error
Append(tableData *optimization.TableData) error
Append(tableData *optimization.TableData, useTempTable bool) error
Dedupe(tableID sqllib.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error
Exec(query string, args ...any) (sql.Result, error)
Query(query string, args ...any) (*sql.Rows, error)
Expand All @@ -30,7 +30,7 @@ type DataWarehouse interface {

type Baseline interface {
Merge(tableData *optimization.TableData) error
Append(tableData *optimization.TableData) error
Append(tableData *optimization.TableData, useTempTable bool) error
IsRetryableError(err error) bool
IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sqllib.TableIdentifier
}
Expand Down
4 changes: 4 additions & 0 deletions lib/destination/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ type MergeOpts struct {

type AdditionalSettings struct {
AdditionalCopyClause string

// These settings are used for the `Append` method.
UseTempTable bool
TempTableID sql.TableIdentifier
}
2 changes: 1 addition & 1 deletion processes/consumer/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func Flush(ctx context.Context, inMemDB *models.DatabaseData, dest destination.B
action := "merge"
// Merge or Append depending on the mode.
if _tableData.Mode() == config.History {
err = dest.Append(_tableData.TableData)
err = dest.Append(_tableData.TableData, false)
action = "append"
} else {
err = dest.Merge(_tableData.TableData)
Expand Down

0 comments on commit 0d649bd

Please sign in to comment.