Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions parquet/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
"github.com/invopop/jsonschema"
)

const defaultMaxRowGroupLength = 128 * 1024 * 1024
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason this was set to 128 mibi rows but it's actually a row count.


var allowedVersions = []string{"v1.0", "v2.4", "v2.6", "v2Latest"}
var allowedRootRepetitions = []string{"undefined", "required", "optional", "repeated"}

// nolint:revive
type ParquetSpec struct {
Version string `json:"version,omitempty"`
RootRepetition string `json:"root_repetition,omitempty"`
Version string `json:"version,omitempty"`
RootRepetition string `json:"root_repetition,omitempty"`
MaxRowGroupLength *int64 `json:"max_row_group_length,omitempty"`
}

func (s *ParquetSpec) GetVersion() parquet.Version {
Expand Down Expand Up @@ -46,6 +49,13 @@ func (s *ParquetSpec) GetRootRepetition() parquet.Repetition {
return parquet.Repetitions.Repeated
}

func (s *ParquetSpec) GetMaxRowGroupLength() int64 {
if s.MaxRowGroupLength == nil {
return defaultMaxRowGroupLength
}
return *s.MaxRowGroupLength
}

func (ParquetSpec) JSONSchema() *jsonschema.Schema {
properties := jsonschema.NewProperties()
allowedVersionsAsAny := make([]any, len(allowedVersions))
Expand All @@ -70,6 +80,13 @@ func (ParquetSpec) JSONSchema() *jsonschema.Schema {
Default: "repeated",
})

properties.Set("max_row_group_length", &jsonschema.Schema{
Type: "integer",
Description: "Max row group length",
Default: defaultMaxRowGroupLength,
Minimum: "0",
})

return &jsonschema.Schema{
Description: "CloudQuery Parquet file output spec.",
Properties: properties,
Expand All @@ -85,6 +102,10 @@ func (s *ParquetSpec) SetDefaults() {
if s.RootRepetition == "" {
s.RootRepetition = "repeated"
}
if s.MaxRowGroupLength == nil {
i := int64(defaultMaxRowGroupLength)
s.MaxRowGroupLength = &i
}
}

func (s *ParquetSpec) Validate() error {
Expand All @@ -94,5 +115,8 @@ func (s *ParquetSpec) Validate() error {
if !slices.Contains(allowedRootRepetitions, s.RootRepetition) {
return fmt.Errorf("invalid rootRepetition: %s. Allowed values are %s", s.RootRepetition, strings.Join(allowedRootRepetitions, ", "))
}
if s.MaxRowGroupLength != nil && *s.MaxRowGroupLength < 0 {
return fmt.Errorf("invalid: maxRowGroupLength: %v. Must be zero or positive", *s.MaxRowGroupLength)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is zero or a small number ok?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zero would be going back to the default of the arrow parquetwriter. Updated this check to be < 0

}
return nil
}
9 changes: 9 additions & 0 deletions parquet/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,14 @@ func TestSpec_JSONSchema(t *testing.T) {
ErrorMessage: "at '/root_repetition': value must be one of 'undefined', 'required', 'optional', 'repeated'",
Spec: `{"root_repetition":"invalid"}`,
},
{
Name: "valid max_row_group_length",
Spec: `{"max_row_group_length":256}`,
},
{
Name: "invalid max_row_group_length",
ErrorMessage: "at '/max_row_group_length': minimum: got",
Spec: `{"max_row_group_length":-4}`,
},
})
}
2 changes: 1 addition & 1 deletion parquet/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var _ ftypes.Handle = (*Handle)(nil)

func (c *Client) WriteHeader(w io.Writer, t *schema.Table) (ftypes.Handle, error) {
props := parquet.NewWriterProperties(
parquet.WithMaxRowGroupLength(128*1024*1024), // 128M
parquet.WithMaxRowGroupLength(c.spec.GetMaxRowGroupLength()),
parquet.WithCompression(compress.Codecs.Snappy),
parquet.WithVersion(c.spec.GetVersion()),
parquet.WithRootRepetition(c.spec.GetRootRepetition()),
Expand Down
6 changes: 6 additions & 0 deletions schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@
],
"description": "Root repetition",
"default": "repeated"
},
"max_row_group_length": {
"type": "integer",
"minimum": 0,
"description": "Max row group length",
"default": 134217728
}
},
"additionalProperties": false,
Expand Down
Loading