Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions mc2mc/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err
if err != nil {
return errors.WithStack(err)
}
// construct query with ordered columns
queryRaw = constructQueryWithOrderedColumns(queryRaw, columnNames)
// construct query with ordered columns and BQ pseudo columns for ingestion time
queryRaw = constructQueryWithOrderedColumnsWithBQIngestionTime(queryRaw, columnNames)
}

if c.enablePartitionValue && !c.enableAutoPartition {
Expand Down Expand Up @@ -111,6 +111,23 @@ func addPartitionValueColumn(rawQuery []byte) []byte {
return []byte(fmt.Sprintf("%s SELECT *, STRING(CURRENT_DATE()) as __partitionvalue FROM (%s)", header, qr))
}

// constructQueryWithOrderedColumnsWithBQIngestionTime constructs query with ordered columns and BQ pseudo columns for ingestion time
// ref: https://cloud.google.com/bigquery/docs/querying-partitioned-tables#query_an_ingestion-time_partitioned_table
func constructQueryWithOrderedColumnsWithBQIngestionTime(query []byte, orderedColumns []string) []byte {
var orderedColumnsWithBQIngestionTime []string
for _, col := range orderedColumns {
val := col
switch col {
case "_partitiontime":
val = "CURRENT_TIMESTAMP() as _partitiontime"
case "_partitiondate":
val = "CURRENT_DATE() as _partitiondate"
}
orderedColumnsWithBQIngestionTime = append(orderedColumnsWithBQIngestionTime, val)
}
return constructQueryWithOrderedColumns(query, orderedColumnsWithBQIngestionTime)
}

func constructQueryWithOrderedColumns(query []byte, orderedColumns []string) []byte {
header, qr := loader.SeparateHeadersAndQuery(string(query))
return []byte(fmt.Sprintf("%s %s", header, loader.ConstructQueryWithOrderedColumns(qr, orderedColumns)))
Expand Down
Loading