Skip to content

Commit

Permalink
[exporter/awss3] Add compression option (27872) (open-telemetry#31622)
Browse files Browse the repository at this point in the history
**Description:**
Add `compression` option to compress files using `compress/gzip` library
before uploading to S3.

**Link to tracking Issue:**
Fixes
open-telemetry#27872

**Testing:** 

Sent n number of traces through the S3 exporter using k6 to compare
sizes. Used Minio as the S3 backend.
| Marshaler | Compression | k6 Requests | k6 Data Sent | S3 Objects | S3
Total Size |
| --- | --- | --- | --- | --- | --- |
| otlp_json | No | 101 | 118 KB | 101 | 36 KB | 
| otlp_proto | No | 101 | 118 KB | 101 | 11 KB | 
| otlp_json | Yes | 101 | 118 KB | 101 | 21 KB | 
| otlp_proto | Yes | 101 | 118 KB | 101 | 9.9 KB | 

Additionally, new unit test to check file name.

**Documentation:**
- Updated README.md file
  • Loading branch information
pepperkick authored and DougManton committed Mar 13, 2024
1 parent 2b19afd commit 73ee3ba
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 30 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add-compression-option.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awss3exporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "add `compression` option to enable file compression on S3"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [ 27872 ]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Add `compression` option to compress files using `compress/gzip` library before uploading to S3.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [ user ]
29 changes: 17 additions & 12 deletions exporter/awss3exporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,19 @@ This exporter targets to support proto/json format.

The following exporter configuration parameters are supported.

| Name | Description | Default |
|:----------------------|:---------------------------------------------------------------------------------------------------------------------------------------------|-------------|
| `region` | AWS region. | "us-east-1" |
| `s3_bucket` | S3 bucket | |
| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | |
| `s3_partition` | time granularity of S3 key: hour or minute | "minute" |
| `role_arn` | the Role ARN to be assumed | |
| `file_prefix` | file prefix defined by user | |
| `marshaler` | marshaler used to produce output data | `otlp_json` |
| `endpoint` | overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | |
| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false |
| `disable_ssl` | set this to `true` to disable SSL when sending requests | false |
| Name | Description | Default |
|:----------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|-------------|
| `region` | AWS region. | "us-east-1" |
| `s3_bucket` | S3 bucket | |
| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | |
| `s3_partition` | time granularity of S3 key: hour or minute | "minute" |
| `role_arn` | the Role ARN to be assumed | |
| `file_prefix` | file prefix defined by user | |
| `marshaler` | marshaler used to produce output data | `otlp_json` |
| `endpoint` | overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | |
| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false |
| `disable_ssl` | set this to `true` to disable SSL when sending requests | false |
| `compression` | should the file be compressed | none |

### Marshaler

Expand All @@ -46,6 +47,10 @@ Marshaler determines the format of data sent to AWS S3. Currently, the following
- `body`: export the log body as string.
**This format is supported only for logs.**

### Compression
- `none` (default): No compression will be applied
- `gzip`: Files will be compressed with gzip. **This does not support `sumo_ic`marshaler.**

# Example Configuration

Following example configuration defines to store output in 'eu-central' region and bucket named 'databucket'.
Expand Down
30 changes: 21 additions & 9 deletions exporter/awss3exporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,23 @@ package awss3exporter // import "github.com/open-telemetry/opentelemetry-collect
import (
"errors"

"go.opentelemetry.io/collector/config/configcompression"
"go.uber.org/multierr"
)

// S3UploaderConfig contains aws s3 uploader related config to controls things
// like bucket, prefix, batching, connections, retries, etc.
type S3UploaderConfig struct {
Region string `mapstructure:"region"`
S3Bucket string `mapstructure:"s3_bucket"`
S3Prefix string `mapstructure:"s3_prefix"`
S3Partition string `mapstructure:"s3_partition"`
FilePrefix string `mapstructure:"file_prefix"`
Endpoint string `mapstructure:"endpoint"`
RoleArn string `mapstructure:"role_arn"`
S3ForcePathStyle bool `mapstructure:"s3_force_path_style"`
DisableSSL bool `mapstructure:"disable_ssl"`
Region string `mapstructure:"region"`
S3Bucket string `mapstructure:"s3_bucket"`
S3Prefix string `mapstructure:"s3_prefix"`
S3Partition string `mapstructure:"s3_partition"`
FilePrefix string `mapstructure:"file_prefix"`
Endpoint string `mapstructure:"endpoint"`
RoleArn string `mapstructure:"role_arn"`
S3ForcePathStyle bool `mapstructure:"s3_force_path_style"`
DisableSSL bool `mapstructure:"disable_ssl"`
Compression configcompression.Type `mapstructure:"compression"`
}

type MarshalerType string
Expand Down Expand Up @@ -48,5 +50,15 @@ func (c *Config) Validate() error {
if c.S3Uploader.S3Bucket == "" {
errs = multierr.Append(errs, errors.New("bucket is required"))
}
compression := c.S3Uploader.Compression
if compression.IsCompressed() {
if compression != configcompression.TypeGzip {
errs = multierr.Append(errs, errors.New("unknown compression type"))
}

if c.MarshalerName == SumoIC {
errs = multierr.Append(errs, errors.New("marshaler does not support compression"))
}
}
return errs
}
42 changes: 42 additions & 0 deletions exporter/awss3exporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,45 @@ func TestMarshallerName(t *testing.T) {
)

}

func TestCompressionName(t *testing.T) {
factories, err := otelcoltest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Exporters[factory.Type()] = factory
cfg, err := otelcoltest.LoadConfigAndValidate(
filepath.Join("testdata", "compression.yaml"), factories)

require.NoError(t, err)
require.NotNil(t, cfg)

e := cfg.Exporters[component.MustNewID("awss3")].(*Config)

assert.Equal(t, e,
&Config{
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "foo",
S3Partition: "minute",
Compression: "gzip",
},
MarshalerName: "otlp_json",
},
)

e = cfg.Exporters[component.MustNewIDWithName("awss3", "proto")].(*Config)

assert.Equal(t, e,
&Config{
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "bar",
S3Partition: "minute",
Compression: "none",
},
MarshalerName: "otlp_proto",
},
)

}
1 change: 1 addition & 0 deletions exporter/awss3exporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/aws/aws-sdk-go v1.50.27
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.96.1-0.20240306115632-b2693620eff6
go.opentelemetry.io/collector/config/configcompression v0.96.1-0.20240306115632-b2693620eff6
go.opentelemetry.io/collector/confmap v0.96.1-0.20240306115632-b2693620eff6
go.opentelemetry.io/collector/consumer v0.96.1-0.20240306115632-b2693620eff6
go.opentelemetry.io/collector/exporter v0.96.1-0.20240306115632-b2693620eff6
Expand Down
2 changes: 2 additions & 0 deletions exporter/awss3exporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 34 additions & 8 deletions exporter/awss3exporter/s3_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package awss3exporter // import "github.com/open-telemetry/opentelemetry-collect

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"math/rand"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"go.opentelemetry.io/collector/config/configcompression"
)

type s3Writer struct {
Expand All @@ -38,12 +40,17 @@ func randomInRange(low, hi int) int {
return low + rand.Intn(hi-low)
}

func getS3Key(time time.Time, keyPrefix string, partition string, filePrefix string, metadata string, fileformat string) string {
func getS3Key(time time.Time, keyPrefix string, partition string, filePrefix string, metadata string, fileformat string, compression configcompression.Type) string {
timeKey := getTimeKey(time, partition)
randomID := randomInRange(100000000, 999999999)

s3Key := keyPrefix + "/" + timeKey + "/" + filePrefix + metadata + "_" + strconv.Itoa(randomID) + "." + fileformat

// add ".gz" extension to files if compression is enabled
if compression == configcompression.TypeGzip {
s3Key += ".gz"
}

return s3Key
}

Expand Down Expand Up @@ -77,10 +84,28 @@ func (s3writer *s3Writer) writeBuffer(_ context.Context, buf []byte, config *Con
now := time.Now()
key := getS3Key(now,
config.S3Uploader.S3Prefix, config.S3Uploader.S3Partition,
config.S3Uploader.FilePrefix, metadata, format)

// create a reader from data data in memory
reader := bytes.NewReader(buf)
config.S3Uploader.FilePrefix, metadata, format, config.S3Uploader.Compression)

encoding := ""
var reader *bytes.Reader
if config.S3Uploader.Compression == configcompression.TypeGzip {
// set s3 uploader content encoding to "gzip"
encoding = "gzip"
var gzipContents bytes.Buffer

// create a gzip from data
gzipWriter := gzip.NewWriter(&gzipContents)
_, err := gzipWriter.Write(buf)
if err != nil {
return err
}
gzipWriter.Close()

reader = bytes.NewReader(gzipContents.Bytes())
} else {
// create a reader from data in memory
reader = bytes.NewReader(buf)
}

sessionConfig := getSessionConfig(config)
sess, err := getSession(config, sessionConfig)
Expand All @@ -92,9 +117,10 @@ func (s3writer *s3Writer) writeBuffer(_ context.Context, buf []byte, config *Con
uploader := s3manager.NewUploader(sess)

_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(config.S3Uploader.S3Bucket),
Key: aws.String(key),
Body: reader,
Bucket: aws.String(config.S3Uploader.S3Bucket),
Key: aws.String(key),
Body: reader,
ContentEncoding: &encoding,
})
if err != nil {
return err
Expand Down
16 changes: 15 additions & 1 deletion exporter/awss3exporter/s3_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,21 @@ func TestS3Key(t *testing.T) {
require.NotNil(t, tm)

re := regexp.MustCompile(`keyprefix/year=2022/month=06/day=05/hour=00/minute=00/fileprefixlogs_([0-9]+).json`)
s3Key := getS3Key(tm, "keyprefix", "minute", "fileprefix", "logs", "json")
s3Key := getS3Key(tm, "keyprefix", "minute", "fileprefix", "logs", "json", "")
matched := re.MatchString(s3Key)
assert.Equal(t, true, matched)
}

func TestS3KeyOfCompressedFile(t *testing.T) {
const layout = "2006-01-02"

tm, err := time.Parse(layout, "2022-06-05")

assert.NoError(t, err)
require.NotNil(t, tm)

re := regexp.MustCompile(`keyprefix/year=2022/month=06/day=05/hour=00/minute=00/fileprefixlogs_([0-9]+).json.gz`)
s3Key := getS3Key(tm, "keyprefix", "minute", "fileprefix", "logs", "json", "gzip")
matched := re.MatchString(s3Key)
assert.Equal(t, true, matched)
}
Expand Down
26 changes: 26 additions & 0 deletions exporter/awss3exporter/testdata/compression.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
receivers:
nop:

exporters:
awss3:
s3uploader:
s3_bucket: "foo"
compression: "gzip"
marshaler: otlp_json

awss3/proto:
s3uploader:
s3_bucket: "bar"
compression: "none"
marshaler: otlp_proto


processors:
nop:

service:
pipelines:
traces:
receivers: [nop]
processors: [nop]
exporters: [awss3, awss3/proto]

0 comments on commit 73ee3ba

Please sign in to comment.