diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 1bc15630..585ab4e5 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -69,8 +69,8 @@ func (s *Store) Append(tableData *optimization.TableData, useTempTable bool) err query := fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s`, tableID.FullyQualifiedName(), - strings.Join(sql.QuoteIdentifiers(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), s.Dialect()), ","), - strings.Join(sql.QuoteIdentifiers(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), s.Dialect()), ","), + strings.Join(sql.QuoteColumns(tableData.ReadOnlyInMemoryCols().ValidColumns(), s.Dialect()), ","), + strings.Join(sql.QuoteColumns(tableData.ReadOnlyInMemoryCols().ValidColumns(), s.Dialect()), ","), temporaryTableID.FullyQualifiedName(), ) diff --git a/clients/mssql/staging.go b/clients/mssql/staging.go index e01c884d..a9b39715 100644 --- a/clients/mssql/staging.go +++ b/clients/mssql/staging.go @@ -10,6 +10,7 @@ import ( "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/sql" + "github.com/artie-labs/transfer/lib/typing/columns" ) func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { @@ -41,7 +42,8 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo } }() - columns := tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate() + // TODO: Refactor the loop below to use the columns from [ValidColumns] + columns := columns.ColumnNames(tableData.ReadOnlyInMemoryCols().ValidColumns()) stmt, err := tx.Prepare(mssql.CopyIn(tempTableID.FullyQualifiedName(), mssql.BulkOptions{}, columns...)) if err != nil { return fmt.Errorf("failed to prepare bulk insert: %w", err) diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index b67060de..e7bb1999 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -63,7 +63,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo copyStmt := fmt.Sprintf( `COPY %s (%s) FROM '%s' DELIMITER '\t' NULL AS '\\N' GZIP FORMAT CSV %s dateformat 'auto' timeformat 'auto';`, tempTableID.FullyQualifiedName(), - strings.Join(sql.QuoteIdentifiers(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), s.Dialect()), ","), + strings.Join(sql.QuoteColumns(tableData.ReadOnlyInMemoryCols().ValidColumns(), s.Dialect()), ","), s3Uri, s.credentialsClause, ) diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index 8936dd82..c6ad7d80 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -95,7 +95,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo // COPY the CSV file (in Snowflake) into a table copyCommand := fmt.Sprintf("COPY INTO %s (%s) FROM (SELECT %s FROM @%s)", tempTableID.FullyQualifiedName(), - strings.Join(sql.QuoteIdentifiers(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), s.Dialect()), ","), + strings.Join(sql.QuoteColumns(tableData.ReadOnlyInMemoryCols().ValidColumns(), s.Dialect()), ","), escapeColumns(tableData.ReadOnlyInMemoryCols(), ","), addPrefixToTableName(tempTableID, "%")) if additionalSettings.AdditionalCopyClause != "" { diff --git a/lib/typing/columns/columns.go b/lib/typing/columns/columns.go index dddc3e4e..f341befc 100644 --- a/lib/typing/columns/columns.go +++ b/lib/typing/columns/columns.go @@ -177,13 +177,6 @@ func (c *Columns) GetColumn(name string) (Column, bool) { return Column{}, false } -// GetColumnsToUpdate will filter all the `Invalid` columns so that we do not update it. -// This is used mostly for the SQL MERGE queries. -// TODO: Replace all uses of [GetColumnsToUpdate] with [ValidColumns] -func (c *Columns) GetColumnsToUpdate() []string { - return ColumnNames(c.ValidColumns()) -} - // ValidColumns will filter all the `Invalid` columns so that we do not update them. // This is used mostly for the SQL MERGE queries. func (c *Columns) ValidColumns() []Column { diff --git a/lib/typing/columns/columns_test.go b/lib/typing/columns/columns_test.go index 1ba0491f..f9e27943 100644 --- a/lib/typing/columns/columns_test.go +++ b/lib/typing/columns/columns_test.go @@ -132,60 +132,6 @@ func TestColumn_ShouldBackfill(t *testing.T) { } } -func TestColumns_GetColumnsToUpdate(t *testing.T) { - type _testCase struct { - name string - cols []Column - expectedCols []string - } - - var ( - happyPathCols = []Column{ - { - name: "hi", - KindDetails: typing.String, - }, - { - name: "bye", - KindDetails: typing.String, - }, - { - name: "start", - KindDetails: typing.String, - }, - } - ) - - extraCols := happyPathCols - for i := 0; i < 100; i++ { - extraCols = append(extraCols, Column{ - name: fmt.Sprintf("hello_%v", i), - KindDetails: typing.Invalid, - }) - } - - testCases := []_testCase{ - { - name: "happy path", - cols: happyPathCols, - expectedCols: []string{"hi", "bye", "start"}, - }, - { - name: "happy path + extra col", - cols: extraCols, - expectedCols: []string{"hi", "bye", "start"}, - }, - } - - for _, testCase := range testCases { - cols := &Columns{ - columns: testCase.cols, - } - - assert.Equal(t, testCase.expectedCols, cols.GetColumnsToUpdate(), testCase.name) - } -} - func TestColumns_ValidColumns(t *testing.T) { var happyPathCols = []Column{ {