From e3bf9bf2d3addb226ddeb6a9a2887823b1ff7e29 Mon Sep 17 00:00:00 2001 From: erezrokah Date: Wed, 31 Jul 2024 18:05:52 +0100 Subject: [PATCH 1/3] feat: Support Parquet version spec option --- parquet/spec.go | 52 +++++++++++++++++++++++++++++++++++++++----- parquet/spec_test.go | 9 ++++++++ parquet/write.go | 3 ++- 3 files changed, 58 insertions(+), 6 deletions(-) diff --git a/parquet/spec.go b/parquet/spec.go index 4cdc4c63..a8948268 100644 --- a/parquet/spec.go +++ b/parquet/spec.go @@ -1,21 +1,63 @@ package parquet -import "github.com/invopop/jsonschema" +import ( + "fmt" + "slices" + "strings" -// nolint:revive -type ParquetSpec struct{} + "github.com/apache/arrow/go/v17/parquet" + "github.com/invopop/jsonschema" +) + +var allowedVersions = []string{"v1.0", "v2.4", "v2.6", "v2Latest"} + +type ParquetSpec struct { + Version string `json:"version,omitempty"` +} + +func (s *ParquetSpec) GetVersion() parquet.Version { + switch s.Version { + case "v1.0": + return parquet.V1_0 + case "v2.4": + return parquet.V2_4 + case "v2.6": + return parquet.V2_6 + case "v2Latest": + return parquet.V2_LATEST + } + return parquet.V2_LATEST +} func (ParquetSpec) JSONSchema() *jsonschema.Schema { + properties := jsonschema.NewProperties() + allowedVersionsAsAny := make([]interface{}, len(allowedVersions)) + for i, v := range allowedVersions { + allowedVersionsAsAny[i] = v + } + properties.Set("version", &jsonschema.Schema{ + Type: "string", + Description: "Parquet format version", + Enum: allowedVersionsAsAny, + Default: "v2Latest", + }) return &jsonschema.Schema{ Description: "CloudQuery Parquet file output spec.", + Properties: properties, Type: "object", AdditionalProperties: jsonschema.FalseSchema, // "additionalProperties": false } } -func (*ParquetSpec) SetDefaults() { +func (s *ParquetSpec) SetDefaults() { + if s.Version == "" { + s.Version = "v2Latest" + } } -func (*ParquetSpec) Validate() error { +func (s *ParquetSpec) Validate() error { + if !slices.Contains(allowedVersions, s.Version) { + return fmt.Errorf("invalid version: %s. Allowed values are %s", s.Version, strings.Join(allowedVersions, ", ")) + } return nil } diff --git a/parquet/spec_test.go b/parquet/spec_test.go index 9b619a5f..5dc861cf 100644 --- a/parquet/spec_test.go +++ b/parquet/spec_test.go @@ -21,5 +21,14 @@ func TestSpec_JSONSchema(t *testing.T) { Err: true, Spec: `{"extra":true}`, }, + { + Name: "invalid version", + ErrorMessage: "at '/version': value must be one of 'v1.0', 'v2.4', 'v2.6', 'v2Latest'", + Spec: `{"version":"invalid"}`, + }, + { + Name: "valid version", + Spec: `{"version":"v1.0"}`, + }, }) } diff --git a/parquet/write.go b/parquet/write.go index 8360eb29..6f907459 100644 --- a/parquet/write.go +++ b/parquet/write.go @@ -21,10 +21,11 @@ type Handle struct { var _ ftypes.Handle = (*Handle)(nil) -func (*Client) WriteHeader(w io.Writer, t *schema.Table) (ftypes.Handle, error) { +func (c *Client) WriteHeader(w io.Writer, t *schema.Table) (ftypes.Handle, error) { props := parquet.NewWriterProperties( parquet.WithMaxRowGroupLength(128*1024*1024), // 128M parquet.WithCompression(compress.Codecs.Snappy), + parquet.WithVersion(c.spec.GetVersion()), ) arrprops := pqarrow.DefaultWriterProps() newSchema := convertSchema(t.ToArrowSchema()) From e3ddab5b07b3c0aff97dda7cb700c0fc84cbcf9d Mon Sep 17 00:00:00 2001 From: erezrokah Date: Fri, 9 Aug 2024 19:09:46 +0100 Subject: [PATCH 2/3] feat: Expose Parquet version and root node repetition spec options --- parquet/spec.go | 24 +++++++++++++++++++++++- parquet/write.go | 1 + schema.json | 13 +++++++++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/parquet/spec.go b/parquet/spec.go index a8948268..291cf25e 100644 --- a/parquet/spec.go +++ b/parquet/spec.go @@ -10,9 +10,11 @@ import ( ) var allowedVersions = []string{"v1.0", "v2.4", "v2.6", "v2Latest"} +var allowedRootRepetitions = []string{"undefined", "required", "optional", "repeated"} type ParquetSpec struct { - Version string `json:"version,omitempty"` + Version string `json:"version,omitempty"` + RootRepetition string `json:"root_repetition,omitempty"` } func (s *ParquetSpec) GetVersion() parquet.Version { @@ -29,6 +31,20 @@ func (s *ParquetSpec) GetVersion() parquet.Version { return parquet.V2_LATEST } +func (s *ParquetSpec) GetRootRepetition() parquet.Repetition { + switch s.RootRepetition { + case "undefined": + return parquet.Repetitions.Undefined + case "required": + return parquet.Repetitions.Required + case "optional": + return parquet.Repetitions.Optional + case "repeated": + return parquet.Repetitions.Repeated + } + return parquet.Repetitions.Repeated +} + func (ParquetSpec) JSONSchema() *jsonschema.Schema { properties := jsonschema.NewProperties() allowedVersionsAsAny := make([]interface{}, len(allowedVersions)) @@ -53,11 +69,17 @@ func (s *ParquetSpec) SetDefaults() { if s.Version == "" { s.Version = "v2Latest" } + if s.RootRepetition == "" { + s.RootRepetition = "repeated" + } } func (s *ParquetSpec) Validate() error { if !slices.Contains(allowedVersions, s.Version) { return fmt.Errorf("invalid version: %s. Allowed values are %s", s.Version, strings.Join(allowedVersions, ", ")) } + if !slices.Contains(allowedRootRepetitions, s.RootRepetition) { + return fmt.Errorf("invalid rootRepetition: %s. Allowed values are %s", s.RootRepetition, strings.Join(allowedRootRepetitions, ", ")) + } return nil } diff --git a/parquet/write.go b/parquet/write.go index 6f907459..f80bcc0b 100644 --- a/parquet/write.go +++ b/parquet/write.go @@ -26,6 +26,7 @@ func (c *Client) WriteHeader(w io.Writer, t *schema.Table) (ftypes.Handle, error parquet.WithMaxRowGroupLength(128*1024*1024), // 128M parquet.WithCompression(compress.Codecs.Snappy), parquet.WithVersion(c.spec.GetVersion()), + parquet.WithRootRepetition(c.spec.GetRootRepetition()), ) arrprops := pqarrow.DefaultWriterProps() newSchema := convertSchema(t.ToArrowSchema()) diff --git a/schema.json b/schema.json index 1554e8e3..9d83263b 100644 --- a/schema.json +++ b/schema.json @@ -129,6 +129,19 @@ "description": "CloudQuery JSON file output spec." }, "ParquetSpec": { + "properties": { + "version": { + "type": "string", + "enum": [ + "v1.0", + "v2.4", + "v2.6", + "v2Latest" + ], + "description": "Parquet format version", + "default": "v2Latest" + } + }, "additionalProperties": false, "type": "object", "description": "CloudQuery Parquet file output spec." From a2bab83380e08e680dd0f9f6caac56b2715da2c4 Mon Sep 17 00:00:00 2001 From: erezrokah Date: Fri, 9 Aug 2024 19:18:32 +0100 Subject: [PATCH 3/3] fix: Lint --- parquet/spec.go | 15 ++++++++++++++- parquet/spec_test.go | 9 +++++++++ schema.json | 11 +++++++++++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/parquet/spec.go b/parquet/spec.go index 291cf25e..2ede418f 100644 --- a/parquet/spec.go +++ b/parquet/spec.go @@ -12,6 +12,7 @@ import ( var allowedVersions = []string{"v1.0", "v2.4", "v2.6", "v2Latest"} var allowedRootRepetitions = []string{"undefined", "required", "optional", "repeated"} +// nolint:revive type ParquetSpec struct { Version string `json:"version,omitempty"` RootRepetition string `json:"root_repetition,omitempty"` @@ -47,7 +48,7 @@ func (s *ParquetSpec) GetRootRepetition() parquet.Repetition { func (ParquetSpec) JSONSchema() *jsonschema.Schema { properties := jsonschema.NewProperties() - allowedVersionsAsAny := make([]interface{}, len(allowedVersions)) + allowedVersionsAsAny := make([]any, len(allowedVersions)) for i, v := range allowedVersions { allowedVersionsAsAny[i] = v } @@ -57,6 +58,18 @@ func (ParquetSpec) JSONSchema() *jsonschema.Schema { Enum: allowedVersionsAsAny, Default: "v2Latest", }) + + allowedRootRepetitionsAsAny := make([]any, len(allowedRootRepetitions)) + for i, v := range allowedRootRepetitions { + allowedRootRepetitionsAsAny[i] = v + } + properties.Set("root_repetition", &jsonschema.Schema{ + Type: "string", + Description: "Root repetition", + Enum: allowedRootRepetitionsAsAny, + Default: "repeated", + }) + return &jsonschema.Schema{ Description: "CloudQuery Parquet file output spec.", Properties: properties, diff --git a/parquet/spec_test.go b/parquet/spec_test.go index 5dc861cf..dd3b1dce 100644 --- a/parquet/spec_test.go +++ b/parquet/spec_test.go @@ -30,5 +30,14 @@ func TestSpec_JSONSchema(t *testing.T) { Name: "valid version", Spec: `{"version":"v1.0"}`, }, + { + Name: "valid root_repetition", + Spec: `{"root_repetition":"undefined"}`, + }, + { + Name: "invalid root_repetition", + ErrorMessage: "at '/root_repetition': value must be one of 'undefined', 'required', 'optional', 'repeated'", + Spec: `{"root_repetition":"invalid"}`, + }, }) } diff --git a/schema.json b/schema.json index 9d83263b..8c38b33f 100644 --- a/schema.json +++ b/schema.json @@ -140,6 +140,17 @@ ], "description": "Parquet format version", "default": "v2Latest" + }, + "root_repetition": { + "type": "string", + "enum": [ + "undefined", + "required", + "optional", + "repeated" + ], + "description": "Root repetition", + "default": "repeated" } }, "additionalProperties": false,