Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ operating a Sunlight instance:
[#11](https://github.com/FiloSottile/sunlight/issues/11)) or overwriting
protection (automatically enabled client-side on Tigris).

Eventual consistency is acceptable for this backend.
Eventual consistency is acceptable for this backend, but it must provide
strong durability guarantees.

Currently, S3 and S3-compatible APIs are supported.
Currently, S3-compatible APIs and local POSIX filesystems are supported.

* A per-log deduplication cache, to return existing SCTs for previously
submitted (pre-)certificates.
Expand Down
25 changes: 22 additions & 3 deletions cmd/sunlight/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ type LogConfig struct {
// going to be treated like a directory in many tools using S3.
S3KeyPrefix string

// LocalDirectory is the path to a local directory where the log will store
// its data. It must be dedicated to this specific log instance.
//
// Only one of S3Bucket or LocalDirectory can be set at the same time.
LocalDirectory string

// NotAfterStart is the start of the validity range for certificates
// accepted by this log instance, as and RFC 3339 date.
NotAfterStart string
Expand Down Expand Up @@ -347,9 +353,22 @@ func main() {
slog.String("log", lc.ShortName),
}))

b, err := ctlog.NewS3Backend(ctx, lc.S3Region, lc.S3Bucket, lc.S3Endpoint, lc.S3KeyPrefix, logger)
if err != nil {
fatalError(logger, "failed to create backend", "err", err)
var b ctlog.Backend
switch {
case lc.S3Bucket != "" && lc.LocalDirectory != "":
fatalError(logger, "only one of S3Bucket or LocalDirectory can be set at the same time")
case lc.S3Bucket != "":
b, err = ctlog.NewS3Backend(ctx, lc.S3Region, lc.S3Bucket, lc.S3Endpoint, lc.S3KeyPrefix, logger)
if err != nil {
fatalError(logger, "failed to create backend", "err", err)
}
case lc.LocalDirectory != "":
b, err = ctlog.NewLocalBackend(ctx, lc.LocalDirectory, logger)
if err != nil {
fatalError(logger, "failed to create backend", "err", err)
}
default:
fatalError(logger, "neither S3Bucket nor LocalDirectory are set, one must be used")
}

r := x509util.NewPEMCertPool()
Expand Down
92 changes: 74 additions & 18 deletions internal/ctlog/ctlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ctlog
import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"crypto"
"crypto/ecdsa"
Expand Down Expand Up @@ -207,7 +208,7 @@ func LoadLog(ctx context.Context, config *Config) (*Log, error) {
// Apply the staged tiles before continuing.
config.Log.WarnContext(ctx, "checkpoint in object storage is older than lock checkpoint",
"old_size", c1.N, "size", c.N)
stagedUploads, err := config.Backend.Fetch(ctx, stagingPath(c.Tree))
stagedUploads, err := fetchAndDecompress(ctx, config.Backend, stagingPath(c.Tree))
if err != nil {
return nil, fmt.Errorf("couldn't fetch staged uploads: %w", err)
}
Expand Down Expand Up @@ -244,7 +245,7 @@ func LoadLog(ctx context.Context, config *Config) (*Log, error) {
// Fetch the right-most data tile.
dataTile := edgeTiles[0]
dataTile.L = -1
dataTile.B, err = config.Backend.Fetch(ctx, dataTile.Path())
dataTile.B, err = fetchAndDecompress(ctx, config.Backend, dataTile.Path())
if err != nil {
return nil, fmt.Errorf("couldn't fetch right edge data tile: %w", err)
}
Expand Down Expand Up @@ -363,10 +364,12 @@ type Backend interface {
// returns, the object must be fully persisted. opts may be nil.
Upload(ctx context.Context, key string, data []byte, opts *UploadOptions) error

// Fetch returns the value for a key. It's expected to decompress any data
// uploaded with UploadOptions.Compress true.
// Fetch returns the value for a key.
Fetch(ctx context.Context, key string) ([]byte, error)

// Discard suggests to the backend that the key can be deleted.
Discard(ctx context.Context, key string) error

// Metrics returns the metrics to register for this log. The metrics should
// not be shared by any other logs.
Metrics() []prometheus.Collector
Expand All @@ -379,18 +382,18 @@ type UploadOptions struct {
// "application/octet-stream".
ContentType string

// Compress is true if the data is compressible and should be compressed
// before uploading if possible.
Compress bool
// Compressed is true if the data is compressed with gzip.
Compressed bool

// Immutable is true if the data is never changed after being uploaded.
// Note that the same value may still be re-uploaded, and must succeed.
// [Backend.Discard] can still be used on immutable entries.
Immutable bool
}

var optsHashTile = &UploadOptions{Immutable: true}
var optsDataTile = &UploadOptions{Compress: true, Immutable: true}
var optsStaging = &UploadOptions{Compress: true, Immutable: true}
var optsDataTile = &UploadOptions{Compressed: true, Immutable: true}
var optsStaging = &UploadOptions{Compressed: true, Immutable: true}
var optsIssuer = &UploadOptions{ContentType: "application/pkix-cert", Immutable: true}
var optsCheckpoint = &UploadOptions{ContentType: "text/plain; charset=utf-8"}

Expand Down Expand Up @@ -763,11 +766,18 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
tile := tlog.TileForIndex(sunlight.TileHeight, tlog.StoredHashIndex(0, n-1))
tile.L = -1
edgeTiles[-1] = tileWithBytes{tile, dataTile}
l.c.Log.DebugContext(ctx, "staging full data tile",
"tree_size", n, "tile", tile, "size", len(dataTile))

gzipData, err := compress(dataTile)
if err != nil {
return fmtErrorf("couldn't compress data tile: %w", err)
}
l.c.Log.DebugContext(ctx, "staging full data tile", "tree_size", n,
"tile", tile, "size", len(dataTile), "gzip_size", len(gzipData))
l.m.SeqDataTileSize.Observe(float64(len(dataTile)))
l.m.SeqDataTileGzipSize.Observe(float64(len(gzipData)))

tileUploads = append(tileUploads, &uploadAction{
sunlight.TilePath(tile), dataTile, optsDataTile})
sunlight.TilePath(tile), gzipData, optsDataTile})
dataTile = nil
}
}
Expand All @@ -777,11 +787,16 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
tile := tlog.TileForIndex(sunlight.TileHeight, tlog.StoredHashIndex(0, n-1))
tile.L = -1
edgeTiles[-1] = tileWithBytes{tile, dataTile}
l.c.Log.DebugContext(ctx, "staging partial data tile",
"tree_size", n, "tile", tile, "size", len(dataTile))
gzipData, err := compress(dataTile)
if err != nil {
return fmtErrorf("couldn't compress data tile: %w", err)
}
l.c.Log.DebugContext(ctx, "staging partial data tile", "tree_size", n,
"tile", tile, "size", len(dataTile), "gzip_size", len(gzipData))
l.m.SeqDataTileSize.Observe(float64(len(dataTile)))
l.m.SeqDataTileGzipSize.Observe(float64(len(gzipData)))
tileUploads = append(tileUploads, &uploadAction{
sunlight.TilePath(tile), dataTile, optsDataTile})
sunlight.TilePath(tile), gzipData, optsDataTile})
}

// Produce and stage new tree tiles.
Expand Down Expand Up @@ -818,16 +833,20 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
if err != nil {
return fmtErrorf("couldn't marshal staged uploads: %w", err)
}
stagingPath := stagingPath(tree.Tree)
// Don't upload an empty staging bundle, as it would have the same path of
// the previous one, but different (empty) content. Note that LoadLog might
// end up re-uploading the tiles from the previous one, which is harmless.
if len(tileUploads) > 0 {
stagingPath := stagingPath(tree.Tree)
gzipData, err := compress(stagedUploads)
if err != nil {
return fmtErrorf("couldn't compress staged uploads: %w", err)
}
l.c.Log.DebugContext(ctx, "uploading staged tiles", "old_tree_size", oldSize,
"tree_size", tree.N, "path", stagingPath, "size", len(stagedUploads))
"tree_size", tree.N, "path", stagingPath, "size", len(stagedUploads), "gzip_size", len(gzipData))
ctxStrict, cancel := context.WithTimeout(ctx, strictTimeout)
defer cancel()
if err := l.c.Backend.Upload(ctxStrict, stagingPath, stagedUploads, optsStaging); err != nil {
if err := l.c.Backend.Upload(ctxStrict, stagingPath, gzipData, optsStaging); err != nil {
return fmtErrorf("couldn't upload staged tiles: %w", err)
}
}
Expand Down Expand Up @@ -872,6 +891,16 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
return fmtErrorf("couldn't upload checkpoint to object storage: %w", err)
}

// Once the checkpoint is uploaded to object storage, we can safely delete the
// staging bundle, if any. No need to fail the sequencing if this fails.
if len(tileUploads) > 0 {
if err := l.c.Backend.Discard(ctx, stagingPath); err != nil {
l.c.Log.ErrorContext(ctx, "staging bundle discard failed",
"tree_size", tree.N, "err", err)
l.m.StagingDiscardErrors.Inc()
}
}

// At this point if the cache put fails, there's no reason to return errors
// to users. The only consequence of cache false negatives are duplicated
// leaves anyway. In fact, an error might cause the clients to resumbit,
Expand Down Expand Up @@ -1126,3 +1155,30 @@ func (l *Log) hashReader(overlay map[int64]tlog.Hash) tlog.HashReaderFunc {
return list, nil
}
}

func compress(data []byte) ([]byte, error) {
b := &bytes.Buffer{}
w := gzip.NewWriter(b)
if _, err := w.Write(data); err != nil {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
return b.Bytes(), nil
}

const maxCompressRatio = 100

func fetchAndDecompress(ctx context.Context, backend Backend, key string) ([]byte, error) {
data, err := backend.Fetch(ctx, key)
if err != nil {
return nil, err
}
r, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
maxSize := int64(len(data)) * maxCompressRatio
return io.ReadAll(io.LimitReader(r, maxSize))
}
18 changes: 18 additions & 0 deletions internal/ctlog/ctlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"reflect"
"slices"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -204,6 +205,23 @@ func TestSequenceUploadPaths(t *testing.T) {
if !reflect.DeepEqual(keys, expected) {
t.Errorf("got %#v, expected %#v", keys, expected)
}

for _, key := range keys {
expectedImmutable := false
expectedDeleted := false
if key != "checkpoint" {
expectedImmutable = true
}
if strings.HasPrefix(key, "staging/") {
expectedDeleted = true
}
if tl.Config.Backend.(*MemoryBackend).del[key] != expectedDeleted {
t.Errorf("got deleted %v, expected %v for key %q", tl.Config.Backend.(*MemoryBackend).del[key], expectedDeleted, key)
}
if tl.Config.Backend.(*MemoryBackend).imm[key] != expectedImmutable {
t.Errorf("got immutable %v, expected %v for key %q", tl.Config.Backend.(*MemoryBackend).imm[key], expectedImmutable, key)
}
}
}

func TestDuplicates(t *testing.T) {
Expand Down
125 changes: 125 additions & 0 deletions internal/ctlog/local.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package ctlog

import (
"bytes"
"context"
"errors"
"io"
"log/slog"
"os"
"path/filepath"
"time"

"filippo.io/sunlight/internal/durable"
"github.com/prometheus/client_golang/prometheus"
)

type LocalBackend struct {
dir string
metrics []prometheus.Collector
duration prometheus.SummaryVec
log *slog.Logger
}

func NewLocalBackend(ctx context.Context, dir string, l *slog.Logger) (*LocalBackend, error) {
duration := prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "fs_op_duration_seconds",
Help: "Overall local backend operation latency.",
Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.025, 0.9: 0.01, 0.99: 0.001},
MaxAge: 1 * time.Minute,
AgeBuckets: 6,
},
[]string{"method"},
)
if fi, err := os.Stat(dir); err != nil {
return nil, fmtErrorf("failed to stat local backend directory %q: %w", dir, err)
} else if !fi.IsDir() {
return nil, fmtErrorf("local backend path %q is not a directory", dir)
}
return &LocalBackend{
dir: dir,
metrics: []prometheus.Collector{duration},
duration: *duration,
log: l,
}, nil
}

var _ Backend = &LocalBackend{}

func (s *LocalBackend) Upload(ctx context.Context, key string, data []byte, opts *UploadOptions) error {
defer prometheus.NewTimer(s.duration.WithLabelValues("upload")).ObserveDuration()
name, err := filepath.Localize(key)
if err != nil {
return fmtErrorf("failed to localize key %q as a filesystem path: %w", key, err)
}
path := filepath.Join(s.dir, name)
if err := durable.MkdirAll(filepath.Dir(path), 0755); err != nil {
return fmtErrorf("failed to create directory %q: %w", filepath.Dir(path), err)
}
var perms os.FileMode = 0644
if opts != nil && opts.Immutable {
perms = 0444
if f, err := os.Open(path); err == nil {
defer f.Close()
if err := compareFile(f, data); err != nil {
return fmtErrorf("immutable file %q already exists and does not match: %w", path, err)
}
s.log.WarnContext(ctx, "local file already exists", "key", key, "path", path)
return nil
}
// As a best effort, try to set the immutable flag if supported by the
// OS, and the process has the appropriate capabilities.
defer setImmutable(path)
}
s.log.DebugContext(ctx, "local file write", "key", key,
"size", len(data), "path", path, "perms", perms)
return durable.WriteFile(path, data, perms)
}

func (s *LocalBackend) Fetch(ctx context.Context, key string) ([]byte, error) {
defer prometheus.NewTimer(s.duration.WithLabelValues("fetch")).ObserveDuration()
name, err := filepath.Localize(key)
if err != nil {
return nil, fmtErrorf("failed to localize key %q as a filesystem path: %w", key, err)
}
path := filepath.Join(s.dir, name)
s.log.DebugContext(ctx, "local file read", "key", key, "path", path)
return os.ReadFile(path)
}

func (s *LocalBackend) Discard(ctx context.Context, key string) error {
defer prometheus.NewTimer(s.duration.WithLabelValues("discard")).ObserveDuration()
name, err := filepath.Localize(key)
if err != nil {
return fmtErrorf("failed to localize key %q as a filesystem path: %w", key, err)
}
path := filepath.Join(s.dir, name)
s.log.DebugContext(ctx, "local file delete", "key", key, "path", path)
unsetImmutable(path)
return os.Remove(path)
}

func (s *LocalBackend) Metrics() []prometheus.Collector {
return s.metrics
}

func compareFile(f *os.File, data []byte) error {
b := make([]byte, min(len(data), 16384))
for {
n, err := f.Read(b)
if err != nil && err != io.EOF {
return err
}
if n > len(data) || !bytes.Equal(b[:n], data[:n]) {
return errors.New("file contents do not match")
}
data = data[n:]
if err == io.EOF {
if len(data) == 0 {
return nil
}
return errors.New("file contents do not match")
}
}
}
6 changes: 6 additions & 0 deletions internal/ctlog/local_compat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
//go:build !linux || (!amd64 && !arm64)

package ctlog

func setImmutable(name string) {}
func unsetImmutable(name string) {}
Loading