Skip to content

Commit

Permalink
Kill GetColumnsToUpdate (#746)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Jun 18, 2024
1 parent e28d9f2 commit a7ae7b5
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 66 deletions.
4 changes: 2 additions & 2 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func (s *Store) Append(tableData *optimization.TableData, useTempTable bool) err

query := fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s`,
tableID.FullyQualifiedName(),
strings.Join(sql.QuoteIdentifiers(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), s.Dialect()), ","),
strings.Join(sql.QuoteIdentifiers(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), s.Dialect()), ","),
strings.Join(sql.QuoteColumns(tableData.ReadOnlyInMemoryCols().ValidColumns(), s.Dialect()), ","),
strings.Join(sql.QuoteColumns(tableData.ReadOnlyInMemoryCols().ValidColumns(), s.Dialect()), ","),
temporaryTableID.FullyQualifiedName(),
)

Expand Down
4 changes: 3 additions & 1 deletion clients/mssql/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing/columns"
)

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
Expand Down Expand Up @@ -41,7 +42,8 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo
}
}()

columns := tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate()
// TODO: Refactor the loop below to use the columns from [ValidColumns]
columns := columns.ColumnNames(tableData.ReadOnlyInMemoryCols().ValidColumns())
stmt, err := tx.Prepare(mssql.CopyIn(tempTableID.FullyQualifiedName(), mssql.BulkOptions{}, columns...))
if err != nil {
return fmt.Errorf("failed to prepare bulk insert: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion clients/redshift/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo
copyStmt := fmt.Sprintf(
`COPY %s (%s) FROM '%s' DELIMITER '\t' NULL AS '\\N' GZIP FORMAT CSV %s dateformat 'auto' timeformat 'auto';`,
tempTableID.FullyQualifiedName(),
strings.Join(sql.QuoteIdentifiers(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), s.Dialect()), ","),
strings.Join(sql.QuoteColumns(tableData.ReadOnlyInMemoryCols().ValidColumns(), s.Dialect()), ","),
s3Uri,
s.credentialsClause,
)
Expand Down
2 changes: 1 addition & 1 deletion clients/snowflake/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo
// COPY the CSV file (in Snowflake) into a table
copyCommand := fmt.Sprintf("COPY INTO %s (%s) FROM (SELECT %s FROM @%s)",
tempTableID.FullyQualifiedName(),
strings.Join(sql.QuoteIdentifiers(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), s.Dialect()), ","),
strings.Join(sql.QuoteColumns(tableData.ReadOnlyInMemoryCols().ValidColumns(), s.Dialect()), ","),
escapeColumns(tableData.ReadOnlyInMemoryCols(), ","), addPrefixToTableName(tempTableID, "%"))

if additionalSettings.AdditionalCopyClause != "" {
Expand Down
7 changes: 0 additions & 7 deletions lib/typing/columns/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,6 @@ func (c *Columns) GetColumn(name string) (Column, bool) {
return Column{}, false
}

// GetColumnsToUpdate will filter all the `Invalid` columns so that we do not update it.
// This is used mostly for the SQL MERGE queries.
// TODO: Replace all uses of [GetColumnsToUpdate] with [ValidColumns]
func (c *Columns) GetColumnsToUpdate() []string {
return ColumnNames(c.ValidColumns())
}

// ValidColumns will filter all the `Invalid` columns so that we do not update them.
// This is used mostly for the SQL MERGE queries.
func (c *Columns) ValidColumns() []Column {
Expand Down
54 changes: 0 additions & 54 deletions lib/typing/columns/columns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,60 +132,6 @@ func TestColumn_ShouldBackfill(t *testing.T) {
}
}

func TestColumns_GetColumnsToUpdate(t *testing.T) {
type _testCase struct {
name string
cols []Column
expectedCols []string
}

var (
happyPathCols = []Column{
{
name: "hi",
KindDetails: typing.String,
},
{
name: "bye",
KindDetails: typing.String,
},
{
name: "start",
KindDetails: typing.String,
},
}
)

extraCols := happyPathCols
for i := 0; i < 100; i++ {
extraCols = append(extraCols, Column{
name: fmt.Sprintf("hello_%v", i),
KindDetails: typing.Invalid,
})
}

testCases := []_testCase{
{
name: "happy path",
cols: happyPathCols,
expectedCols: []string{"hi", "bye", "start"},
},
{
name: "happy path + extra col",
cols: extraCols,
expectedCols: []string{"hi", "bye", "start"},
},
}

for _, testCase := range testCases {
cols := &Columns{
columns: testCase.cols,
}

assert.Equal(t, testCase.expectedCols, cols.GetColumnsToUpdate(), testCase.name)
}
}

func TestColumns_ValidColumns(t *testing.T) {
var happyPathCols = []Column{
{
Expand Down

0 comments on commit a7ae7b5

Please sign in to comment.