From 17e80310b8f22bf77124c3c55526b3b112db1e09 Mon Sep 17 00:00:00 2001 From: Kirill Kapkov Date: Tue, 14 Apr 2026 11:26:15 +0100 Subject: [PATCH] Use constant 8 zstd workers Flush zstd writer between files to bound memory with multi-threaded compression. Without periodic flushes, multi-threaded zstd buffers input asynchronously, causing memory usage to grow up to the full uncompressed layer size. Declare cpu:8 execution_requirements for zstd OCI layer actions. ocitool create-layer spawns 8 zstd worker threads internally. Without declaring this cost, Bazel over-subscribes the machine by scheduling too many concurrent zstd layer builds. Add conditional execution_requirements so Bazel's scheduler accounts for the actual CPU usage (8 for zstd, default 1 for gzip). --- go/cmd/ocitool/createlayer_cmd.go | 30 +++++++++++++++++++++++++++++- oci/layer.bzl | 1 + 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/go/cmd/ocitool/createlayer_cmd.go b/go/cmd/ocitool/createlayer_cmd.go index 98c488c..de01e0c 100644 --- a/go/cmd/ocitool/createlayer_cmd.go +++ b/go/cmd/ocitool/createlayer_cmd.go @@ -22,6 +22,13 @@ import ( "github.com/urfave/cli/v2" ) +const zstdWorkers = 8 + +// Noticeably faster (~20-40%) than default '5', with ony ~2% larger resulting image size. +// And still ~17% smaller image size compared to gzip. +// See comparison https://datadoghq.atlassian.net/wiki/x/EoN6hwE +const zstdCompressionLevel = 3 + func CreateLayerCmd(c *cli.Context) error { config, err := parseConfig(c) if err != nil { @@ -37,8 +44,15 @@ func CreateLayerCmd(c *cli.Context) error { digester := digest.SHA256.Digester() wc := ociutil.NewWriterCounter(io.MultiWriter(out, digester.Hash())) + // flusher drains the compressor's internal buffers between files. + // With multi-threaded zstd (workers > 0), Write() is asynchronous and buffers + // input until workers process it. Without periodic flushes, memory usage can + // grow to the full uncompressed layer size. + type flusher interface{ Flush() error } + var compressWriter io.Writer var compressCloser io.Closer + var compressFlusher flusher var mediaType string switch config.CompressionMethod { case "gzip": @@ -47,9 +61,13 @@ func CreateLayerCmd(c *cli.Context) error { compressCloser = gzipWriter mediaType = ocispec.MediaTypeImageLayerGzip case "zstd": - zstdWriter := zstd.NewWriter(wc) + zstdWriter := zstd.NewWriterLevel(wc, zstdCompressionLevel) + if err := zstdWriter.SetNbWorkers(zstdWorkers); err != nil { + return fmt.Errorf("failed to set zstd workers: %w", err) + } compressWriter = zstdWriter compressCloser = zstdWriter + compressFlusher = zstdWriter mediaType = ocispec.MediaTypeImageLayerZstd default: return fmt.Errorf("uknown compress method %s", config.CompressionMethod) @@ -91,6 +109,11 @@ func CreateLayerCmd(c *cli.Context) error { if err != nil { return err } + if compressFlusher != nil { + if err := compressFlusher.Flush(); err != nil { + return fmt.Errorf("failed to flush compressor: %w", err) + } + } } hostPaths := make([]string, 0, len(config.FileMapping)) @@ -152,6 +175,11 @@ func CreateLayerCmd(c *cli.Context) error { ); err != nil { return err } + if compressFlusher != nil { + if err := compressFlusher.Flush(); err != nil { + return fmt.Errorf("failed to flush compressor: %w", err) + } + } } tarPaths := make([]string, 0, len(config.SymlinkMapping)) diff --git a/oci/layer.bzl b/oci/layer.bzl index e9d906b..1104395 100644 --- a/oci/layer.bzl +++ b/oci/layer.bzl @@ -77,6 +77,7 @@ def _impl(ctx): ["--symlink={}={}".format(k, v) for k, v in ctx.attr.symlinks.items()], inputs = ctx.files.files + ctx.files.file_map, mnemonic = "OCIImageCreateLayer", + execution_requirements = {"cpu:8": ""} if compression_method == "zstd" else {}, outputs = [ descriptor_file, output_file,