diff --git a/parquet/spec.go b/parquet/spec.go index 4cdc4c63..2ede418f 100644 --- a/parquet/spec.go +++ b/parquet/spec.go @@ -1,21 +1,98 @@ package parquet -import "github.com/invopop/jsonschema" +import ( + "fmt" + "slices" + "strings" + + "github.com/apache/arrow/go/v17/parquet" + "github.com/invopop/jsonschema" +) + +var allowedVersions = []string{"v1.0", "v2.4", "v2.6", "v2Latest"} +var allowedRootRepetitions = []string{"undefined", "required", "optional", "repeated"} // nolint:revive -type ParquetSpec struct{} +type ParquetSpec struct { + Version string `json:"version,omitempty"` + RootRepetition string `json:"root_repetition,omitempty"` +} + +func (s *ParquetSpec) GetVersion() parquet.Version { + switch s.Version { + case "v1.0": + return parquet.V1_0 + case "v2.4": + return parquet.V2_4 + case "v2.6": + return parquet.V2_6 + case "v2Latest": + return parquet.V2_LATEST + } + return parquet.V2_LATEST +} + +func (s *ParquetSpec) GetRootRepetition() parquet.Repetition { + switch s.RootRepetition { + case "undefined": + return parquet.Repetitions.Undefined + case "required": + return parquet.Repetitions.Required + case "optional": + return parquet.Repetitions.Optional + case "repeated": + return parquet.Repetitions.Repeated + } + return parquet.Repetitions.Repeated +} func (ParquetSpec) JSONSchema() *jsonschema.Schema { + properties := jsonschema.NewProperties() + allowedVersionsAsAny := make([]any, len(allowedVersions)) + for i, v := range allowedVersions { + allowedVersionsAsAny[i] = v + } + properties.Set("version", &jsonschema.Schema{ + Type: "string", + Description: "Parquet format version", + Enum: allowedVersionsAsAny, + Default: "v2Latest", + }) + + allowedRootRepetitionsAsAny := make([]any, len(allowedRootRepetitions)) + for i, v := range allowedRootRepetitions { + allowedRootRepetitionsAsAny[i] = v + } + properties.Set("root_repetition", &jsonschema.Schema{ + Type: "string", + Description: "Root repetition", + Enum: allowedRootRepetitionsAsAny, + Default: "repeated", + }) + return &jsonschema.Schema{ Description: "CloudQuery Parquet file output spec.", + Properties: properties, Type: "object", AdditionalProperties: jsonschema.FalseSchema, // "additionalProperties": false } } -func (*ParquetSpec) SetDefaults() { +func (s *ParquetSpec) SetDefaults() { + if s.Version == "" { + s.Version = "v2Latest" + } + if s.RootRepetition == "" { + s.RootRepetition = "repeated" + } } -func (*ParquetSpec) Validate() error { +func (s *ParquetSpec) Validate() error { + if !slices.Contains(allowedVersions, s.Version) { + return fmt.Errorf("invalid version: %s. Allowed values are %s", s.Version, strings.Join(allowedVersions, ", ")) + } + if !slices.Contains(allowedRootRepetitions, s.RootRepetition) { + return fmt.Errorf("invalid rootRepetition: %s. Allowed values are %s", s.RootRepetition, strings.Join(allowedRootRepetitions, ", ")) + } return nil } diff --git a/parquet/spec_test.go b/parquet/spec_test.go index 9b619a5f..dd3b1dce 100644 --- a/parquet/spec_test.go +++ b/parquet/spec_test.go @@ -21,5 +21,23 @@ func TestSpec_JSONSchema(t *testing.T) { Err: true, Spec: `{"extra":true}`, }, + { + Name: "invalid version", + ErrorMessage: "at '/version': value must be one of 'v1.0', 'v2.4', 'v2.6', 'v2Latest'", + Spec: `{"version":"invalid"}`, + }, + { + Name: "valid version", + Spec: `{"version":"v1.0"}`, + }, + { + Name: "valid root_repetition", + Spec: `{"root_repetition":"undefined"}`, + }, + { + Name: "invalid root_repetition", + ErrorMessage: "at '/root_repetition': value must be one of 'undefined', 'required', 'optional', 'repeated'", + Spec: `{"root_repetition":"invalid"}`, + }, }) } diff --git a/parquet/write.go b/parquet/write.go index 8360eb29..f80bcc0b 100644 --- a/parquet/write.go +++ b/parquet/write.go @@ -21,10 +21,12 @@ type Handle struct { var _ ftypes.Handle = (*Handle)(nil) -func (*Client) WriteHeader(w io.Writer, t *schema.Table) (ftypes.Handle, error) { +func (c *Client) WriteHeader(w io.Writer, t *schema.Table) (ftypes.Handle, error) { props := parquet.NewWriterProperties( parquet.WithMaxRowGroupLength(128*1024*1024), // 128M parquet.WithCompression(compress.Codecs.Snappy), + parquet.WithVersion(c.spec.GetVersion()), + parquet.WithRootRepetition(c.spec.GetRootRepetition()), ) arrprops := pqarrow.DefaultWriterProps() newSchema := convertSchema(t.ToArrowSchema()) diff --git a/schema.json b/schema.json index 1554e8e3..8c38b33f 100644 --- a/schema.json +++ b/schema.json @@ -129,6 +129,30 @@ "description": "CloudQuery JSON file output spec." }, "ParquetSpec": { + "properties": { + "version": { + "type": "string", + "enum": [ + "v1.0", + "v2.4", + "v2.6", + "v2Latest" + ], + "description": "Parquet format version", + "default": "v2Latest" + }, + "root_repetition": { + "type": "string", + "enum": [ + "undefined", + "required", + "optional", + "repeated" + ], + "description": "Root repetition", + "default": "repeated" + } + }, "additionalProperties": false, "type": "object", "description": "CloudQuery Parquet file output spec."