Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cachew.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
# }

url = "http://127.0.0.1:8080"
logging {
level = "debug"
}

git {
mirror-root = "./state/git-mirrors"
bundle-interval = "24h"
snapshot-interval = "24h"
}

host "https://w3.org" {}
Expand Down
2 changes: 1 addition & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func Split[GlobalConfig any](ast *hcl.AST) (global, providers *hcl.AST) {
switch node := node.(type) {
case *hcl.Block:
if globals[node.Name] {
global.Entries = append(global.Entries, node.Body...)
global.Entries = append(global.Entries, node)
} else {
providers.Entries = append(providers.Entries, node)
}
Expand Down
13 changes: 8 additions & 5 deletions internal/gitclone/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ func (m *Manager) Config() Config {
return m.config
}

func (m *Manager) DiscoverExisting(_ context.Context) error {
func (m *Manager) DiscoverExisting(_ context.Context) ([]*Repository, error) {
var discovered []*Repository
err := filepath.Walk(m.config.RootDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
Expand Down Expand Up @@ -205,14 +206,16 @@ func (m *Manager) DiscoverExisting(_ context.Context) error {
m.clones[upstreamURL] = repo
m.clonesMu.Unlock()

discovered = append(discovered, repo)

return fs.SkipDir
})

if err != nil {
return errors.Wrap(err, "walk root directory")
return nil, errors.Wrap(err, "walk root directory")
}

return nil
return discovered, nil
}

func (m *Manager) clonePathForURL(upstreamURL string) string {
Expand Down Expand Up @@ -251,10 +254,10 @@ func (r *Repository) NeedsFetch(fetchInterval time.Duration) bool {
return time.Since(r.lastFetch) >= fetchInterval
}

func (r *Repository) WithReadLock(fn func()) {
func (r *Repository) WithReadLock(fn func() error) error {
r.mu.RLock()
defer r.mu.RUnlock()
fn()
return fn()
}

func WithReadLockReturn[T any](repo *Repository, fn func() (T, error)) (T, error) {
Expand Down
3 changes: 2 additions & 1 deletion internal/gitclone/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ func TestManager_DiscoverExisting(t *testing.T) {
assert.NoError(t, os.WriteFile(filepath.Join(gitDir, "HEAD"), []byte("ref: refs/heads/main\n"), 0o644))
}

err = manager.DiscoverExisting(context.Background())
discovered, err := manager.DiscoverExisting(context.Background())
assert.NoError(t, err)
assert.Equal(t, 3, len(discovered))

repo1 := manager.Get("https://github.com/user1/repo1")
assert.NotZero(t, repo1)
Expand Down
4 changes: 2 additions & 2 deletions internal/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
)

type Config struct {
JSON bool `help:"Enable JSON logging."`
Level slog.Level `help:"Set the logging level." default:"info"`
JSON bool `hcl:"json,optional" help:"Enable JSON logging."`
Level slog.Level `hcl:"level" help:"Set the logging level." default:"info"`
}

type logKey struct{}
Expand Down
4 changes: 3 additions & 1 deletion internal/strategy/git/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *Strategy) serveFromBackend(w http.ResponseWriter, r *http.Request, repo
slog.String("backend_path", backendPath),
slog.String("clone_path", repo.Path()))

repo.WithReadLock(func() {
repo.WithReadLock(func() error { //nolint:errcheck,gosec
var stderrBuf bytes.Buffer

handler := &cgi.Handler{
Expand All @@ -81,6 +81,8 @@ func (s *Strategy) serveFromBackend(w http.ResponseWriter, r *http.Request, repo
slog.String("stderr", stderrBuf.String()),
slog.String("path", backendPath))
}

return nil
})
}

Expand Down
66 changes: 21 additions & 45 deletions internal/strategy/git/bundle.go
Original file line number Diff line number Diff line change
@@ -1,81 +1,57 @@
package git

import (
"bytes"
"context"
"io"
"log/slog"
"net/http"
"os/exec"
"time"

"github.com/alecthomas/errors"

"github.com/block/cachew/internal/cache"
"github.com/block/cachew/internal/gitclone"
"github.com/block/cachew/internal/logging"
)

func (s *Strategy) generateAndUploadBundle(ctx context.Context, repo *gitclone.Repository) {
func (s *Strategy) generateAndUploadBundle(ctx context.Context, repo *gitclone.Repository) error {
logger := logging.FromContext(ctx)
upstream := repo.UpstreamURL()

logger.InfoContext(ctx, "Generating bundle",
slog.String("upstream", repo.UpstreamURL()))
logger.InfoContext(ctx, "Bundle generation started", slog.String("upstream", upstream))

cacheKey := cache.NewKey(repo.UpstreamURL() + ".bundle")
cacheKey := cache.NewKey(upstream + ".bundle")

headers := http.Header{
"Content-Type": []string{"application/x-git-bundle"},
}
ttl := 7 * 24 * time.Hour
w, err := s.cache.Create(ctx, cacheKey, headers, ttl)
if err != nil {
logger.ErrorContext(ctx, "Failed to create cache entry",
slog.String("upstream", repo.UpstreamURL()),
slog.String("error", err.Error()))
return
return errors.Wrap(err, "create cache entry")
}
defer w.Close()

repo.WithReadLock(func() {
err = errors.Wrap(repo.WithReadLock(func() error {
var stderr bytes.Buffer
// Use --branches --remotes to include all branches but exclude tags (which can be massive)
// #nosec G204 - repo.Path() is controlled by us
cmd := exec.CommandContext(ctx, "git", "-C", repo.Path(), "bundle", "create", "-", "--branches", "--remotes")
cmd.Stdout = w
cmd.Stderr = &stderr

stderrPipe, err := cmd.StderrPipe()
if err != nil {
logger.ErrorContext(ctx, "Failed to create stderr pipe",
slog.String("upstream", repo.UpstreamURL()),
slog.String("error", err.Error()))
return
}

logger.DebugContext(ctx, "Starting bundle generation",
slog.String("upstream", repo.UpstreamURL()),
slog.String("path", repo.Path()))

if err := cmd.Start(); err != nil {
logger.ErrorContext(ctx, "Failed to start bundle generation",
slog.String("upstream", repo.UpstreamURL()),
slog.String("error", err.Error()))
return
}

stderr, _ := io.ReadAll(stderrPipe) //nolint:errcheck

if err := cmd.Wait(); err != nil {
logger.ErrorContext(ctx, "Failed to generate bundle",
slog.String("upstream", repo.UpstreamURL()),
slog.String("error", err.Error()),
slog.String("stderr", string(stderr)))
return
if err := cmd.Run(); err != nil {
return errors.Wrapf(err, "bundle generation failed: %s", stderr.String())
}

if len(stderr) > 0 {
logger.DebugContext(ctx, "Bundle generation stderr",
slog.String("upstream", repo.UpstreamURL()),
slog.String("stderr", string(stderr)))
}
return nil
}), "generate bundle")
if err != nil {
logger.ErrorContext(ctx, "Bundle generation failed", slog.String("upstream", upstream), slog.String("error", err.Error()))
return err
}

logger.InfoContext(ctx, "Bundle uploaded successfully",
slog.String("upstream", repo.UpstreamURL()))
})
logger.InfoContext(ctx, "Bundle generation completed", slog.String("upstream", upstream))
return nil
}
43 changes: 33 additions & 10 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Config struct {
FetchInterval time.Duration `hcl:"fetch-interval,optional" help:"How often to fetch from upstream in minutes." default:"15m"`
RefCheckInterval time.Duration `hcl:"ref-check-interval,optional" help:"How long to cache ref checks." default:"10s"`
BundleInterval time.Duration `hcl:"bundle-interval,optional" help:"How often to generate bundles. 0 disables bundling." default:"0"`
SnapshotInterval time.Duration `hcl:"snapshot-interval,optional" help:"How often to generate tar.zstd snapshots. 0 disables snapshots." default:"0"`
}

type Strategy struct {
Expand Down Expand Up @@ -79,10 +80,19 @@ func New(ctx context.Context, config Config, scheduler jobscheduler.Scheduler, c
scheduler: scheduler.WithQueuePrefix("git"),
}

if err := s.cloneManager.DiscoverExisting(ctx); err != nil {
existing, err := s.cloneManager.DiscoverExisting(ctx)
if err != nil {
logger.WarnContext(ctx, "Failed to discover existing clones",
slog.String("error", err.Error()))
}
for _, repo := range existing {
if s.config.BundleInterval > 0 {
s.scheduleBundleJobs(repo)
}
if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobs(repo)
}
}

s.proxy = &httputil.ReverseProxy{
Director: func(req *http.Request) {
Expand All @@ -105,7 +115,8 @@ func New(ctx context.Context, config Config, scheduler jobscheduler.Scheduler, c
"mirror_root", config.MirrorRoot,
"fetch_interval", config.FetchInterval,
"ref_check_interval", config.RefCheckInterval,
"bundle_interval", config.BundleInterval)
"bundle_interval", config.BundleInterval,
"snapshot_interval", config.SnapshotInterval)

return s, nil
}
Expand All @@ -131,6 +142,11 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) {
return
}

if strings.HasSuffix(pathValue, "/snapshot") {
s.handleSnapshotRequest(w, r, host, pathValue)
return
}

service := r.URL.Query().Get("service")
isReceivePack := service == "git-receive-pack" || strings.HasSuffix(pathValue, "/git-receive-pack")

Expand Down Expand Up @@ -189,27 +205,31 @@ func ExtractRepoPath(pathValue string) string {
}

func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) {
s.serveCachedArtifact(w, r, host, pathValue, "bundle")
}

func (s *Strategy) serveCachedArtifact(w http.ResponseWriter, r *http.Request, host, pathValue, artifact string) {
ctx := r.Context()
logger := logging.FromContext(ctx)

logger.DebugContext(ctx, "Bundle request",
logger.DebugContext(ctx, artifact+" request",
slog.String("host", host),
slog.String("path", pathValue))

pathValue = strings.TrimSuffix(pathValue, "/bundle")
pathValue = strings.TrimSuffix(pathValue, "/"+artifact)
repoPath := ExtractRepoPath(pathValue)
upstreamURL := "https://" + host + "/" + repoPath
cacheKey := cache.NewKey(upstreamURL + ".bundle")
cacheKey := cache.NewKey(upstreamURL + "." + artifact)

reader, headers, err := s.cache.Open(ctx, cacheKey)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
logger.DebugContext(ctx, "Bundle not found in cache",
logger.DebugContext(ctx, artifact+" not found in cache",
slog.String("upstream", upstreamURL))
http.NotFound(w, r)
return
}
logger.ErrorContext(ctx, "Failed to open bundle from cache",
logger.ErrorContext(ctx, "Failed to open "+artifact+" from cache",
slog.String("upstream", upstreamURL),
slog.String("error", err.Error()))
http.Error(w, "Internal server error", http.StatusInternalServerError)
Expand All @@ -225,7 +245,7 @@ func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, h

_, err = io.Copy(w, reader)
if err != nil {
logger.ErrorContext(ctx, "Failed to stream bundle",
logger.ErrorContext(ctx, "Failed to stream "+artifact,
slog.String("upstream", upstreamURL),
slog.String("error", err.Error()))
}
Expand Down Expand Up @@ -261,6 +281,10 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {
if s.config.BundleInterval > 0 {
s.scheduleBundleJobs(repo)
}

if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobs(repo)
}
}

func (s *Strategy) maybeBackgroundFetch(repo *gitclone.Repository) {
Expand Down Expand Up @@ -301,7 +325,6 @@ func (s *Strategy) backgroundFetch(ctx context.Context, repo *gitclone.Repositor

func (s *Strategy) scheduleBundleJobs(repo *gitclone.Repository) {
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "bundle-periodic", s.config.BundleInterval, func(ctx context.Context) error {
s.generateAndUploadBundle(ctx, repo)
return nil
return s.generateAndUploadBundle(ctx, repo)
})
}
45 changes: 45 additions & 0 deletions internal/strategy/git/snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package git

import (
"context"
"log/slog"
"net/http"
"time"

"github.com/alecthomas/errors"

"github.com/block/cachew/internal/cache"
"github.com/block/cachew/internal/gitclone"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/snapshot"
)

func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone.Repository) error {
logger := logging.FromContext(ctx)
upstream := repo.UpstreamURL()

logger.InfoContext(ctx, "Snapshot generation started", slog.String("upstream", upstream))

cacheKey := cache.NewKey(upstream + ".snapshot")
ttl := 7 * 24 * time.Hour
excludePatterns := []string{"*.lock"}

err := errors.Wrap(snapshot.Create(ctx, s.cache, cacheKey, repo.Path(), ttl, excludePatterns), "create snapshot")
if err != nil {
logger.ErrorContext(ctx, "Snapshot generation failed", slog.String("upstream", upstream), slog.String("error", err.Error()))
return err
}

logger.InfoContext(ctx, "Snapshot generation completed", slog.String("upstream", upstream))
return nil
}

func (s *Strategy) scheduleSnapshotJobs(repo *gitclone.Repository) {
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "snapshot-periodic", s.config.SnapshotInterval, func(ctx context.Context) error {
return s.generateAndUploadSnapshot(ctx, repo)
})
}

func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) {
s.serveCachedArtifact(w, r, host, pathValue, "snapshot")
}
Loading