Skip to content

Commit

Permalink
feat: S3 initialize empty objects when using Athena and Parquet (#17793)
Browse files Browse the repository at this point in the history
#### Summary
Expose new parameter that enables users to set whether we should generate empty objects for tables that generated no data
  • Loading branch information
bbernays committed Apr 29, 2024
1 parent 0fef5b0 commit f3cd612
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 27 deletions.
8 changes: 5 additions & 3 deletions plugins/destination/s3/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

type Client struct {
plugin.UnimplementedSource
streamingbatchwriter.IgnoreMigrateTable
streamingbatchwriter.UnimplementedDeleteStale
streamingbatchwriter.UnimplementedDeleteRecords
syncID string
Expand All @@ -36,12 +35,15 @@ type Client struct {
s3Client *s3.Client
uploader *manager.Uploader
downloader *manager.Downloader

initializedTables map[string]string
}

func New(ctx context.Context, logger zerolog.Logger, s []byte, opts plugin.NewClientOptions) (plugin.Client, error) {
c := &Client{
logger: logger.With().Str("module", "s3").Logger(),
syncID: opts.InvocationID,
logger: logger.With().Str("module", "s3").Logger(),
syncID: opts.InvocationID,
initializedTables: make(map[string]string),
}
if opts.NoConnection {
return c, nil
Expand Down
42 changes: 41 additions & 1 deletion plugins/destination/s3/client/spec/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,47 @@ func (s Spec) JSONSchemaExtend(sc *jsonschema.Schema) {
},
}

sc.AllOf = append(sc.AllOf, cleanPath, noRotateNoUUID, noRotateNoBatch, uuidWhenBatching)
forceParquet := func(sc *jsonschema.Schema, field string) *jsonschema.Schema {
val := *sc.Properties.Value(field)
val.Enum = nil
val.Const = "parquet"
return &val
}

// write_empty_objects_for_empty_tables enabled -> require parquet format
parquetEmptyObjects := &jsonschema.Schema{
Title: "write_empty_objects_for_empty_tables requires parquet format",
If: &jsonschema.Schema{
Properties: func() *orderedmap.OrderedMap[string, *jsonschema.Schema] {
noRotate := *sc.Properties.Value("write_empty_objects_for_empty_tables")
noRotate.Default = nil
noRotate.Const = true
noRotate.Description = ""
properties := orderedmap.New[string, *jsonschema.Schema]()
properties.Set("write_empty_objects_for_empty_tables", &noRotate)
return properties
}(),
Required: []string{"write_empty_objects_for_empty_tables"},
},
Then: &jsonschema.Schema{
// require properties not to be empty or null
Properties: func() *orderedmap.OrderedMap[string, *jsonschema.Schema] {
properties := jsonschema.NewProperties()
properties.Set("format", forceParquet(sc, "format"))
return properties
}(),
Required: []string{"format"},
},
Extras: map[string]any{
"errorMessage": map[string]any{
"properties": map[string]any{
"format": "when using `write_empty_objects_for_empty_tables` format must be set to `parquet`",
},
},
},
}

sc.AllOf = append(sc.AllOf, cleanPath, noRotateNoUUID, noRotateNoBatch, uuidWhenBatching, parquetEmptyObjects)
}

//go:embed schema.json
Expand Down
36 changes: 36 additions & 0 deletions plugins/destination/s3/client/spec/schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions plugins/destination/s3/client/spec/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,5 +303,22 @@ func TestSpecJSONSchema(t *testing.T) {
Name: "server side encryption (success)",
Spec: `{"format": "csv", "path": "{{UUID}}", "bucket": "b", "region": "r", "server_side_encryption_configuration": {"server_side_encryption":"AES256", "sse_kms_key_id":"1234-5678"}}`,
},
{
Name: "Empty objects false format parquet",
Spec: `{"format": "parquet", "path": "{{UUID}}", "bucket": "b", "region": "r","write_empty_objects_for_empty_tables":false}`,
},
{
Name: "Empty objects false format csv",
Spec: `{"format": "csv", "path": "{{UUID}}", "bucket": "b", "region": "r","write_empty_objects_for_empty_tables":false}`,
},
{
Name: "Empty objects true",
Spec: `{"format": "parquet", "path": "{{UUID}}", "bucket": "b", "region": "r","write_empty_objects_for_empty_tables":true}`,
},
{
Name: "Empty objects true format must be parquet",
Spec: `{"format": "csv", "path": "{{UUID}}", "bucket": "b", "region": "r","write_empty_objects_for_empty_tables":true}`,
Err: true,
},
})
}
7 changes: 7 additions & 0 deletions plugins/destination/s3/client/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ type Spec struct {
// This option is intended to be used when using a custom endpoint using the `endpoint` option.
EndpointSkipTLSVerify bool `json:"endpoint_skip_tls_verify,omitempty" jsonschema:"default=false"`

// If set to `true`, the plugin will create empty parquet files with the table headers and data types for those tables that have no data.
GenerateEmptyObjects bool `json:"write_empty_objects_for_empty_tables,omitempty" jsonschema:"default=false"`

// Maximum number of items that may be grouped together to be written in a single write.
//
// Defaults to `10000` unless `no_rotate` is `true` (will be `0` then).
Expand Down Expand Up @@ -165,6 +168,10 @@ func (s *Spec) Validate() error {
return fmt.Errorf("`path` should not contain relative paths or duplicate slashes")
}

if s.GenerateEmptyObjects && s.Format != filetypes.FormatTypeParquet {
return fmt.Errorf("`write_empty_objects_for_empty_tables` can only be used with `parquet` format")
}

if s.NoRotate {
if strings.Contains(s.Path, varUUID) {
return fmt.Errorf("`path` should not contain %s when `no_rotate` = true", varUUID)
Expand Down
13 changes: 8 additions & 5 deletions plugins/destination/s3/client/spec/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ func TestSpec_Validate(t *testing.T) {
{Give: Spec{Path: "test/path", FileSpec: filetypes.FileSpec{Format: "json"}, Bucket: "mybucket", Region: region, BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: false},
{Give: Spec{Path: "test/path", FileSpec: filetypes.FileSpec{Format: "json"}, Region: "region", BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: true}, // no bucket
{Give: Spec{Path: "test/path/{{TABLE}}.{{UUID}}", FileSpec: filetypes.FileSpec{Format: "json"}, NoRotate: false, Bucket: "mybucket", Region: region, BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: false},
{Give: Spec{Path: "test/path/{{TABLE}}.{{UUID}}", FileSpec: filetypes.FileSpec{Format: "json"}, NoRotate: true, Bucket: "mybucket", Region: region, BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: true}, // can't have no_rotate and {{UUID}}
{Give: Spec{Path: "test/path/{{TABLE}}", FileSpec: filetypes.FileSpec{Format: "json"}, NoRotate: false, Bucket: "mybucket", Region: region, BatchSize: &one, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: true}, // can't have nonzero batch size and no {{UUID}}
{Give: Spec{Path: "/test/path/{{TABLE}}.{{UUID}}", FileSpec: filetypes.FileSpec{Format: "json"}, NoRotate: true, Bucket: "mybucket", Region: region, BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: true}, // begins with a slash
{Give: Spec{Path: "//test/path/{{TABLE}}.{{UUID}}", FileSpec: filetypes.FileSpec{Format: "json"}, NoRotate: true, Bucket: "mybucket", Region: region, BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: true}, // duplicate slashes
{Give: Spec{Path: "test//path", FileSpec: filetypes.FileSpec{Format: "json"}, Bucket: "mybucket", Region: region, BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: true}, // duplicate slashes
{Give: Spec{Path: "test/path/{{TABLE}}.{{UUID}}", FileSpec: filetypes.FileSpec{Format: "parquet"}, NoRotate: false, Bucket: "mybucket", Region: region, BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0, GenerateEmptyObjects: true}, WantErr: false},
{Give: Spec{Path: "test/path/{{TABLE}}.{{UUID}}", FileSpec: filetypes.FileSpec{Format: "parquet"}, NoRotate: false, Bucket: "mybucket", Region: region, BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0, GenerateEmptyObjects: false}, WantErr: false},
{Give: Spec{Path: "test/path/{{TABLE}}.{{UUID}}", FileSpec: filetypes.FileSpec{Format: "json"}, NoRotate: false, Bucket: "mybucket", Region: region, BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0, GenerateEmptyObjects: true}, WantErr: true}, // when empty_objects is enabled, format must be parquet
{Give: Spec{Path: "test/path/{{TABLE}}.{{UUID}}", FileSpec: filetypes.FileSpec{Format: "json"}, NoRotate: true, Bucket: "mybucket", Region: region, BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: true}, // can't have no_rotate and {{UUID}}
{Give: Spec{Path: "test/path/{{TABLE}}", FileSpec: filetypes.FileSpec{Format: "json"}, NoRotate: false, Bucket: "mybucket", Region: region, BatchSize: &one, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: true}, // can't have nonzero batch size and no {{UUID}}
{Give: Spec{Path: "/test/path/{{TABLE}}.{{UUID}}", FileSpec: filetypes.FileSpec{Format: "json"}, NoRotate: true, Bucket: "mybucket", Region: region, BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: true}, // begins with a slash
{Give: Spec{Path: "//test/path/{{TABLE}}.{{UUID}}", FileSpec: filetypes.FileSpec{Format: "json"}, NoRotate: true, Bucket: "mybucket", Region: region, BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: true}, // duplicate slashes
{Give: Spec{Path: "test//path", FileSpec: filetypes.FileSpec{Format: "json"}, Bucket: "mybucket", Region: region, BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: true}, // duplicate slashes
}
for i, tc := range cases {
tc := tc
Expand Down
68 changes: 50 additions & 18 deletions plugins/destination/s3/client/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,50 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/cloudquery/filetypes/v4"
"github.com/cloudquery/plugin-sdk/v4/message"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/cloudquery/plugin-sdk/v4/types"
"github.com/google/uuid"
)

var reInvalidJSONKey = regexp.MustCompile(`\W`)

func (c *Client) createObject(ctx context.Context, table *schema.Table, objKey string) (*filetypes.Stream, error) {
s, err := c.Client.StartStream(table, func(r io.Reader) error {
params := &s3.PutObjectInput{
Bucket: aws.String(c.spec.Bucket),
Key: aws.String(objKey),
Body: r,
ContentType: aws.String(c.spec.GetContentType()),
}

sseConfiguration := c.spec.ServerSideEncryptionConfiguration
if sseConfiguration != nil {
params.SSEKMSKeyId = &sseConfiguration.SSEKMSKeyId
params.ServerSideEncryption = sseConfiguration.ServerSideEncryption
}

_, err := c.uploader.Upload(ctx, params)
return err
})
return s, err
}

func (c *Client) WriteTable(ctx context.Context, msgs <-chan *message.WriteInsert) error {
var s *filetypes.Stream

for msg := range msgs {
if s == nil {
table := msg.GetTable()

objKey := c.spec.ReplacePathVariables(table.Name, uuid.NewString(), time.Now().UTC(), c.syncID)
// if object was already initialized, use the same key
// We don't need any locking here because all messages for the same table are processed sequentially
if val, ok := c.initializedTables[table.Name]; ok {
objKey = val
delete(c.initializedTables, table.Name)
}

var err error
s, err = c.Client.StartStream(table, func(r io.Reader) error {
params := &s3.PutObjectInput{
Bucket: aws.String(c.spec.Bucket),
Key: aws.String(objKey),
Body: r,
ContentType: aws.String(c.spec.GetContentType()),
}

sseConfiguration := c.spec.ServerSideEncryptionConfiguration
if sseConfiguration != nil {
params.SSEKMSKeyId = &sseConfiguration.SSEKMSKeyId
params.ServerSideEncryption = sseConfiguration.ServerSideEncryption
}

_, err := c.uploader.Upload(ctx, params)
return err
})
s, err = c.createObject(ctx, table, objKey)
if err != nil {
return err
}
Expand Down Expand Up @@ -74,6 +85,27 @@ func (c *Client) Write(ctx context.Context, msgs <-chan message.WriteMessage) er
return c.writer.Write(ctx, msgs)
}

func (c *Client) MigrateTable(ctx context.Context, ch <-chan *message.WriteMigrateTable) error {
for msg := range ch {
if !c.spec.GenerateEmptyObjects {
continue
}
table := msg.GetTable()
objKey := c.spec.ReplacePathVariables(table.Name, uuid.NewString(), time.Now().UTC(), c.syncID)
// We don't need any locking here because all messages for the same table are processed sequentially
c.initializedTables[table.Name] = objKey
s, err := c.createObject(ctx, table, objKey)
if err != nil {
return err
}
err = s.Finish()
if err != nil {
return err
}
}
return nil
}

// sanitizeRecordJSONKeys replaces all invalid characters in JSON keys with underscores. This is required
// for compatibility with Athena.
func sanitizeRecordJSONKeys(record arrow.Record) (arrow.Record, error) {
Expand Down
1 change: 1 addition & 0 deletions plugins/destination/s3/docs/_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ spec:
# compression: "" # options: gzip
# no_rotate: false
# athena: false # <- set this to true for Athena compatibility
# write_empty_objects_for_empty_tables: false # <- set this to true if using with the CloudQuery Compliance policies
# test_write: true # tests the ability to write to the bucket before processing the data
# endpoint: "" # Endpoint to use for S3 API calls.
# endpoint_skip_tls_verify # Disable TLS verification if using an untrusted certificate
Expand Down
4 changes: 4 additions & 0 deletions plugins/destination/s3/docs/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ This is the (nested) spec used by the CSV destination Plugin.
When `athena` is set to `true`, the S3 plugin will sanitize keys in JSON columns to be compatible with the Hive Metastore / Athena.
This allows tables to be created with a Glue Crawler and then queried via Athena, without changes to the table schema.

- `write_empty_objects_for_empty_tables` (`boolean`) (optional) (default: `false`)

By default only tables with resources are persisted to objects during the sync. If you'd like to persist empty objects for empty tables enable this option. Useful when using CloudQuery Compliance policies to ensure all tables have their schema populated by a query engine like Athena

- `test_write` (`boolean`) (optional) (default: `true`)

Ensure write access to the given bucket and path by writing a test object on each sync.
Expand Down

0 comments on commit f3cd612

Please sign in to comment.