From 7fc0701017c98b6c26019556c8a7669418c86c4e Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Fri, 6 Feb 2026 18:39:08 +1100 Subject: [PATCH] feat: periodically snapshot git repo As with bundling. --- cachew.hcl | 4 + internal/config/config.go | 2 +- internal/gitclone/manager.go | 13 +-- internal/gitclone/manager_test.go | 3 +- internal/logging/logging.go | 4 +- internal/strategy/git/backend.go | 4 +- internal/strategy/git/bundle.go | 66 +++++---------- internal/strategy/git/git.go | 43 +++++++--- internal/strategy/git/snapshot.go | 45 +++++++++++ internal/strategy/git/snapshot_test.go | 106 +++++++++++++++++++++++++ 10 files changed, 225 insertions(+), 65 deletions(-) create mode 100644 internal/strategy/git/snapshot.go create mode 100644 internal/strategy/git/snapshot_test.go diff --git a/cachew.hcl b/cachew.hcl index a1f44db..90c5daf 100644 --- a/cachew.hcl +++ b/cachew.hcl @@ -8,10 +8,14 @@ # } url = "http://127.0.0.1:8080" +logging { + level = "debug" +} git { mirror-root = "./state/git-mirrors" bundle-interval = "24h" + snapshot-interval = "24h" } host "https://w3.org" {} diff --git a/internal/config/config.go b/internal/config/config.go index 935b389..88462f2 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -70,7 +70,7 @@ func Split[GlobalConfig any](ast *hcl.AST) (global, providers *hcl.AST) { switch node := node.(type) { case *hcl.Block: if globals[node.Name] { - global.Entries = append(global.Entries, node.Body...) + global.Entries = append(global.Entries, node) } else { providers.Entries = append(providers.Entries, node) } diff --git a/internal/gitclone/manager.go b/internal/gitclone/manager.go index 49f9fa7..f332a2d 100644 --- a/internal/gitclone/manager.go +++ b/internal/gitclone/manager.go @@ -152,7 +152,8 @@ func (m *Manager) Config() Config { return m.config } -func (m *Manager) DiscoverExisting(_ context.Context) error { +func (m *Manager) DiscoverExisting(_ context.Context) ([]*Repository, error) { + var discovered []*Repository err := filepath.Walk(m.config.RootDir, func(path string, info os.FileInfo, err error) error { if err != nil { return err @@ -205,14 +206,16 @@ func (m *Manager) DiscoverExisting(_ context.Context) error { m.clones[upstreamURL] = repo m.clonesMu.Unlock() + discovered = append(discovered, repo) + return fs.SkipDir }) if err != nil { - return errors.Wrap(err, "walk root directory") + return nil, errors.Wrap(err, "walk root directory") } - return nil + return discovered, nil } func (m *Manager) clonePathForURL(upstreamURL string) string { @@ -251,10 +254,10 @@ func (r *Repository) NeedsFetch(fetchInterval time.Duration) bool { return time.Since(r.lastFetch) >= fetchInterval } -func (r *Repository) WithReadLock(fn func()) { +func (r *Repository) WithReadLock(fn func() error) error { r.mu.RLock() defer r.mu.RUnlock() - fn() + return fn() } func WithReadLockReturn[T any](repo *Repository, fn func() (T, error)) (T, error) { diff --git a/internal/gitclone/manager_test.go b/internal/gitclone/manager_test.go index 907f6ca..f538ae9 100644 --- a/internal/gitclone/manager_test.go +++ b/internal/gitclone/manager_test.go @@ -138,8 +138,9 @@ func TestManager_DiscoverExisting(t *testing.T) { assert.NoError(t, os.WriteFile(filepath.Join(gitDir, "HEAD"), []byte("ref: refs/heads/main\n"), 0o644)) } - err = manager.DiscoverExisting(context.Background()) + discovered, err := manager.DiscoverExisting(context.Background()) assert.NoError(t, err) + assert.Equal(t, 3, len(discovered)) repo1 := manager.Get("https://github.com/user1/repo1") assert.NotZero(t, repo1) diff --git a/internal/logging/logging.go b/internal/logging/logging.go index 7ea5e3a..5158730 100644 --- a/internal/logging/logging.go +++ b/internal/logging/logging.go @@ -10,8 +10,8 @@ import ( ) type Config struct { - JSON bool `help:"Enable JSON logging."` - Level slog.Level `help:"Set the logging level." default:"info"` + JSON bool `hcl:"json,optional" help:"Enable JSON logging."` + Level slog.Level `hcl:"level" help:"Set the logging level." default:"info"` } type logKey struct{} diff --git a/internal/strategy/git/backend.go b/internal/strategy/git/backend.go index d17c3b3..ff43f87 100644 --- a/internal/strategy/git/backend.go +++ b/internal/strategy/git/backend.go @@ -57,7 +57,7 @@ func (s *Strategy) serveFromBackend(w http.ResponseWriter, r *http.Request, repo slog.String("backend_path", backendPath), slog.String("clone_path", repo.Path())) - repo.WithReadLock(func() { + repo.WithReadLock(func() error { //nolint:errcheck,gosec var stderrBuf bytes.Buffer handler := &cgi.Handler{ @@ -81,6 +81,8 @@ func (s *Strategy) serveFromBackend(w http.ResponseWriter, r *http.Request, repo slog.String("stderr", stderrBuf.String()), slog.String("path", backendPath)) } + + return nil }) } diff --git a/internal/strategy/git/bundle.go b/internal/strategy/git/bundle.go index 972a7b6..115609f 100644 --- a/internal/strategy/git/bundle.go +++ b/internal/strategy/git/bundle.go @@ -1,25 +1,27 @@ package git import ( + "bytes" "context" - "io" "log/slog" "net/http" "os/exec" "time" + "github.com/alecthomas/errors" + "github.com/block/cachew/internal/cache" "github.com/block/cachew/internal/gitclone" "github.com/block/cachew/internal/logging" ) -func (s *Strategy) generateAndUploadBundle(ctx context.Context, repo *gitclone.Repository) { +func (s *Strategy) generateAndUploadBundle(ctx context.Context, repo *gitclone.Repository) error { logger := logging.FromContext(ctx) + upstream := repo.UpstreamURL() - logger.InfoContext(ctx, "Generating bundle", - slog.String("upstream", repo.UpstreamURL())) + logger.InfoContext(ctx, "Bundle generation started", slog.String("upstream", upstream)) - cacheKey := cache.NewKey(repo.UpstreamURL() + ".bundle") + cacheKey := cache.NewKey(upstream + ".bundle") headers := http.Header{ "Content-Type": []string{"application/x-git-bundle"}, @@ -27,55 +29,29 @@ func (s *Strategy) generateAndUploadBundle(ctx context.Context, repo *gitclone.R ttl := 7 * 24 * time.Hour w, err := s.cache.Create(ctx, cacheKey, headers, ttl) if err != nil { - logger.ErrorContext(ctx, "Failed to create cache entry", - slog.String("upstream", repo.UpstreamURL()), - slog.String("error", err.Error())) - return + return errors.Wrap(err, "create cache entry") } defer w.Close() - repo.WithReadLock(func() { + err = errors.Wrap(repo.WithReadLock(func() error { + var stderr bytes.Buffer // Use --branches --remotes to include all branches but exclude tags (which can be massive) // #nosec G204 - repo.Path() is controlled by us cmd := exec.CommandContext(ctx, "git", "-C", repo.Path(), "bundle", "create", "-", "--branches", "--remotes") cmd.Stdout = w + cmd.Stderr = &stderr - stderrPipe, err := cmd.StderrPipe() - if err != nil { - logger.ErrorContext(ctx, "Failed to create stderr pipe", - slog.String("upstream", repo.UpstreamURL()), - slog.String("error", err.Error())) - return - } - - logger.DebugContext(ctx, "Starting bundle generation", - slog.String("upstream", repo.UpstreamURL()), - slog.String("path", repo.Path())) - - if err := cmd.Start(); err != nil { - logger.ErrorContext(ctx, "Failed to start bundle generation", - slog.String("upstream", repo.UpstreamURL()), - slog.String("error", err.Error())) - return - } - - stderr, _ := io.ReadAll(stderrPipe) //nolint:errcheck - - if err := cmd.Wait(); err != nil { - logger.ErrorContext(ctx, "Failed to generate bundle", - slog.String("upstream", repo.UpstreamURL()), - slog.String("error", err.Error()), - slog.String("stderr", string(stderr))) - return + if err := cmd.Run(); err != nil { + return errors.Wrapf(err, "bundle generation failed: %s", stderr.String()) } - if len(stderr) > 0 { - logger.DebugContext(ctx, "Bundle generation stderr", - slog.String("upstream", repo.UpstreamURL()), - slog.String("stderr", string(stderr))) - } + return nil + }), "generate bundle") + if err != nil { + logger.ErrorContext(ctx, "Bundle generation failed", slog.String("upstream", upstream), slog.String("error", err.Error())) + return err + } - logger.InfoContext(ctx, "Bundle uploaded successfully", - slog.String("upstream", repo.UpstreamURL())) - }) + logger.InfoContext(ctx, "Bundle generation completed", slog.String("upstream", upstream)) + return nil } diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index e19569c..fb9c843 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -31,6 +31,7 @@ type Config struct { FetchInterval time.Duration `hcl:"fetch-interval,optional" help:"How often to fetch from upstream in minutes." default:"15m"` RefCheckInterval time.Duration `hcl:"ref-check-interval,optional" help:"How long to cache ref checks." default:"10s"` BundleInterval time.Duration `hcl:"bundle-interval,optional" help:"How often to generate bundles. 0 disables bundling." default:"0"` + SnapshotInterval time.Duration `hcl:"snapshot-interval,optional" help:"How often to generate tar.zstd snapshots. 0 disables snapshots." default:"0"` } type Strategy struct { @@ -79,10 +80,19 @@ func New(ctx context.Context, config Config, scheduler jobscheduler.Scheduler, c scheduler: scheduler.WithQueuePrefix("git"), } - if err := s.cloneManager.DiscoverExisting(ctx); err != nil { + existing, err := s.cloneManager.DiscoverExisting(ctx) + if err != nil { logger.WarnContext(ctx, "Failed to discover existing clones", slog.String("error", err.Error())) } + for _, repo := range existing { + if s.config.BundleInterval > 0 { + s.scheduleBundleJobs(repo) + } + if s.config.SnapshotInterval > 0 { + s.scheduleSnapshotJobs(repo) + } + } s.proxy = &httputil.ReverseProxy{ Director: func(req *http.Request) { @@ -105,7 +115,8 @@ func New(ctx context.Context, config Config, scheduler jobscheduler.Scheduler, c "mirror_root", config.MirrorRoot, "fetch_interval", config.FetchInterval, "ref_check_interval", config.RefCheckInterval, - "bundle_interval", config.BundleInterval) + "bundle_interval", config.BundleInterval, + "snapshot_interval", config.SnapshotInterval) return s, nil } @@ -131,6 +142,11 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) { return } + if strings.HasSuffix(pathValue, "/snapshot") { + s.handleSnapshotRequest(w, r, host, pathValue) + return + } + service := r.URL.Query().Get("service") isReceivePack := service == "git-receive-pack" || strings.HasSuffix(pathValue, "/git-receive-pack") @@ -189,27 +205,31 @@ func ExtractRepoPath(pathValue string) string { } func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) { + s.serveCachedArtifact(w, r, host, pathValue, "bundle") +} + +func (s *Strategy) serveCachedArtifact(w http.ResponseWriter, r *http.Request, host, pathValue, artifact string) { ctx := r.Context() logger := logging.FromContext(ctx) - logger.DebugContext(ctx, "Bundle request", + logger.DebugContext(ctx, artifact+" request", slog.String("host", host), slog.String("path", pathValue)) - pathValue = strings.TrimSuffix(pathValue, "/bundle") + pathValue = strings.TrimSuffix(pathValue, "/"+artifact) repoPath := ExtractRepoPath(pathValue) upstreamURL := "https://" + host + "/" + repoPath - cacheKey := cache.NewKey(upstreamURL + ".bundle") + cacheKey := cache.NewKey(upstreamURL + "." + artifact) reader, headers, err := s.cache.Open(ctx, cacheKey) if err != nil { if errors.Is(err, os.ErrNotExist) { - logger.DebugContext(ctx, "Bundle not found in cache", + logger.DebugContext(ctx, artifact+" not found in cache", slog.String("upstream", upstreamURL)) http.NotFound(w, r) return } - logger.ErrorContext(ctx, "Failed to open bundle from cache", + logger.ErrorContext(ctx, "Failed to open "+artifact+" from cache", slog.String("upstream", upstreamURL), slog.String("error", err.Error())) http.Error(w, "Internal server error", http.StatusInternalServerError) @@ -225,7 +245,7 @@ func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, h _, err = io.Copy(w, reader) if err != nil { - logger.ErrorContext(ctx, "Failed to stream bundle", + logger.ErrorContext(ctx, "Failed to stream "+artifact, slog.String("upstream", upstreamURL), slog.String("error", err.Error())) } @@ -261,6 +281,10 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) { if s.config.BundleInterval > 0 { s.scheduleBundleJobs(repo) } + + if s.config.SnapshotInterval > 0 { + s.scheduleSnapshotJobs(repo) + } } func (s *Strategy) maybeBackgroundFetch(repo *gitclone.Repository) { @@ -301,7 +325,6 @@ func (s *Strategy) backgroundFetch(ctx context.Context, repo *gitclone.Repositor func (s *Strategy) scheduleBundleJobs(repo *gitclone.Repository) { s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "bundle-periodic", s.config.BundleInterval, func(ctx context.Context) error { - s.generateAndUploadBundle(ctx, repo) - return nil + return s.generateAndUploadBundle(ctx, repo) }) } diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go new file mode 100644 index 0000000..9d294fc --- /dev/null +++ b/internal/strategy/git/snapshot.go @@ -0,0 +1,45 @@ +package git + +import ( + "context" + "log/slog" + "net/http" + "time" + + "github.com/alecthomas/errors" + + "github.com/block/cachew/internal/cache" + "github.com/block/cachew/internal/gitclone" + "github.com/block/cachew/internal/logging" + "github.com/block/cachew/internal/snapshot" +) + +func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone.Repository) error { + logger := logging.FromContext(ctx) + upstream := repo.UpstreamURL() + + logger.InfoContext(ctx, "Snapshot generation started", slog.String("upstream", upstream)) + + cacheKey := cache.NewKey(upstream + ".snapshot") + ttl := 7 * 24 * time.Hour + excludePatterns := []string{"*.lock"} + + err := errors.Wrap(snapshot.Create(ctx, s.cache, cacheKey, repo.Path(), ttl, excludePatterns), "create snapshot") + if err != nil { + logger.ErrorContext(ctx, "Snapshot generation failed", slog.String("upstream", upstream), slog.String("error", err.Error())) + return err + } + + logger.InfoContext(ctx, "Snapshot generation completed", slog.String("upstream", upstream)) + return nil +} + +func (s *Strategy) scheduleSnapshotJobs(repo *gitclone.Repository) { + s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "snapshot-periodic", s.config.SnapshotInterval, func(ctx context.Context) error { + return s.generateAndUploadSnapshot(ctx, repo) + }) +} + +func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) { + s.serveCachedArtifact(w, r, host, pathValue, "snapshot") +} diff --git a/internal/strategy/git/snapshot_test.go b/internal/strategy/git/snapshot_test.go new file mode 100644 index 0000000..cf53709 --- /dev/null +++ b/internal/strategy/git/snapshot_test.go @@ -0,0 +1,106 @@ +package git_test + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + + "github.com/block/cachew/internal/cache" + "github.com/block/cachew/internal/jobscheduler" + "github.com/block/cachew/internal/logging" + "github.com/block/cachew/internal/strategy/git" +) + +func TestSnapshotHTTPEndpoint(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{}) + tmpDir := t.TempDir() + + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{}) + assert.NoError(t, err) + mux := newTestMux() + + _, err = git.New(ctx, git.Config{ + MirrorRoot: tmpDir, + SnapshotInterval: 24 * time.Hour, + }, jobscheduler.New(ctx, jobscheduler.Config{}), memCache, mux) + assert.NoError(t, err) + + // Create a fake snapshot in the cache + upstreamURL := "https://github.com/org/repo" + cacheKey := cache.NewKey(upstreamURL + ".snapshot") + snapshotData := []byte("fake snapshot data") + + headers := make(map[string][]string) + headers["Content-Type"] = []string{"application/zstd"} + writer, err := memCache.Create(ctx, cacheKey, headers, 24*time.Hour) + assert.NoError(t, err) + _, err = writer.Write(snapshotData) + assert.NoError(t, err) + err = writer.Close() + assert.NoError(t, err) + + handler := mux.handlers["GET /git/{host}/{path...}"] + assert.NotZero(t, handler) + + // Test successful snapshot request + req := httptest.NewRequest(http.MethodGet, "/git/github.com/org/repo/snapshot", nil) + req = req.WithContext(ctx) + req.SetPathValue("host", "github.com") + req.SetPathValue("path", "org/repo/snapshot") + w := httptest.NewRecorder() + + handler.ServeHTTP(w, req) + + assert.Equal(t, 200, w.Code) + assert.Equal(t, "application/zstd", w.Header().Get("Content-Type")) + assert.Equal(t, snapshotData, w.Body.Bytes()) + + // Test snapshot not found + req = httptest.NewRequest(http.MethodGet, "/git/github.com/org/nonexistent/snapshot", nil) + req = req.WithContext(ctx) + req.SetPathValue("host", "github.com") + req.SetPathValue("path", "org/nonexistent/snapshot") + w = httptest.NewRecorder() + + handler.ServeHTTP(w, req) + + assert.Equal(t, 404, w.Code) +} + +func TestSnapshotInterval(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{}) + tmpDir := t.TempDir() + + tests := []struct { + name string + snapshotInterval time.Duration + }{ + { + name: "CustomInterval", + snapshotInterval: 1 * time.Hour, + }, + { + name: "DefaultInterval", + snapshotInterval: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{}) + assert.NoError(t, err) + mux := newTestMux() + + s, err := git.New(ctx, git.Config{ + MirrorRoot: tmpDir, + SnapshotInterval: tt.snapshotInterval, + }, jobscheduler.New(ctx, jobscheduler.Config{}), memCache, mux) + assert.NoError(t, err) + assert.NotZero(t, s) + }) + } +}