Skip to content

Commit

Permalink
Removing deadcode.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jun 17, 2024
1 parent a320314 commit 8af8338
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 356 deletions.
49 changes: 1 addition & 48 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ const (
describeNameCol = "column_name"
describeTypeCol = "data_type"
describeCommentCol = "description"
useStorageWriteAPI = true
)

type Store struct {
Expand Down Expand Up @@ -72,33 +71,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo
}

// Load the data
if useStorageWriteAPI {
return s.putTableViaStorageWriteAPI(context.Background(), bqTempTableID, tableData)
} else {
return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData)
}
}

func buildLegacyRows(tableData *optimization.TableData, additionalDateFmts []string) ([]*Row, error) {
// Cast the data into BigQuery values
var rows []*Row
columns := tableData.ReadOnlyInMemoryCols().ValidColumns()
for _, value := range tableData.Rows() {
data := make(map[string]bigquery.Value)
for _, col := range columns {
colVal, err := castColVal(value[col.Name()], col, additionalDateFmts)
if err != nil {
return nil, fmt.Errorf("failed to cast col %q: %w", col.Name(), err)
}

if colVal != nil {
data[col.Name()] = colVal
}
}

rows = append(rows, NewRow(data))
}
return rows, nil
return s.putTableViaStorageWriteAPI(context.Background(), bqTempTableID, tableData)
}

func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql.TableIdentifier {
Expand Down Expand Up @@ -146,26 +119,6 @@ func (s *Store) GetClient(ctx context.Context) *bigquery.Client {
return client
}

func (s *Store) putTableViaLegacyAPI(ctx context.Context, tableID TableIdentifier, tableData *optimization.TableData) error {
rows, err := buildLegacyRows(tableData, s.config.SharedTransferConfig.TypingSettings.AdditionalDateFormats)
if err != nil {
return err
}

client := s.GetClient(ctx)
defer client.Close()

batch := NewBatch(rows, s.batchSize)
inserter := client.Dataset(tableID.Dataset()).Table(tableID.Table()).Inserter()
for batch.HasNext() {
if err := inserter.Put(ctx, batch.NextChunk()); err != nil {
return fmt.Errorf("failed to insert rows: %w", err)
}
}

return nil
}

func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableIdentifier, tableData *optimization.TableData) error {
columns := tableData.ReadOnlyInMemoryCols().ValidColumns()

Expand Down
118 changes: 0 additions & 118 deletions clients/bigquery/cast.go

This file was deleted.

190 changes: 0 additions & 190 deletions clients/bigquery/cast_test.go

This file was deleted.

0 comments on commit 8af8338

Please sign in to comment.