Skip to content

Commit

Permalink
Checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jun 18, 2024
1 parent b75b4ac commit 626c5f5
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 9 deletions.
19 changes: 17 additions & 2 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,23 @@ type Store struct {
db.Store
}

func (s *Store) Append(tableData *optimization.TableData) error {
return shared.Append(s, tableData, types.AdditionalSettings{})
func (s *Store) Append(tableData *optimization.TableData, useTempTable bool) error {
tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name())

// Redshift is slightly different, we'll load and create the temporary table via shared.Append
// Then, we'll invoke `ALTER TABLE target APPEND FROM staging` to combine the diffs.
temporaryTableID := shared.TempTableID(tableID)
if err := shared.Append(s, tableData, types.AdditionalSettings{
UseTempTable: true,
TempTableID: temporaryTableID,
}); err != nil {
return err
}

_, err := s.Exec(
fmt.Sprintf(`ALTER TABLE %s APPEND FROM %s;`, tableID.FullyQualifiedName(), temporaryTableID.FullyQualifiedName()),
)
return err
}

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
Expand Down
2 changes: 1 addition & 1 deletion clients/mssql/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *Store) Merge(tableData *optimization.TableData) error {
return shared.Merge(s, tableData, types.MergeOpts{})
}

func (s *Store) Append(tableData *optimization.TableData) error {
func (s *Store) Append(tableData *optimization.TableData, _ bool) error {
return shared.Append(s, tableData, types.AdditionalSettings{})
}

Expand Down
2 changes: 1 addition & 1 deletion clients/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Store struct {
db.Store
}

func (s *Store) Append(tableData *optimization.TableData) error {
func (s *Store) Append(tableData *optimization.TableData, _ bool) error {
return shared.Append(s, tableData, types.AdditionalSettings{})
}

Expand Down
2 changes: 1 addition & 1 deletion clients/shared/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op
tableConfig,
tableID,
opts,
false,
opts.UseTempTable,
)
}
2 changes: 1 addition & 1 deletion clients/snowflake/writes.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/artie-labs/transfer/lib/typing/columns"
)

func (s *Store) Append(tableData *optimization.TableData) error {
func (s *Store) Append(tableData *optimization.TableData, _ bool) error {
var err error
for i := 0; i < maxRetries; i++ {
if i > 0 {
Expand Down
4 changes: 2 additions & 2 deletions lib/destination/dwh.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
type DataWarehouse interface {
Dialect() sqllib.Dialect
Merge(tableData *optimization.TableData) error
Append(tableData *optimization.TableData) error
Append(tableData *optimization.TableData, useTempTable bool) error
Dedupe(tableID sqllib.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error
Exec(query string, args ...any) (sql.Result, error)
Query(query string, args ...any) (*sql.Rows, error)
Expand All @@ -30,7 +30,7 @@ type DataWarehouse interface {

type Baseline interface {
Merge(tableData *optimization.TableData) error
Append(tableData *optimization.TableData) error
Append(tableData *optimization.TableData, useTempTable bool) error
IsRetryableError(err error) bool
IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sqllib.TableIdentifier
}
Expand Down
3 changes: 3 additions & 0 deletions lib/destination/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ type MergeOpts struct {

type AdditionalSettings struct {
AdditionalCopyClause string

UseTempTable bool
TempTableID sql.TableIdentifier
}
2 changes: 1 addition & 1 deletion processes/consumer/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func Flush(ctx context.Context, inMemDB *models.DatabaseData, dest destination.B
action := "merge"
// Merge or Append depending on the mode.
if _tableData.Mode() == config.History {
err = dest.Append(_tableData.TableData)
err = dest.Append(_tableData.TableData, false)
action = "append"
} else {
err = dest.Merge(_tableData.TableData)
Expand Down

0 comments on commit 626c5f5

Please sign in to comment.