Skip to content

Commit

Permalink
internal/ctlog: upload immutable objects with Cache-Control and If-Match
Browse files Browse the repository at this point in the history
  • Loading branch information
FiloSottile committed Mar 4, 2024
1 parent 932c15c commit 1046953
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 20 deletions.
12 changes: 8 additions & 4 deletions internal/ctlog/ctlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,13 @@ type UploadOptions struct {
// Compress is true if the data is compressible and should be compressed
// before uploading if possible.
Compress bool

// Immutable is true if the data is never updated after being uploaded.
Immutable bool
}

var optsCompress = &UploadOptions{Compress: true}
var optsHashTile = &UploadOptions{Immutable: true}
var optsDataTile = &UploadOptions{Compress: true, Immutable: true}
var optsText = &UploadOptions{ContentType: "text/plain; charset=utf-8"}

// A LockBackend is a database that supports compare-and-swap operations.
Expand Down Expand Up @@ -730,7 +734,7 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
l.m.SeqDataTileSize.Observe(float64(len(dataTile)))
tileCount++
data := dataTile // data is captured by the g.Go function.
g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), data, optsCompress) })
g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), data, optsDataTile) })
dataTile = nil
}
}
Expand All @@ -744,7 +748,7 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
"tree_size", n, "tile", tile, "size", len(dataTile))
l.m.SeqDataTileSize.Observe(float64(len(dataTile)))
tileCount++
g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), dataTile, optsCompress) })
g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), dataTile, optsDataTile) })
}

// Produce and upload new tree tiles.
Expand All @@ -763,7 +767,7 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
l.c.Log.DebugContext(ctx, "uploading tree tile", "old_tree_size", oldSize,
"tree_size", n, "tile", tile, "size", len(data))
tileCount++
g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), data, nil) })
g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), data, optsHashTile) })
}

if err := g.Wait(); err != nil {
Expand Down
46 changes: 30 additions & 16 deletions internal/ctlog/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws/retry"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
awshttp "github.com/aws/smithy-go/transport/http"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
Expand Down Expand Up @@ -131,6 +132,32 @@ func (s *S3Backend) Upload(ctx context.Context, key string, data []byte, opts *U
data = b.Bytes()
contentEncoding = aws.String("gzip")
}
var cacheControl *string
if opts != nil && opts.Immutable {
cacheControl = aws.String("public, max-age=604800, immutable")
}
putObject := func() (*s3.PutObjectOutput, error) {
return s.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.keyPrefix + key),
Body: bytes.NewReader(data),
ContentLength: aws.Int64(int64(len(data))),
ContentEncoding: contentEncoding,
ContentType: contentType,
CacheControl: cacheControl,
}, func(options *s3.Options) {
// As an extra safety measure against concurrent sequencers (which are
// especially likely on Fly), use Tigris conditional requests to only
// create immutable objects if they don't exist yet. The LockBackend
// protects against signing a split tree, but there is a risk that the
// losing sequencer will overwrite the data tiles of the winning one.
// Without S3 Versioning, that's potentially irrecoverable.
if opts.Immutable && options.BaseEndpoint != nil &&
*options.BaseEndpoint == "https://fly.storage.tigris.dev" {
options.APIOptions = append(options.APIOptions, awshttp.AddHeaderValue("If-Match", ""))
}
})
}
ctx, cancel := context.WithCancelCause(ctx)
hedgeErr := make(chan error, 1)
go func() {
Expand All @@ -140,27 +167,13 @@ func (s *S3Backend) Upload(ctx context.Context, key string, data []byte, opts *U
case <-ctx.Done():
case <-timer.C:
s.hedgeRequests.Inc()
_, err := s.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.keyPrefix + key),
Body: bytes.NewReader(data),
ContentLength: aws.Int64(int64(len(data))),
ContentEncoding: contentEncoding,
ContentType: contentType,
})
_, err := putObject()
s.log.DebugContext(ctx, "S3 PUT hedge", "key", key, "err", err)
hedgeErr <- err
cancel(errors.New("competing request succeeded"))
}
}()
_, err := s.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.keyPrefix + key),
Body: bytes.NewReader(data),
ContentLength: aws.Int64(int64(len(data))),
ContentEncoding: contentEncoding,
ContentType: contentType,
})
_, err := putObject()
select {
case err = <-hedgeErr:
s.hedgeWins.Inc()
Expand All @@ -169,6 +182,7 @@ func (s *S3Backend) Upload(ctx context.Context, key string, data []byte, opts *U
}
s.log.DebugContext(ctx, "S3 PUT", "key", key, "size", len(data),
"compress", contentEncoding != nil, "type", *contentType,
"immutable", cacheControl != nil,
"elapsed", time.Since(start), "err", err)
s.uploadSize.Observe(float64(len(data)))
if err != nil {
Expand Down

0 comments on commit 1046953

Please sign in to comment.