Skip to content

Commit fa77efa

Browse files
authored
feat(bigquery): expose hive partitioning options (#3240)
* feat(bigquery): add HivePartitioningOptions This resuscitates https://code-review.googlesource.com/c/gocloud/+/43270 and updates it now that the service is properly advertising the fields such as RequirePartitionFilter.
1 parent ea3cde5 commit fa77efa

File tree

3 files changed

+133
-0
lines changed

3 files changed

+133
-0
lines changed

Diff for: bigquery/external.go

+72
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ type ExternalDataConfig struct {
9292

9393
// Additional options for CSV, GoogleSheets and Bigtable formats.
9494
Options ExternalDataConfigOptions
95+
96+
// HivePartitioningOptions allows use of Hive partitioning based on the
97+
// layout of objects in Google Cloud Storage.
98+
HivePartitioningOptions *HivePartitioningOptions
9599
}
96100

97101
func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {
@@ -106,6 +110,9 @@ func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {
106110
if e.Schema != nil {
107111
q.Schema = e.Schema.toBQ()
108112
}
113+
if e.HivePartitioningOptions != nil {
114+
q.HivePartitioningOptions = e.HivePartitioningOptions.toBQ()
115+
}
109116
if e.Options != nil {
110117
e.Options.populateExternalDataConfig(&q)
111118
}
@@ -134,6 +141,9 @@ func bqToExternalDataConfig(q *bq.ExternalDataConfiguration) (*ExternalDataConfi
134141
return nil, err
135142
}
136143
}
144+
if q.HivePartitioningOptions != nil {
145+
e.HivePartitioningOptions = bqToHivePartitioningOptions(q.HivePartitioningOptions)
146+
}
137147
return e, nil
138148
}
139149

@@ -409,3 +419,65 @@ func bqToBigtableColumn(q *bq.BigtableColumn) (*BigtableColumn, error) {
409419
}
410420
return b, nil
411421
}
422+
423+
// HivePartitioningMode is used in conjunction with HivePartitioningOptions.
424+
type HivePartitioningMode string
425+
426+
const (
427+
// AutoHivePartitioningMode automatically infers partitioning key and types.
428+
AutoHivePartitioningMode HivePartitioningMode = "AUTO"
429+
// StringHivePartitioningMode automatically infers partitioning keys and treats values as string.
430+
StringHivePartitioningMode HivePartitioningMode = "STRINGS"
431+
// CustomHivePartitioningMode allows custom definition of the external partitioning.
432+
CustomHivePartitioningMode HivePartitioningMode = "CUSTOM"
433+
)
434+
435+
// HivePartitioningOptions defines the behavior of Hive partitioning
436+
// when working with external data.
437+
type HivePartitioningOptions struct {
438+
439+
// Mode defines which hive partitioning mode to use when reading data.
440+
Mode HivePartitioningMode
441+
442+
// When hive partition detection is requested, a common prefix for
443+
// all source uris should be supplied. The prefix must end immediately
444+
// before the partition key encoding begins.
445+
//
446+
// For example, consider files following this data layout.
447+
// gs://bucket/path_to_table/dt=2019-01-01/country=BR/id=7/file.avro
448+
// gs://bucket/path_to_table/dt=2018-12-31/country=CA/id=3/file.avro
449+
//
450+
// When hive partitioning is requested with either AUTO or STRINGS
451+
// detection, the common prefix can be either of
452+
// gs://bucket/path_to_table or gs://bucket/path_to_table/ (trailing
453+
// slash does not matter).
454+
SourceURIPrefix string
455+
456+
// If set to true, queries against this external table require
457+
// a partition filter to be present that can perform partition
458+
// elimination. Hive-partitioned load jobs with this field
459+
// set to true will fail.
460+
RequirePartitionFilter bool
461+
}
462+
463+
func (o *HivePartitioningOptions) toBQ() *bq.HivePartitioningOptions {
464+
if o == nil {
465+
return nil
466+
}
467+
return &bq.HivePartitioningOptions{
468+
Mode: string(o.Mode),
469+
SourceUriPrefix: o.SourceURIPrefix,
470+
RequirePartitionFilter: o.RequirePartitionFilter,
471+
}
472+
}
473+
474+
func bqToHivePartitioningOptions(q *bq.HivePartitioningOptions) *HivePartitioningOptions {
475+
if q == nil {
476+
return nil
477+
}
478+
return &HivePartitioningOptions{
479+
Mode: HivePartitioningMode(q.Mode),
480+
SourceURIPrefix: q.SourceUriPrefix,
481+
RequirePartitionFilter: q.RequirePartitionFilter,
482+
}
483+
}

Diff for: bigquery/external_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@ func TestExternalDataConfig(t *testing.T) {
4848
Range: "sheet1!A1:Z10",
4949
},
5050
},
51+
{
52+
SourceFormat: Avro,
53+
HivePartitioningOptions: &HivePartitioningOptions{
54+
Mode: AutoHivePartitioningMode,
55+
SourceURIPrefix: "gs://somebucket/a/b/c",
56+
RequirePartitionFilter: true,
57+
},
58+
},
5159
{
5260
SourceFormat: Bigtable,
5361
Options: &BigtableOptions{

Diff for: bigquery/integration_test.go

+53
Original file line numberDiff line numberDiff line change
@@ -1859,6 +1859,59 @@ func TestIntegration_LegacyQuery(t *testing.T) {
18591859
}
18601860
}
18611861

1862+
func TestIntegration_QueryExternalHivePartitioning(t *testing.T) {
1863+
if client == nil {
1864+
t.Skip("Integration tests skipped")
1865+
}
1866+
ctx := context.Background()
1867+
1868+
autoTable := dataset.Table(tableIDs.New())
1869+
customTable := dataset.Table(tableIDs.New())
1870+
1871+
err := autoTable.Create(ctx, &TableMetadata{
1872+
ExternalDataConfig: &ExternalDataConfig{
1873+
SourceFormat: Parquet,
1874+
SourceURIs: []string{"gs://cloud-samples-data/bigquery/hive-partitioning-samples/autolayout/*"},
1875+
AutoDetect: true,
1876+
HivePartitioningOptions: &HivePartitioningOptions{
1877+
Mode: AutoHivePartitioningMode,
1878+
SourceURIPrefix: "gs://cloud-samples-data/bigquery/hive-partitioning-samples/autolayout/",
1879+
RequirePartitionFilter: true,
1880+
},
1881+
},
1882+
})
1883+
if err != nil {
1884+
t.Fatalf("table.Create(auto): %v", err)
1885+
}
1886+
defer autoTable.Delete(ctx)
1887+
1888+
err = customTable.Create(ctx, &TableMetadata{
1889+
ExternalDataConfig: &ExternalDataConfig{
1890+
SourceFormat: Parquet,
1891+
SourceURIs: []string{"gs://cloud-samples-data/bigquery/hive-partitioning-samples/customlayout/*"},
1892+
AutoDetect: true,
1893+
HivePartitioningOptions: &HivePartitioningOptions{
1894+
Mode: CustomHivePartitioningMode,
1895+
SourceURIPrefix: "gs://cloud-samples-data/bigquery/hive-partitioning-samples/customlayout/{pkey:STRING}/",
1896+
RequirePartitionFilter: true,
1897+
},
1898+
},
1899+
})
1900+
if err != nil {
1901+
t.Fatalf("table.Create(custom): %v", err)
1902+
}
1903+
defer customTable.Delete(ctx)
1904+
1905+
// Issue a test query that prunes based on the custom hive partitioning key, and verify the result is as expected.
1906+
sql := fmt.Sprintf("SELECT COUNT(*) as ct FROM `%s`.%s.%s WHERE pkey=\"foo\"", customTable.ProjectID, customTable.DatasetID, customTable.TableID)
1907+
q := client.Query(sql)
1908+
it, err := q.Read(ctx)
1909+
if err != nil {
1910+
t.Fatalf("Error querying: %v", err)
1911+
}
1912+
checkReadAndTotalRows(t, "HiveQuery", it, [][]Value{{int64(50)}})
1913+
}
1914+
18621915
func TestIntegration_QueryParameters(t *testing.T) {
18631916
if client == nil {
18641917
t.Skip("Integration tests skipped")

0 commit comments

Comments
 (0)