diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index bdd515a96..b67060de9 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -91,11 +91,11 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID writer.Comma = '\t' additionalDateFmts := s.config.SharedTransferConfig.TypingSettings.AdditionalDateFormats + columns := tableData.ReadOnlyInMemoryCols().ValidColumns() for _, value := range tableData.Rows() { var row []string - for _, col := range tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate() { - colKind, _ := tableData.ReadOnlyInMemoryCols().GetColumn(col) - castedValue, castErr := s.CastColValStaging(value[col], colKind, additionalDateFmts) + for _, col := range columns { + castedValue, castErr := s.CastColValStaging(value[col.Name()], col, additionalDateFmts) if castErr != nil { return "", castErr } diff --git a/clients/s3/s3.go b/clients/s3/s3.go index eb2667974..4c1ec9548 100644 --- a/clients/s3/s3.go +++ b/clients/s3/s3.go @@ -106,20 +106,16 @@ func (s *Store) Merge(tableData *optimization.TableData) error { additionalDateFmts := s.config.SharedTransferConfig.TypingSettings.AdditionalDateFormats pw.CompressionType = parquet.CompressionCodec_GZIP + columns := tableData.ReadOnlyInMemoryCols().ValidColumns() for _, val := range tableData.Rows() { row := make(map[string]any) - for _, col := range tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate() { - colKind, isOk := tableData.ReadOnlyInMemoryCols().GetColumn(col) - if !isOk { - return fmt.Errorf("expected column: %v to exist in readOnlyInMemoryCols(...) but it does not", col) - } - - value, err := parquetutil.ParseValue(val[col], colKind, additionalDateFmts) + for _, col := range columns { + value, err := parquetutil.ParseValue(val[col.Name()], col, additionalDateFmts) if err != nil { - return fmt.Errorf("failed to parse value, err: %w, value: %v, column: %v", err, val[col], col) + return fmt.Errorf("failed to parse value, err: %w, value: %v, column: %q", err, val[col.Name()], col.Name()) } - row[col] = value + row[col.Name()] = value } rowBytes, err := json.Marshal(row) diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index fdddc2810..8936dd824 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -121,11 +121,11 @@ func (s *Store) writeTemporaryTableFile(tableData *optimization.TableData, newTa writer.Comma = '\t' additionalDateFmts := s.config.SharedTransferConfig.TypingSettings.AdditionalDateFormats + columns := tableData.ReadOnlyInMemoryCols().ValidColumns() for _, value := range tableData.Rows() { var row []string - for _, col := range tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate() { - column, _ := tableData.ReadOnlyInMemoryCols().GetColumn(col) - castedValue, castErr := castColValStaging(value[col], column, additionalDateFmts) + for _, col := range columns { + castedValue, castErr := castColValStaging(value[col.Name()], col, additionalDateFmts) if castErr != nil { return "", castErr }