diff --git a/parquet/spec.go b/parquet/spec.go index 2ede418f..df0041b6 100644 --- a/parquet/spec.go +++ b/parquet/spec.go @@ -9,13 +9,16 @@ import ( "github.com/invopop/jsonschema" ) +const defaultMaxRowGroupLength = 128 * 1024 * 1024 + var allowedVersions = []string{"v1.0", "v2.4", "v2.6", "v2Latest"} var allowedRootRepetitions = []string{"undefined", "required", "optional", "repeated"} // nolint:revive type ParquetSpec struct { - Version string `json:"version,omitempty"` - RootRepetition string `json:"root_repetition,omitempty"` + Version string `json:"version,omitempty"` + RootRepetition string `json:"root_repetition,omitempty"` + MaxRowGroupLength *int64 `json:"max_row_group_length,omitempty"` } func (s *ParquetSpec) GetVersion() parquet.Version { @@ -46,6 +49,13 @@ func (s *ParquetSpec) GetRootRepetition() parquet.Repetition { return parquet.Repetitions.Repeated } +func (s *ParquetSpec) GetMaxRowGroupLength() int64 { + if s.MaxRowGroupLength == nil { + return defaultMaxRowGroupLength + } + return *s.MaxRowGroupLength +} + func (ParquetSpec) JSONSchema() *jsonschema.Schema { properties := jsonschema.NewProperties() allowedVersionsAsAny := make([]any, len(allowedVersions)) @@ -70,6 +80,13 @@ func (ParquetSpec) JSONSchema() *jsonschema.Schema { Default: "repeated", }) + properties.Set("max_row_group_length", &jsonschema.Schema{ + Type: "integer", + Description: "Max row group length", + Default: defaultMaxRowGroupLength, + Minimum: "0", + }) + return &jsonschema.Schema{ Description: "CloudQuery Parquet file output spec.", Properties: properties, @@ -85,6 +102,10 @@ func (s *ParquetSpec) SetDefaults() { if s.RootRepetition == "" { s.RootRepetition = "repeated" } + if s.MaxRowGroupLength == nil { + i := int64(defaultMaxRowGroupLength) + s.MaxRowGroupLength = &i + } } func (s *ParquetSpec) Validate() error { @@ -94,5 +115,8 @@ func (s *ParquetSpec) Validate() error { if !slices.Contains(allowedRootRepetitions, s.RootRepetition) { return fmt.Errorf("invalid rootRepetition: %s. Allowed values are %s", s.RootRepetition, strings.Join(allowedRootRepetitions, ", ")) } + if s.MaxRowGroupLength != nil && *s.MaxRowGroupLength < 0 { + return fmt.Errorf("invalid: maxRowGroupLength: %v. Must be zero or positive", *s.MaxRowGroupLength) + } return nil } diff --git a/parquet/spec_test.go b/parquet/spec_test.go index dd3b1dce..2600056a 100644 --- a/parquet/spec_test.go +++ b/parquet/spec_test.go @@ -39,5 +39,14 @@ func TestSpec_JSONSchema(t *testing.T) { ErrorMessage: "at '/root_repetition': value must be one of 'undefined', 'required', 'optional', 'repeated'", Spec: `{"root_repetition":"invalid"}`, }, + { + Name: "valid max_row_group_length", + Spec: `{"max_row_group_length":256}`, + }, + { + Name: "invalid max_row_group_length", + ErrorMessage: "at '/max_row_group_length': minimum: got", + Spec: `{"max_row_group_length":-4}`, + }, }) } diff --git a/parquet/write.go b/parquet/write.go index f80bcc0b..2a23dd54 100644 --- a/parquet/write.go +++ b/parquet/write.go @@ -23,7 +23,7 @@ var _ ftypes.Handle = (*Handle)(nil) func (c *Client) WriteHeader(w io.Writer, t *schema.Table) (ftypes.Handle, error) { props := parquet.NewWriterProperties( - parquet.WithMaxRowGroupLength(128*1024*1024), // 128M + parquet.WithMaxRowGroupLength(c.spec.GetMaxRowGroupLength()), parquet.WithCompression(compress.Codecs.Snappy), parquet.WithVersion(c.spec.GetVersion()), parquet.WithRootRepetition(c.spec.GetRootRepetition()), diff --git a/schema.json b/schema.json index 8c38b33f..e246bebb 100644 --- a/schema.json +++ b/schema.json @@ -151,6 +151,12 @@ ], "description": "Root repetition", "default": "repeated" + }, + "max_row_group_length": { + "type": "integer", + "minimum": 0, + "description": "Max row group length", + "default": 134217728 } }, "additionalProperties": false,