Skip to content

Commit

Permalink
Checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jun 18, 2024
1 parent d6a2593 commit edf6033
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
22 changes: 14 additions & 8 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,26 @@ type Store struct {
}

func (s *Store) Append(tableData *optimization.TableData, useTempTable bool) error {
tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name())
if !useTempTable {
return shared.Append(s, tableData, types.AdditionalSettings{})
}

// Redshift is slightly different, we'll load and create the temporary table via shared.Append
// Then, we'll invoke `ALTER TABLE target APPEND FROM staging` to combine the diffs.
// We can simplify this once Google has fully rolled out the ability to execute DML on recently streamed data
// See: https://cloud.google.com/bigquery/docs/write-api#use_data_manipulation_language_dml_with_recently_streamed_data
// For now, we'll need to append this to a temporary table and then append temporary table onto the target table
tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name())
temporaryTableID := shared.TempTableID(tableID)
if err := shared.Append(s, tableData, types.AdditionalSettings{
err := shared.Append(s, tableData, types.AdditionalSettings{
UseTempTable: true,
TempTableID: temporaryTableID,
}); err != nil {
return err
})

if err != nil {
return fmt.Errorf("failed to append: %w", err)
}

_, err := s.Exec(
fmt.Sprintf(`ALTER TABLE %s APPEND FROM %s;`, tableID.FullyQualifiedName(), temporaryTableID.FullyQualifiedName()),
_, err = s.Exec(
fmt.Sprintf(`INSERT INTO %s SELECT * FROM %s`, tableID.FullyQualifiedName(), temporaryTableID.FullyQualifiedName()),
)
return err
}
Expand Down
5 changes: 5 additions & 0 deletions clients/shared/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op
return fmt.Errorf("failed to merge columns from destination: %w", err)
}

if opts.UseTempTable {
// Override tableID with tempTableID if we're using a temporary table
tableID = opts.TempTableID
}

return dwh.PrepareTemporaryTable(
tableData,
tableConfig,
Expand Down

0 comments on commit edf6033

Please sign in to comment.