diff --git a/README.md b/README.md index ca4e94d..394b149 100644 --- a/README.md +++ b/README.md @@ -53,9 +53,10 @@ operating a Sunlight instance: [#11](https://github.com/FiloSottile/sunlight/issues/11)) or overwriting protection (automatically enabled client-side on Tigris). - Eventual consistency is acceptable for this backend. + Eventual consistency is acceptable for this backend, but it must provide + strong durability guarantees. - Currently, S3 and S3-compatible APIs are supported. + Currently, S3-compatible APIs and local POSIX filesystems are supported. * A per-log deduplication cache, to return existing SCTs for previously submitted (pre-)certificates. diff --git a/cmd/sunlight/main.go b/cmd/sunlight/main.go index afea74b..53339ca 100644 --- a/cmd/sunlight/main.go +++ b/cmd/sunlight/main.go @@ -200,6 +200,12 @@ type LogConfig struct { // going to be treated like a directory in many tools using S3. S3KeyPrefix string + // LocalDirectory is the path to a local directory where the log will store + // its data. It must be dedicated to this specific log instance. + // + // Only one of S3Bucket or LocalDirectory can be set at the same time. + LocalDirectory string + // NotAfterStart is the start of the validity range for certificates // accepted by this log instance, as and RFC 3339 date. NotAfterStart string @@ -347,9 +353,22 @@ func main() { slog.String("log", lc.ShortName), })) - b, err := ctlog.NewS3Backend(ctx, lc.S3Region, lc.S3Bucket, lc.S3Endpoint, lc.S3KeyPrefix, logger) - if err != nil { - fatalError(logger, "failed to create backend", "err", err) + var b ctlog.Backend + switch { + case lc.S3Bucket != "" && lc.LocalDirectory != "": + fatalError(logger, "only one of S3Bucket or LocalDirectory can be set at the same time") + case lc.S3Bucket != "": + b, err = ctlog.NewS3Backend(ctx, lc.S3Region, lc.S3Bucket, lc.S3Endpoint, lc.S3KeyPrefix, logger) + if err != nil { + fatalError(logger, "failed to create backend", "err", err) + } + case lc.LocalDirectory != "": + b, err = ctlog.NewLocalBackend(ctx, lc.LocalDirectory, logger) + if err != nil { + fatalError(logger, "failed to create backend", "err", err) + } + default: + fatalError(logger, "neither S3Bucket nor LocalDirectory are set, one must be used") } r := x509util.NewPEMCertPool() diff --git a/internal/ctlog/ctlog.go b/internal/ctlog/ctlog.go index 8f96a20..9e21a72 100644 --- a/internal/ctlog/ctlog.go +++ b/internal/ctlog/ctlog.go @@ -3,6 +3,7 @@ package ctlog import ( "archive/tar" "bytes" + "compress/gzip" "context" "crypto" "crypto/ecdsa" @@ -207,7 +208,7 @@ func LoadLog(ctx context.Context, config *Config) (*Log, error) { // Apply the staged tiles before continuing. config.Log.WarnContext(ctx, "checkpoint in object storage is older than lock checkpoint", "old_size", c1.N, "size", c.N) - stagedUploads, err := config.Backend.Fetch(ctx, stagingPath(c.Tree)) + stagedUploads, err := fetchAndDecompress(ctx, config.Backend, stagingPath(c.Tree)) if err != nil { return nil, fmt.Errorf("couldn't fetch staged uploads: %w", err) } @@ -244,7 +245,7 @@ func LoadLog(ctx context.Context, config *Config) (*Log, error) { // Fetch the right-most data tile. dataTile := edgeTiles[0] dataTile.L = -1 - dataTile.B, err = config.Backend.Fetch(ctx, dataTile.Path()) + dataTile.B, err = fetchAndDecompress(ctx, config.Backend, dataTile.Path()) if err != nil { return nil, fmt.Errorf("couldn't fetch right edge data tile: %w", err) } @@ -363,10 +364,12 @@ type Backend interface { // returns, the object must be fully persisted. opts may be nil. Upload(ctx context.Context, key string, data []byte, opts *UploadOptions) error - // Fetch returns the value for a key. It's expected to decompress any data - // uploaded with UploadOptions.Compress true. + // Fetch returns the value for a key. Fetch(ctx context.Context, key string) ([]byte, error) + // Discard suggests to the backend that the key can be deleted. + Discard(ctx context.Context, key string) error + // Metrics returns the metrics to register for this log. The metrics should // not be shared by any other logs. Metrics() []prometheus.Collector @@ -379,18 +382,18 @@ type UploadOptions struct { // "application/octet-stream". ContentType string - // Compress is true if the data is compressible and should be compressed - // before uploading if possible. - Compress bool + // Compressed is true if the data is compressed with gzip. + Compressed bool // Immutable is true if the data is never changed after being uploaded. // Note that the same value may still be re-uploaded, and must succeed. + // [Backend.Discard] can still be used on immutable entries. Immutable bool } var optsHashTile = &UploadOptions{Immutable: true} -var optsDataTile = &UploadOptions{Compress: true, Immutable: true} -var optsStaging = &UploadOptions{Compress: true, Immutable: true} +var optsDataTile = &UploadOptions{Compressed: true, Immutable: true} +var optsStaging = &UploadOptions{Compressed: true, Immutable: true} var optsIssuer = &UploadOptions{ContentType: "application/pkix-cert", Immutable: true} var optsCheckpoint = &UploadOptions{ContentType: "text/plain; charset=utf-8"} @@ -763,11 +766,18 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { tile := tlog.TileForIndex(sunlight.TileHeight, tlog.StoredHashIndex(0, n-1)) tile.L = -1 edgeTiles[-1] = tileWithBytes{tile, dataTile} - l.c.Log.DebugContext(ctx, "staging full data tile", - "tree_size", n, "tile", tile, "size", len(dataTile)) + + gzipData, err := compress(dataTile) + if err != nil { + return fmtErrorf("couldn't compress data tile: %w", err) + } + l.c.Log.DebugContext(ctx, "staging full data tile", "tree_size", n, + "tile", tile, "size", len(dataTile), "gzip_size", len(gzipData)) l.m.SeqDataTileSize.Observe(float64(len(dataTile))) + l.m.SeqDataTileGzipSize.Observe(float64(len(gzipData))) + tileUploads = append(tileUploads, &uploadAction{ - sunlight.TilePath(tile), dataTile, optsDataTile}) + sunlight.TilePath(tile), gzipData, optsDataTile}) dataTile = nil } } @@ -777,11 +787,16 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { tile := tlog.TileForIndex(sunlight.TileHeight, tlog.StoredHashIndex(0, n-1)) tile.L = -1 edgeTiles[-1] = tileWithBytes{tile, dataTile} - l.c.Log.DebugContext(ctx, "staging partial data tile", - "tree_size", n, "tile", tile, "size", len(dataTile)) + gzipData, err := compress(dataTile) + if err != nil { + return fmtErrorf("couldn't compress data tile: %w", err) + } + l.c.Log.DebugContext(ctx, "staging partial data tile", "tree_size", n, + "tile", tile, "size", len(dataTile), "gzip_size", len(gzipData)) l.m.SeqDataTileSize.Observe(float64(len(dataTile))) + l.m.SeqDataTileGzipSize.Observe(float64(len(gzipData))) tileUploads = append(tileUploads, &uploadAction{ - sunlight.TilePath(tile), dataTile, optsDataTile}) + sunlight.TilePath(tile), gzipData, optsDataTile}) } // Produce and stage new tree tiles. @@ -818,16 +833,20 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { if err != nil { return fmtErrorf("couldn't marshal staged uploads: %w", err) } + stagingPath := stagingPath(tree.Tree) // Don't upload an empty staging bundle, as it would have the same path of // the previous one, but different (empty) content. Note that LoadLog might // end up re-uploading the tiles from the previous one, which is harmless. if len(tileUploads) > 0 { - stagingPath := stagingPath(tree.Tree) + gzipData, err := compress(stagedUploads) + if err != nil { + return fmtErrorf("couldn't compress staged uploads: %w", err) + } l.c.Log.DebugContext(ctx, "uploading staged tiles", "old_tree_size", oldSize, - "tree_size", tree.N, "path", stagingPath, "size", len(stagedUploads)) + "tree_size", tree.N, "path", stagingPath, "size", len(stagedUploads), "gzip_size", len(gzipData)) ctxStrict, cancel := context.WithTimeout(ctx, strictTimeout) defer cancel() - if err := l.c.Backend.Upload(ctxStrict, stagingPath, stagedUploads, optsStaging); err != nil { + if err := l.c.Backend.Upload(ctxStrict, stagingPath, gzipData, optsStaging); err != nil { return fmtErrorf("couldn't upload staged tiles: %w", err) } } @@ -872,6 +891,16 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { return fmtErrorf("couldn't upload checkpoint to object storage: %w", err) } + // Once the checkpoint is uploaded to object storage, we can safely delete the + // staging bundle, if any. No need to fail the sequencing if this fails. + if len(tileUploads) > 0 { + if err := l.c.Backend.Discard(ctx, stagingPath); err != nil { + l.c.Log.ErrorContext(ctx, "staging bundle discard failed", + "tree_size", tree.N, "err", err) + l.m.StagingDiscardErrors.Inc() + } + } + // At this point if the cache put fails, there's no reason to return errors // to users. The only consequence of cache false negatives are duplicated // leaves anyway. In fact, an error might cause the clients to resumbit, @@ -1126,3 +1155,30 @@ func (l *Log) hashReader(overlay map[int64]tlog.Hash) tlog.HashReaderFunc { return list, nil } } + +func compress(data []byte) ([]byte, error) { + b := &bytes.Buffer{} + w := gzip.NewWriter(b) + if _, err := w.Write(data); err != nil { + return nil, err + } + if err := w.Close(); err != nil { + return nil, err + } + return b.Bytes(), nil +} + +const maxCompressRatio = 100 + +func fetchAndDecompress(ctx context.Context, backend Backend, key string) ([]byte, error) { + data, err := backend.Fetch(ctx, key) + if err != nil { + return nil, err + } + r, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + return nil, err + } + maxSize := int64(len(data)) * maxCompressRatio + return io.ReadAll(io.LimitReader(r, maxSize)) +} diff --git a/internal/ctlog/ctlog_test.go b/internal/ctlog/ctlog_test.go index f39e9a9..04a720d 100644 --- a/internal/ctlog/ctlog_test.go +++ b/internal/ctlog/ctlog_test.go @@ -15,6 +15,7 @@ import ( "reflect" "slices" "strconv" + "strings" "sync/atomic" "testing" "time" @@ -204,6 +205,23 @@ func TestSequenceUploadPaths(t *testing.T) { if !reflect.DeepEqual(keys, expected) { t.Errorf("got %#v, expected %#v", keys, expected) } + + for _, key := range keys { + expectedImmutable := false + expectedDeleted := false + if key != "checkpoint" { + expectedImmutable = true + } + if strings.HasPrefix(key, "staging/") { + expectedDeleted = true + } + if tl.Config.Backend.(*MemoryBackend).del[key] != expectedDeleted { + t.Errorf("got deleted %v, expected %v for key %q", tl.Config.Backend.(*MemoryBackend).del[key], expectedDeleted, key) + } + if tl.Config.Backend.(*MemoryBackend).imm[key] != expectedImmutable { + t.Errorf("got immutable %v, expected %v for key %q", tl.Config.Backend.(*MemoryBackend).imm[key], expectedImmutable, key) + } + } } func TestDuplicates(t *testing.T) { diff --git a/internal/ctlog/local.go b/internal/ctlog/local.go new file mode 100644 index 0000000..f5e9998 --- /dev/null +++ b/internal/ctlog/local.go @@ -0,0 +1,125 @@ +package ctlog + +import ( + "bytes" + "context" + "errors" + "io" + "log/slog" + "os" + "path/filepath" + "time" + + "filippo.io/sunlight/internal/durable" + "github.com/prometheus/client_golang/prometheus" +) + +type LocalBackend struct { + dir string + metrics []prometheus.Collector + duration prometheus.SummaryVec + log *slog.Logger +} + +func NewLocalBackend(ctx context.Context, dir string, l *slog.Logger) (*LocalBackend, error) { + duration := prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Name: "fs_op_duration_seconds", + Help: "Overall local backend operation latency.", + Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.025, 0.9: 0.01, 0.99: 0.001}, + MaxAge: 1 * time.Minute, + AgeBuckets: 6, + }, + []string{"method"}, + ) + if fi, err := os.Stat(dir); err != nil { + return nil, fmtErrorf("failed to stat local backend directory %q: %w", dir, err) + } else if !fi.IsDir() { + return nil, fmtErrorf("local backend path %q is not a directory", dir) + } + return &LocalBackend{ + dir: dir, + metrics: []prometheus.Collector{duration}, + duration: *duration, + log: l, + }, nil +} + +var _ Backend = &LocalBackend{} + +func (s *LocalBackend) Upload(ctx context.Context, key string, data []byte, opts *UploadOptions) error { + defer prometheus.NewTimer(s.duration.WithLabelValues("upload")).ObserveDuration() + name, err := filepath.Localize(key) + if err != nil { + return fmtErrorf("failed to localize key %q as a filesystem path: %w", key, err) + } + path := filepath.Join(s.dir, name) + if err := durable.MkdirAll(filepath.Dir(path), 0755); err != nil { + return fmtErrorf("failed to create directory %q: %w", filepath.Dir(path), err) + } + var perms os.FileMode = 0644 + if opts != nil && opts.Immutable { + perms = 0444 + if f, err := os.Open(path); err == nil { + defer f.Close() + if err := compareFile(f, data); err != nil { + return fmtErrorf("immutable file %q already exists and does not match: %w", path, err) + } + s.log.WarnContext(ctx, "local file already exists", "key", key, "path", path) + return nil + } + // As a best effort, try to set the immutable flag if supported by the + // OS, and the process has the appropriate capabilities. + defer setImmutable(path) + } + s.log.DebugContext(ctx, "local file write", "key", key, + "size", len(data), "path", path, "perms", perms) + return durable.WriteFile(path, data, perms) +} + +func (s *LocalBackend) Fetch(ctx context.Context, key string) ([]byte, error) { + defer prometheus.NewTimer(s.duration.WithLabelValues("fetch")).ObserveDuration() + name, err := filepath.Localize(key) + if err != nil { + return nil, fmtErrorf("failed to localize key %q as a filesystem path: %w", key, err) + } + path := filepath.Join(s.dir, name) + s.log.DebugContext(ctx, "local file read", "key", key, "path", path) + return os.ReadFile(path) +} + +func (s *LocalBackend) Discard(ctx context.Context, key string) error { + defer prometheus.NewTimer(s.duration.WithLabelValues("discard")).ObserveDuration() + name, err := filepath.Localize(key) + if err != nil { + return fmtErrorf("failed to localize key %q as a filesystem path: %w", key, err) + } + path := filepath.Join(s.dir, name) + s.log.DebugContext(ctx, "local file delete", "key", key, "path", path) + unsetImmutable(path) + return os.Remove(path) +} + +func (s *LocalBackend) Metrics() []prometheus.Collector { + return s.metrics +} + +func compareFile(f *os.File, data []byte) error { + b := make([]byte, min(len(data), 16384)) + for { + n, err := f.Read(b) + if err != nil && err != io.EOF { + return err + } + if n > len(data) || !bytes.Equal(b[:n], data[:n]) { + return errors.New("file contents do not match") + } + data = data[n:] + if err == io.EOF { + if len(data) == 0 { + return nil + } + return errors.New("file contents do not match") + } + } +} diff --git a/internal/ctlog/local_compat.go b/internal/ctlog/local_compat.go new file mode 100644 index 0000000..07d213a --- /dev/null +++ b/internal/ctlog/local_compat.go @@ -0,0 +1,6 @@ +//go:build !linux || (!amd64 && !arm64) + +package ctlog + +func setImmutable(name string) {} +func unsetImmutable(name string) {} diff --git a/internal/ctlog/local_linux.go b/internal/ctlog/local_linux.go new file mode 100644 index 0000000..e17b71c --- /dev/null +++ b/internal/ctlog/local_linux.go @@ -0,0 +1,34 @@ +//go:build amd64 || arm64 + +package ctlog + +import ( + "os" + "syscall" + "unsafe" +) + +const _FS_IOC_SETFLAGS = uintptr(0x40086602) +const _FS_IMMUTABLE_FL = 0x00000010 + +func setImmutable(name string) { + f, err := os.Open(name) + if err != nil { + return + } + defer f.Close() + setFlags(f, _FS_IMMUTABLE_FL) +} + +func unsetImmutable(name string) { + f, err := os.Open(name) + if err != nil { + return + } + defer f.Close() + setFlags(f, 0) +} + +func setFlags(f *os.File, flags int32) { + syscall.Syscall(syscall.SYS_IOCTL, f.Fd(), _FS_IOC_SETFLAGS, uintptr(unsafe.Pointer(&flags))) +} diff --git a/internal/ctlog/metrics.go b/internal/ctlog/metrics.go index 787bfc8..f4726d0 100644 --- a/internal/ctlog/metrics.go +++ b/internal/ctlog/metrics.go @@ -15,12 +15,13 @@ type metrics struct { ReqInFlight *prometheus.GaugeVec ReqDuration *prometheus.SummaryVec - SeqCount *prometheus.CounterVec - SeqPoolSize prometheus.Summary - SeqDuration prometheus.Summary - SeqLeafSize prometheus.Summary - SeqTiles prometheus.Counter - SeqDataTileSize prometheus.Summary + SeqCount *prometheus.CounterVec + SeqPoolSize prometheus.Summary + SeqDuration prometheus.Summary + SeqLeafSize prometheus.Summary + SeqTiles prometheus.Counter + SeqDataTileSize prometheus.Summary + SeqDataTileGzipSize prometheus.Summary TreeTime prometheus.Gauge TreeSize prometheus.Gauge @@ -37,6 +38,8 @@ type metrics struct { CacheGetDuration prometheus.Summary CachePutDuration prometheus.Summary CachePutErrors prometheus.Counter + + StagingDiscardErrors prometheus.Counter } func initMetrics() metrics { @@ -115,6 +118,15 @@ func initMetrics() metrics { AgeBuckets: 6, }, ), + SeqDataTileGzipSize: prometheus.NewSummary( + prometheus.SummaryOpts{ + Name: "sequencing_data_tiles_gzip_bytes", + Help: "Compressed size of uploaded data tiles, including partials.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + MaxAge: 1 * time.Minute, + AgeBuckets: 6, + }, + ), TreeTime: prometheus.NewGauge( prometheus.GaugeOpts{ @@ -196,6 +208,12 @@ func initMetrics() metrics { Help: "Number of failed deduplication cache inserts.", }, ), + StagingDiscardErrors: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "staging_discard_errors_total", + Help: "Number of errors discarding staging entries.", + }, + ), } } diff --git a/internal/ctlog/s3.go b/internal/ctlog/s3.go index 88944e0..a225950 100644 --- a/internal/ctlog/s3.go +++ b/internal/ctlog/s3.go @@ -2,7 +2,6 @@ package ctlog import ( "bytes" - "compress/gzip" "context" "errors" "fmt" @@ -26,20 +25,12 @@ type S3Backend struct { keyPrefix string metrics []prometheus.Collector uploadSize prometheus.Summary - compressRatio prometheus.Summary hedgeRequests prometheus.Counter hedgeWins prometheus.Counter log *slog.Logger } func NewS3Backend(ctx context.Context, region, bucket, endpoint, keyPrefix string, l *slog.Logger) (*S3Backend, error) { - counter := prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "s3_requests_total", - Help: "S3 HTTP requests performed, by method and response code.", - }, - []string{"method", "code"}, - ) duration := prometheus.NewSummaryVec( prometheus.SummaryOpts{ Name: "s3_request_duration_seconds", @@ -60,20 +51,12 @@ func NewS3Backend(ctx context.Context, region, bucket, endpoint, keyPrefix strin uploadSize := prometheus.NewSummary( prometheus.SummaryOpts{ Name: "s3_upload_size_bytes", - Help: "S3 (compressed) body size in bytes for object puts.", + Help: "S3 body size in bytes for object puts.", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, MaxAge: 1 * time.Minute, AgeBuckets: 6, }, ) - compressRatio := prometheus.NewSummary( - prometheus.SummaryOpts{ - Name: "s3_compress_ratio", - Help: "Ratio of compressed to uncompressed body size for compressible object puts.", - MaxAge: 1 * time.Minute, - AgeBuckets: 6, - }, - ) hedgeRequests := prometheus.NewCounter( prometheus.CounterOpts{ Name: "s3_hedges_total", @@ -88,7 +71,6 @@ func NewS3Backend(ctx context.Context, region, bucket, endpoint, keyPrefix strin ) transport := http.RoundTripper(http.DefaultTransport.(*http.Transport).Clone()) - transport = promhttp.InstrumentRoundTripperCounter(counter, transport) transport = promhttp.InstrumentRoundTripperDuration(duration, transport) cfg, err := config.LoadDefaultConfig(ctx) @@ -111,12 +93,10 @@ func NewS3Backend(ctx context.Context, region, bucket, endpoint, keyPrefix strin o.Logger = awsLogger{log: l} o.ClientLogMode = aws.LogRequest | aws.LogResponse | aws.LogRetries }), - bucket: bucket, - keyPrefix: keyPrefix, - metrics: []prometheus.Collector{counter, duration, - uploadSize, compressRatio, hedgeRequests, hedgeWins}, + bucket: bucket, + keyPrefix: keyPrefix, + metrics: []prometheus.Collector{duration, uploadSize, hedgeRequests, hedgeWins}, uploadSize: uploadSize, - compressRatio: compressRatio, hedgeRequests: hedgeRequests, hedgeWins: hedgeWins, log: l, @@ -159,17 +139,7 @@ func (s *S3Backend) Upload(ctx context.Context, key string, data []byte, opts *U contentType = aws.String(opts.ContentType) } var contentEncoding *string - if opts != nil && opts.Compress { - b := &bytes.Buffer{} - w := gzip.NewWriter(b) - if _, err := w.Write(data); err != nil { - return fmtErrorf("failed to compress %q: %w", key, err) - } - if err := w.Close(); err != nil { - return fmtErrorf("failed to compress %q: %w", key, err) - } - s.compressRatio.Observe(float64(b.Len()) / float64(len(data))) - data = b.Bytes() + if opts != nil && opts.Compressed { contentEncoding = aws.String("gzip") } var cacheControl *string @@ -210,7 +180,7 @@ func (s *S3Backend) Upload(ctx context.Context, key string, data []byte, opts *U cancel(errors.New("competing request succeeded")) } s.log.DebugContext(ctx, "S3 PUT", "key", key, "size", len(data), - "compress", contentEncoding != nil, "type", *contentType, + "compressed", contentEncoding != nil, "type", *contentType, "immutable", cacheControl != nil, "elapsed", time.Since(start), "err", err) s.uploadSize.Observe(float64(len(data))) @@ -232,20 +202,19 @@ func (s *S3Backend) Fetch(ctx context.Context, key string) ([]byte, error) { defer out.Body.Close() s.log.DebugContext(ctx, "S3 GET", "key", key, "size", out.ContentLength, "encoding", out.ContentEncoding) - body := out.Body - if out.ContentEncoding != nil && *out.ContentEncoding == "gzip" { - body, err = gzip.NewReader(out.Body) - if err != nil { - return nil, fmtErrorf("failed to decompress %q from S3: %w", key, err) - } - } - data, err := io.ReadAll(body) + data, err := io.ReadAll(out.Body) if err != nil { return nil, fmtErrorf("failed to read %q from S3: %w", key, err) } return data, nil } +func (s *S3Backend) Discard(ctx context.Context, key string) error { + // For remote backends, DELETE requests are not cost-effective and a waste + // of latency. Lifecycle rules can be used to delete old staging bundles. + return nil +} + func (s *S3Backend) Metrics() []prometheus.Collector { return s.metrics } diff --git a/internal/ctlog/testlog_test.go b/internal/ctlog/testlog_test.go index 3f14706..bf3711a 100644 --- a/internal/ctlog/testlog_test.go +++ b/internal/ctlog/testlog_test.go @@ -2,6 +2,7 @@ package ctlog_test import ( "bytes" + "compress/gzip" "context" "crypto/ecdsa" "crypto/ed25519" @@ -215,6 +216,10 @@ func (tl *TestLog) CheckLog(size int64) (sthTimestamp int64) { } b, err := tl.Config.Backend.Fetch(context.Background(), sunlight.TilePath(tile)) fatalIfErr(t, err) + r, err := gzip.NewReader(bytes.NewReader(b)) + fatalIfErr(t, err) + b, err = io.ReadAll(r) + fatalIfErr(t, err) for i := 0; i < tile.W; i++ { e, rest, err := sunlight.ReadTileLeaf(b) if err != nil { @@ -444,6 +449,7 @@ type MemoryBackend struct { mu sync.Mutex m map[string][]byte imm map[string]bool + del map[string]bool uploads uint64 @@ -452,7 +458,7 @@ type MemoryBackend struct { func NewMemoryBackend(t testing.TB) *MemoryBackend { return &MemoryBackend{ - t: t, m: make(map[string][]byte), imm: make(map[string]bool), + t: t, m: make(map[string][]byte), imm: make(map[string]bool), del: make(map[string]bool), } } @@ -478,6 +484,9 @@ func (b *MemoryBackend) Upload(ctx context.Context, key string, data []byte, opt if b.imm[key] && !bytes.Equal(b.m[key], data) { b.t.Errorf("immutable key %q was modified", key) } + if b.del[key] { + b.t.Errorf("deleted key %q was re-uploaded", key) + } b.m[key] = data b.imm[key] = opts.Immutable return finalErr @@ -549,9 +558,31 @@ func (b *MemoryBackend) Fetch(ctx context.Context, key string) ([]byte, error) { if !ok { return nil, fmt.Errorf("key %q not found", key) } + if b.del[key] { + b.t.Errorf("deleted key %q was fetched", key) + return nil, fmt.Errorf("key %q not found", key) + } return data, nil } +func (b *MemoryBackend) Discard(ctx context.Context, key string) error { + if err := ctx.Err(); err != nil { + return err + } + b.mu.Lock() + defer b.mu.Unlock() + if _, ok := b.m[key]; !ok { + b.t.Errorf("deleted missing key %q", key) + return fmt.Errorf("key %q not found", key) + } + if b.del[key] { + b.t.Errorf("deleted key %q was deleted again", key) + return fmt.Errorf("key %q not found", key) + } + b.del[key] = true + return nil +} + func (b *MemoryBackend) Metrics() []prometheus.Collector { return nil } type MemoryLockBackend struct { diff --git a/internal/durable/path.go b/internal/durable/path.go new file mode 100644 index 0000000..d504291 --- /dev/null +++ b/internal/durable/path.go @@ -0,0 +1,134 @@ +// Package durable provides equivalent functionality to the os package, but with +// additional guarantees for durability based on [os.File.Sync]. +// +// Note that after an fsync error, files and caches may be in a number of +// unreliable states, such that the only potentially safe course of action is +// starting over without reading back the failed files. +// +// See also https://www.usenix.org/conference/atc20/presentation/rebello, +// https://wiki.postgresql.org/wiki/Fsync_Errors, and +// https://danluu.com/deconstruct-files/. +package durable + +import ( + "os" + "path/filepath" + "syscall" +) + +// The sequence of syscalls for a happy path call to WriteFile looks like this: +// +// openat(AT_FDCWD, "dir", O_RDONLY|O_CLOEXEC|O_DIRECTORY) = 3 +// openat(AT_FDCWD, "dir/.file4057926228", O_RDWR|O_CREAT|O_EXCL|O_CLOEXEC, 0600) = 6 +// fchmod(6, 0644) = 0 +// write(6, "hello world", 11) = 11 +// fsync(6) = 0 +// close(6) = 0 +// renameat(AT_FDCWD, "dir/.file4057926228", AT_FDCWD, "dir/file") = 0 +// fsync(3) = 0 +// close(3) = 0 +// + +// WriteFile behaves somewhat like [os.WriteFile], but it also syncs the file +// contents and the directory entry to disk before returning, and prevents +// partial writes from being visible. +// +// If the file already exists, it takes the specified permissions. +func WriteFile(name string, data []byte, perm os.FileMode) (err error) { + // After the file, sync the directory in which the entry resides. Otherwise, + // after a power failure the file may not exist or the rename may rollback. + // + // Keep the parent open during the operation, to prevent it from being + // evicted from the inode cache, which can discard a write-through error. + parent, err := os.OpenFile(filepath.Dir(name), os.O_RDONLY|syscall.O_DIRECTORY, 0) + if err != nil { + return &os.PathError{Op: "durablewrite", Path: name, Err: err} + } + defer fsyncAndClose(parent, &err) + + f, err := os.CreateTemp(filepath.Dir(name), "."+filepath.Base(name)) + if err != nil { + return &os.PathError{Op: "durablewrite", Path: name, Err: err} + } + defer func(tmpname string) { + if err == nil { + err = os.Rename(tmpname, name) + } + if err != nil { + os.Remove(tmpname) + } + }(f.Name()) + if err := f.Chmod(perm); err != nil { + f.Close() + return err + } + defer fsyncAndClose(f, &err) + + _, err = f.Write(data) + return err +} + +// MkdirAll behaves like [os.MkdirAll], but it also syncs each affected +// directory to disk. +func MkdirAll(path string, perm os.FileMode) (err error) { + if dir, err := os.Stat(path); err == nil { + if dir.IsDir() { + return nil + } + return &os.PathError{Op: "mkdir", Path: path, Err: syscall.ENOTDIR} + } + + path = filepath.Clean(path) + if parent := filepath.Dir(path); parent != path && parent != filepath.VolumeName(path) { + if err := MkdirAll(parent, perm); err != nil { + return err + } + } + + // This is a little inefficient because if we are creating two levels of + // directories, we will sync the intermediate directory twice. That's going + // to be rare in our use case, so we don't optimize for it. + return Mkdir(path, perm) +} + +// The sequence of syscalls for a happy path call to Mkdir looks like this: +// +// openat(AT_FDCWD, "dir1", O_RDONLY|O_CLOEXEC|O_DIRECTORY) = 3 +// mkdirat(AT_FDCWD, "dir1/dir2", 0755) = 0 +// openat(AT_FDCWD, "dir1/dir2", O_RDONLY|O_CLOEXEC) = 6 +// fsync(6) = 0 +// close(6) = 0 +// fsync(3) = 0 +// close(3) = 0 +// + +// Mkdir behaves like [os.Mkdir], but it also syncs the directory containing +// the created directory to disk. +func Mkdir(path string, perm os.FileMode) (err error) { + parent, err := os.OpenFile(filepath.Dir(path), os.O_RDONLY|syscall.O_DIRECTORY, 0) + if err != nil { + return &os.PathError{Op: "mkdir", Path: path, Err: err} + } + defer fsyncAndClose(parent, &err) + + if err := os.Mkdir(path, perm); err != nil { + return err + } + + f, err := os.Open(path) + if err != nil { + return &os.PathError{Op: "mkdir", Path: path, Err: err} + } + defer fsyncAndClose(f, &err) + + return nil +} + +func fsyncAndClose(f *os.File, err *error) { + if *err == nil { + *err = f.Sync() + } + if err1 := f.Close(); err1 != nil && *err == nil { + *err = err1 + } +}