From 9b1b34a29065ce1f681f9e775bb5cb931952cbf0 Mon Sep 17 00:00:00 2001 From: Filippo Valsorda Date: Tue, 22 Apr 2025 12:39:39 +0200 Subject: [PATCH 01/10] internal/ctlog: move compression out of the Backend This avoids replicating the logic in each Backend implementation, and also frees it from keeping track of what files are compressed (so they can be decompressed by Fetch), which could be difficult for future backends with less access to metadata, like the POSIX filesystem one. --- internal/ctlog/ctlog.go | 76 ++++++++++++++++++++++++++-------- internal/ctlog/metrics.go | 22 +++++++--- internal/ctlog/s3.go | 51 ++++------------------- internal/ctlog/testlog_test.go | 5 +++ 4 files changed, 87 insertions(+), 67 deletions(-) diff --git a/internal/ctlog/ctlog.go b/internal/ctlog/ctlog.go index 8f96a20..915338b 100644 --- a/internal/ctlog/ctlog.go +++ b/internal/ctlog/ctlog.go @@ -3,6 +3,7 @@ package ctlog import ( "archive/tar" "bytes" + "compress/gzip" "context" "crypto" "crypto/ecdsa" @@ -207,7 +208,7 @@ func LoadLog(ctx context.Context, config *Config) (*Log, error) { // Apply the staged tiles before continuing. config.Log.WarnContext(ctx, "checkpoint in object storage is older than lock checkpoint", "old_size", c1.N, "size", c.N) - stagedUploads, err := config.Backend.Fetch(ctx, stagingPath(c.Tree)) + stagedUploads, err := fetchAndDecompress(ctx, config.Backend, stagingPath(c.Tree)) if err != nil { return nil, fmt.Errorf("couldn't fetch staged uploads: %w", err) } @@ -244,7 +245,7 @@ func LoadLog(ctx context.Context, config *Config) (*Log, error) { // Fetch the right-most data tile. dataTile := edgeTiles[0] dataTile.L = -1 - dataTile.B, err = config.Backend.Fetch(ctx, dataTile.Path()) + dataTile.B, err = fetchAndDecompress(ctx, config.Backend, dataTile.Path()) if err != nil { return nil, fmt.Errorf("couldn't fetch right edge data tile: %w", err) } @@ -363,8 +364,7 @@ type Backend interface { // returns, the object must be fully persisted. opts may be nil. Upload(ctx context.Context, key string, data []byte, opts *UploadOptions) error - // Fetch returns the value for a key. It's expected to decompress any data - // uploaded with UploadOptions.Compress true. + // Fetch returns the value for a key. Fetch(ctx context.Context, key string) ([]byte, error) // Metrics returns the metrics to register for this log. The metrics should @@ -379,9 +379,8 @@ type UploadOptions struct { // "application/octet-stream". ContentType string - // Compress is true if the data is compressible and should be compressed - // before uploading if possible. - Compress bool + // Compressed is true if the data is compressed with gzip. + Compressed bool // Immutable is true if the data is never changed after being uploaded. // Note that the same value may still be re-uploaded, and must succeed. @@ -389,8 +388,8 @@ type UploadOptions struct { } var optsHashTile = &UploadOptions{Immutable: true} -var optsDataTile = &UploadOptions{Compress: true, Immutable: true} -var optsStaging = &UploadOptions{Compress: true, Immutable: true} +var optsDataTile = &UploadOptions{Compressed: true, Immutable: true} +var optsStaging = &UploadOptions{Compressed: true, Immutable: true} var optsIssuer = &UploadOptions{ContentType: "application/pkix-cert", Immutable: true} var optsCheckpoint = &UploadOptions{ContentType: "text/plain; charset=utf-8"} @@ -763,11 +762,18 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { tile := tlog.TileForIndex(sunlight.TileHeight, tlog.StoredHashIndex(0, n-1)) tile.L = -1 edgeTiles[-1] = tileWithBytes{tile, dataTile} - l.c.Log.DebugContext(ctx, "staging full data tile", - "tree_size", n, "tile", tile, "size", len(dataTile)) + + gzipData, err := compress(dataTile) + if err != nil { + return fmtErrorf("couldn't compress data tile: %w", err) + } + l.c.Log.DebugContext(ctx, "staging full data tile", "tree_size", n, + "tile", tile, "size", len(dataTile), "gzip_size", len(gzipData)) l.m.SeqDataTileSize.Observe(float64(len(dataTile))) + l.m.SeqDataTileGzipSize.Observe(float64(len(gzipData))) + tileUploads = append(tileUploads, &uploadAction{ - sunlight.TilePath(tile), dataTile, optsDataTile}) + sunlight.TilePath(tile), gzipData, optsDataTile}) dataTile = nil } } @@ -777,11 +783,16 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { tile := tlog.TileForIndex(sunlight.TileHeight, tlog.StoredHashIndex(0, n-1)) tile.L = -1 edgeTiles[-1] = tileWithBytes{tile, dataTile} - l.c.Log.DebugContext(ctx, "staging partial data tile", - "tree_size", n, "tile", tile, "size", len(dataTile)) + gzipData, err := compress(dataTile) + if err != nil { + return fmtErrorf("couldn't compress data tile: %w", err) + } + l.c.Log.DebugContext(ctx, "staging partial data tile", "tree_size", n, + "tile", tile, "size", len(dataTile), "gzip_size", len(gzipData)) l.m.SeqDataTileSize.Observe(float64(len(dataTile))) + l.m.SeqDataTileGzipSize.Observe(float64(len(gzipData))) tileUploads = append(tileUploads, &uploadAction{ - sunlight.TilePath(tile), dataTile, optsDataTile}) + sunlight.TilePath(tile), gzipData, optsDataTile}) } // Produce and stage new tree tiles. @@ -823,11 +834,15 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { // end up re-uploading the tiles from the previous one, which is harmless. if len(tileUploads) > 0 { stagingPath := stagingPath(tree.Tree) + gzipData, err := compress(stagedUploads) + if err != nil { + return fmtErrorf("couldn't compress staged uploads: %w", err) + } l.c.Log.DebugContext(ctx, "uploading staged tiles", "old_tree_size", oldSize, - "tree_size", tree.N, "path", stagingPath, "size", len(stagedUploads)) + "tree_size", tree.N, "path", stagingPath, "size", len(stagedUploads), "gzip_size", len(gzipData)) ctxStrict, cancel := context.WithTimeout(ctx, strictTimeout) defer cancel() - if err := l.c.Backend.Upload(ctxStrict, stagingPath, stagedUploads, optsStaging); err != nil { + if err := l.c.Backend.Upload(ctxStrict, stagingPath, gzipData, optsStaging); err != nil { return fmtErrorf("couldn't upload staged tiles: %w", err) } } @@ -1126,3 +1141,30 @@ func (l *Log) hashReader(overlay map[int64]tlog.Hash) tlog.HashReaderFunc { return list, nil } } + +func compress(data []byte) ([]byte, error) { + b := &bytes.Buffer{} + w := gzip.NewWriter(b) + if _, err := w.Write(data); err != nil { + return nil, err + } + if err := w.Close(); err != nil { + return nil, err + } + return b.Bytes(), nil +} + +const maxCompressRatio = 100 + +func fetchAndDecompress(ctx context.Context, backend Backend, key string) ([]byte, error) { + data, err := backend.Fetch(ctx, key) + if err != nil { + return nil, err + } + r, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + return nil, err + } + maxSize := int64(len(data)) * maxCompressRatio + return io.ReadAll(io.LimitReader(r, maxSize)) +} diff --git a/internal/ctlog/metrics.go b/internal/ctlog/metrics.go index 787bfc8..0d7ea6b 100644 --- a/internal/ctlog/metrics.go +++ b/internal/ctlog/metrics.go @@ -15,12 +15,13 @@ type metrics struct { ReqInFlight *prometheus.GaugeVec ReqDuration *prometheus.SummaryVec - SeqCount *prometheus.CounterVec - SeqPoolSize prometheus.Summary - SeqDuration prometheus.Summary - SeqLeafSize prometheus.Summary - SeqTiles prometheus.Counter - SeqDataTileSize prometheus.Summary + SeqCount *prometheus.CounterVec + SeqPoolSize prometheus.Summary + SeqDuration prometheus.Summary + SeqLeafSize prometheus.Summary + SeqTiles prometheus.Counter + SeqDataTileSize prometheus.Summary + SeqDataTileGzipSize prometheus.Summary TreeTime prometheus.Gauge TreeSize prometheus.Gauge @@ -115,6 +116,15 @@ func initMetrics() metrics { AgeBuckets: 6, }, ), + SeqDataTileGzipSize: prometheus.NewSummary( + prometheus.SummaryOpts{ + Name: "sequencing_data_tiles_gzip_bytes", + Help: "Compressed size of uploaded data tiles, including partials.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + MaxAge: 1 * time.Minute, + AgeBuckets: 6, + }, + ), TreeTime: prometheus.NewGauge( prometheus.GaugeOpts{ diff --git a/internal/ctlog/s3.go b/internal/ctlog/s3.go index 88944e0..5d41b8f 100644 --- a/internal/ctlog/s3.go +++ b/internal/ctlog/s3.go @@ -2,7 +2,6 @@ package ctlog import ( "bytes" - "compress/gzip" "context" "errors" "fmt" @@ -26,20 +25,12 @@ type S3Backend struct { keyPrefix string metrics []prometheus.Collector uploadSize prometheus.Summary - compressRatio prometheus.Summary hedgeRequests prometheus.Counter hedgeWins prometheus.Counter log *slog.Logger } func NewS3Backend(ctx context.Context, region, bucket, endpoint, keyPrefix string, l *slog.Logger) (*S3Backend, error) { - counter := prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "s3_requests_total", - Help: "S3 HTTP requests performed, by method and response code.", - }, - []string{"method", "code"}, - ) duration := prometheus.NewSummaryVec( prometheus.SummaryOpts{ Name: "s3_request_duration_seconds", @@ -60,20 +51,12 @@ func NewS3Backend(ctx context.Context, region, bucket, endpoint, keyPrefix strin uploadSize := prometheus.NewSummary( prometheus.SummaryOpts{ Name: "s3_upload_size_bytes", - Help: "S3 (compressed) body size in bytes for object puts.", + Help: "S3 body size in bytes for object puts.", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, MaxAge: 1 * time.Minute, AgeBuckets: 6, }, ) - compressRatio := prometheus.NewSummary( - prometheus.SummaryOpts{ - Name: "s3_compress_ratio", - Help: "Ratio of compressed to uncompressed body size for compressible object puts.", - MaxAge: 1 * time.Minute, - AgeBuckets: 6, - }, - ) hedgeRequests := prometheus.NewCounter( prometheus.CounterOpts{ Name: "s3_hedges_total", @@ -88,7 +71,6 @@ func NewS3Backend(ctx context.Context, region, bucket, endpoint, keyPrefix strin ) transport := http.RoundTripper(http.DefaultTransport.(*http.Transport).Clone()) - transport = promhttp.InstrumentRoundTripperCounter(counter, transport) transport = promhttp.InstrumentRoundTripperDuration(duration, transport) cfg, err := config.LoadDefaultConfig(ctx) @@ -111,12 +93,10 @@ func NewS3Backend(ctx context.Context, region, bucket, endpoint, keyPrefix strin o.Logger = awsLogger{log: l} o.ClientLogMode = aws.LogRequest | aws.LogResponse | aws.LogRetries }), - bucket: bucket, - keyPrefix: keyPrefix, - metrics: []prometheus.Collector{counter, duration, - uploadSize, compressRatio, hedgeRequests, hedgeWins}, + bucket: bucket, + keyPrefix: keyPrefix, + metrics: []prometheus.Collector{duration, uploadSize, hedgeRequests, hedgeWins}, uploadSize: uploadSize, - compressRatio: compressRatio, hedgeRequests: hedgeRequests, hedgeWins: hedgeWins, log: l, @@ -159,17 +139,7 @@ func (s *S3Backend) Upload(ctx context.Context, key string, data []byte, opts *U contentType = aws.String(opts.ContentType) } var contentEncoding *string - if opts != nil && opts.Compress { - b := &bytes.Buffer{} - w := gzip.NewWriter(b) - if _, err := w.Write(data); err != nil { - return fmtErrorf("failed to compress %q: %w", key, err) - } - if err := w.Close(); err != nil { - return fmtErrorf("failed to compress %q: %w", key, err) - } - s.compressRatio.Observe(float64(b.Len()) / float64(len(data))) - data = b.Bytes() + if opts != nil && opts.Compressed { contentEncoding = aws.String("gzip") } var cacheControl *string @@ -210,7 +180,7 @@ func (s *S3Backend) Upload(ctx context.Context, key string, data []byte, opts *U cancel(errors.New("competing request succeeded")) } s.log.DebugContext(ctx, "S3 PUT", "key", key, "size", len(data), - "compress", contentEncoding != nil, "type", *contentType, + "compressed", contentEncoding != nil, "type", *contentType, "immutable", cacheControl != nil, "elapsed", time.Since(start), "err", err) s.uploadSize.Observe(float64(len(data))) @@ -232,14 +202,7 @@ func (s *S3Backend) Fetch(ctx context.Context, key string) ([]byte, error) { defer out.Body.Close() s.log.DebugContext(ctx, "S3 GET", "key", key, "size", out.ContentLength, "encoding", out.ContentEncoding) - body := out.Body - if out.ContentEncoding != nil && *out.ContentEncoding == "gzip" { - body, err = gzip.NewReader(out.Body) - if err != nil { - return nil, fmtErrorf("failed to decompress %q from S3: %w", key, err) - } - } - data, err := io.ReadAll(body) + data, err := io.ReadAll(out.Body) if err != nil { return nil, fmtErrorf("failed to read %q from S3: %w", key, err) } diff --git a/internal/ctlog/testlog_test.go b/internal/ctlog/testlog_test.go index 3f14706..b3956dc 100644 --- a/internal/ctlog/testlog_test.go +++ b/internal/ctlog/testlog_test.go @@ -2,6 +2,7 @@ package ctlog_test import ( "bytes" + "compress/gzip" "context" "crypto/ecdsa" "crypto/ed25519" @@ -215,6 +216,10 @@ func (tl *TestLog) CheckLog(size int64) (sthTimestamp int64) { } b, err := tl.Config.Backend.Fetch(context.Background(), sunlight.TilePath(tile)) fatalIfErr(t, err) + r, err := gzip.NewReader(bytes.NewReader(b)) + fatalIfErr(t, err) + b, err = io.ReadAll(r) + fatalIfErr(t, err) for i := 0; i < tile.W; i++ { e, rest, err := sunlight.ReadTileLeaf(b) if err != nil { From 0cc15768fe5a058b0e7439f96ebadc5759e1816a Mon Sep 17 00:00:00 2001 From: Filippo Valsorda Date: Tue, 22 Apr 2025 15:17:22 +0200 Subject: [PATCH 02/10] all: add local POSIX filesystem backend Closes #20 --- README.md | 5 ++- cmd/sunlight/main.go | 25 ++++++++++-- internal/ctlog/local.go | 67 ++++++++++++++++++++++++++++++++ internal/durable/path.go | 83 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 175 insertions(+), 5 deletions(-) create mode 100644 internal/ctlog/local.go create mode 100644 internal/durable/path.go diff --git a/README.md b/README.md index ca4e94d..394b149 100644 --- a/README.md +++ b/README.md @@ -53,9 +53,10 @@ operating a Sunlight instance: [#11](https://github.com/FiloSottile/sunlight/issues/11)) or overwriting protection (automatically enabled client-side on Tigris). - Eventual consistency is acceptable for this backend. + Eventual consistency is acceptable for this backend, but it must provide + strong durability guarantees. - Currently, S3 and S3-compatible APIs are supported. + Currently, S3-compatible APIs and local POSIX filesystems are supported. * A per-log deduplication cache, to return existing SCTs for previously submitted (pre-)certificates. diff --git a/cmd/sunlight/main.go b/cmd/sunlight/main.go index afea74b..53339ca 100644 --- a/cmd/sunlight/main.go +++ b/cmd/sunlight/main.go @@ -200,6 +200,12 @@ type LogConfig struct { // going to be treated like a directory in many tools using S3. S3KeyPrefix string + // LocalDirectory is the path to a local directory where the log will store + // its data. It must be dedicated to this specific log instance. + // + // Only one of S3Bucket or LocalDirectory can be set at the same time. + LocalDirectory string + // NotAfterStart is the start of the validity range for certificates // accepted by this log instance, as and RFC 3339 date. NotAfterStart string @@ -347,9 +353,22 @@ func main() { slog.String("log", lc.ShortName), })) - b, err := ctlog.NewS3Backend(ctx, lc.S3Region, lc.S3Bucket, lc.S3Endpoint, lc.S3KeyPrefix, logger) - if err != nil { - fatalError(logger, "failed to create backend", "err", err) + var b ctlog.Backend + switch { + case lc.S3Bucket != "" && lc.LocalDirectory != "": + fatalError(logger, "only one of S3Bucket or LocalDirectory can be set at the same time") + case lc.S3Bucket != "": + b, err = ctlog.NewS3Backend(ctx, lc.S3Region, lc.S3Bucket, lc.S3Endpoint, lc.S3KeyPrefix, logger) + if err != nil { + fatalError(logger, "failed to create backend", "err", err) + } + case lc.LocalDirectory != "": + b, err = ctlog.NewLocalBackend(ctx, lc.LocalDirectory, logger) + if err != nil { + fatalError(logger, "failed to create backend", "err", err) + } + default: + fatalError(logger, "neither S3Bucket nor LocalDirectory are set, one must be used") } r := x509util.NewPEMCertPool() diff --git a/internal/ctlog/local.go b/internal/ctlog/local.go new file mode 100644 index 0000000..7b04584 --- /dev/null +++ b/internal/ctlog/local.go @@ -0,0 +1,67 @@ +package ctlog + +import ( + "context" + "log/slog" + "os" + "path/filepath" + "time" + + "filippo.io/sunlight/internal/durable" + "github.com/prometheus/client_golang/prometheus" +) + +type LocalBackend struct { + dir string + metrics []prometheus.Collector + log *slog.Logger +} + +func NewLocalBackend(ctx context.Context, dir string, l *slog.Logger) (*LocalBackend, error) { + if fi, err := os.Stat(dir); err != nil { + return nil, fmtErrorf("failed to stat local backend directory %q: %w", dir, err) + } else if !fi.IsDir() { + return nil, fmtErrorf("local backend path %q is not a directory", dir) + } + return &LocalBackend{ + dir: dir, + metrics: []prometheus.Collector{}, + log: l, + }, nil +} + +var _ Backend = &LocalBackend{} + +func (s *LocalBackend) Upload(ctx context.Context, key string, data []byte, opts *UploadOptions) error { + start := time.Now() + name, err := filepath.Localize(key) + if err != nil { + return fmtErrorf("failed to localize key %q as a filesystem path: %w", key, err) + } + path := filepath.Join(s.dir, name) + if err := durable.MkdirAll(filepath.Dir(path), 0755); err != nil { + return fmtErrorf("failed to create directory %q: %w", filepath.Dir(path), err) + } + var perms os.FileMode = 0644 + if opts != nil && opts.Immutable { + perms = 0444 + } + err = durable.WriteFile(path, data, perms) + s.log.DebugContext(ctx, "local file write", "key", key, "size", len(data), + "path", path, "perms", perms, "elapsed", time.Since(start), "err", err) + return err +} + +func (s *LocalBackend) Fetch(ctx context.Context, key string) ([]byte, error) { + name, err := filepath.Localize(key) + if err != nil { + return nil, fmtErrorf("failed to localize key %q as a filesystem path: %w", key, err) + } + path := filepath.Join(s.dir, name) + s.log.DebugContext(ctx, "local file read", "key", key, "path", path) + return os.ReadFile(path) +} + +func (s *LocalBackend) Metrics() []prometheus.Collector { + return s.metrics +} diff --git a/internal/durable/path.go b/internal/durable/path.go new file mode 100644 index 0000000..d316191 --- /dev/null +++ b/internal/durable/path.go @@ -0,0 +1,83 @@ +// Package durable provides equivalent functionality to the os package, but with +// additional guarantees for durability based on [os.File.Sync]. +package durable + +import ( + "os" + "path/filepath" + "syscall" +) + +// WriteFile behaves like [os.WriteFile], but it also syncs the file contents +// and the directory containing the file to disk. +func WriteFile(name string, data []byte, perm os.FileMode) error { + f, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) + if err != nil { + return err + } + _, err = f.Write(data) + if err == nil { + err = f.Sync() + } + if err1 := f.Close(); err1 != nil && err == nil { + err = err1 + } + if err == nil { + // Note that this is not necessary if the file was not just created. + // Our use of this function is mostly for creating new files, so we + // don't optimize for the case where the file already exists. + err = fsyncDirectory(name) + } + return err +} + +// MkdirAll behaves like [os.MkdirAll], but it also syncs the directory +// containing each created directory to disk. +func MkdirAll(path string, perm os.FileMode) error { + if dir, err := os.Stat(path); err == nil { + if dir.IsDir() { + return nil + } + return &os.PathError{Op: "mkdir", Path: path, Err: syscall.ENOTDIR} + } + + path = filepath.Clean(path) + if parent := filepath.Dir(path); parent != path && parent != filepath.VolumeName(path) { + if err := MkdirAll(parent, perm); err != nil { + return err + } + } + + return Mkdir(path, perm) +} + +// Mkdir behaves like [os.Mkdir], but it also syncs the directory containing +// the created directory to disk. +func Mkdir(path string, perm os.FileMode) (err error) { + defer func() { + if err == nil { + err = fsyncDirectory(path) + } + }() + + // Do we need to sync path itself? Presumably not, since otherwise the + // synced parent directory would have a dangling entry. + return os.Mkdir(path, perm) +} + +// fsyncDirectory syncs the directory in which the directory entry for path +// resides. Otherwise, after a power failure the file at path may not exist. +// See https://github.com/sqlite/sqlite/blob/024818be2/src/os_unix.c#L3739-L3799 +// for confirmation that operating on a file, then opening the directory and +// calling fsync on it is the correct sequence of operations. +func fsyncDirectory(path string) error { + parent, err := os.Open(filepath.Dir(path)) + if err != nil { + return err + } + err = parent.Sync() + if err1 := parent.Close(); err1 != nil && err == nil { + err = err1 + } + return err +} From 7ac5d8e8794b8ed8cf49846c1330642872172838 Mon Sep 17 00:00:00 2001 From: Filippo Valsorda Date: Tue, 22 Apr 2025 17:07:47 +0200 Subject: [PATCH 03/10] internal/durable: improve resiliency to write-through errors --- internal/durable/path.go | 85 +++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 37 deletions(-) diff --git a/internal/durable/path.go b/internal/durable/path.go index d316191..7b7da32 100644 --- a/internal/durable/path.go +++ b/internal/durable/path.go @@ -1,5 +1,8 @@ // Package durable provides equivalent functionality to the os package, but with // additional guarantees for durability based on [os.File.Sync]. +// +// See https://wiki.postgresql.org/wiki/Fsync_Errors for a discussion of the +// reliability of fsync. package durable import ( @@ -10,30 +13,35 @@ import ( // WriteFile behaves like [os.WriteFile], but it also syncs the file contents // and the directory containing the file to disk. -func WriteFile(name string, data []byte, perm os.FileMode) error { +func WriteFile(name string, data []byte, perm os.FileMode) (err error) { + // After the file, sync the directory in which the entry resides. Otherwise, + // after a power failure the file may not exist. + // + // Keep the parent open during the operation, to prevent it from being + // evicted from the inode cache, which can discard a write-through error. + // + // Note that this is not necessary if the file already exists. Our use of + // this function is mostly for creating new files, so we don't optimize for + // the case where the file already exists. + parent, err := os.Open(filepath.Dir(name)) + if err != nil { + return &os.PathError{Op: "write", Path: name, Err: err} + } + defer fsyncAndClose(parent, &err) + f, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) if err != nil { return err } + defer fsyncAndClose(f, &err) + _, err = f.Write(data) - if err == nil { - err = f.Sync() - } - if err1 := f.Close(); err1 != nil && err == nil { - err = err1 - } - if err == nil { - // Note that this is not necessary if the file was not just created. - // Our use of this function is mostly for creating new files, so we - // don't optimize for the case where the file already exists. - err = fsyncDirectory(name) - } return err } -// MkdirAll behaves like [os.MkdirAll], but it also syncs the directory -// containing each created directory to disk. -func MkdirAll(path string, perm os.FileMode) error { +// MkdirAll behaves like [os.MkdirAll], but it also syncs each affected +// directory to disk. +func MkdirAll(path string, perm os.FileMode) (err error) { if dir, err := os.Stat(path); err == nil { if dir.IsDir() { return nil @@ -48,36 +56,39 @@ func MkdirAll(path string, perm os.FileMode) error { } } + // This is a little inefficient because if we are creating two levels of + // directories, we will sync the intermediate directory twice. That's going + // to be rare in our use case, so we don't optimize for it. return Mkdir(path, perm) } // Mkdir behaves like [os.Mkdir], but it also syncs the directory containing // the created directory to disk. func Mkdir(path string, perm os.FileMode) (err error) { - defer func() { - if err == nil { - err = fsyncDirectory(path) - } - }() - - // Do we need to sync path itself? Presumably not, since otherwise the - // synced parent directory would have a dangling entry. - return os.Mkdir(path, perm) -} - -// fsyncDirectory syncs the directory in which the directory entry for path -// resides. Otherwise, after a power failure the file at path may not exist. -// See https://github.com/sqlite/sqlite/blob/024818be2/src/os_unix.c#L3739-L3799 -// for confirmation that operating on a file, then opening the directory and -// calling fsync on it is the correct sequence of operations. -func fsyncDirectory(path string) error { parent, err := os.Open(filepath.Dir(path)) if err != nil { + return &os.PathError{Op: "mkdir", Path: path, Err: err} + } + defer fsyncAndClose(parent, &err) + + if err := os.Mkdir(path, perm); err != nil { return err } - err = parent.Sync() - if err1 := parent.Close(); err1 != nil && err == nil { - err = err1 + + f, err := os.Open(path) + if err != nil { + return &os.PathError{Op: "mkdir", Path: path, Err: err} + } + defer fsyncAndClose(f, &err) + + return nil +} + +func fsyncAndClose(f *os.File, err *error) { + if *err == nil { + *err = f.Sync() + } + if err1 := f.Close(); err1 != nil && *err == nil { + *err = err1 } - return err } From 3fe6d4de6f906a44571c9b64f7483466db180a93 Mon Sep 17 00:00:00 2001 From: Filippo Valsorda Date: Wed, 23 Apr 2025 15:47:54 +0200 Subject: [PATCH 04/10] internal/durable: make WriteFile atomic --- internal/durable/path.go | 41 +++++++++++++++++++++++++++------------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/internal/durable/path.go b/internal/durable/path.go index 7b7da32..5ef514b 100644 --- a/internal/durable/path.go +++ b/internal/durable/path.go @@ -1,8 +1,13 @@ // Package durable provides equivalent functionality to the os package, but with // additional guarantees for durability based on [os.File.Sync]. // -// See https://wiki.postgresql.org/wiki/Fsync_Errors for a discussion of the -// reliability of fsync. +// Note that after an fsync error, files and caches may be in a number of +// unreliable states, such that the only potentially safe course of action is +// starting over without reading back the failed files. +// +// See also https://www.usenix.org/conference/atc20/presentation/rebello, +// https://wiki.postgresql.org/wiki/Fsync_Errors, and +// https://danluu.com/deconstruct-files/. package durable import ( @@ -11,26 +16,36 @@ import ( "syscall" ) -// WriteFile behaves like [os.WriteFile], but it also syncs the file contents -// and the directory containing the file to disk. +// WriteFile behaves somewhat like [os.WriteFile], but it also syncs the file +// contents and the directory entry to disk before returning, and prevents +// partial writes from being visible. +// +// If the file already exists, it takes the specified permissions. func WriteFile(name string, data []byte, perm os.FileMode) (err error) { // After the file, sync the directory in which the entry resides. Otherwise, - // after a power failure the file may not exist. + // after a power failure the file may not exist or the rename may rollback. // // Keep the parent open during the operation, to prevent it from being // evicted from the inode cache, which can discard a write-through error. - // - // Note that this is not necessary if the file already exists. Our use of - // this function is mostly for creating new files, so we don't optimize for - // the case where the file already exists. - parent, err := os.Open(filepath.Dir(name)) + parent, err := os.OpenFile(filepath.Dir(name), os.O_RDONLY|syscall.O_DIRECTORY, 0) if err != nil { - return &os.PathError{Op: "write", Path: name, Err: err} + return &os.PathError{Op: "durablewrite", Path: name, Err: err} } defer fsyncAndClose(parent, &err) - f, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) + f, err := os.CreateTemp(filepath.Dir(name), "."+filepath.Base(name)) if err != nil { + return &os.PathError{Op: "durablewrite", Path: name, Err: err} + } + defer func() { + if err == nil { + err = os.Rename(f.Name(), name) + } else { + os.Remove(f.Name()) + } + }() + if err := f.Chmod(perm); err != nil { + f.Close() return err } defer fsyncAndClose(f, &err) @@ -65,7 +80,7 @@ func MkdirAll(path string, perm os.FileMode) (err error) { // Mkdir behaves like [os.Mkdir], but it also syncs the directory containing // the created directory to disk. func Mkdir(path string, perm os.FileMode) (err error) { - parent, err := os.Open(filepath.Dir(path)) + parent, err := os.OpenFile(filepath.Dir(path), os.O_RDONLY|syscall.O_DIRECTORY, 0) if err != nil { return &os.PathError{Op: "mkdir", Path: path, Err: err} } From 24d931c85d888acf4e6b47fcc21f45657df4f5a6 Mon Sep 17 00:00:00 2001 From: Filippo Valsorda Date: Wed, 23 Apr 2025 16:02:14 +0200 Subject: [PATCH 05/10] internal/ctlog: allow idempotent overwrites in local backend --- internal/ctlog/local.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/internal/ctlog/local.go b/internal/ctlog/local.go index 7b04584..40fad4b 100644 --- a/internal/ctlog/local.go +++ b/internal/ctlog/local.go @@ -1,7 +1,10 @@ package ctlog import ( + "bytes" "context" + "errors" + "io" "log/slog" "os" "path/filepath" @@ -45,6 +48,14 @@ func (s *LocalBackend) Upload(ctx context.Context, key string, data []byte, opts var perms os.FileMode = 0644 if opts != nil && opts.Immutable { perms = 0444 + if f, err := os.Open(path); err == nil { + defer f.Close() + if err := compareFile(f, data); err != nil { + return fmtErrorf("immutable file %q already exists and does not match: %w", path, err) + } + s.log.WarnContext(ctx, "local file already exists", "key", key, "path", path) + return nil + } } err = durable.WriteFile(path, data, perms) s.log.DebugContext(ctx, "local file write", "key", key, "size", len(data), @@ -65,3 +76,23 @@ func (s *LocalBackend) Fetch(ctx context.Context, key string) ([]byte, error) { func (s *LocalBackend) Metrics() []prometheus.Collector { return s.metrics } + +func compareFile(f *os.File, data []byte) error { + b := make([]byte, min(len(data), 16384)) + for { + n, err := f.Read(b) + if err != nil && err != io.EOF { + return err + } + if n > len(data) || !bytes.Equal(b[:n], data[:n]) { + return errors.New("file contents do not match") + } + data = data[n:] + if err == io.EOF { + if len(data) == 0 { + return nil + } + return errors.New("file contents do not match") + } + } +} From 7cc91b204a7cf454f2d463954afaed3ec5596879 Mon Sep 17 00:00:00 2001 From: Filippo Valsorda Date: Wed, 23 Apr 2025 16:09:50 +0200 Subject: [PATCH 06/10] internal/ctlog: add latency metric to local backend --- internal/ctlog/local.go | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/internal/ctlog/local.go b/internal/ctlog/local.go index 40fad4b..40e5370 100644 --- a/internal/ctlog/local.go +++ b/internal/ctlog/local.go @@ -15,28 +15,40 @@ import ( ) type LocalBackend struct { - dir string - metrics []prometheus.Collector - log *slog.Logger + dir string + metrics []prometheus.Collector + duration prometheus.SummaryVec + log *slog.Logger } func NewLocalBackend(ctx context.Context, dir string, l *slog.Logger) (*LocalBackend, error) { + duration := prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Name: "fs_op_duration_seconds", + Help: "Overall local backend operation latency.", + Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.025, 0.9: 0.01, 0.99: 0.001}, + MaxAge: 1 * time.Minute, + AgeBuckets: 6, + }, + []string{"method"}, + ) if fi, err := os.Stat(dir); err != nil { return nil, fmtErrorf("failed to stat local backend directory %q: %w", dir, err) } else if !fi.IsDir() { return nil, fmtErrorf("local backend path %q is not a directory", dir) } return &LocalBackend{ - dir: dir, - metrics: []prometheus.Collector{}, - log: l, + dir: dir, + metrics: []prometheus.Collector{duration}, + duration: *duration, + log: l, }, nil } var _ Backend = &LocalBackend{} func (s *LocalBackend) Upload(ctx context.Context, key string, data []byte, opts *UploadOptions) error { - start := time.Now() + defer prometheus.NewTimer(s.duration.WithLabelValues("upload")).ObserveDuration() name, err := filepath.Localize(key) if err != nil { return fmtErrorf("failed to localize key %q as a filesystem path: %w", key, err) @@ -57,13 +69,13 @@ func (s *LocalBackend) Upload(ctx context.Context, key string, data []byte, opts return nil } } - err = durable.WriteFile(path, data, perms) - s.log.DebugContext(ctx, "local file write", "key", key, "size", len(data), - "path", path, "perms", perms, "elapsed", time.Since(start), "err", err) - return err + s.log.DebugContext(ctx, "local file write", "key", key, + "size", len(data), "path", path, "perms", perms) + return durable.WriteFile(path, data, perms) } func (s *LocalBackend) Fetch(ctx context.Context, key string) ([]byte, error) { + defer prometheus.NewTimer(s.duration.WithLabelValues("fetch")).ObserveDuration() name, err := filepath.Localize(key) if err != nil { return nil, fmtErrorf("failed to localize key %q as a filesystem path: %w", key, err) From 19382aadb635686dc830cf12754913b174035596 Mon Sep 17 00:00:00 2001 From: Filippo Valsorda Date: Wed, 23 Apr 2025 17:00:44 +0200 Subject: [PATCH 07/10] internal/ctlog: set filesystem immutable flag on immutable files --- internal/ctlog/local.go | 3 +++ internal/ctlog/local_compat.go | 6 ++++++ internal/ctlog/local_linux.go | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 43 insertions(+) create mode 100644 internal/ctlog/local_compat.go create mode 100644 internal/ctlog/local_linux.go diff --git a/internal/ctlog/local.go b/internal/ctlog/local.go index 40e5370..f929f89 100644 --- a/internal/ctlog/local.go +++ b/internal/ctlog/local.go @@ -68,6 +68,9 @@ func (s *LocalBackend) Upload(ctx context.Context, key string, data []byte, opts s.log.WarnContext(ctx, "local file already exists", "key", key, "path", path) return nil } + // As a best effort, try to set the immutable flag if supported by the + // OS, and the process has the appropriate capabilities. + defer setImmutable(path) } s.log.DebugContext(ctx, "local file write", "key", key, "size", len(data), "path", path, "perms", perms) diff --git a/internal/ctlog/local_compat.go b/internal/ctlog/local_compat.go new file mode 100644 index 0000000..07d213a --- /dev/null +++ b/internal/ctlog/local_compat.go @@ -0,0 +1,6 @@ +//go:build !linux || (!amd64 && !arm64) + +package ctlog + +func setImmutable(name string) {} +func unsetImmutable(name string) {} diff --git a/internal/ctlog/local_linux.go b/internal/ctlog/local_linux.go new file mode 100644 index 0000000..e17b71c --- /dev/null +++ b/internal/ctlog/local_linux.go @@ -0,0 +1,34 @@ +//go:build amd64 || arm64 + +package ctlog + +import ( + "os" + "syscall" + "unsafe" +) + +const _FS_IOC_SETFLAGS = uintptr(0x40086602) +const _FS_IMMUTABLE_FL = 0x00000010 + +func setImmutable(name string) { + f, err := os.Open(name) + if err != nil { + return + } + defer f.Close() + setFlags(f, _FS_IMMUTABLE_FL) +} + +func unsetImmutable(name string) { + f, err := os.Open(name) + if err != nil { + return + } + defer f.Close() + setFlags(f, 0) +} + +func setFlags(f *os.File, flags int32) { + syscall.Syscall(syscall.SYS_IOCTL, f.Fd(), _FS_IOC_SETFLAGS, uintptr(unsafe.Pointer(&flags))) +} From 2ab93ca34f6103ca997b1ff0cdbe01645bae18bb Mon Sep 17 00:00:00 2001 From: Filippo Valsorda Date: Wed, 23 Apr 2025 17:02:07 +0200 Subject: [PATCH 08/10] internal/ctlog: automatically delete staging bundles from local backend --- internal/ctlog/ctlog.go | 16 +++++++++++++++- internal/ctlog/ctlog_test.go | 18 ++++++++++++++++++ internal/ctlog/local.go | 12 ++++++++++++ internal/ctlog/metrics.go | 8 ++++++++ internal/ctlog/s3.go | 6 ++++++ internal/ctlog/testlog_test.go | 28 +++++++++++++++++++++++++++- 6 files changed, 86 insertions(+), 2 deletions(-) diff --git a/internal/ctlog/ctlog.go b/internal/ctlog/ctlog.go index 915338b..9e21a72 100644 --- a/internal/ctlog/ctlog.go +++ b/internal/ctlog/ctlog.go @@ -367,6 +367,9 @@ type Backend interface { // Fetch returns the value for a key. Fetch(ctx context.Context, key string) ([]byte, error) + // Discard suggests to the backend that the key can be deleted. + Discard(ctx context.Context, key string) error + // Metrics returns the metrics to register for this log. The metrics should // not be shared by any other logs. Metrics() []prometheus.Collector @@ -384,6 +387,7 @@ type UploadOptions struct { // Immutable is true if the data is never changed after being uploaded. // Note that the same value may still be re-uploaded, and must succeed. + // [Backend.Discard] can still be used on immutable entries. Immutable bool } @@ -829,11 +833,11 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { if err != nil { return fmtErrorf("couldn't marshal staged uploads: %w", err) } + stagingPath := stagingPath(tree.Tree) // Don't upload an empty staging bundle, as it would have the same path of // the previous one, but different (empty) content. Note that LoadLog might // end up re-uploading the tiles from the previous one, which is harmless. if len(tileUploads) > 0 { - stagingPath := stagingPath(tree.Tree) gzipData, err := compress(stagedUploads) if err != nil { return fmtErrorf("couldn't compress staged uploads: %w", err) @@ -887,6 +891,16 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { return fmtErrorf("couldn't upload checkpoint to object storage: %w", err) } + // Once the checkpoint is uploaded to object storage, we can safely delete the + // staging bundle, if any. No need to fail the sequencing if this fails. + if len(tileUploads) > 0 { + if err := l.c.Backend.Discard(ctx, stagingPath); err != nil { + l.c.Log.ErrorContext(ctx, "staging bundle discard failed", + "tree_size", tree.N, "err", err) + l.m.StagingDiscardErrors.Inc() + } + } + // At this point if the cache put fails, there's no reason to return errors // to users. The only consequence of cache false negatives are duplicated // leaves anyway. In fact, an error might cause the clients to resumbit, diff --git a/internal/ctlog/ctlog_test.go b/internal/ctlog/ctlog_test.go index f39e9a9..04a720d 100644 --- a/internal/ctlog/ctlog_test.go +++ b/internal/ctlog/ctlog_test.go @@ -15,6 +15,7 @@ import ( "reflect" "slices" "strconv" + "strings" "sync/atomic" "testing" "time" @@ -204,6 +205,23 @@ func TestSequenceUploadPaths(t *testing.T) { if !reflect.DeepEqual(keys, expected) { t.Errorf("got %#v, expected %#v", keys, expected) } + + for _, key := range keys { + expectedImmutable := false + expectedDeleted := false + if key != "checkpoint" { + expectedImmutable = true + } + if strings.HasPrefix(key, "staging/") { + expectedDeleted = true + } + if tl.Config.Backend.(*MemoryBackend).del[key] != expectedDeleted { + t.Errorf("got deleted %v, expected %v for key %q", tl.Config.Backend.(*MemoryBackend).del[key], expectedDeleted, key) + } + if tl.Config.Backend.(*MemoryBackend).imm[key] != expectedImmutable { + t.Errorf("got immutable %v, expected %v for key %q", tl.Config.Backend.(*MemoryBackend).imm[key], expectedImmutable, key) + } + } } func TestDuplicates(t *testing.T) { diff --git a/internal/ctlog/local.go b/internal/ctlog/local.go index f929f89..f5e9998 100644 --- a/internal/ctlog/local.go +++ b/internal/ctlog/local.go @@ -88,6 +88,18 @@ func (s *LocalBackend) Fetch(ctx context.Context, key string) ([]byte, error) { return os.ReadFile(path) } +func (s *LocalBackend) Discard(ctx context.Context, key string) error { + defer prometheus.NewTimer(s.duration.WithLabelValues("discard")).ObserveDuration() + name, err := filepath.Localize(key) + if err != nil { + return fmtErrorf("failed to localize key %q as a filesystem path: %w", key, err) + } + path := filepath.Join(s.dir, name) + s.log.DebugContext(ctx, "local file delete", "key", key, "path", path) + unsetImmutable(path) + return os.Remove(path) +} + func (s *LocalBackend) Metrics() []prometheus.Collector { return s.metrics } diff --git a/internal/ctlog/metrics.go b/internal/ctlog/metrics.go index 0d7ea6b..f4726d0 100644 --- a/internal/ctlog/metrics.go +++ b/internal/ctlog/metrics.go @@ -38,6 +38,8 @@ type metrics struct { CacheGetDuration prometheus.Summary CachePutDuration prometheus.Summary CachePutErrors prometheus.Counter + + StagingDiscardErrors prometheus.Counter } func initMetrics() metrics { @@ -206,6 +208,12 @@ func initMetrics() metrics { Help: "Number of failed deduplication cache inserts.", }, ), + StagingDiscardErrors: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "staging_discard_errors_total", + Help: "Number of errors discarding staging entries.", + }, + ), } } diff --git a/internal/ctlog/s3.go b/internal/ctlog/s3.go index 5d41b8f..a225950 100644 --- a/internal/ctlog/s3.go +++ b/internal/ctlog/s3.go @@ -209,6 +209,12 @@ func (s *S3Backend) Fetch(ctx context.Context, key string) ([]byte, error) { return data, nil } +func (s *S3Backend) Discard(ctx context.Context, key string) error { + // For remote backends, DELETE requests are not cost-effective and a waste + // of latency. Lifecycle rules can be used to delete old staging bundles. + return nil +} + func (s *S3Backend) Metrics() []prometheus.Collector { return s.metrics } diff --git a/internal/ctlog/testlog_test.go b/internal/ctlog/testlog_test.go index b3956dc..bf3711a 100644 --- a/internal/ctlog/testlog_test.go +++ b/internal/ctlog/testlog_test.go @@ -449,6 +449,7 @@ type MemoryBackend struct { mu sync.Mutex m map[string][]byte imm map[string]bool + del map[string]bool uploads uint64 @@ -457,7 +458,7 @@ type MemoryBackend struct { func NewMemoryBackend(t testing.TB) *MemoryBackend { return &MemoryBackend{ - t: t, m: make(map[string][]byte), imm: make(map[string]bool), + t: t, m: make(map[string][]byte), imm: make(map[string]bool), del: make(map[string]bool), } } @@ -483,6 +484,9 @@ func (b *MemoryBackend) Upload(ctx context.Context, key string, data []byte, opt if b.imm[key] && !bytes.Equal(b.m[key], data) { b.t.Errorf("immutable key %q was modified", key) } + if b.del[key] { + b.t.Errorf("deleted key %q was re-uploaded", key) + } b.m[key] = data b.imm[key] = opts.Immutable return finalErr @@ -554,9 +558,31 @@ func (b *MemoryBackend) Fetch(ctx context.Context, key string) ([]byte, error) { if !ok { return nil, fmt.Errorf("key %q not found", key) } + if b.del[key] { + b.t.Errorf("deleted key %q was fetched", key) + return nil, fmt.Errorf("key %q not found", key) + } return data, nil } +func (b *MemoryBackend) Discard(ctx context.Context, key string) error { + if err := ctx.Err(); err != nil { + return err + } + b.mu.Lock() + defer b.mu.Unlock() + if _, ok := b.m[key]; !ok { + b.t.Errorf("deleted missing key %q", key) + return fmt.Errorf("key %q not found", key) + } + if b.del[key] { + b.t.Errorf("deleted key %q was deleted again", key) + return fmt.Errorf("key %q not found", key) + } + b.del[key] = true + return nil +} + func (b *MemoryBackend) Metrics() []prometheus.Collector { return nil } type MemoryLockBackend struct { From 9e58e829fc3ba376d265ae9ec3d6bd1ee2efe586 Mon Sep 17 00:00:00 2001 From: Filippo Valsorda Date: Thu, 24 Apr 2025 11:11:25 +0200 Subject: [PATCH 09/10] internal/durable: delete tmp file if rename fails --- internal/durable/path.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/durable/path.go b/internal/durable/path.go index 5ef514b..bc8f146 100644 --- a/internal/durable/path.go +++ b/internal/durable/path.go @@ -37,13 +37,14 @@ func WriteFile(name string, data []byte, perm os.FileMode) (err error) { if err != nil { return &os.PathError{Op: "durablewrite", Path: name, Err: err} } - defer func() { + defer func(tmpname string) { if err == nil { - err = os.Rename(f.Name(), name) - } else { - os.Remove(f.Name()) + err = os.Rename(tmpname, name) } - }() + if err != nil { + os.Remove(tmpname) + } + }(f.Name()) if err := f.Chmod(perm); err != nil { f.Close() return err From d3ddb50ac09564d45a28b37279ae99a7242ba12f Mon Sep 17 00:00:00 2001 From: Filippo Valsorda Date: Thu, 24 Apr 2025 11:27:11 +0200 Subject: [PATCH 10/10] internal/durable: add strace samples --- internal/durable/path.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/internal/durable/path.go b/internal/durable/path.go index bc8f146..d504291 100644 --- a/internal/durable/path.go +++ b/internal/durable/path.go @@ -16,6 +16,19 @@ import ( "syscall" ) +// The sequence of syscalls for a happy path call to WriteFile looks like this: +// +// openat(AT_FDCWD, "dir", O_RDONLY|O_CLOEXEC|O_DIRECTORY) = 3 +// openat(AT_FDCWD, "dir/.file4057926228", O_RDWR|O_CREAT|O_EXCL|O_CLOEXEC, 0600) = 6 +// fchmod(6, 0644) = 0 +// write(6, "hello world", 11) = 11 +// fsync(6) = 0 +// close(6) = 0 +// renameat(AT_FDCWD, "dir/.file4057926228", AT_FDCWD, "dir/file") = 0 +// fsync(3) = 0 +// close(3) = 0 +// + // WriteFile behaves somewhat like [os.WriteFile], but it also syncs the file // contents and the directory entry to disk before returning, and prevents // partial writes from being visible. @@ -78,6 +91,17 @@ func MkdirAll(path string, perm os.FileMode) (err error) { return Mkdir(path, perm) } +// The sequence of syscalls for a happy path call to Mkdir looks like this: +// +// openat(AT_FDCWD, "dir1", O_RDONLY|O_CLOEXEC|O_DIRECTORY) = 3 +// mkdirat(AT_FDCWD, "dir1/dir2", 0755) = 0 +// openat(AT_FDCWD, "dir1/dir2", O_RDONLY|O_CLOEXEC) = 6 +// fsync(6) = 0 +// close(6) = 0 +// fsync(3) = 0 +// close(3) = 0 +// + // Mkdir behaves like [os.Mkdir], but it also syncs the directory containing // the created directory to disk. func Mkdir(path string, perm os.FileMode) (err error) {