From 87ca533b832f6bee0bdc5736a04437800b66decd Mon Sep 17 00:00:00 2001 From: Stefan Prodan Date: Thu, 4 Sep 2025 16:18:24 +0300 Subject: [PATCH] Refactor controller to use `fluxcd/pkg/artifact` Signed-off-by: Stefan Prodan --- go.mod | 13 +- go.sum | 22 +- internal/controller/bucket_controller.go | 4 +- internal/controller/bucket_controller_test.go | 4 +- .../controller/gitrepository_controller.go | 3 +- .../gitrepository_controller_fuzz_test.go | 2 +- .../gitrepository_controller_test.go | 4 +- internal/controller/helmchart_controller.go | 2 +- .../controller/helmchart_controller_test.go | 94 +- .../controller/helmrepository_controller.go | 4 +- .../helmrepository_controller_test.go | 4 +- .../controller/ocirepository_controller.go | 2 +- .../ocirepository_controller_test.go | 4 +- internal/controller/suite_test.go | 23 +- internal/digest/digest.go | 52 -- internal/digest/digest_test.go | 71 -- internal/digest/writer.go | 71 -- internal/digest/writer_test.go | 128 --- internal/storage/storage.go | 733 --------------- internal/storage/storage_test.go | 864 ------------------ main.go | 148 +-- 21 files changed, 163 insertions(+), 2089 deletions(-) delete mode 100644 internal/digest/digest.go delete mode 100644 internal/digest/digest_test.go delete mode 100644 internal/digest/writer.go delete mode 100644 internal/digest/writer_test.go delete mode 100644 internal/storage/storage.go delete mode 100644 internal/storage/storage_test.go diff --git a/go.mod b/go.mod index c70642968..25bb2aa18 100644 --- a/go.mod +++ b/go.mod @@ -18,12 +18,13 @@ require ( github.com/Masterminds/semver/v3 v3.4.0 github.com/cyphar/filepath-securejoin v0.4.1 github.com/distribution/distribution/v3 v3.0.0 - github.com/docker/cli v28.3.3+incompatible + github.com/docker/cli v28.4.0+incompatible github.com/docker/go-units v0.5.0 github.com/elazarl/goproxy v1.7.2 github.com/fluxcd/cli-utils v0.36.0-flux.15 github.com/fluxcd/pkg/apis/event v0.19.0 github.com/fluxcd/pkg/apis/meta v1.21.0 + github.com/fluxcd/pkg/artifact v0.2.0 github.com/fluxcd/pkg/auth v0.30.0 github.com/fluxcd/pkg/cache v0.11.0 github.com/fluxcd/pkg/git v0.36.0 @@ -31,10 +32,9 @@ require ( github.com/fluxcd/pkg/gittestserver v0.20.0 github.com/fluxcd/pkg/helmtestserver v0.29.0 github.com/fluxcd/pkg/http/transport v0.7.0 - github.com/fluxcd/pkg/lockedfile v0.7.0 github.com/fluxcd/pkg/masktoken v0.8.0 - github.com/fluxcd/pkg/oci v0.54.0 - github.com/fluxcd/pkg/runtime v0.82.0 + github.com/fluxcd/pkg/oci v0.55.0 + github.com/fluxcd/pkg/runtime v0.83.0 github.com/fluxcd/pkg/sourceignore v0.14.0 github.com/fluxcd/pkg/ssh v0.21.0 github.com/fluxcd/pkg/tar v0.14.0 @@ -53,7 +53,6 @@ require ( github.com/notaryproject/notation-go v1.3.2 github.com/onsi/gomega v1.38.2 github.com/opencontainers/go-digest v1.0.0 - github.com/opencontainers/go-digest/blake3 v0.0.0-20250116041648-1e56c6daea3b github.com/opencontainers/image-spec v1.1.1 github.com/ory/dockertest/v3 v3.12.0 github.com/otiai10/copy v1.14.1 @@ -62,7 +61,7 @@ require ( github.com/sigstore/cosign/v2 v2.5.2 github.com/sigstore/sigstore v1.9.5 github.com/sirupsen/logrus v1.9.3 - github.com/spf13/pflag v1.0.7 + github.com/spf13/pflag v1.0.10 golang.org/x/crypto v0.41.0 golang.org/x/oauth2 v0.30.0 golang.org/x/sync v0.16.0 @@ -193,6 +192,7 @@ require ( github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fluxcd/gitkit v0.6.0 // indirect github.com/fluxcd/pkg/apis/acl v0.9.0 // indirect + github.com/fluxcd/pkg/lockedfile v0.7.0 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/go-asn1-ber/asn1-ber v1.5.7 // indirect @@ -296,6 +296,7 @@ require ( github.com/nozzle/throttler v0.0.0-20180817012639-2ea982251481 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/oleiade/reflections v1.1.0 // indirect + github.com/opencontainers/go-digest/blake3 v0.0.0-20250813155314-89707e38ad1a // indirect github.com/opencontainers/runc v1.2.4 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/otiai10/mint v1.6.3 // indirect diff --git a/go.sum b/go.sum index c740d4b4e..0261b171b 100644 --- a/go.sum +++ b/go.sum @@ -321,8 +321,8 @@ github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5Qvfr github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI= github.com/dlclark/regexp2 v1.11.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= -github.com/docker/cli v28.3.3+incompatible h1:fp9ZHAr1WWPGdIWBM1b3zLtgCF+83gRdVMTJsUeiyAo= -github.com/docker/cli v28.3.3+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= +github.com/docker/cli v28.4.0+incompatible h1:RBcf3Kjw2pMtwui5V0DIMdyeab8glEw5QY0UUU4C9kY= +github.com/docker/cli v28.4.0+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBirtxJnzDrHLEKxTAYk= github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= github.com/docker/docker v28.3.3+incompatible h1:Dypm25kh4rmk49v1eiVbsAtpAsYURjYkaKubwuBdxEI= @@ -378,6 +378,8 @@ github.com/fluxcd/pkg/apis/event v0.19.0 h1:ZJU2voontkzp5rNYA4JMOu40S4tRcrWi4Do5 github.com/fluxcd/pkg/apis/event v0.19.0/go.mod h1:deuIyUb6lh+Z1Ccvwwxhm1wNM3kpSo+vF1IgRnpaZfQ= github.com/fluxcd/pkg/apis/meta v1.21.0 h1:R+bN02chcs0HUmyVDQhqe/FHmYLjipVDMLnyYfNX850= github.com/fluxcd/pkg/apis/meta v1.21.0/go.mod h1:XUAEUgT4gkWDAEN79E141tmL+v4SV50tVZ/Ojpc/ueg= +github.com/fluxcd/pkg/artifact v0.2.0 h1:y4j+c2v1qzXEgtQSAQbqAvvvdaUckQ7NxaWWobhNgm4= +github.com/fluxcd/pkg/artifact v0.2.0/go.mod h1:+L19/j8WPJ/blBZ/BFE+NhX6dja9Na1kTJkvZgbblbY= github.com/fluxcd/pkg/auth v0.30.0 h1:7JMnY1ClArvOsadt6hOxceu8Q2hLsYHFMt0DV3BQl4Q= github.com/fluxcd/pkg/auth v0.30.0/go.mod h1:me38o1nDfSLw6YvnkT9Ce/zqJZICZSA7j5pNMR3JUbc= github.com/fluxcd/pkg/cache v0.11.0 h1:fsE8S+una21fSNw4MDXGUIf0Gf1J+pqa4RbsVKf2aTI= @@ -396,10 +398,10 @@ github.com/fluxcd/pkg/lockedfile v0.7.0 h1:tmzW2GeMGuJMiCcVloXVd1vKZ92anm9WGkRgO github.com/fluxcd/pkg/lockedfile v0.7.0/go.mod h1:AzCV/h1N3hi/KtUDUCUgS8hl1+a1y+I6pmRo25dxdK0= github.com/fluxcd/pkg/masktoken v0.8.0 h1:Dm5xIVNbg0s6zNttjDvimaG38bKsXwxBVo5b+D7ThVU= github.com/fluxcd/pkg/masktoken v0.8.0/go.mod h1:Gc73ALOqIe+5Gj2V3JggMNiYcBiZ9bNNDYBE9R5XTTg= -github.com/fluxcd/pkg/oci v0.54.0 h1:s9INS1xocek9Lijob/Pq8xGx+TUA1NInmImY1Cw1DQA= -github.com/fluxcd/pkg/oci v0.54.0/go.mod h1:Z0QAwiC3E8aG4ggFGub1lKhIS++rfcMmrrUt4VSEQ38= -github.com/fluxcd/pkg/runtime v0.82.0 h1:VdPPRJtj8/rcBdqY7GZSffoxe5elFHt+ymwQHNbPOlc= -github.com/fluxcd/pkg/runtime v0.82.0/go.mod h1:rIDynMhU5upbn8ce3bXQhH5L6vtDw5MELycvtJG/+og= +github.com/fluxcd/pkg/oci v0.55.0 h1:7/EpGRv/5KtWFu9/bXozxR4Nu3V76TNuuN/0lII51G8= +github.com/fluxcd/pkg/oci v0.55.0/go.mod h1:roi2GxtkGBcOYCXnPw1VJvxllgAZ/pqTCCSm9bZY9Bs= +github.com/fluxcd/pkg/runtime v0.83.0 h1:XzpwKzo7GqfBE/BKpxG5B4U7cUnojnB407S9Dpp6oLU= +github.com/fluxcd/pkg/runtime v0.83.0/go.mod h1:r8KLvXRguKtpLAa66fA19rIbwPViXm8az038IUabYvw= github.com/fluxcd/pkg/sourceignore v0.14.0 h1:ZiZzbXtXb/Qp7I7JCStsxOlX8ri8rWwCvmvIrJ0UzQQ= github.com/fluxcd/pkg/sourceignore v0.14.0/go.mod h1:E3zKvyTyB+oQKqm/2I/jS6Rrt3B7fNuig/4bY2vi3bg= github.com/fluxcd/pkg/ssh v0.21.0 h1:ZmyF0n9je0cTTkOpvFVgIhmdx9qtswnVE60TK4IzJh0= @@ -809,8 +811,8 @@ github.com/open-policy-agent/opa v1.5.1 h1:LTxxBJusMVjfs67W4FoRcnMfXADIGFMzpqnfk github.com/open-policy-agent/opa v1.5.1/go.mod h1:bYbS7u+uhTI+cxHQIpzvr5hxX0hV7urWtY+38ZtjMgk= github.com/opencontainers/go-digest v1.0.1-0.20220411205349-bde1400a84be h1:f2PlhC9pm5sqpBZFvnAoKj+KzXRzbjFMA+TqXfJdgho= github.com/opencontainers/go-digest v1.0.1-0.20220411205349-bde1400a84be/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= -github.com/opencontainers/go-digest/blake3 v0.0.0-20250116041648-1e56c6daea3b h1:nAiL9bmUK4IzFrKoVMRykv0iYGdoit5vpbPaVCZ+fI4= -github.com/opencontainers/go-digest/blake3 v0.0.0-20250116041648-1e56c6daea3b/go.mod h1:kqQaIc6bZstKgnGpL7GD5dWoLKbA6mH1Y9ULjGImBnM= +github.com/opencontainers/go-digest/blake3 v0.0.0-20250813155314-89707e38ad1a h1:IAncDmJeD90l6+YR1Gf6r0HrmnRmOatzPfUpMS80ZTI= +github.com/opencontainers/go-digest/blake3 v0.0.0-20250813155314-89707e38ad1a/go.mod h1:kqQaIc6bZstKgnGpL7GD5dWoLKbA6mH1Y9ULjGImBnM= github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= github.com/opencontainers/runc v1.2.4 h1:yWFgLkghp71D76Fa0l349yAl5g4Gse7DPYNlvkQ9Eiw= @@ -953,8 +955,8 @@ github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cA github.com/spf13/cobra v1.9.1 h1:CXSaggrXdbHK9CF+8ywj8Amf7PBRmPCOJugH954Nnlo= github.com/spf13/cobra v1.9.1/go.mod h1:nDyEzZ8ogv936Cinf6g1RU9MRY64Ir93oCnqb9wxYW0= github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/spf13/pflag v1.0.7 h1:vN6T9TfwStFPFM5XzjsvmzZkLuaLX+HS+0SeFLRgU6M= -github.com/spf13/pflag v1.0.7/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= +github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.20.1 h1:ZMi+z/lvLyPSCoNtFCpqjy0S4kPbirhpTMwl8BkW9X4= github.com/spf13/viper v1.20.1/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqjJvu4= github.com/spiffe/go-spiffe/v2 v2.5.0 h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE= diff --git a/internal/controller/bucket_controller.go b/internal/controller/bucket_controller.go index c855eac23..7fe881be6 100644 --- a/internal/controller/bucket_controller.go +++ b/internal/controller/bucket_controller.go @@ -44,6 +44,8 @@ import ( eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" "github.com/fluxcd/pkg/apis/meta" + intdigest "github.com/fluxcd/pkg/artifact/digest" + "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/auth" "github.com/fluxcd/pkg/cache" "github.com/fluxcd/pkg/runtime/conditions" @@ -59,12 +61,10 @@ import ( "github.com/fluxcd/source-controller/internal/bucket/azure" "github.com/fluxcd/source-controller/internal/bucket/gcp" "github.com/fluxcd/source-controller/internal/bucket/minio" - intdigest "github.com/fluxcd/source-controller/internal/digest" serror "github.com/fluxcd/source-controller/internal/error" "github.com/fluxcd/source-controller/internal/index" sreconcile "github.com/fluxcd/source-controller/internal/reconcile" "github.com/fluxcd/source-controller/internal/reconcile/summarize" - "github.com/fluxcd/source-controller/internal/storage" ) // maxConcurrentBucketFetches is the upper bound on the goroutines used to diff --git a/internal/controller/bucket_controller_test.go b/internal/controller/bucket_controller_test.go index 8770588b5..00ed46cb7 100644 --- a/internal/controller/bucket_controller_test.go +++ b/internal/controller/bucket_controller_test.go @@ -38,6 +38,8 @@ import ( kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status" "github.com/fluxcd/pkg/apis/meta" + intdigest "github.com/fluxcd/pkg/artifact/digest" + "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/auth" "github.com/fluxcd/pkg/runtime/conditions" conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check" @@ -45,13 +47,11 @@ import ( "github.com/fluxcd/pkg/runtime/patch" sourcev1 "github.com/fluxcd/source-controller/api/v1" - intdigest "github.com/fluxcd/source-controller/internal/digest" "github.com/fluxcd/source-controller/internal/index" gcsmock "github.com/fluxcd/source-controller/internal/mock/gcs" s3mock "github.com/fluxcd/source-controller/internal/mock/s3" sreconcile "github.com/fluxcd/source-controller/internal/reconcile" "github.com/fluxcd/source-controller/internal/reconcile/summarize" - "github.com/fluxcd/source-controller/internal/storage" ) // Environment variable to set the GCP Storage host for the GCP client. diff --git a/internal/controller/gitrepository_controller.go b/internal/controller/gitrepository_controller.go index a80001165..1208c8ae0 100644 --- a/internal/controller/gitrepository_controller.go +++ b/internal/controller/gitrepository_controller.go @@ -49,6 +49,7 @@ import ( eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/cache" "github.com/fluxcd/pkg/git" "github.com/fluxcd/pkg/git/gogit" @@ -59,7 +60,6 @@ import ( "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/predicates" rreconcile "github.com/fluxcd/pkg/runtime/reconcile" - "github.com/fluxcd/pkg/sourceignore" sourcev1 "github.com/fluxcd/source-controller/api/v1" @@ -67,7 +67,6 @@ import ( "github.com/fluxcd/source-controller/internal/features" sreconcile "github.com/fluxcd/source-controller/internal/reconcile" "github.com/fluxcd/source-controller/internal/reconcile/summarize" - "github.com/fluxcd/source-controller/internal/storage" "github.com/fluxcd/source-controller/internal/util" ) diff --git a/internal/controller/gitrepository_controller_fuzz_test.go b/internal/controller/gitrepository_controller_fuzz_test.go index d87a8f68b..c9c136820 100644 --- a/internal/controller/gitrepository_controller_fuzz_test.go +++ b/internal/controller/gitrepository_controller_fuzz_test.go @@ -59,12 +59,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/manager" + intstorage "github.com/fluxcd/pkg/artifact/digest" "github.com/fluxcd/pkg/gittestserver" "github.com/fluxcd/pkg/runtime/controller" "github.com/fluxcd/pkg/runtime/testenv" sourcev1 "github.com/fluxcd/source-controller/api/v1" - intstorage "github.com/fluxcd/source-controller/internal/storage" ) var ( diff --git a/internal/controller/gitrepository_controller_test.go b/internal/controller/gitrepository_controller_test.go index 1876fa007..f9f7a591d 100644 --- a/internal/controller/gitrepository_controller_test.go +++ b/internal/controller/gitrepository_controller_test.go @@ -48,6 +48,7 @@ import ( kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status" "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/auth" "github.com/fluxcd/pkg/git" "github.com/fluxcd/pkg/git/github" @@ -64,7 +65,6 @@ import ( "github.com/fluxcd/source-controller/internal/features" sreconcile "github.com/fluxcd/source-controller/internal/reconcile" "github.com/fluxcd/source-controller/internal/reconcile/summarize" - "github.com/fluxcd/source-controller/internal/storage" ) const ( @@ -1503,6 +1503,8 @@ func TestGitRepositoryReconciler_reconcileInclude(t *testing.T) { server, err := testserver.NewTempArtifactServer() g.Expect(err).NotTo(HaveOccurred()) + server.Start() + defer server.Stop() storage, err := newTestStorage(server.HTTPServer) g.Expect(err).NotTo(HaveOccurred()) defer os.RemoveAll(storage.BasePath) diff --git a/internal/controller/helmchart_controller.go b/internal/controller/helmchart_controller.go index ef5a995f0..e969bf67a 100644 --- a/internal/controller/helmchart_controller.go +++ b/internal/controller/helmchart_controller.go @@ -55,6 +55,7 @@ import ( eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/git" "github.com/fluxcd/pkg/runtime/conditions" helper "github.com/fluxcd/pkg/runtime/controller" @@ -75,7 +76,6 @@ import ( "github.com/fluxcd/source-controller/internal/oci/notation" sreconcile "github.com/fluxcd/source-controller/internal/reconcile" "github.com/fluxcd/source-controller/internal/reconcile/summarize" - "github.com/fluxcd/source-controller/internal/storage" "github.com/fluxcd/source-controller/internal/util" ) diff --git a/internal/controller/helmchart_controller_test.go b/internal/controller/helmchart_controller_test.go index dd23c5fee..190a9f8b5 100644 --- a/internal/controller/helmchart_controller_test.go +++ b/internal/controller/helmchart_controller_test.go @@ -34,6 +34,8 @@ import ( "testing" "time" + "github.com/fluxcd/pkg/artifact/config" + "github.com/fluxcd/pkg/artifact/digest" "github.com/notaryproject/notation-core-go/signature/cose" "github.com/notaryproject/notation-core-go/testhelper" "github.com/notaryproject/notation-go" @@ -61,6 +63,7 @@ import ( kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status" "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/helmtestserver" "github.com/fluxcd/pkg/runtime/conditions" conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check" @@ -77,7 +80,6 @@ import ( snotation "github.com/fluxcd/source-controller/internal/oci/notation" sreconcile "github.com/fluxcd/source-controller/internal/reconcile" "github.com/fluxcd/source-controller/internal/reconcile/summarize" - "github.com/fluxcd/source-controller/internal/storage" ) func TestHelmChartReconciler_deleteBeforeFinalizer(t *testing.T) { @@ -571,14 +573,22 @@ func TestHelmChartReconciler_reconcileSource(t *testing.T) { tmpDir := t.TempDir() - storage, err := storage.New(tmpDir, "example.com", retentionTTL, retentionRecords) + opts := &config.Options{ + StoragePath: tmpDir, + StorageAddress: "example.com", + StorageAdvAddress: "example.com", + ArtifactRetentionTTL: retentionTTL, + ArtifactRetentionRecords: retentionRecords, + ArtifactDigestAlgo: digest.Canonical.String(), + } + st, err := storage.New(opts) g.Expect(err).ToNot(HaveOccurred()) gitArtifact := &meta.Artifact{ Revision: "mock-ref/abcdefg12345678", Path: "mock.tgz", } - g.Expect(storage.Archive(gitArtifact, "testdata/charts", nil)).To(Succeed()) + g.Expect(st.Archive(gitArtifact, "testdata/charts", nil)).To(Succeed()) tests := []struct { name string @@ -785,7 +795,7 @@ func TestHelmChartReconciler_reconcileSource(t *testing.T) { r := &HelmChartReconciler{ Client: clientBuilder.Build(), EventRecorder: record.NewFakeRecorder(32), - Storage: storage, + Storage: st, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), } @@ -1115,14 +1125,14 @@ func TestHelmChartReconciler_buildFromHelmRepository(t *testing.T) { clientBuilder.WithObjects(tt.secret.DeepCopy()) } - storage, err := newTestStorage(server) + testStorage, err := newTestStorage(server) g.Expect(err).ToNot(HaveOccurred()) r := &HelmChartReconciler{ Client: clientBuilder.Build(), EventRecorder: record.NewFakeRecorder(32), Getters: testGetters, - Storage: storage, + Storage: testStorage, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), } @@ -1188,14 +1198,22 @@ func TestHelmChartReconciler_buildFromOCIHelmRepository(t *testing.T) { metadata, err := loadTestChartToOCI(chartData, testRegistryServer, "", "", "") g.Expect(err).NotTo(HaveOccurred()) - storage, err := storage.New(tmpDir, "example.com", retentionTTL, retentionRecords) + opts := &config.Options{ + StoragePath: tmpDir, + StorageAddress: "example.com", + StorageAdvAddress: "example.com", + ArtifactRetentionTTL: retentionTTL, + ArtifactRetentionRecords: retentionRecords, + ArtifactDigestAlgo: digest.Canonical.String(), + } + st, err := storage.New(opts) g.Expect(err).ToNot(HaveOccurred()) cachedArtifact := &meta.Artifact{ Revision: "0.1.0", Path: metadata.Name + "-" + metadata.Version + ".tgz", } - g.Expect(storage.CopyFromPath(cachedArtifact, "testdata/charts/helmchart-0.1.0.tgz")).To(Succeed()) + g.Expect(st.CopyFromPath(cachedArtifact, "testdata/charts/helmchart-0.1.0.tgz")).To(Succeed()) tests := []struct { name string @@ -1273,7 +1291,7 @@ func TestHelmChartReconciler_buildFromOCIHelmRepository(t *testing.T) { assertFunc: func(g *WithT, obj *sourcev1.HelmChart, build chart.Build) { g.Expect(build.Name).To(Equal(metadata.Name)) g.Expect(build.Version).To(Equal(metadata.Version)) - g.Expect(build.Path).To(Equal(storage.LocalPath(*cachedArtifact.DeepCopy()))) + g.Expect(build.Path).To(Equal(st.LocalPath(*cachedArtifact.DeepCopy()))) g.Expect(build.Path).To(BeARegularFile()) g.Expect(build.ValuesFiles).To(BeEmpty()) }, @@ -1292,7 +1310,7 @@ func TestHelmChartReconciler_buildFromOCIHelmRepository(t *testing.T) { assertFunc: func(g *WithT, obj *sourcev1.HelmChart, build chart.Build) { g.Expect(build.Name).To(Equal(metadata.Name)) g.Expect(build.Version).To(Equal(metadata.Version)) - g.Expect(build.Path).ToNot(Equal(storage.LocalPath(*cachedArtifact.DeepCopy()))) + g.Expect(build.Path).ToNot(Equal(st.LocalPath(*cachedArtifact.DeepCopy()))) g.Expect(build.Path).To(BeARegularFile()) }, cleanFunc: func(g *WithT, build *chart.Build) { @@ -1356,7 +1374,7 @@ func TestHelmChartReconciler_buildFromOCIHelmRepository(t *testing.T) { Client: clientBuilder.Build(), EventRecorder: record.NewFakeRecorder(32), Getters: testGetters, - Storage: storage, + Storage: st, RegistryClientGenerator: registry.ClientGenerator, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), } @@ -1411,24 +1429,32 @@ func TestHelmChartReconciler_buildFromTarballArtifact(t *testing.T) { tmpDir := t.TempDir() - storage, err := storage.New(tmpDir, "example.com", retentionTTL, retentionRecords) + opts := &config.Options{ + StoragePath: tmpDir, + StorageAddress: "example.com", + StorageAdvAddress: "example.com", + ArtifactRetentionTTL: retentionTTL, + ArtifactRetentionRecords: retentionRecords, + ArtifactDigestAlgo: digest.Canonical.String(), + } + st, err := storage.New(opts) g.Expect(err).ToNot(HaveOccurred()) chartsArtifact := &meta.Artifact{ Revision: "mock-ref/abcdefg12345678", Path: "mock.tgz", } - g.Expect(storage.Archive(chartsArtifact, "testdata/charts", nil)).To(Succeed()) + g.Expect(st.Archive(chartsArtifact, "testdata/charts", nil)).To(Succeed()) yamlArtifact := &meta.Artifact{ Revision: "9876abcd", Path: "values.yaml", } - g.Expect(storage.CopyFromPath(yamlArtifact, "testdata/charts/helmchart/values.yaml")).To(Succeed()) + g.Expect(st.CopyFromPath(yamlArtifact, "testdata/charts/helmchart/values.yaml")).To(Succeed()) cachedArtifact := &meta.Artifact{ Revision: "0.1.0", Path: "cached.tgz", } - g.Expect(storage.CopyFromPath(cachedArtifact, "testdata/charts/helmchart-0.1.0.tgz")).To(Succeed()) + g.Expect(st.CopyFromPath(cachedArtifact, "testdata/charts/helmchart-0.1.0.tgz")).To(Succeed()) tests := []struct { name string @@ -1518,7 +1544,7 @@ func TestHelmChartReconciler_buildFromTarballArtifact(t *testing.T) { assertFunc: func(g *WithT, build chart.Build) { g.Expect(build.Name).To(Equal("helmchart")) g.Expect(build.Version).To(Equal("0.1.0")) - g.Expect(build.Path).To(Equal(storage.LocalPath(*cachedArtifact.DeepCopy()))) + g.Expect(build.Path).To(Equal(st.LocalPath(*cachedArtifact.DeepCopy()))) g.Expect(build.Path).To(BeARegularFile()) g.Expect(build.ValuesFiles).To(BeEmpty()) }, @@ -1535,7 +1561,7 @@ func TestHelmChartReconciler_buildFromTarballArtifact(t *testing.T) { assertFunc: func(g *WithT, build chart.Build) { g.Expect(build.Name).To(Equal("helmchart")) g.Expect(build.Version).To(Equal("0.1.0")) - g.Expect(build.Path).To(Equal(storage.LocalPath(*cachedArtifact.DeepCopy()))) + g.Expect(build.Path).To(Equal(st.LocalPath(*cachedArtifact.DeepCopy()))) g.Expect(build.Path).To(BeARegularFile()) g.Expect(build.ValuesFiles).To(Equal([]string{"values.yaml", "override.yaml"})) }, @@ -1553,7 +1579,7 @@ func TestHelmChartReconciler_buildFromTarballArtifact(t *testing.T) { assertFunc: func(g *WithT, build chart.Build) { g.Expect(build.Name).To(Equal("helmchart")) g.Expect(build.Version).To(Equal("0.1.0")) - g.Expect(build.Path).ToNot(Equal(storage.LocalPath(*cachedArtifact.DeepCopy()))) + g.Expect(build.Path).ToNot(Equal(st.LocalPath(*cachedArtifact.DeepCopy()))) g.Expect(build.Path).To(BeARegularFile()) g.Expect(build.ValuesFiles).To(BeEmpty()) }, @@ -1590,7 +1616,7 @@ func TestHelmChartReconciler_buildFromTarballArtifact(t *testing.T) { WithStatusSubresource(&sourcev1.HelmChart{}). Build(), EventRecorder: record.NewFakeRecorder(32), - Storage: storage, + Storage: st, Getters: testGetters, RegistryClientGenerator: registry.ClientGenerator, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), @@ -2898,19 +2924,26 @@ func TestHelmChartReconciler_reconcileSourceFromOCI_verifySignatureNotation(t *t metadata, err := loadTestChartToOCI(chartData, server, "", "", "") g.Expect(err).NotTo(HaveOccurred()) - storage, err := storage.New(tmpDir, server.registryHost, retentionTTL, retentionRecords) + opts := &config.Options{ + StoragePath: tmpDir, + StorageAddress: server.registryHost, + ArtifactRetentionTTL: retentionTTL, + ArtifactRetentionRecords: retentionRecords, + ArtifactDigestAlgo: digest.Canonical.String(), + } + st, err := storage.New(opts) g.Expect(err).ToNot(HaveOccurred()) cachedArtifact := &meta.Artifact{ Revision: "0.1.0", Path: metadata.Name + "-" + metadata.Version + ".tgz", } - g.Expect(storage.CopyFromPath(cachedArtifact, "testdata/charts/helmchart-0.1.0.tgz")).To(Succeed()) + g.Expect(st.CopyFromPath(cachedArtifact, "testdata/charts/helmchart-0.1.0.tgz")).To(Succeed()) certTuple := testhelper.GetRSASelfSignedSigningCertTuple("notation self-signed certs for testing") certs := []*x509.Certificate{certTuple.Cert} - signer, err := signer.New(certTuple.PrivateKey, certs) + sg, err := signer.New(certTuple.PrivateKey, certs) g.Expect(err).ToNot(HaveOccurred()) policyDocument := trustpolicy.Document{ @@ -3120,7 +3153,7 @@ func TestHelmChartReconciler_reconcileSourceFromOCI_verifySignatureNotation(t *t Client: clientBuilder.Build(), EventRecorder: record.NewFakeRecorder(32), Getters: testGetters, - Storage: storage, + Storage: st, RegistryClientGenerator: registry.ClientGenerator, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), } @@ -3162,7 +3195,7 @@ func TestHelmChartReconciler_reconcileSourceFromOCI_verifySignatureNotation(t *t ArtifactReference: artifact, } - _, err = notation.Sign(ctx, signer, repo, signOptions) + _, err = notation.Sign(ctx, sg, repo, signOptions) g.Expect(err).ToNot(HaveOccurred()) } @@ -3222,14 +3255,21 @@ func TestHelmChartReconciler_reconcileSourceFromOCI_verifySignatureCosign(t *tes metadata, err := loadTestChartToOCI(chartData, server, "", "", "") g.Expect(err).NotTo(HaveOccurred()) - storage, err := storage.New(tmpDir, server.registryHost, retentionTTL, retentionRecords) + opts := &config.Options{ + StoragePath: tmpDir, + StorageAddress: server.registryHost, + ArtifactRetentionTTL: retentionTTL, + ArtifactRetentionRecords: retentionRecords, + ArtifactDigestAlgo: digest.Canonical.String(), + } + st, err := storage.New(opts) g.Expect(err).ToNot(HaveOccurred()) cachedArtifact := &meta.Artifact{ Revision: "0.1.0", Path: metadata.Name + "-" + metadata.Version + ".tgz", } - g.Expect(storage.CopyFromPath(cachedArtifact, "testdata/charts/helmchart-0.1.0.tgz")).To(Succeed()) + g.Expect(st.CopyFromPath(cachedArtifact, "testdata/charts/helmchart-0.1.0.tgz")).To(Succeed()) pf := func(b bool) ([]byte, error) { return []byte("cosign-password"), nil @@ -3365,7 +3405,7 @@ func TestHelmChartReconciler_reconcileSourceFromOCI_verifySignatureCosign(t *tes Client: clientBuilder.Build(), EventRecorder: record.NewFakeRecorder(32), Getters: testGetters, - Storage: storage, + Storage: st, RegistryClientGenerator: registry.ClientGenerator, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), } diff --git a/internal/controller/helmrepository_controller.go b/internal/controller/helmrepository_controller.go index 9e052b34d..06c4494cf 100644 --- a/internal/controller/helmrepository_controller.go +++ b/internal/controller/helmrepository_controller.go @@ -42,6 +42,8 @@ import ( eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" "github.com/fluxcd/pkg/apis/meta" + intdigest "github.com/fluxcd/pkg/artifact/digest" + "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/runtime/conditions" helper "github.com/fluxcd/pkg/runtime/controller" "github.com/fluxcd/pkg/runtime/jitter" @@ -51,14 +53,12 @@ import ( sourcev1 "github.com/fluxcd/source-controller/api/v1" "github.com/fluxcd/source-controller/internal/cache" - intdigest "github.com/fluxcd/source-controller/internal/digest" serror "github.com/fluxcd/source-controller/internal/error" "github.com/fluxcd/source-controller/internal/helm/getter" "github.com/fluxcd/source-controller/internal/helm/repository" intpredicates "github.com/fluxcd/source-controller/internal/predicates" sreconcile "github.com/fluxcd/source-controller/internal/reconcile" "github.com/fluxcd/source-controller/internal/reconcile/summarize" - "github.com/fluxcd/source-controller/internal/storage" ) // helmRepositoryReadyCondition contains the information required to summarize a diff --git a/internal/controller/helmrepository_controller_test.go b/internal/controller/helmrepository_controller_test.go index 3791294e6..d76c58a42 100644 --- a/internal/controller/helmrepository_controller_test.go +++ b/internal/controller/helmrepository_controller_test.go @@ -43,6 +43,8 @@ import ( kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status" "github.com/fluxcd/pkg/apis/meta" + intdigest "github.com/fluxcd/pkg/artifact/digest" + "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/helmtestserver" "github.com/fluxcd/pkg/runtime/conditions" conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check" @@ -51,12 +53,10 @@ import ( sourcev1 "github.com/fluxcd/source-controller/api/v1" "github.com/fluxcd/source-controller/internal/cache" - intdigest "github.com/fluxcd/source-controller/internal/digest" "github.com/fluxcd/source-controller/internal/helm/repository" intpredicates "github.com/fluxcd/source-controller/internal/predicates" sreconcile "github.com/fluxcd/source-controller/internal/reconcile" "github.com/fluxcd/source-controller/internal/reconcile/summarize" - "github.com/fluxcd/source-controller/internal/storage" ) func TestHelmRepositoryReconciler_deleteBeforeFinalizer(t *testing.T) { diff --git a/internal/controller/ocirepository_controller.go b/internal/controller/ocirepository_controller.go index e39230551..a91c8a51b 100644 --- a/internal/controller/ocirepository_controller.go +++ b/internal/controller/ocirepository_controller.go @@ -50,6 +50,7 @@ import ( eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/auth" "github.com/fluxcd/pkg/cache" "github.com/fluxcd/pkg/oci" @@ -77,7 +78,6 @@ import ( "github.com/fluxcd/source-controller/internal/oci/notation" sreconcile "github.com/fluxcd/source-controller/internal/reconcile" "github.com/fluxcd/source-controller/internal/reconcile/summarize" - "github.com/fluxcd/source-controller/internal/storage" "github.com/fluxcd/source-controller/internal/util" ) diff --git a/internal/controller/ocirepository_controller_test.go b/internal/controller/ocirepository_controller_test.go index e2cea947d..6ea35e962 100644 --- a/internal/controller/ocirepository_controller_test.go +++ b/internal/controller/ocirepository_controller_test.go @@ -60,6 +60,8 @@ import ( kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status" "github.com/fluxcd/pkg/apis/meta" + intdigest "github.com/fluxcd/pkg/artifact/digest" + "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/auth" "github.com/fluxcd/pkg/git" "github.com/fluxcd/pkg/oci" @@ -69,11 +71,9 @@ import ( "github.com/fluxcd/pkg/tar" sourcev1 "github.com/fluxcd/source-controller/api/v1" - intdigest "github.com/fluxcd/source-controller/internal/digest" serror "github.com/fluxcd/source-controller/internal/error" snotation "github.com/fluxcd/source-controller/internal/oci/notation" sreconcile "github.com/fluxcd/source-controller/internal/reconcile" - "github.com/fluxcd/source-controller/internal/storage" testproxy "github.com/fluxcd/source-controller/tests/proxy" ) diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index eeb166fb5..ad0365616 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -32,6 +32,10 @@ import ( "testing" "time" + "github.com/distribution/distribution/v3/configuration" + dockerRegistry "github.com/distribution/distribution/v3/registry" + _ "github.com/distribution/distribution/v3/registry/auth/htpasswd" + _ "github.com/distribution/distribution/v3/registry/storage/driver/inmemory" "github.com/foxcpp/go-mockdns" "github.com/phayes/freeport" "github.com/sirupsen/logrus" @@ -45,11 +49,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" - "github.com/distribution/distribution/v3/configuration" - dockerRegistry "github.com/distribution/distribution/v3/registry" - _ "github.com/distribution/distribution/v3/registry/auth/htpasswd" - _ "github.com/distribution/distribution/v3/registry/storage/driver/inmemory" - + "github.com/fluxcd/pkg/artifact/config" + "github.com/fluxcd/pkg/artifact/digest" + "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/runtime/controller" "github.com/fluxcd/pkg/runtime/metrics" "github.com/fluxcd/pkg/runtime/testenv" @@ -57,7 +59,6 @@ import ( sourcev1 "github.com/fluxcd/source-controller/api/v1" "github.com/fluxcd/source-controller/internal/cache" - "github.com/fluxcd/source-controller/internal/storage" // +kubebuilder:scaffold:imports ) @@ -432,7 +433,15 @@ func initTestTLS() { } func newTestStorage(s *testserver.HTTPServer) (*storage.Storage, error) { - st, err := storage.New(s.Root(), s.URL(), retentionTTL, retentionRecords) + opts := &config.Options{ + StoragePath: s.Root(), + StorageAddress: s.URL(), + StorageAdvAddress: s.URL(), + ArtifactRetentionTTL: retentionTTL, + ArtifactRetentionRecords: retentionRecords, + ArtifactDigestAlgo: digest.Canonical.String(), + } + st, err := storage.New(opts) if err != nil { return nil, err } diff --git a/internal/digest/digest.go b/internal/digest/digest.go deleted file mode 100644 index 6b1117398..000000000 --- a/internal/digest/digest.go +++ /dev/null @@ -1,52 +0,0 @@ -/* -Copyright 2022 The Flux authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package digest - -import ( - "crypto" - _ "crypto/sha1" - _ "crypto/sha256" - _ "crypto/sha512" - "fmt" - - "github.com/opencontainers/go-digest" - _ "github.com/opencontainers/go-digest/blake3" -) - -const ( - SHA1 digest.Algorithm = "sha1" -) - -var ( - // Canonical is the primary digest algorithm used to calculate checksums. - Canonical = digest.SHA256 -) - -func init() { - // Register SHA-1 algorithm for support of e.g. Git commit SHAs. - digest.RegisterAlgorithm(SHA1, crypto.SHA1) -} - -// AlgorithmForName returns the digest algorithm for the given name, or an -// error of type digest.ErrDigestUnsupported if the algorithm is unavailable. -func AlgorithmForName(name string) (digest.Algorithm, error) { - a := digest.Algorithm(name) - if !a.Available() { - return "", fmt.Errorf("%w: %s", digest.ErrDigestUnsupported, name) - } - return a, nil -} diff --git a/internal/digest/digest_test.go b/internal/digest/digest_test.go deleted file mode 100644 index 3030c2d11..000000000 --- a/internal/digest/digest_test.go +++ /dev/null @@ -1,71 +0,0 @@ -/* -Copyright 2022 The Flux authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package digest - -import ( - "errors" - "testing" - - . "github.com/onsi/gomega" - "github.com/opencontainers/go-digest" -) - -func TestAlgorithmForName(t *testing.T) { - tests := []struct { - name string - want digest.Algorithm - wantErr error - }{ - { - name: "sha256", - want: digest.SHA256, - }, - { - name: "sha384", - want: digest.SHA384, - }, - { - name: "sha512", - want: digest.SHA512, - }, - { - name: "blake3", - want: digest.BLAKE3, - }, - { - name: "sha1", - want: SHA1, - }, - { - name: "not-available", - wantErr: digest.ErrDigestUnsupported, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - g := NewWithT(t) - got, err := AlgorithmForName(tt.name) - if tt.wantErr != nil { - g.Expect(err).To(HaveOccurred()) - g.Expect(errors.Is(err, tt.wantErr)).To(BeTrue()) - return - } - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(got).To(Equal(tt.want)) - }) - } -} diff --git a/internal/digest/writer.go b/internal/digest/writer.go deleted file mode 100644 index 4783f8b84..000000000 --- a/internal/digest/writer.go +++ /dev/null @@ -1,71 +0,0 @@ -/* -Copyright 2022 The Flux authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package digest - -import ( - "fmt" - "io" - - "github.com/opencontainers/go-digest" -) - -// MultiDigester is a digester that writes to multiple digesters to calculate -// the checksum of different algorithms. -type MultiDigester struct { - d map[digest.Algorithm]digest.Digester -} - -// NewMultiDigester returns a new MultiDigester that writes to newly -// initialized digesters for the given algorithms. If a provided algorithm is -// not available, it returns a digest.ErrDigestUnsupported error. -func NewMultiDigester(algos ...digest.Algorithm) (*MultiDigester, error) { - d := make(map[digest.Algorithm]digest.Digester, len(algos)) - for _, a := range algos { - if _, ok := d[a]; ok { - continue - } - if !a.Available() { - return nil, fmt.Errorf("%w: %s", digest.ErrDigestUnsupported, a) - } - d[a] = a.Digester() - } - return &MultiDigester{d: d}, nil -} - -// Write writes p to all underlying digesters. -func (w *MultiDigester) Write(p []byte) (n int, err error) { - for _, d := range w.d { - n, err = d.Hash().Write(p) - if err != nil { - return - } - if n != len(p) { - err = io.ErrShortWrite - return - } - } - return len(p), nil -} - -// Digest returns the digest of the data written to the digester of the given -// algorithm, or an empty digest if the algorithm is not available. -func (w *MultiDigester) Digest(algo digest.Algorithm) digest.Digest { - if d, ok := w.d[algo]; ok { - return d.Digest() - } - return "" -} diff --git a/internal/digest/writer_test.go b/internal/digest/writer_test.go deleted file mode 100644 index 9ae63b882..000000000 --- a/internal/digest/writer_test.go +++ /dev/null @@ -1,128 +0,0 @@ -/* -Copyright 2022 The Flux authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package digest - -import ( - "crypto/rand" - "testing" - - . "github.com/onsi/gomega" - "github.com/opencontainers/go-digest" -) - -func TestNewMultiDigester(t *testing.T) { - t.Run("constructs a MultiDigester", func(t *testing.T) { - g := NewWithT(t) - - d, err := NewMultiDigester(Canonical, digest.SHA512) - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(d.d).To(HaveLen(2)) - }) - - t.Run("returns an error if an algorithm is not available", func(t *testing.T) { - g := NewWithT(t) - - _, err := NewMultiDigester(digest.Algorithm("not-available")) - g.Expect(err).To(HaveOccurred()) - }) -} - -func TestMultiDigester_Write(t *testing.T) { - t.Run("writes to all digesters", func(t *testing.T) { - g := NewWithT(t) - - d, err := NewMultiDigester(Canonical, digest.SHA512) - g.Expect(err).ToNot(HaveOccurred()) - - n, err := d.Write([]byte("hello")) - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(n).To(Equal(5)) - - n, err = d.Write([]byte(" world")) - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(n).To(Equal(6)) - - g.Expect(d.Digest(Canonical)).To(BeEquivalentTo("sha256:b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9")) - g.Expect(d.Digest(digest.SHA512)).To(BeEquivalentTo("sha512:309ecc489c12d6eb4cc40f50c902f2b4d0ed77ee511a7c7a9bcd3ca86d4cd86f989dd35bc5ff499670da34255b45b0cfd830e81f605dcf7dc5542e93ae9cd76f")) - }) -} - -func TestMultiDigester_Digest(t *testing.T) { - t.Run("returns the digest for the given algorithm", func(t *testing.T) { - g := NewWithT(t) - - d, err := NewMultiDigester(Canonical, digest.SHA512) - g.Expect(err).ToNot(HaveOccurred()) - - g.Expect(d.Digest(Canonical)).To(BeEquivalentTo("sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")) - g.Expect(d.Digest(digest.SHA512)).To(BeEquivalentTo("sha512:cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e")) - }) - - t.Run("returns an empty digest if the algorithm is not supported", func(t *testing.T) { - g := NewWithT(t) - - d, err := NewMultiDigester(Canonical, digest.SHA512) - g.Expect(err).ToNot(HaveOccurred()) - - g.Expect(d.Digest(digest.Algorithm("not-available"))).To(BeEmpty()) - }) -} - -func benchmarkMultiDigesterWrite(b *testing.B, algos []digest.Algorithm, pSize int64) { - md, err := NewMultiDigester(algos...) - if err != nil { - b.Fatal(err) - } - - p := make([]byte, pSize) - if _, err = rand.Read(p); err != nil { - b.Fatal(err) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - md.Write(p) - } -} - -func BenchmarkMultiDigester_Write(b *testing.B) { - const pSize = 1024 * 2 - - b.Run("sha1", func(b *testing.B) { - benchmarkMultiDigesterWrite(b, []digest.Algorithm{SHA1}, pSize) - }) - - b.Run("sha256", func(b *testing.B) { - benchmarkMultiDigesterWrite(b, []digest.Algorithm{digest.SHA256}, pSize) - }) - - b.Run("blake3", func(b *testing.B) { - benchmarkMultiDigesterWrite(b, []digest.Algorithm{digest.BLAKE3}, pSize) - }) - - b.Run("sha256+sha384", func(b *testing.B) { - benchmarkMultiDigesterWrite(b, []digest.Algorithm{digest.SHA256, digest.SHA384}, pSize) - }) - - b.Run("sha256+sha512", func(b *testing.B) { - benchmarkMultiDigesterWrite(b, []digest.Algorithm{digest.SHA256, digest.SHA512}, pSize) - }) - - b.Run("sha256+blake3", func(b *testing.B) { - benchmarkMultiDigesterWrite(b, []digest.Algorithm{digest.SHA256, digest.BLAKE3}, pSize) - }) -} diff --git a/internal/storage/storage.go b/internal/storage/storage.go deleted file mode 100644 index 46d31a2bd..000000000 --- a/internal/storage/storage.go +++ /dev/null @@ -1,733 +0,0 @@ -/* -Copyright 2025 The Flux authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package storage - -import ( - "archive/tar" - "compress/gzip" - "context" - "fmt" - "io" - "io/fs" - "net/url" - "os" - "path" - "path/filepath" - "sort" - "strings" - "time" - - securejoin "github.com/cyphar/filepath-securejoin" - "github.com/fluxcd/pkg/apis/meta" - "github.com/go-git/go-git/v5/plumbing/format/gitignore" - "github.com/opencontainers/go-digest" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - kerrors "k8s.io/apimachinery/pkg/util/errors" - - "github.com/fluxcd/pkg/lockedfile" - "github.com/fluxcd/pkg/oci" - "github.com/fluxcd/pkg/sourceignore" - pkgtar "github.com/fluxcd/pkg/tar" - - intdigest "github.com/fluxcd/source-controller/internal/digest" -) - -const GarbageCountLimit = 1000 - -const ( - // defaultFileMode is the permission mode applied to files inside an artifact archive. - defaultFileMode int64 = 0o600 - // defaultDirMode is the permission mode applied to all directories inside an artifact archive. - defaultDirMode int64 = 0o750 - // defaultExeFileMode is the permission mode applied to executable files inside an artifact archive. - defaultExeFileMode int64 = 0o700 -) - -// Storage manages artifacts -type Storage struct { - // BasePath is the local directory path where the source artifacts are stored. - BasePath string `json:"basePath"` - - // Hostname is the file server host name used to compose the artifacts URIs. - Hostname string `json:"hostname"` - - // ArtifactRetentionTTL is the duration of time that artifacts will be kept - // in storage before being garbage collected. - ArtifactRetentionTTL time.Duration `json:"artifactRetentionTTL"` - - // ArtifactRetentionRecords is the maximum number of artifacts to be kept in - // storage after a garbage collection. - ArtifactRetentionRecords int `json:"artifactRetentionRecords"` -} - -// New creates the storage helper for a given path and hostname. -func New(basePath string, hostname string, artifactRetentionTTL time.Duration, artifactRetentionRecords int) (*Storage, error) { - if f, err := os.Stat(basePath); os.IsNotExist(err) || !f.IsDir() { - return nil, fmt.Errorf("invalid dir path: %s", basePath) - } - return &Storage{ - BasePath: basePath, - Hostname: hostname, - ArtifactRetentionTTL: artifactRetentionTTL, - ArtifactRetentionRecords: artifactRetentionRecords, - }, nil -} - -// NewArtifactFor returns a new meta.Artifact. -func (s Storage) NewArtifactFor(kind string, metadata metav1.Object, revision, fileName string) meta.Artifact { - path := ArtifactPath(kind, metadata.GetNamespace(), metadata.GetName(), fileName) - artifact := meta.Artifact{ - Path: path, - Revision: revision, - } - s.SetArtifactURL(&artifact) - return artifact -} - -// SetArtifactURL sets the URL on the given meta.Artifact. -func (s Storage) SetArtifactURL(artifact *meta.Artifact) { - if artifact.Path == "" { - return - } - format := "http://%s/%s" - if strings.HasPrefix(s.Hostname, "http://") || strings.HasPrefix(s.Hostname, "https://") { - format = "%s/%s" - } - artifact.URL = fmt.Sprintf(format, s.Hostname, strings.TrimLeft(artifact.Path, "/")) -} - -// SetHostname sets the hostname of the given URL string to the current Storage.Hostname and returns the result. -func (s Storage) SetHostname(URL string) string { - u, err := url.Parse(URL) - if err != nil { - return "" - } - u.Host = s.Hostname - return u.String() -} - -// MkdirAll calls os.MkdirAll for the given meta.Artifact base dir. -func (s Storage) MkdirAll(artifact meta.Artifact) error { - dir := filepath.Dir(s.LocalPath(artifact)) - return os.MkdirAll(dir, 0o700) -} - -// Remove calls os.Remove for the given meta.Artifact path. -func (s Storage) Remove(artifact meta.Artifact) error { - return os.Remove(s.LocalPath(artifact)) -} - -// RemoveAll calls os.RemoveAll for the given meta.Artifact base dir. -func (s Storage) RemoveAll(artifact meta.Artifact) (string, error) { - var deletedDir string - dir := filepath.Dir(s.LocalPath(artifact)) - // Check if the dir exists. - _, err := os.Stat(dir) - if err == nil { - deletedDir = dir - } - return deletedDir, os.RemoveAll(dir) -} - -// RemoveAllButCurrent removes all files for the given meta.Artifact base dir, excluding the current one. -func (s Storage) RemoveAllButCurrent(artifact meta.Artifact) ([]string, error) { - deletedFiles := []string{} - localPath := s.LocalPath(artifact) - dir := filepath.Dir(localPath) - var errors []string - _ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - errors = append(errors, err.Error()) - return nil - } - - if path != localPath && !info.IsDir() && info.Mode()&os.ModeSymlink != os.ModeSymlink { - if err := os.Remove(path); err != nil { - errors = append(errors, info.Name()) - } else { - // Collect the successfully deleted file paths. - deletedFiles = append(deletedFiles, path) - } - } - return nil - }) - - if len(errors) > 0 { - return deletedFiles, fmt.Errorf("failed to remove files: %s", strings.Join(errors, " ")) - } - return deletedFiles, nil -} - -// getGarbageFiles returns all files that need to be garbage collected for the given artifact. -// Garbage files are determined based on the below flow: -// 1. collect all artifact files with an expired ttl -// 2. if we satisfy maxItemsToBeRetained, then return -// 3. else, collect all artifact files till the latest n files remain, where n=maxItemsToBeRetained -func (s Storage) getGarbageFiles(artifact meta.Artifact, totalCountLimit, maxItemsToBeRetained int, ttl time.Duration) (garbageFiles []string, _ error) { - localPath := s.LocalPath(artifact) - dir := filepath.Dir(localPath) - artifactFilesWithCreatedTs := make(map[time.Time]string) - // sortedPaths contain all files sorted according to their created ts. - sortedPaths := []string{} - now := time.Now().UTC() - totalArtifactFiles := 0 - var errors []string - creationTimestamps := []time.Time{} - _ = filepath.WalkDir(dir, func(path string, d fs.DirEntry, err error) error { - if err != nil { - errors = append(errors, err.Error()) - return nil - } - if totalArtifactFiles >= totalCountLimit { - return fmt.Errorf("reached file walking limit, already walked over: %d", totalArtifactFiles) - } - info, err := d.Info() - if err != nil { - errors = append(errors, err.Error()) - return nil - } - createdAt := info.ModTime().UTC() - diff := now.Sub(createdAt) - // Compare the time difference between now and the time at which the file was created - // with the provided TTL. Delete if the difference is greater than the TTL. Since the - // below logic just deals with determining if an artifact needs to be garbage collected, - // we avoid all lock files, adding them at the end to the list of garbage files. - expired := diff > ttl - if !info.IsDir() && info.Mode()&os.ModeSymlink != os.ModeSymlink && filepath.Ext(path) != ".lock" { - if path != localPath && expired { - garbageFiles = append(garbageFiles, path) - } - totalArtifactFiles += 1 - artifactFilesWithCreatedTs[createdAt] = path - creationTimestamps = append(creationTimestamps, createdAt) - } - return nil - - }) - if len(errors) > 0 { - return nil, fmt.Errorf("can't walk over file: %s", strings.Join(errors, ",")) - } - - // We already collected enough garbage files to satisfy the no. of max - // items that are supposed to be retained, so exit early. - if totalArtifactFiles-len(garbageFiles) < maxItemsToBeRetained { - return garbageFiles, nil - } - - // sort all timestamps in ascending order. - sort.Slice(creationTimestamps, func(i, j int) bool { return creationTimestamps[i].Before(creationTimestamps[j]) }) - for _, ts := range creationTimestamps { - path, ok := artifactFilesWithCreatedTs[ts] - if !ok { - return garbageFiles, fmt.Errorf("failed to fetch file for created ts: %v", ts) - } - sortedPaths = append(sortedPaths, path) - } - - var collected int - noOfGarbageFiles := len(garbageFiles) - for _, path := range sortedPaths { - if path != localPath && filepath.Ext(path) != ".lock" && !stringInSlice(path, garbageFiles) { - // If we previously collected some garbage files with an expired ttl, then take that into account - // when checking whether we need to remove more files to satisfy the max no. of items allowed - // in the filesystem, along with the no. of files already removed in this loop. - if noOfGarbageFiles > 0 { - if (len(sortedPaths) - collected - len(garbageFiles)) > maxItemsToBeRetained { - garbageFiles = append(garbageFiles, path) - collected += 1 - } - } else { - if len(sortedPaths)-collected > maxItemsToBeRetained { - garbageFiles = append(garbageFiles, path) - collected += 1 - } - } - } - } - - return garbageFiles, nil -} - -// GarbageCollect removes all garbage files in the artifact dir according to the provided -// retention options. -func (s Storage) GarbageCollect(ctx context.Context, artifact meta.Artifact, timeout time.Duration) ([]string, error) { - delFilesChan := make(chan []string) - errChan := make(chan error) - // Abort if it takes more than the provided timeout duration. - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - go func() { - garbageFiles, err := s.getGarbageFiles(artifact, GarbageCountLimit, s.ArtifactRetentionRecords, s.ArtifactRetentionTTL) - if err != nil { - errChan <- err - return - } - var errors []error - var deleted []string - if len(garbageFiles) > 0 { - for _, file := range garbageFiles { - err := os.Remove(file) - if err != nil { - errors = append(errors, err) - } else { - deleted = append(deleted, file) - } - // If a lock file exists for this garbage artifact, remove that too. - lockFile := file + ".lock" - if _, err = os.Lstat(lockFile); err == nil { - err = os.Remove(lockFile) - if err != nil { - errors = append(errors, err) - } - } - } - } - if len(errors) > 0 { - errChan <- kerrors.NewAggregate(errors) - return - } - delFilesChan <- deleted - }() - - for { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case delFiles := <-delFilesChan: - return delFiles, nil - case err := <-errChan: - return nil, err - } - } -} - -func stringInSlice(a string, list []string) bool { - for _, b := range list { - if b == a { - return true - } - } - return false -} - -// ArtifactExist returns a boolean indicating whether the meta.Artifact exists in storage and is a regular file. -func (s Storage) ArtifactExist(artifact meta.Artifact) bool { - fi, err := os.Lstat(s.LocalPath(artifact)) - if err != nil { - return false - } - return fi.Mode().IsRegular() -} - -// VerifyArtifact verifies if the Digest of the meta.Artifact matches the digest -// of the file in Storage. It returns an error if the digests don't match, or -// if it can't be verified. -func (s Storage) VerifyArtifact(artifact meta.Artifact) error { - if artifact.Digest == "" { - return fmt.Errorf("artifact has no digest") - } - - d, err := digest.Parse(artifact.Digest) - if err != nil { - return fmt.Errorf("failed to parse artifact digest '%s': %w", artifact.Digest, err) - } - - f, err := os.Open(s.LocalPath(artifact)) - if err != nil { - return err - } - defer f.Close() - - verifier := d.Verifier() - if _, err = io.Copy(verifier, f); err != nil { - return err - } - if !verifier.Verified() { - return fmt.Errorf("computed digest doesn't match '%s'", d.String()) - } - return nil -} - -// ArchiveFileFilter must return true if a file should not be included in the archive after inspecting the given path -// and/or os.FileInfo. -type ArchiveFileFilter func(p string, fi os.FileInfo) bool - -// SourceIgnoreFilter returns an ArchiveFileFilter that filters out files matching sourceignore.VCSPatterns and any of -// the provided patterns. -// If an empty gitignore.Pattern slice is given, the matcher is set to sourceignore.NewDefaultMatcher. -func SourceIgnoreFilter(ps []gitignore.Pattern, domain []string) ArchiveFileFilter { - matcher := sourceignore.NewDefaultMatcher(ps, domain) - if len(ps) > 0 { - ps = append(sourceignore.VCSPatterns(domain), ps...) - matcher = sourceignore.NewMatcher(ps) - } - return func(p string, fi os.FileInfo) bool { - return matcher.Match(strings.Split(p, string(filepath.Separator)), fi.IsDir()) - } -} - -// Archive atomically archives the given directory as a tarball to the given meta.Artifact path, excluding -// directories and any ArchiveFileFilter matches. While archiving, any environment specific data (for example, -// the user and group name) is stripped from file headers. -// If successful, it sets the digest and last update time on the artifact. -func (s Storage) Archive(artifact *meta.Artifact, dir string, filter ArchiveFileFilter) (err error) { - if f, err := os.Stat(dir); os.IsNotExist(err) || !f.IsDir() { - return fmt.Errorf("invalid dir path: %s", dir) - } - - localPath := s.LocalPath(*artifact) - tf, err := os.CreateTemp(filepath.Split(localPath)) - if err != nil { - return err - } - tmpName := tf.Name() - defer func() { - if err != nil { - os.Remove(tmpName) - } - }() - - d := intdigest.Canonical.Digester() - sz := &writeCounter{} - mw := io.MultiWriter(d.Hash(), tf, sz) - - gw := gzip.NewWriter(mw) - tw := tar.NewWriter(gw) - if err := filepath.Walk(dir, func(p string, fi os.FileInfo, err error) error { - if err != nil { - return err - } - - // Ignore anything that is not a file or directories e.g. symlinks - if m := fi.Mode(); !(m.IsRegular() || m.IsDir()) { - return nil - } - - // Skip filtered files - if filter != nil && filter(p, fi) { - return nil - } - - header, err := tar.FileInfoHeader(fi, p) - if err != nil { - return err - } - - // The name needs to be modified to maintain directory structure - // as tar.FileInfoHeader only has access to the base name of the file. - // Ref: https://golang.org/src/archive/tar/common.go?#L626 - relFilePath := p - if filepath.IsAbs(dir) { - relFilePath, err = filepath.Rel(dir, p) - if err != nil { - return err - } - } - sanitizeHeader(relFilePath, header) - - if err := tw.WriteHeader(header); err != nil { - return err - } - - if !fi.Mode().IsRegular() { - return nil - } - f, err := os.Open(p) - if err != nil { - f.Close() - return err - } - if _, err := io.Copy(tw, f); err != nil { - f.Close() - return err - } - return f.Close() - }); err != nil { - tw.Close() - gw.Close() - tf.Close() - return err - } - - if err := tw.Close(); err != nil { - gw.Close() - tf.Close() - return err - } - if err := gw.Close(); err != nil { - tf.Close() - return err - } - if err := tf.Close(); err != nil { - return err - } - - if err := os.Chmod(tmpName, 0o600); err != nil { - return err - } - - if err := oci.RenameWithFallback(tmpName, localPath); err != nil { - return err - } - - artifact.Digest = d.Digest().String() - artifact.LastUpdateTime = metav1.Now() - artifact.Size = &sz.written - - return nil -} - -// AtomicWriteFile atomically writes the io.Reader contents to the meta.Artifact path. -// If successful, it sets the digest and last update time on the artifact. -func (s Storage) AtomicWriteFile(artifact *meta.Artifact, reader io.Reader, mode os.FileMode) (err error) { - localPath := s.LocalPath(*artifact) - tf, err := os.CreateTemp(filepath.Split(localPath)) - if err != nil { - return err - } - tfName := tf.Name() - defer func() { - if err != nil { - os.Remove(tfName) - } - }() - - d := intdigest.Canonical.Digester() - sz := &writeCounter{} - mw := io.MultiWriter(tf, d.Hash(), sz) - - if _, err := io.Copy(mw, reader); err != nil { - tf.Close() - return err - } - if err := tf.Close(); err != nil { - return err - } - - if err := os.Chmod(tfName, mode); err != nil { - return err - } - - if err := oci.RenameWithFallback(tfName, localPath); err != nil { - return err - } - - artifact.Digest = d.Digest().String() - artifact.LastUpdateTime = metav1.Now() - artifact.Size = &sz.written - - return nil -} - -// Copy atomically copies the io.Reader contents to the meta.Artifact path. -// If successful, it sets the digest and last update time on the artifact. -func (s Storage) Copy(artifact *meta.Artifact, reader io.Reader) (err error) { - localPath := s.LocalPath(*artifact) - tf, err := os.CreateTemp(filepath.Split(localPath)) - if err != nil { - return err - } - tfName := tf.Name() - defer func() { - if err != nil { - os.Remove(tfName) - } - }() - - d := intdigest.Canonical.Digester() - sz := &writeCounter{} - mw := io.MultiWriter(tf, d.Hash(), sz) - - if _, err := io.Copy(mw, reader); err != nil { - tf.Close() - return err - } - if err := tf.Close(); err != nil { - return err - } - - if err := oci.RenameWithFallback(tfName, localPath); err != nil { - return err - } - - artifact.Digest = d.Digest().String() - artifact.LastUpdateTime = metav1.Now() - artifact.Size = &sz.written - - return nil -} - -// CopyFromPath atomically copies the contents of the given path to the path of the meta.Artifact. -// If successful, the digest and last update time on the artifact is set. -func (s Storage) CopyFromPath(artifact *meta.Artifact, path string) (err error) { - f, err := os.Open(path) - if err != nil { - return err - } - defer func() { - if cerr := f.Close(); cerr != nil && err == nil { - err = cerr - } - }() - err = s.Copy(artifact, f) - return err -} - -// CopyToPath copies the contents in the (sub)path of the given artifact to the given path. -func (s Storage) CopyToPath(artifact *meta.Artifact, subPath, toPath string) error { - // create a tmp directory to store artifact - tmp, err := os.MkdirTemp("", "flux-include-") - if err != nil { - return err - } - defer os.RemoveAll(tmp) - - // read artifact file content - localPath := s.LocalPath(*artifact) - f, err := os.Open(localPath) - if err != nil { - return err - } - defer f.Close() - - // untar the artifact - untarPath := filepath.Join(tmp, "unpack") - if err = pkgtar.Untar(f, untarPath, pkgtar.WithMaxUntarSize(-1)); err != nil { - return err - } - - // create the destination parent dir - if err = os.MkdirAll(filepath.Dir(toPath), os.ModePerm); err != nil { - return err - } - - // copy the artifact content to the destination dir - fromPath, err := securejoin.SecureJoin(untarPath, subPath) - if err != nil { - return err - } - if err := oci.RenameWithFallback(fromPath, toPath); err != nil { - return err - } - return nil -} - -// Symlink creates or updates a symbolic link for the given meta.Artifact and returns the URL for the symlink. -func (s Storage) Symlink(artifact meta.Artifact, linkName string) (string, error) { - localPath := s.LocalPath(artifact) - dir := filepath.Dir(localPath) - link := filepath.Join(dir, linkName) - tmpLink := link + ".tmp" - - if err := os.Remove(tmpLink); err != nil && !os.IsNotExist(err) { - return "", err - } - - if err := os.Symlink(localPath, tmpLink); err != nil { - return "", err - } - - if err := os.Rename(tmpLink, link); err != nil { - return "", err - } - - return fmt.Sprintf("http://%s/%s", s.Hostname, filepath.Join(filepath.Dir(artifact.Path), linkName)), nil -} - -// Lock creates a file lock for the given meta.Artifact. -func (s Storage) Lock(artifact meta.Artifact) (unlock func(), err error) { - lockFile := s.LocalPath(artifact) + ".lock" - mutex := lockedfile.MutexAt(lockFile) - return mutex.Lock() -} - -// LocalPath returns the secure local path of the given artifact (that is: relative to the Storage.BasePath). -func (s Storage) LocalPath(artifact meta.Artifact) string { - if artifact.Path == "" { - return "" - } - path, err := securejoin.SecureJoin(s.BasePath, artifact.Path) - if err != nil { - return "" - } - return path -} - -// writeCounter is an implementation of io.Writer that only records the number -// of bytes written. -type writeCounter struct { - written int64 -} - -func (wc *writeCounter) Write(p []byte) (int, error) { - n := len(p) - wc.written += int64(n) - return n, nil -} - -// sanitizeHeader modifies the tar.Header to be relative to the root of the -// archive and removes any environment specific data. -func sanitizeHeader(relP string, h *tar.Header) { - // Modify the name to be relative to the root of the archive, - // this ensures we maintain the same structure when extracting. - h.Name = relP - - // We want to remove any environment specific data as well, this - // ensures the checksum is purely content based. - h.Gid = 0 - h.Uid = 0 - h.Uname = "" - h.Gname = "" - h.ModTime = time.Time{} - h.AccessTime = time.Time{} - h.ChangeTime = time.Time{} - - // Override the mode to be the default for the type of file. - setDefaultMode(h) -} - -// setDefaultMode sets the default mode for the given header. -func setDefaultMode(h *tar.Header) { - if h.FileInfo().IsDir() { - h.Mode = defaultDirMode - return - } - - if h.FileInfo().Mode().IsRegular() { - mode := h.FileInfo().Mode() - if mode&os.ModeType == 0 && mode&0o111 != 0 { - h.Mode = defaultExeFileMode - return - } - h.Mode = defaultFileMode - return - } -} - -// ArtifactDir returns the artifact dir path in the form of -// '//'. -func ArtifactDir(kind, namespace, name string) string { - kind = strings.ToLower(kind) - return path.Join(kind, namespace, name) -} - -// ArtifactPath returns the artifact path in the form of -// '//name>/'. -func ArtifactPath(kind, namespace, name, filename string) string { - return path.Join(ArtifactDir(kind, namespace, name), filename) -} diff --git a/internal/storage/storage_test.go b/internal/storage/storage_test.go deleted file mode 100644 index 6890e9388..000000000 --- a/internal/storage/storage_test.go +++ /dev/null @@ -1,864 +0,0 @@ -/* -Copyright 2025 The Flux authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package storage - -import ( - "archive/tar" - "bytes" - "compress/gzip" - "context" - "errors" - "fmt" - "io" - "math/rand" - "os" - "path/filepath" - "strings" - "testing" - "time" - - "github.com/go-git/go-git/v5/plumbing/format/gitignore" - . "github.com/onsi/gomega" - - "github.com/fluxcd/pkg/apis/meta" -) - -func TestStorageConstructor(t *testing.T) { - dir := t.TempDir() - - if _, err := New("/nonexistent", "hostname", time.Minute, 2); err == nil { - t.Fatal("nonexistent path was allowable in storage constructor") - } - - f, err := os.CreateTemp(dir, "") - if err != nil { - t.Fatalf("while creating temporary file: %v", err) - } - f.Close() - - if _, err := New(f.Name(), "hostname", time.Minute, 2); err == nil { - os.Remove(f.Name()) - t.Fatal("file path was accepted as basedir") - } - os.Remove(f.Name()) - - if _, err := New(dir, "hostname", time.Minute, 2); err != nil { - t.Fatalf("Valid path did not successfully return: %v", err) - } -} - -// walks a tar.gz and looks for paths with the basename. It does not match -// symlinks properly at this time because that's painful. -func walkTar(tarFile string, match string, dir bool) (int64, int64, bool, error) { - f, err := os.Open(tarFile) - if err != nil { - return 0, 0, false, fmt.Errorf("could not open file: %w", err) - } - defer f.Close() - - gzr, err := gzip.NewReader(f) - if err != nil { - return 0, 0, false, fmt.Errorf("could not unzip file: %w", err) - } - defer gzr.Close() - - tr := tar.NewReader(gzr) - for { - header, err := tr.Next() - if err == io.EOF { - break - } else if err != nil { - return 0, 0, false, fmt.Errorf("corrupt tarball reading header: %w", err) - } - - switch header.Typeflag { - case tar.TypeDir: - if header.Name == match && dir { - return 0, header.Mode, true, nil - } - case tar.TypeReg: - if header.Name == match { - return header.Size, header.Mode, true, nil - } - default: - // skip - } - } - - return 0, 0, false, nil -} - -func TestStorage_Archive(t *testing.T) { - dir := t.TempDir() - - storage, err := New(dir, "hostname", time.Minute, 2) - if err != nil { - t.Fatalf("error while bootstrapping storage: %v", err) - } - - type dummyFile struct { - content []byte - mode int64 - } - - createFiles := func(files map[string]dummyFile) (dir string, err error) { - dir = t.TempDir() - for name, df := range files { - absPath := filepath.Join(dir, name) - if err = os.MkdirAll(filepath.Dir(absPath), 0o750); err != nil { - return - } - f, err := os.Create(absPath) - if err != nil { - return "", fmt.Errorf("could not create file %q: %w", absPath, err) - } - if n, err := f.Write(df.content); err != nil { - f.Close() - return "", fmt.Errorf("could not write %d bytes to file %q: %w", n, f.Name(), err) - } - f.Close() - - if df.mode != 0 { - if err = os.Chmod(absPath, os.FileMode(df.mode)); err != nil { - return "", fmt.Errorf("could not chmod file %q: %w", absPath, err) - } - } - } - return - } - - matchFiles := func(t *testing.T, storage *Storage, artifact meta.Artifact, files map[string]dummyFile, dirs []string) { - t.Helper() - for name, df := range files { - mustExist := !(name[0:1] == "!") - if !mustExist { - name = name[1:] - } - s, m, exist, err := walkTar(storage.LocalPath(artifact), name, false) - if err != nil { - t.Fatalf("failed reading tarball: %v", err) - } - if bs := int64(len(df.content)); s != bs { - t.Fatalf("%q size %v != %v", name, s, bs) - } - if exist != mustExist { - if mustExist { - t.Errorf("could not find file %q in tarball", name) - } else { - t.Errorf("tarball contained excluded file %q", name) - } - } - expectMode := df.mode - if expectMode == 0 { - expectMode = defaultFileMode - } - if exist && m != expectMode { - t.Fatalf("%q mode %v != %v", name, m, expectMode) - } - } - for _, name := range dirs { - mustExist := !(name[0:1] == "!") - if !mustExist { - name = name[1:] - } - _, m, exist, err := walkTar(storage.LocalPath(artifact), name, true) - if err != nil { - t.Fatalf("failed reading tarball: %v", err) - } - if exist != mustExist { - if mustExist { - t.Errorf("could not find dir %q in tarball", name) - } else { - t.Errorf("tarball contained excluded file %q", name) - } - } - if exist && m != defaultDirMode { - t.Fatalf("%q mode %v != %v", name, m, defaultDirMode) - } - - } - } - - tests := []struct { - name string - files map[string]dummyFile - filter ArchiveFileFilter - want map[string]dummyFile - wantDirs []string - wantErr bool - }{ - { - name: "no filter", - files: map[string]dummyFile{ - ".git/config": {}, - "file.jpg": {content: []byte(`contents`)}, - "manifest.yaml": {}, - }, - filter: nil, - want: map[string]dummyFile{ - ".git/config": {}, - "file.jpg": {content: []byte(`contents`)}, - "manifest.yaml": {}, - }, - }, - { - name: "exclude VCS", - files: map[string]dummyFile{ - ".git/config": {}, - "manifest.yaml": {}, - }, - wantDirs: []string{ - "!.git", - }, - filter: SourceIgnoreFilter(nil, nil), - want: map[string]dummyFile{ - "!.git/config": {}, - "manifest.yaml": {}, - }, - }, - { - name: "custom", - files: map[string]dummyFile{ - ".git/config": {}, - "custom": {}, - "horse.jpg": {}, - }, - filter: SourceIgnoreFilter([]gitignore.Pattern{ - gitignore.ParsePattern("custom", nil), - }, nil), - want: map[string]dummyFile{ - "!git/config": {}, - "!custom": {}, - "horse.jpg": {}, - }, - wantErr: false, - }, - { - name: "including directories", - files: map[string]dummyFile{ - "test/.gitkeep": {}, - }, - filter: SourceIgnoreFilter([]gitignore.Pattern{ - gitignore.ParsePattern("custom", nil), - }, nil), - wantDirs: []string{ - "test", - }, - wantErr: false, - }, - { - name: "sets default file modes", - files: map[string]dummyFile{ - "test/file": { - mode: 0o666, - }, - "test/executable": { - mode: 0o777, - }, - }, - want: map[string]dummyFile{ - "test/file": { - mode: defaultFileMode, - }, - "test/executable": { - mode: defaultExeFileMode, - }, - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - dir, err := createFiles(tt.files) - if err != nil { - t.Error(err) - return - } - defer os.RemoveAll(dir) - artifact := meta.Artifact{ - Path: filepath.Join(randStringRunes(10), randStringRunes(10), randStringRunes(10)+".tar.gz"), - } - if err := storage.MkdirAll(artifact); err != nil { - t.Fatalf("artifact directory creation failed: %v", err) - } - if err := storage.Archive(&artifact, dir, tt.filter); (err != nil) != tt.wantErr { - t.Errorf("Archive() error = %v, wantErr %v", err, tt.wantErr) - } - matchFiles(t, storage, artifact, tt.want, tt.wantDirs) - }) - } -} - -func TestStorage_Remove(t *testing.T) { - t.Run("removes file", func(t *testing.T) { - g := NewWithT(t) - - dir := t.TempDir() - - s, err := New(dir, "", 0, 0) - g.Expect(err).ToNot(HaveOccurred()) - - artifact := meta.Artifact{ - Path: filepath.Join(dir, "test.txt"), - } - g.Expect(s.MkdirAll(artifact)).To(Succeed()) - g.Expect(s.AtomicWriteFile(&artifact, bytes.NewReader([]byte("test")), 0o600)).To(Succeed()) - g.Expect(s.ArtifactExist(artifact)).To(BeTrue()) - - g.Expect(s.Remove(artifact)).To(Succeed()) - g.Expect(s.ArtifactExist(artifact)).To(BeFalse()) - }) - - t.Run("error if file does not exist", func(t *testing.T) { - g := NewWithT(t) - - dir := t.TempDir() - - s, err := New(dir, "", 0, 0) - g.Expect(err).ToNot(HaveOccurred()) - - artifact := meta.Artifact{ - Path: filepath.Join(dir, "test.txt"), - } - - err = s.Remove(artifact) - g.Expect(err).To(HaveOccurred()) - g.Expect(errors.Is(err, os.ErrNotExist)).To(BeTrue()) - }) -} - -func TestStorageRemoveAllButCurrent(t *testing.T) { - t.Run("bad directory in archive", func(t *testing.T) { - dir := t.TempDir() - - s, err := New(dir, "hostname", time.Minute, 2) - if err != nil { - t.Fatalf("Valid path did not successfully return: %v", err) - } - - if _, err := s.RemoveAllButCurrent(meta.Artifact{Path: filepath.Join(dir, "really", "nonexistent")}); err == nil { - t.Fatal("Did not error while pruning non-existent path") - } - }) - - t.Run("collect names of deleted items", func(t *testing.T) { - g := NewWithT(t) - dir := t.TempDir() - - s, err := New(dir, "hostname", time.Minute, 2) - g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage") - - artifact := meta.Artifact{ - Path: filepath.Join("foo", "bar", "artifact1.tar.gz"), - } - - // Create artifact dir and artifacts. - artifactDir := filepath.Join(dir, "foo", "bar") - g.Expect(os.MkdirAll(artifactDir, 0o750)).NotTo(HaveOccurred()) - current := []string{ - filepath.Join(artifactDir, "artifact1.tar.gz"), - } - wantDeleted := []string{ - filepath.Join(artifactDir, "file1.txt"), - filepath.Join(artifactDir, "file2.txt"), - } - createFile := func(files []string) { - for _, c := range files { - f, err := os.Create(c) - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(f.Close()).ToNot(HaveOccurred()) - } - } - createFile(current) - createFile(wantDeleted) - _, err = s.Symlink(artifact, "latest.tar.gz") - g.Expect(err).ToNot(HaveOccurred(), "failed to create symlink") - - deleted, err := s.RemoveAllButCurrent(artifact) - g.Expect(err).ToNot(HaveOccurred(), "failed to remove all but current") - g.Expect(deleted).To(Equal(wantDeleted)) - }) -} - -func TestStorageRemoveAll(t *testing.T) { - tests := []struct { - name string - artifactPath string - createArtifactPath bool - wantDeleted string - }{ - { - name: "delete non-existent path", - artifactPath: filepath.Join("foo", "bar", "artifact1.tar.gz"), - createArtifactPath: false, - wantDeleted: "", - }, - { - name: "delete existing path", - artifactPath: filepath.Join("foo", "bar", "artifact1.tar.gz"), - createArtifactPath: true, - wantDeleted: filepath.Join("foo", "bar"), - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - g := NewWithT(t) - dir := t.TempDir() - - s, err := New(dir, "hostname", time.Minute, 2) - g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage") - - artifact := meta.Artifact{ - Path: tt.artifactPath, - } - - if tt.createArtifactPath { - g.Expect(os.MkdirAll(filepath.Join(dir, tt.artifactPath), 0o750)).ToNot(HaveOccurred()) - } - - deleted, err := s.RemoveAll(artifact) - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(deleted).To(ContainSubstring(tt.wantDeleted), "unexpected deleted path") - }) - } -} - -func TestStorageCopyFromPath(t *testing.T) { - type File struct { - Name string - Content []byte - } - - dir := t.TempDir() - - storage, err := New(dir, "hostname", time.Minute, 2) - if err != nil { - t.Fatalf("error while bootstrapping storage: %v", err) - } - - createFile := func(file *File) (absPath string, err error) { - dir = t.TempDir() - absPath = filepath.Join(dir, file.Name) - if err = os.MkdirAll(filepath.Dir(absPath), 0o750); err != nil { - return - } - f, err := os.Create(absPath) - if err != nil { - return "", fmt.Errorf("could not create file %q: %w", absPath, err) - } - if n, err := f.Write(file.Content); err != nil { - f.Close() - return "", fmt.Errorf("could not write %d bytes to file %q: %w", n, f.Name(), err) - } - f.Close() - return - } - - matchFile := func(t *testing.T, storage *Storage, artifact meta.Artifact, file *File, expectMismatch bool) { - c, err := os.ReadFile(storage.LocalPath(artifact)) - if err != nil { - t.Fatalf("failed reading file: %v", err) - } - if (string(c) != string(file.Content)) != expectMismatch { - t.Errorf("artifact content does not match and not expecting mismatch, got: %q, want: %q", string(c), string(file.Content)) - } - } - - tests := []struct { - name string - file *File - want *File - expectMismatch bool - }{ - { - name: "content match", - file: &File{ - Name: "manifest.yaml", - Content: []byte(`contents`), - }, - want: &File{ - Name: "manifest.yaml", - Content: []byte(`contents`), - }, - }, - { - name: "content not match", - file: &File{ - Name: "manifest.yaml", - Content: []byte(`contents`), - }, - want: &File{ - Name: "manifest.yaml", - Content: []byte(`mismatch contents`), - }, - expectMismatch: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - absPath, err := createFile(tt.file) - if err != nil { - t.Error(err) - return - } - artifact := meta.Artifact{ - Path: filepath.Join(randStringRunes(10), randStringRunes(10), randStringRunes(10)), - } - if err := storage.MkdirAll(artifact); err != nil { - t.Fatalf("artifact directory creation failed: %v", err) - } - if err := storage.CopyFromPath(&artifact, absPath); err != nil { - t.Errorf("CopyFromPath() error = %v", err) - } - matchFile(t, storage, artifact, tt.want, tt.expectMismatch) - }) - } -} - -func TestStorage_getGarbageFiles(t *testing.T) { - artifactFolder := filepath.Join("foo", "bar") - tests := []struct { - name string - artifactPaths []string - createPause time.Duration - ttl time.Duration - maxItemsToBeRetained int - totalCountLimit int - wantDeleted []string - }{ - { - name: "delete files based on maxItemsToBeRetained", - artifactPaths: []string{ - filepath.Join(artifactFolder, "artifact1.tar.gz"), - filepath.Join(artifactFolder, "artifact2.tar.gz"), - filepath.Join(artifactFolder, "artifact3.tar.gz"), - filepath.Join(artifactFolder, "artifact4.tar.gz"), - filepath.Join(artifactFolder, "artifact5.tar.gz"), - }, - createPause: time.Millisecond * 10, - ttl: time.Minute * 2, - totalCountLimit: 10, - maxItemsToBeRetained: 2, - wantDeleted: []string{ - filepath.Join(artifactFolder, "artifact1.tar.gz"), - filepath.Join(artifactFolder, "artifact2.tar.gz"), - filepath.Join(artifactFolder, "artifact3.tar.gz"), - }, - }, - { - name: "delete files based on maxItemsToBeRetained, ignore lock files", - artifactPaths: []string{ - filepath.Join(artifactFolder, "artifact1.tar.gz"), - filepath.Join(artifactFolder, "artifact1.tar.gz.lock"), - filepath.Join(artifactFolder, "artifact2.tar.gz"), - filepath.Join(artifactFolder, "artifact2.tar.gz.lock"), - filepath.Join(artifactFolder, "artifact3.tar.gz"), - filepath.Join(artifactFolder, "artifact3.tar.gz.lock"), - filepath.Join(artifactFolder, "artifact4.tar.gz"), - filepath.Join(artifactFolder, "artifact5.tar.gz"), - }, - createPause: time.Millisecond * 10, - ttl: time.Minute * 2, - totalCountLimit: 10, - maxItemsToBeRetained: 2, - wantDeleted: []string{ - filepath.Join(artifactFolder, "artifact1.tar.gz"), - filepath.Join(artifactFolder, "artifact2.tar.gz"), - filepath.Join(artifactFolder, "artifact3.tar.gz"), - }, - }, - { - name: "delete files based on ttl", - artifactPaths: []string{ - filepath.Join(artifactFolder, "artifact1.tar.gz"), - filepath.Join(artifactFolder, "artifact2.tar.gz"), - filepath.Join(artifactFolder, "artifact3.tar.gz"), - filepath.Join(artifactFolder, "artifact4.tar.gz"), - filepath.Join(artifactFolder, "artifact5.tar.gz"), - }, - createPause: time.Second * 1, - ttl: time.Second*3 + time.Millisecond*500, - totalCountLimit: 10, - maxItemsToBeRetained: 4, - wantDeleted: []string{ - filepath.Join(artifactFolder, "artifact1.tar.gz"), - filepath.Join(artifactFolder, "artifact2.tar.gz"), - }, - }, - { - name: "delete files based on ttl, ignore lock files", - artifactPaths: []string{ - filepath.Join(artifactFolder, "artifact1.tar.gz"), - filepath.Join(artifactFolder, "artifact1.tar.gz.lock"), - filepath.Join(artifactFolder, "artifact2.tar.gz"), - filepath.Join(artifactFolder, "artifact2.tar.gz.lock"), - filepath.Join(artifactFolder, "artifact3.tar.gz"), - filepath.Join(artifactFolder, "artifact4.tar.gz"), - filepath.Join(artifactFolder, "artifact5.tar.gz"), - }, - createPause: time.Second * 1, - ttl: time.Second*3 + time.Millisecond*500, - totalCountLimit: 10, - maxItemsToBeRetained: 4, - wantDeleted: []string{ - filepath.Join(artifactFolder, "artifact1.tar.gz"), - filepath.Join(artifactFolder, "artifact2.tar.gz"), - }, - }, - { - name: "delete files based on ttl and maxItemsToBeRetained", - artifactPaths: []string{ - filepath.Join(artifactFolder, "artifact1.tar.gz"), - filepath.Join(artifactFolder, "artifact2.tar.gz"), - filepath.Join(artifactFolder, "artifact3.tar.gz"), - filepath.Join(artifactFolder, "artifact4.tar.gz"), - filepath.Join(artifactFolder, "artifact5.tar.gz"), - filepath.Join(artifactFolder, "artifact6.tar.gz"), - }, - createPause: time.Second * 1, - ttl: time.Second*5 + time.Millisecond*500, - totalCountLimit: 10, - maxItemsToBeRetained: 4, - wantDeleted: []string{ - filepath.Join(artifactFolder, "artifact1.tar.gz"), - filepath.Join(artifactFolder, "artifact2.tar.gz"), - }, - }, - { - name: "delete files based on ttl and maxItemsToBeRetained and totalCountLimit", - artifactPaths: []string{ - filepath.Join(artifactFolder, "artifact1.tar.gz"), - filepath.Join(artifactFolder, "artifact2.tar.gz"), - filepath.Join(artifactFolder, "artifact3.tar.gz"), - filepath.Join(artifactFolder, "artifact4.tar.gz"), - filepath.Join(artifactFolder, "artifact5.tar.gz"), - filepath.Join(artifactFolder, "artifact6.tar.gz"), - }, - createPause: time.Millisecond * 500, - ttl: time.Millisecond * 500, - totalCountLimit: 3, - maxItemsToBeRetained: 2, - wantDeleted: []string{ - filepath.Join(artifactFolder, "artifact1.tar.gz"), - filepath.Join(artifactFolder, "artifact2.tar.gz"), - filepath.Join(artifactFolder, "artifact3.tar.gz"), - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - g := NewWithT(t) - dir := t.TempDir() - - s, err := New(dir, "hostname", tt.ttl, tt.maxItemsToBeRetained) - g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage") - - artifact := meta.Artifact{ - Path: tt.artifactPaths[len(tt.artifactPaths)-1], - } - g.Expect(os.MkdirAll(filepath.Join(dir, artifactFolder), 0o750)).ToNot(HaveOccurred()) - for _, artifactPath := range tt.artifactPaths { - f, err := os.Create(filepath.Join(dir, artifactPath)) - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(f.Close()).ToNot(HaveOccurred()) - time.Sleep(tt.createPause) - } - - deletedPaths, err := s.getGarbageFiles(artifact, tt.totalCountLimit, tt.maxItemsToBeRetained, tt.ttl) - g.Expect(err).ToNot(HaveOccurred(), "failed to collect garbage files") - g.Expect(len(tt.wantDeleted)).To(Equal(len(deletedPaths))) - for _, wantDeletedPath := range tt.wantDeleted { - present := false - for _, deletedPath := range deletedPaths { - if strings.Contains(deletedPath, wantDeletedPath) { - present = true - break - } - } - if !present { - g.Fail(fmt.Sprintf("expected file to be deleted, still exists: %s", wantDeletedPath)) - } - } - }) - } -} - -func TestStorage_GarbageCollect(t *testing.T) { - artifactFolder := filepath.Join("foo", "bar") - tests := []struct { - name string - artifactPaths []string - wantCollected []string - wantDeleted []string - wantErr string - ctxTimeout time.Duration - }{ - { - name: "garbage collects", - artifactPaths: []string{ - filepath.Join(artifactFolder, "artifact1.tar.gz"), - filepath.Join(artifactFolder, "artifact1.tar.gz.lock"), - filepath.Join(artifactFolder, "artifact2.tar.gz"), - filepath.Join(artifactFolder, "artifact2.tar.gz.lock"), - filepath.Join(artifactFolder, "artifact3.tar.gz"), - filepath.Join(artifactFolder, "artifact4.tar.gz"), - }, - wantCollected: []string{ - filepath.Join(artifactFolder, "artifact1.tar.gz"), - filepath.Join(artifactFolder, "artifact2.tar.gz"), - }, - wantDeleted: []string{ - filepath.Join(artifactFolder, "artifact1.tar.gz"), - filepath.Join(artifactFolder, "artifact1.tar.gz.lock"), - filepath.Join(artifactFolder, "artifact2.tar.gz"), - filepath.Join(artifactFolder, "artifact2.tar.gz.lock"), - }, - ctxTimeout: time.Second * 1, - }, - { - name: "garbage collection fails with context timeout", - artifactPaths: []string{ - filepath.Join(artifactFolder, "artifact1.tar.gz"), - filepath.Join(artifactFolder, "artifact2.tar.gz"), - filepath.Join(artifactFolder, "artifact3.tar.gz"), - filepath.Join(artifactFolder, "artifact4.tar.gz"), - }, - wantErr: "context deadline exceeded", - ctxTimeout: time.Nanosecond * 1, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - g := NewWithT(t) - dir := t.TempDir() - - s, err := New(dir, "hostname", time.Second*2, 2) - g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage") - - artifact := meta.Artifact{ - Path: tt.artifactPaths[len(tt.artifactPaths)-1], - } - g.Expect(os.MkdirAll(filepath.Join(dir, artifactFolder), 0o750)).ToNot(HaveOccurred()) - for i, artifactPath := range tt.artifactPaths { - f, err := os.Create(filepath.Join(dir, artifactPath)) - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(f.Close()).ToNot(HaveOccurred()) - if i != len(tt.artifactPaths)-1 { - time.Sleep(time.Second * 1) - } - } - - collectedPaths, err := s.GarbageCollect(context.TODO(), artifact, tt.ctxTimeout) - if tt.wantErr == "" { - g.Expect(err).ToNot(HaveOccurred(), "failed to collect garbage files") - } else { - g.Expect(err).To(HaveOccurred()) - g.Expect(err.Error()).To(ContainSubstring(tt.wantErr)) - } - if len(tt.wantCollected) > 0 { - g.Expect(len(tt.wantCollected)).To(Equal(len(collectedPaths))) - for _, wantCollectedPath := range tt.wantCollected { - present := false - for _, collectedPath := range collectedPaths { - if strings.Contains(collectedPath, wantCollectedPath) { - g.Expect(collectedPath).ToNot(BeAnExistingFile()) - present = true - break - } - } - if present == false { - g.Fail(fmt.Sprintf("expected file to be garbage collected, still exists: %s", wantCollectedPath)) - } - } - } - for _, delFile := range tt.wantDeleted { - g.Expect(filepath.Join(dir, delFile)).ToNot(BeAnExistingFile()) - } - }) - } -} - -func TestStorage_VerifyArtifact(t *testing.T) { - g := NewWithT(t) - - dir := t.TempDir() - s, err := New(dir, "", 0, 0) - g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage") - - g.Expect(os.WriteFile(filepath.Join(dir, "artifact"), []byte("test"), 0o600)).To(Succeed()) - - t.Run("artifact without digest", func(t *testing.T) { - g := NewWithT(t) - - err := s.VerifyArtifact(meta.Artifact{}) - g.Expect(err).To(HaveOccurred()) - g.Expect(err).To(MatchError("artifact has no digest")) - }) - - t.Run("artifact with invalid digest", func(t *testing.T) { - g := NewWithT(t) - - err := s.VerifyArtifact(meta.Artifact{Digest: "invalid"}) - g.Expect(err).To(HaveOccurred()) - g.Expect(err).To(MatchError("failed to parse artifact digest 'invalid': invalid checksum digest format")) - }) - - t.Run("artifact with invalid path", func(t *testing.T) { - g := NewWithT(t) - - err := s.VerifyArtifact(meta.Artifact{ - Digest: "sha256:9ba7a35ce8acd3557fe30680ef193ca7a36bb5dc62788f30de7122a0a5beab69", - Path: "invalid", - }) - g.Expect(err).To(HaveOccurred()) - g.Expect(errors.Is(err, os.ErrNotExist)).To(BeTrue()) - }) - - t.Run("artifact with digest mismatch", func(t *testing.T) { - g := NewWithT(t) - - err := s.VerifyArtifact(meta.Artifact{ - Digest: "sha256:9ba7a35ce8acd3557fe30680ef193ca7a36bb5dc62788f30de7122a0a5beab69", - Path: "artifact", - }) - g.Expect(err).To(HaveOccurred()) - g.Expect(err).To(MatchError("computed digest doesn't match 'sha256:9ba7a35ce8acd3557fe30680ef193ca7a36bb5dc62788f30de7122a0a5beab69'")) - }) - - t.Run("artifact with digest match", func(t *testing.T) { - g := NewWithT(t) - - err := s.VerifyArtifact(meta.Artifact{ - Digest: "sha256:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08", - Path: "artifact", - }) - g.Expect(err).ToNot(HaveOccurred()) - }) -} - -var letterRunes = []rune("abcdefghijklmnopqrstuvwxyz1234567890") - -func randStringRunes(n int) string { - b := make([]rune, n) - for i := range b { - b[i] = letterRunes[rand.Intn(len(letterRunes))] - } - return string(b) -} diff --git a/main.go b/main.go index 9bfb4e351..cb019e6e4 100644 --- a/main.go +++ b/main.go @@ -18,8 +18,6 @@ package main import ( "fmt" - "net" - "net/http" "os" "time" @@ -39,6 +37,10 @@ import ( ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + artcfg "github.com/fluxcd/pkg/artifact/config" + artdigest "github.com/fluxcd/pkg/artifact/digest" + artsrv "github.com/fluxcd/pkg/artifact/server" + artstore "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/auth" pkgcache "github.com/fluxcd/pkg/cache" "github.com/fluxcd/pkg/git" @@ -54,13 +56,11 @@ import ( "github.com/fluxcd/pkg/runtime/probes" sourcev1 "github.com/fluxcd/source-controller/api/v1" - intstorage "github.com/fluxcd/source-controller/internal/storage" // +kubebuilder:scaffold:imports "github.com/fluxcd/source-controller/internal/cache" "github.com/fluxcd/source-controller/internal/controller" - intdigest "github.com/fluxcd/source-controller/internal/digest" "github.com/fluxcd/source-controller/internal/features" "github.com/fluxcd/source-controller/internal/helm" "github.com/fluxcd/source-controller/internal/helm/registry" @@ -96,32 +96,27 @@ func main() { ) var ( - metricsAddr string - eventsAddr string - healthAddr string - storagePath string - storageAddr string - storageAdvAddr string - concurrent int - requeueDependency time.Duration - helmIndexLimit int64 - helmChartLimit int64 - helmChartFileLimit int64 - clientOptions client.Options - logOptions logger.Options - leaderElectionOptions leaderelection.Options - rateLimiterOptions helper.RateLimiterOptions - featureGates feathelper.FeatureGates - watchOptions helper.WatchOptions - intervalJitterOptions jitter.IntervalOptions - helmCacheMaxSize int - helmCacheTTL string - helmCachePurgeInterval string - artifactRetentionTTL time.Duration - artifactRetentionRecords int - artifactDigestAlgo string - tokenCacheOptions pkgcache.TokenFlags - defaultServiceAccount string + metricsAddr string + eventsAddr string + healthAddr string + concurrent int + requeueDependency time.Duration + helmIndexLimit int64 + helmChartLimit int64 + helmChartFileLimit int64 + artifactOptions artcfg.Options + clientOptions client.Options + logOptions logger.Options + leaderElectionOptions leaderelection.Options + rateLimiterOptions helper.RateLimiterOptions + featureGates feathelper.FeatureGates + watchOptions helper.WatchOptions + intervalJitterOptions jitter.IntervalOptions + helmCacheMaxSize int + helmCacheTTL string + helmCachePurgeInterval string + tokenCacheOptions pkgcache.TokenFlags + defaultServiceAccount string ) flag.StringVar(&metricsAddr, "metrics-addr", envOrDefault("METRICS_ADDR", ":8080"), @@ -129,12 +124,6 @@ func main() { flag.StringVar(&eventsAddr, "events-addr", envOrDefault("EVENTS_ADDR", ""), "The address of the events receiver.") flag.StringVar(&healthAddr, "health-addr", ":9440", "The address the health endpoint binds to.") - flag.StringVar(&storagePath, "storage-path", envOrDefault("STORAGE_PATH", ""), - "The local storage path.") - flag.StringVar(&storageAddr, "storage-addr", envOrDefault("STORAGE_ADDR", ":9090"), - "The address the static file server binds to.") - flag.StringVar(&storageAdvAddr, "storage-adv-addr", envOrDefault("STORAGE_ADV_ADDR", ""), - "The advertised address of the static file server.") flag.IntVar(&concurrent, "concurrent", 2, "The number of concurrent reconciles per controller.") flag.Int64Var(&helmIndexLimit, "helm-index-max-size", helm.MaxIndexSize, "The max allowed size in bytes of a Helm repository index file.") @@ -154,15 +143,10 @@ func main() { "The list of key exchange algorithms to use for ssh connections, arranged from most preferred to the least.") flag.StringSliceVar(&git.HostKeyAlgos, "ssh-hostkey-algos", []string{}, "The list of hostkey algorithms to use for ssh connections, arranged from most preferred to the least.") - flag.DurationVar(&artifactRetentionTTL, "artifact-retention-ttl", 60*time.Second, - "The duration of time that artifacts from previous reconciliations will be kept in storage before being garbage collected.") - flag.IntVar(&artifactRetentionRecords, "artifact-retention-records", 2, - "The maximum number of artifacts to be kept in storage after a garbage collection.") - flag.StringVar(&artifactDigestAlgo, "artifact-digest-algo", intdigest.Canonical.String(), - "The algorithm to use to calculate the digest of artifacts.") flag.StringVar(&defaultServiceAccount, auth.ControllerFlagDefaultServiceAccount, "", "Default service account to use for workload identity when not specified in resources.") + artifactOptions.BindFlags(flag.CommandLine) clientOptions.BindFlags(flag.CommandLine) logOptions.BindFlags(flag.CommandLine) leaderElectionOptions.BindFlags(flag.CommandLine) @@ -210,7 +194,19 @@ func main() { metrics := helper.NewMetrics(mgr, metrics.MustMakeRecorder(), sourcev1.SourceFinalizer) cacheRecorder := cache.MustMakeMetrics() eventRecorder := mustSetupEventRecorder(mgr, eventsAddr, controllerName) - storage := mustInitStorage(storagePath, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords, artifactDigestAlgo) + + algo, err := artdigest.AlgorithmForName(artifactOptions.ArtifactDigestAlgo) + if err != nil { + setupLog.Error(err, "unable to configure canonical digest algorithm") + os.Exit(1) + } + artdigest.Canonical = algo + + storage, err := artstore.New(&artifactOptions) + if err != nil { + setupLog.Error(err, "unable to configure artifact storage") + os.Exit(1) + } mustSetupHelmLimits(helmIndexLimit, helmChartLimit, helmChartFileLimit) helmIndexCache, helmIndexCacheItemTTL := mustInitHelmCache(helmCacheMaxSize, helmCacheTTL, helmCachePurgeInterval) @@ -315,7 +311,11 @@ func main() { // to handle that. <-mgr.Elected() - startFileServer(storage.BasePath, storageAddr) + // Start the artifact server if running as leader. + if err := artsrv.Start(ctx, &artifactOptions); err != nil { + setupLog.Error(err, "artifact server error") + os.Exit(1) + } }() setupLog.Info("starting manager") @@ -325,17 +325,6 @@ func main() { } } -func startFileServer(path string, address string) { - setupLog.Info("starting file server") - fs := http.FileServer(http.Dir(path)) - mux := http.NewServeMux() - mux.Handle("/", fs) - err := http.ListenAndServe(address, mux) - if err != nil { - setupLog.Error(err, "file server error") - } -} - func mustSetupEventRecorder(mgr ctrl.Manager, eventsAddr, controllerName string) record.EventRecorder { eventRecorder, err := events.NewRecorder(mgr, ctrl.Log, eventsAddr, controllerName) if err != nil { @@ -450,55 +439,6 @@ func mustInitHelmCache(maxSize int, itemTTL, purgeInterval string) (*cache.Cache return cache.New(maxSize, interval), ttl } -func mustInitStorage(path string, - storageAdvAddr string, - artifactRetentionTTL time.Duration, - artifactRetentionRecords int, - artifactDigestAlgo string) *intstorage.Storage { - if storageAdvAddr == "" { - storageAdvAddr = determineAdvStorageAddr(storageAdvAddr) - } - - if artifactDigestAlgo != intdigest.Canonical.String() { - algo, err := intdigest.AlgorithmForName(artifactDigestAlgo) - if err != nil { - setupLog.Error(err, "unable to configure canonical digest algorithm") - os.Exit(1) - } - intdigest.Canonical = algo - } - - storage, err := intstorage.New(path, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords) - if err != nil { - setupLog.Error(err, "unable to initialise storage") - os.Exit(1) - } - return storage -} - -func determineAdvStorageAddr(storageAddr string) string { - host, port, err := net.SplitHostPort(storageAddr) - if err != nil { - setupLog.Error(err, "unable to parse storage address") - os.Exit(1) - } - switch host { - case "": - host = "localhost" - case "0.0.0.0": - host = os.Getenv("HOSTNAME") - if host == "" { - hn, err := os.Hostname() - if err != nil { - setupLog.Error(err, "0.0.0.0 specified in storage addr but hostname is invalid") - os.Exit(1) - } - host = hn - } - } - return net.JoinHostPort(host, port) -} - func envOrDefault(envName, defaultValue string) string { ret := os.Getenv(envName) if ret != "" {