Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): expose hive partitioning options #3240

Merged
merged 6 commits into from Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
73 changes: 73 additions & 0 deletions bigquery/external.go
Expand Up @@ -92,6 +92,10 @@ type ExternalDataConfig struct {

// Additional options for CSV, GoogleSheets and Bigtable formats.
Options ExternalDataConfigOptions

// HivePartitioningOptions allows use of Hive partitioning based on the
// layout of objects in Google Cloud Storage.
HivePartitioningOptions *HivePartitioningOptions
}

func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {
Expand All @@ -106,6 +110,9 @@ func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {
if e.Schema != nil {
q.Schema = e.Schema.toBQ()
}
if e.HivePartitioningOptions != nil {
q.HivePartitioningOptions = e.HivePartitioningOptions.toBQ()
}
if e.Options != nil {
e.Options.populateExternalDataConfig(&q)
}
Expand Down Expand Up @@ -134,6 +141,9 @@ func bqToExternalDataConfig(q *bq.ExternalDataConfiguration) (*ExternalDataConfi
return nil, err
}
}
if q.HivePartitioningOptions != nil {
e.HivePartitioningOptions = bqToHivePartitioningOptions(q.HivePartitioningOptions)
}
return e, nil
}

Expand Down Expand Up @@ -409,3 +419,66 @@ func bqToBigtableColumn(q *bq.BigtableColumn) (*BigtableColumn, error) {
}
return b, nil
}

// HivePartitioningMode is used in conjunction with HivePartitioningOptions.
type HivePartitioningMode string

var (
shollyman marked this conversation as resolved.
Show resolved Hide resolved
// AUTO
AutoHivePartitioningMode HivePartitioningMode = "AUTO"
// STRINGS
StringHivePartitioningMode HivePartitioningMode = "STRINGS"
// CUSTOM
CustomHivePartitioningMode HivePartitioningMode = "CUSTOM"
)

// HivePartitioningOptions defines the behavior of Hive partitioning
// when working with external data.
type HivePartitioningOptions struct {

// Mode defines which hive partitioning mode to use when reading data.
Mode HivePartitioningMode

// When hive partition detection is
shollyman marked this conversation as resolved.
Show resolved Hide resolved
// requested, a common prefix for all source uris should be supplied.
// The prefix must end immediately before the partition key encoding
// begins.
//
// For example, consider files following this data layout.
// gs://bucket/path_to_table/dt=2019-01-01/country=BR/id=7/file.avro
// gs://bucket/path_to_table/dt=2018-12-31/country=CA/id=3/file.avro
//
// When hive partitioning is requested with either AUTO or STRINGS
// detection, the common prefix can be either of
// gs://bucket/path_to_table or gs://bucket/path_to_table/ (trailing
// slash does not matter).
SourceURIPrefix string

// If set to true, queries against this external table require
// a partition filter to be present that can perform partition
// elimination. Hive-partitioned load jobs with this field
// set to true will fail.
RequirePartitionFilter bool
}

func (o *HivePartitioningOptions) toBQ() *bq.HivePartitioningOptions {
if o == nil {
return nil
}
return &bq.HivePartitioningOptions{
Mode: string(o.Mode),
SourceUriPrefix: o.SourceURIPrefix,
RequirePartitionFilter: o.RequirePartitionFilter,
}
}

func bqToHivePartitioningOptions(q *bq.HivePartitioningOptions) *HivePartitioningOptions {
if q == nil {
return nil
}
return &HivePartitioningOptions{
Mode: HivePartitioningMode(q.Mode),
SourceURIPrefix: q.SourceUriPrefix,
RequirePartitionFilter: q.RequirePartitionFilter,
}
}
8 changes: 8 additions & 0 deletions bigquery/external_test.go
Expand Up @@ -48,6 +48,14 @@ func TestExternalDataConfig(t *testing.T) {
Range: "sheet1!A1:Z10",
},
},
{
SourceFormat: Avro,
HivePartitioningOptions: &HivePartitioningOptions{
Mode: AutoHivePartitioningMode,
SourceURIPrefix: "gs://somebucket/a/b/c",
RequirePartitionFilter: true,
},
},
{
SourceFormat: Bigtable,
Options: &BigtableOptions{
Expand Down
53 changes: 53 additions & 0 deletions bigquery/integration_test.go
Expand Up @@ -1859,6 +1859,59 @@ func TestIntegration_LegacyQuery(t *testing.T) {
}
}

func TestIntegration_QueryExternalHivePartitioning(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()

autoTable := dataset.Table(tableIDs.New())
customTable := dataset.Table(tableIDs.New())

err := autoTable.Create(ctx, &TableMetadata{
ExternalDataConfig: &ExternalDataConfig{
SourceFormat: Parquet,
SourceURIs: []string{"gs://cloud-samples-data/bigquery/hive-partitioning-samples/autolayout/*"},
AutoDetect: true,
HivePartitioningOptions: &HivePartitioningOptions{
Mode: AutoHivePartitioningMode,
SourceURIPrefix: "gs://cloud-samples-data/bigquery/hive-partitioning-samples/autolayout/",
RequirePartitionFilter: true,
},
},
})
if err != nil {
t.Fatalf("table.Create(auto): %v", err)
}
defer autoTable.Delete(ctx)

err = customTable.Create(ctx, &TableMetadata{
ExternalDataConfig: &ExternalDataConfig{
SourceFormat: Parquet,
SourceURIs: []string{"gs://cloud-samples-data/bigquery/hive-partitioning-samples/customlayout/*"},
AutoDetect: true,
HivePartitioningOptions: &HivePartitioningOptions{
Mode: CustomHivePartitioningMode,
SourceURIPrefix: "gs://cloud-samples-data/bigquery/hive-partitioning-samples/customlayout/{pkey:STRING}/",
RequirePartitionFilter: true,
},
},
})
if err != nil {
t.Fatalf("table.Create(custom): %v", err)
}
defer customTable.Delete(ctx)

// Issue a test query that prunes based on the custom hive partitioning key, and verify the result is as expected.
sql := fmt.Sprintf("SELECT COUNT(*) as ct FROM `%s`.%s.%s WHERE pkey=\"foo\"", customTable.ProjectID, customTable.DatasetID, customTable.TableID)
q := client.Query(sql)
it, err := q.Read(ctx)
if err != nil {
t.Fatalf("Error querying: %v", err)
}
checkReadAndTotalRows(t, "HiveQuery", it, [][]Value{{int64(50)}})
}

func TestIntegration_QueryParameters(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
Expand Down