From 3b837ea80ba62e6d8328e8c6494ce024d52b4c1a Mon Sep 17 00:00:00 2001 From: Saquib Mian Date: Fri, 10 Nov 2023 13:06:30 -0500 Subject: [PATCH] Extract all `bufsync.*Func`s into a `bufsync.Handler` (#2554) This also improves the testing to be blackbox; we invoke sync and assert on the end-state. --- private/buf/bufsync/backfill_tags_test.go | 101 ++--- private/buf/bufsync/bufsync.go | 176 ++++----- private/buf/bufsync/bufsync_test.go | 238 ++++++++++++ private/buf/bufsync/clock.go | 27 ++ private/buf/bufsync/commits_to_sync_test.go | 114 ++---- private/buf/bufsync/main_test.go | 91 ----- private/buf/bufsync/prepare_sync_test.go | 36 +- private/buf/bufsync/syncer.go | 177 ++++----- .../command/alpha/repo/reposync/reposync.go | 358 +---------------- .../alpha/repo/reposync/sync_handler.go | 362 ++++++++++++++++++ private/pkg/git/git.go | 2 +- private/pkg/git/repository.go | 24 +- private/pkg/git/repository_test.go | 7 +- 13 files changed, 883 insertions(+), 830 deletions(-) create mode 100644 private/buf/bufsync/bufsync_test.go create mode 100644 private/buf/bufsync/clock.go delete mode 100644 private/buf/bufsync/main_test.go create mode 100644 private/buf/cmd/buf/command/alpha/repo/reposync/sync_handler.go diff --git a/private/buf/bufsync/backfill_tags_test.go b/private/buf/bufsync/backfill_tags_test.go index 8737965257..be9c2d5aac 100644 --- a/private/buf/bufsync/backfill_tags_test.go +++ b/private/buf/bufsync/backfill_tags_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package bufsync +package bufsync_test import ( "context" @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/bufbuild/buf/private/buf/bufsync" "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduleref" "github.com/bufbuild/buf/private/pkg/command" "github.com/bufbuild/buf/private/pkg/git" @@ -36,76 +37,58 @@ func TestBackfilltags(t *testing.T) { moduleIdentityInHEAD, err := bufmoduleref.NewModuleIdentity("buf.build", "acme", "foo") require.NoError(t, err) prepareGitRepoBackfillTags(t, repoDir, moduleIdentityInHEAD) - mockBSRChecker := newMockSyncGitChecker() + mockHandler := newMockSyncHandler() // prepare the top 5 commits as syncable commits, mark the rest as if they were already synced var ( commitCount int allCommitsHashes []string - startSyncHash git.Hash fakeNowCommitLimitTime time.Time // to be sent as a fake clock and discard "old" commits ) require.NoError(t, repo.ForEachCommit(func(commit git.Commit) error { allCommitsHashes = append(allCommitsHashes, commit.Hash().Hex()) commitCount++ - if commitCount == 5 { - startSyncHash = commit.Hash() + if commitCount == 6 { + // mark this commit as synced; nothing after this needs to be marked because syncer + // won't travel past this + mockHandler.setSyncPoint( + defaultBranchName, + commit.Hash(), + moduleIdentityInHEAD, + ) } - if commitCount > 5 { - if commitCount == 15 { - // have the time limit at the commit 15 looking back - fakeNowCommitLimitTime = commit.Committer().Timestamp() - } - mockBSRChecker.markSynced(commit.Hash().Hex()) + if commitCount == 15 { + // have the time limit at the commit 15 looking back + fakeNowCommitLimitTime = commit.Committer().Timestamp() } return nil })) - mockTagsBackfiller := newMockTagsBackfiller() - mockClock := &mockClock{now: fakeNowCommitLimitTime.Add(lookbackTimeLimit)} const moduleDir = "." // module is at the git root repo - testSyncer := syncer{ - repo: repo, - storageGitProvider: storagegit.NewProvider(repo.Objects()), - logger: zaptest.NewLogger(t), - sortedModulesDirsForSync: []string{moduleDir}, - modulesDirsToIdentityOverrideForSync: map[string]bufmoduleref.ModuleIdentity{moduleDir: nil}, - syncedGitCommitChecker: mockBSRChecker.checkFunc(), - commitsToTags: make(map[string][]string), - modulesDirsToBranchesToIdentities: make(map[string]map[string]bufmoduleref.ModuleIdentity), - modulesToBranchesExpectedSyncPoints: make(map[string]map[string]string), - modulesIdentitiesToCommitsSyncedCache: make(map[string]map[string]struct{}), - tagsBackfiller: mockTagsBackfiller.backfillFunc(), - errorHandler: &mockErrorHandler{}, - } - require.NoError(t, testSyncer.prepareSync(context.Background())) - require.NoError(t, testSyncer.backfillTags( - context.Background(), - moduleDir, - moduleIdentityInHEAD, - defaultBranchName, - startSyncHash, - mockClock, - )) - // in total the repo has at least 20 commits, we expect to backfill 11 of them... + syncer, err := bufsync.NewSyncer( + zaptest.NewLogger(t), + &mockClock{now: fakeNowCommitLimitTime.Add(bufsync.LookbackTimeLimit)}, + repo, + storagegit.NewProvider(repo.Objects()), + mockHandler, + bufsync.SyncerWithModule(moduleDir, nil), + ) + require.NoError(t, err) + require.NoError(t, syncer.Sync(context.Background())) + // in total the repo has at least 20 commits, we expect to backfill 11 of them + // and sync the next 4 commits assert.GreaterOrEqual(t, len(allCommitsHashes), 20) - assert.Len(t, mockTagsBackfiller.backfilledCommitsToTags, 11) + assert.Len(t, mockHandler.tagsByHash, 15) // as follows: for i, commitHash := range allCommitsHashes { - if i < 4 { - // the 4 most recent should not be backfilling anything, those are unsynced commits that will - // be synced by another func. - assert.NotContains(t, mockTagsBackfiller.backfilledCommitsToTags, commitHash) - } else if i < 15 { + if i < 15 { + // Between 0-4, the tags should be synced. // Between 5-15 the tags should be backfilled. // - // The commit #5 is the git start sync point, which will also be handled by sync because it's - // sometimes already synced and sometimes not. It's handled by both sync and backfill tags. - // // The func it's backfilling more than 5 commits, because it needs to backfill until both // conditions are met, at least 5 commits and at least 24 hours. - assert.Contains(t, mockTagsBackfiller.backfilledCommitsToTags, commitHash) + assert.Contains(t, mockHandler.tagsByHash, commitHash) } else { // past the #15 the commits are too old, we don't backfill back there - assert.NotContains(t, mockTagsBackfiller.backfilledCommitsToTags, commitHash) + assert.NotContains(t, mockHandler.tagsByHash, commitHash) } } } @@ -141,25 +124,3 @@ func prepareGitRepoBackfillTags(t *testing.T, repoDir string, moduleIdentity buf time.Sleep(1 * time.Second) doEmptyCommitAndTag(15) } - -type mockTagsBackfiller struct { - backfilledCommitsToTags map[string]struct{} -} - -func newMockTagsBackfiller() mockTagsBackfiller { - return mockTagsBackfiller{backfilledCommitsToTags: make(map[string]struct{})} -} - -func (b *mockTagsBackfiller) backfillFunc() TagsBackfiller { - return func(_ context.Context, _ bufmoduleref.ModuleIdentity, alreadySyncedHash git.Hash, _, _ git.Ident, _ []string) (string, error) { - // we don't really test which tags were backfilled, only which commits had its tags backfilled - b.backfilledCommitsToTags[alreadySyncedHash.Hex()] = struct{}{} - return "some-BSR-commit-name", nil - } -} - -type mockClock struct { - now time.Time -} - -func (c *mockClock) Now() time.Time { return c.now } diff --git a/private/buf/bufsync/bufsync.go b/private/buf/bufsync/bufsync.go index a4af5bdc41..53ad547fb3 100644 --- a/private/buf/bufsync/bufsync.go +++ b/private/buf/bufsync/bufsync.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduleref" "github.com/bufbuild/buf/private/pkg/git" @@ -149,27 +150,84 @@ type ErrorHandler interface { ) error } -// Syncer syncs a modules in a git.Repository. +// Handler is a handler for Syncer. It controls the way in which Syncer handles errors, provides +// any information the Syncer needs to Sync commits, and receives ModuleCommits that should be +// synced. +type Handler interface { + ErrorHandler + + // SyncModuleCommit is invoked to process a sync point. If an error is returned, sync will abort. + SyncModuleCommit(ctx context.Context, commit ModuleCommit) error + + // ResolveSyncPoint is invoked to resolve a syncpoint for a particular module at a particular branch. + // If no syncpoint is found, this function returns nil. If an error is returned, sync will abort. + ResolveSyncPoint( + ctx context.Context, + module bufmoduleref.ModuleIdentity, + branch string, + ) (git.Hash, error) + + // CheckSyncedGitCommits is invoked when syncing branches to know which commits hashes from a set + // are already synced inthe BSR. It expects to receive the commit hashes that are synced already. If + // an error is returned, sync will abort. + CheckSyncedGitCommits( + ctx context.Context, + module bufmoduleref.ModuleIdentity, + commitHashes map[string]struct{}, + ) (map[string]struct{}, error) + + // GetModuleReleaseBranch is invoked before syncing, to gather release branch names for all the + // modules that are about to be synced. If the BSR module does not exist, the implementation should + // return `ModuleDoesNotExistErr` error. + GetModuleReleaseBranch( + ctx context.Context, + module bufmoduleref.ModuleIdentity, + ) (string, error) + + // BackfillTags is invoked when a commit with valid modules is found within a lookback threshold + // past the start sync point for such module. The Syncer assumes that the "old" commit is already + // synced, so it will attempt to backfill existing tags using that git hash, in case they were + // recently created or moved there. + // + // A common scenario is SemVer releases: a commit is pushed to the default Git branch, the sync + // process triggers and completes, and some minutes later that commit is tagged "v1.2.3". The next + // time the sync command runs, this backfiller would pick such tag and backfill it to the correct + // BSR commit. + // + // It's expected to return the BSR commit name to which the tags were backfilled. + BackfillTags( + ctx context.Context, + module bufmoduleref.ModuleIdentity, + alreadySyncedHash git.Hash, + author git.Ident, + committer git.Ident, + tags []string, + ) (string, error) +} + +// Syncer syncs modules in a git.Repository. type Syncer interface { - // Sync syncs the repository using the provided SyncFunc. It processes commits in reverse - // topological order, loads any configured named modules, extracts any Git metadata for that - // commit, and invokes SyncFunc with a ModuleCommit. - Sync(context.Context, SyncFunc) error + // Sync syncs the repository. It processes commits in reverse topological order, loads any + // configured named modules, extracts any Git metadata for that commit, and invokes + // Handler#SyncModuleCommit with a ModuleCommit. + Sync(context.Context) error } // NewSyncer creates a new Syncer. func NewSyncer( logger *zap.Logger, + clock Clock, repo git.Repository, storageGitProvider storagegit.Provider, - errorHandler ErrorHandler, + handler Handler, options ...SyncerOption, ) (Syncer, error) { return newSyncer( logger, + clock, repo, storageGitProvider, - errorHandler, + handler, options..., ) } @@ -177,10 +235,10 @@ func NewSyncer( // SyncerOption configures the creation of a new Syncer. type SyncerOption func(*syncer) error -// SyncerWithRemote configures a Syncer with a resumption using a SyncPointResolver. -func SyncerWithRemote(remoteName string) SyncerOption { +// SyncerWithGitRemote configures a Syncer to sync commits from particular Git remote. +func SyncerWithGitRemote(gitRemoteName string) SyncerOption { return func(s *syncer) error { - s.remote = remoteName + s.gitRemoteName = gitRemoteName return nil } } @@ -208,43 +266,6 @@ func SyncerWithModule(moduleDir string, identityOverride bufmoduleref.ModuleIden } } -// SyncerWithResumption configures a Syncer with a resumption using a SyncPointResolver. -func SyncerWithResumption(resolver SyncPointResolver) SyncerOption { - return func(s *syncer) error { - s.syncPointResolver = resolver - return nil - } -} - -// SyncerWithGitCommitChecker configures a git commit checker, to know if a module has a given git -// hash alrady synced in a BSR instance. -func SyncerWithGitCommitChecker(checker SyncedGitCommitChecker) SyncerOption { - return func(s *syncer) error { - s.syncedGitCommitChecker = checker - return nil - } -} - -// SyncerWithModuleDefaultBranchGetter configures a getter for modules' default branch, to protect -// those branches syncs in cases like a Git history rewrite. If this option is not passed, the -// syncer treats the BSR default branch as any other branch. -func SyncerWithModuleDefaultBranchGetter(getter ModuleDefaultBranchGetter) SyncerOption { - return func(s *syncer) error { - s.moduleDefaultBranchGetter = getter - return nil - } -} - -// SyncerWithTagsBackfiller configures a tags backfiller for older, already synced commits. If this -// option is not passed, the syncer won't try to sync older tags past each module's start sync -// points on all branches. -func SyncerWithTagsBackfiller(backfiller TagsBackfiller) SyncerOption { - return func(s *syncer) error { - s.tagsBackfiller = backfiller - return nil - } -} - // SyncerWithAllBranches sets the syncer to sync all branches. Be default the syncer only processes // commits in the current checked out branch. func SyncerWithAllBranches() SyncerOption { @@ -254,56 +275,6 @@ func SyncerWithAllBranches() SyncerOption { } } -// SyncFunc is invoked by Syncer to process a sync point. If an error is returned, -// sync will abort. -type SyncFunc func(ctx context.Context, commit ModuleCommit) error - -// SyncPointResolver is invoked by Syncer to resolve a syncpoint for a particular module -// at a particular branch. If no syncpoint is found, this function returns nil. If an error -// is returned, sync will abort. -type SyncPointResolver func( - ctx context.Context, - module bufmoduleref.ModuleIdentity, - branch string, -) (git.Hash, error) - -// SyncedGitCommitChecker is invoked when syncing branches to know which commits hashes from a set -// are already synced inthe BSR. It expects to receive the commit hashes that are synced already. If -// an error is returned, sync will abort. -type SyncedGitCommitChecker func( - ctx context.Context, - module bufmoduleref.ModuleIdentity, - commitHashes map[string]struct{}, -) (map[string]struct{}, error) - -// ModuleDefaultBranchGetter is invoked before syncing, to gather default branch names for all the -// modules that are about to be synced. If the BSR module does not exist, the implementation should -// return `ModuleDoesNotExistErr` error. -type ModuleDefaultBranchGetter func( - ctx context.Context, - module bufmoduleref.ModuleIdentity, -) (string, error) - -// TagsBackfiller is invoked when a commit with valid modules is found within a lookback threshold -// past the start sync point for such module. The Syncer assumes that the "old" commit is already -// synced, so it will attempt to backfill existing tags using that git hash, in case they were -// recently created or moved there. -// -// A common scenario is SemVer releases: a commit is pushed to the default Git branch, the sync -// process triggers and completes, and some minutes later that commit is tagged "v1.2.3". The next -// time the sync command runs, this backfiller would pick such tag and backfill it to the correct -// BSR commit. -// -// It's expected to return the BSR commit name to which the tags were backfilled. -type TagsBackfiller func( - ctx context.Context, - module bufmoduleref.ModuleIdentity, - alreadySyncedHash git.Hash, - author git.Ident, - committer git.Ident, - tags []string, -) (string, error) - // ModuleCommit is a module at a particular commit. type ModuleCommit interface { // Branch is the git branch that this module is sourced from. @@ -320,3 +291,14 @@ type ModuleCommit interface { // Bucket is the bucket for the module. Bucket() storage.ReadBucket } + +// Clock provides the current time. +type Clock interface { + // Now provides the current time. + Now() time.Time +} + +// NewRealClock returns a Clock that returns the current time using time#Now(). +func NewRealClock() Clock { + return newClock() +} diff --git a/private/buf/bufsync/bufsync_test.go b/private/buf/bufsync/bufsync_test.go new file mode 100644 index 0000000000..ac183f21df --- /dev/null +++ b/private/buf/bufsync/bufsync_test.go @@ -0,0 +1,238 @@ +// Copyright 2020-2023 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bufsync_test + +import ( + "bytes" + "context" + "errors" + "io" + "os" + "path" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/bufbuild/buf/private/buf/bufsync" + "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduleref" + "github.com/bufbuild/buf/private/pkg/command" + "github.com/bufbuild/buf/private/pkg/git" + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" +) + +// scaffoldGitRepository returns an initialized git repository with a single commit, and returns the +// repository and its directory. +func scaffoldGitRepository(t *testing.T, defaultBranchName string) (git.Repository, string) { + runner := command.NewRunner() + repoDir := scaffoldGitRepositoryDir(t, runner, defaultBranchName) + dotGitPath := path.Join(repoDir, git.DotGitDir) + repo, err := git.OpenRepository( + context.Background(), + dotGitPath, + runner, + git.OpenRepositoryWithDefaultBranch(defaultBranchName), + ) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, repo.Close()) + }) + return repo, repoDir +} + +// scaffoldGitRepositoryDir prepares a git repository with an initial README, and a single commit. +// It returns the directory where the local git repo is. +func scaffoldGitRepositoryDir(t *testing.T, runner command.Runner, defaultBranchName string) string { + repoDir := t.TempDir() + + // setup repo + runInDir(t, runner, repoDir, "git", "init", "--initial-branch", defaultBranchName) + runInDir(t, runner, repoDir, "git", "config", "user.name", "Buf TestBot") + runInDir(t, runner, repoDir, "git", "config", "user.email", "testbot@buf.build") + + // write and commit a README file + writeFiles(t, repoDir, map[string]string{"README.md": "This is a scaffold repository.\n"}) + runInDir(t, runner, repoDir, "git", "add", ".") + runInDir(t, runner, repoDir, "git", "commit", "-m", "Write README") + + return repoDir +} + +func runInDir(t *testing.T, runner command.Runner, dir string, cmd string, args ...string) { + stderr := bytes.NewBuffer(nil) + err := runner.Run( + context.Background(), + cmd, + command.RunWithArgs(args...), + command.RunWithDir(dir), + command.RunWithStderr(stderr), + ) + if err != nil { + t.Logf("run %q", strings.Join(append([]string{cmd}, args...), " ")) + _, err := io.Copy(os.Stderr, stderr) + require.NoError(t, err) + } + require.NoError(t, err) +} + +func writeFiles(t *testing.T, directoryPath string, pathToContents map[string]string) { + for path, contents := range pathToContents { + require.NoError(t, os.MkdirAll(filepath.Join(directoryPath, filepath.Dir(path)), 0700)) + require.NoError(t, os.WriteFile(filepath.Join(directoryPath, path), []byte(contents), 0600)) + } +} + +type mockClock struct { + now time.Time +} + +func (c *mockClock) Now() time.Time { return c.now } + +type mockSyncHandler struct { + syncedCommitsSHAs map[string]struct{} + commitsByBranch map[string][]bufsync.ModuleCommit + hashByTag map[string]git.Hash + tagsByHash map[string][]string + manualSyncPointByModuleByBranch map[string]map[string]git.Hash +} + +func newMockSyncHandler() *mockSyncHandler { + return &mockSyncHandler{ + syncedCommitsSHAs: make(map[string]struct{}), + commitsByBranch: make(map[string][]bufsync.ModuleCommit), + hashByTag: make(map[string]git.Hash), + tagsByHash: make(map[string][]string), + manualSyncPointByModuleByBranch: make(map[string]map[string]git.Hash), + } +} + +func (c *mockSyncHandler) setSyncPoint(branch string, hash git.Hash, identity bufmoduleref.ModuleIdentity) { + branchSyncpoints, ok := c.manualSyncPointByModuleByBranch[branch] + if !ok { + branchSyncpoints = make(map[string]git.Hash) + c.manualSyncPointByModuleByBranch[branch] = branchSyncpoints + } + branchSyncpoints[identity.IdentityString()] = hash + c.syncedCommitsSHAs[hash.Hex()] = struct{}{} +} + +func (c *mockSyncHandler) HandleReadModuleError( + readErr *bufsync.ReadModuleError, +) bufsync.LookbackDecisionCode { + if readErr.Code() == bufsync.ReadModuleErrorCodeUnexpectedName { + return bufsync.LookbackDecisionCodeOverride + } + return bufsync.LookbackDecisionCodeSkip +} + +func (c *mockSyncHandler) InvalidBSRSyncPoint( + identity bufmoduleref.ModuleIdentity, + branch string, + gitHash git.Hash, + isDefaultBranch bool, + err error, +) error { + return errors.New("unimplemented") +} + +func (c *mockSyncHandler) BackfillTags( + ctx context.Context, + module bufmoduleref.ModuleIdentity, + alreadySyncedHash git.Hash, + author git.Ident, + committer git.Ident, + tags []string, +) (string, error) { + for _, tag := range tags { + if previousHash, ok := c.hashByTag[tag]; ok { + // clear previous tag + c.tagsByHash[previousHash.Hex()] = slices.DeleteFunc( + c.tagsByHash[previousHash.Hex()], + func(previousTag string) bool { + return previousTag == tag + }, + ) + } + c.hashByTag[tag] = alreadySyncedHash + } + c.tagsByHash[alreadySyncedHash.Hex()] = tags + return "some-BSR-commit-name", nil +} + +func (c *mockSyncHandler) GetModuleReleaseBranch( + ctx context.Context, + module bufmoduleref.ModuleIdentity, +) (string, error) { + // hardcoded default branch + return bufmoduleref.Main, nil +} + +func (c *mockSyncHandler) ResolveSyncPoint( + ctx context.Context, + module bufmoduleref.ModuleIdentity, + branch string, +) (git.Hash, error) { + // if we have commits from SyncModuleCommit, prefer that over + // manually set sync point + if branch, ok := c.commitsByBranch[branch]; ok && len(branch) > 0 { + // everything here is synced; return tip of branch + return branch[len(branch)-1].Commit().Hash(), nil + } + if branch, ok := c.manualSyncPointByModuleByBranch[branch]; ok { + if syncPoint, ok := branch[module.IdentityString()]; ok { + return syncPoint, nil + } + } + return nil, nil +} + +func (c *mockSyncHandler) SyncModuleCommit( + ctx context.Context, + commit bufsync.ModuleCommit, +) error { + c.setSyncPoint( + commit.Branch(), + commit.Commit().Hash(), + commit.Identity(), + ) + // append-only, no backfill; good enough for now! + c.commitsByBranch[commit.Branch()] = append(c.commitsByBranch[commit.Branch()], commit) + _, err := c.BackfillTags( + ctx, + commit.Identity(), + commit.Commit().Hash(), + commit.Commit().Author(), + commit.Commit().Committer(), + commit.Tags(), + ) + return err +} + +func (c *mockSyncHandler) CheckSyncedGitCommits( + ctx context.Context, + module bufmoduleref.ModuleIdentity, + commitHashes map[string]struct{}, +) (map[string]struct{}, error) { + syncedHashes := make(map[string]struct{}) + for hash := range commitHashes { + if _, isSynced := c.syncedCommitsSHAs[hash]; isSynced { + syncedHashes[hash] = struct{}{} + } + } + return syncedHashes, nil +} + +var _ bufsync.Handler = (*mockSyncHandler)(nil) diff --git a/private/buf/bufsync/clock.go b/private/buf/bufsync/clock.go new file mode 100644 index 0000000000..42dfed6a51 --- /dev/null +++ b/private/buf/bufsync/clock.go @@ -0,0 +1,27 @@ +// Copyright 2020-2023 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bufsync + +import "time" + +type clock struct{} + +func newClock() *clock { + return &clock{} +} + +func (*clock) Now() time.Time { + return time.Now() +} diff --git a/private/buf/bufsync/commits_to_sync_test.go b/private/buf/bufsync/commits_to_sync_test.go index f9b1599f6e..fc6689d297 100644 --- a/private/buf/bufsync/commits_to_sync_test.go +++ b/private/buf/bufsync/commits_to_sync_test.go @@ -12,19 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -package bufsync +package bufsync_test import ( "context" - "errors" "fmt" "testing" + "github.com/bufbuild/buf/private/buf/bufsync" "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduleref" "github.com/bufbuild/buf/private/pkg/command" - "github.com/bufbuild/buf/private/pkg/git" "github.com/bufbuild/buf/private/pkg/storage/storagegit" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -37,7 +35,8 @@ func TestCommitsToSyncWithNoPreviousSyncPoints(t *testing.T) { require.NoError(t, err) const defaultBranchName = "main" repo, repoDir := scaffoldGitRepository(t, defaultBranchName) - prepareGitRepoSyncWithNoPreviousSyncPoints(t, repoDir, moduleIdentityInHEAD, defaultBranchName) + runner := command.NewRunner() + prepareGitRepoSyncWithNoPreviousSyncPoints(t, runner, repoDir, moduleIdentityInHEAD, defaultBranchName) type testCase struct { name string branch string @@ -65,110 +64,51 @@ func TestCommitsToSyncWithNoPreviousSyncPoints(t *testing.T) { expectedCommits: 1, }, } + handler := newMockSyncHandler() // use same handler for all test cases for _, withOverride := range []bool{false, true} { - mockBSRChecker := newMockSyncGitChecker() for _, tc := range testCases { func(tc testCase) { t.Run(fmt.Sprintf("%s/override_%t", tc.name, withOverride), func(t *testing.T) { + // check out the branch to sync + runInDir(t, runner, repoDir, "git", "checkout", tc.branch) const moduleDir = "." - moduleDirsToIdentityOverride := make(map[string]bufmoduleref.ModuleIdentity) - if withOverride { - moduleDirsToIdentityOverride[moduleDir] = moduleIdentityOverride - } else { - moduleDirsToIdentityOverride[moduleDir] = nil - } - testSyncer := syncer{ - repo: repo, - storageGitProvider: storagegit.NewProvider(repo.Objects()), - logger: zaptest.NewLogger(t), - errorHandler: &mockErrorHandler{}, - modulesDirsToIdentityOverrideForSync: moduleDirsToIdentityOverride, - sortedModulesDirsForSync: []string{"."}, - syncAllBranches: true, - syncedGitCommitChecker: mockBSRChecker.checkFunc(), - commitsToTags: make(map[string][]string), - modulesDirsToBranchesToIdentities: make(map[string]map[string]bufmoduleref.ModuleIdentity), - modulesToBranchesExpectedSyncPoints: make(map[string]map[string]string), - modulesIdentitiesToCommitsSyncedCache: make(map[string]map[string]struct{}), - } - require.NoError(t, testSyncer.prepareSync(context.Background())) - var moduleIdentity bufmoduleref.ModuleIdentity + var opts []bufsync.SyncerOption if withOverride { - moduleIdentity = moduleIdentityOverride + opts = append(opts, bufsync.SyncerWithModule(moduleDir, moduleIdentityOverride)) } else { - moduleIdentity = moduleIdentityInHEAD + opts = append(opts, bufsync.SyncerWithModule(moduleDir, nil)) } - syncableCommits, err := testSyncer.branchSyncableCommits( - context.Background(), - moduleDir, - moduleIdentity, - tc.branch, - "", // no expected git sync point + syncer, err := bufsync.NewSyncer( + zaptest.NewLogger(t), + bufsync.NewRealClock(), + repo, + storagegit.NewProvider(repo.Objects()), + handler, + opts..., ) require.NoError(t, err) - require.Len(t, syncableCommits, tc.expectedCommits) - for _, syncableCommit := range syncableCommits { - assert.NotEmpty(t, syncableCommit.commit.Hash().Hex()) - mockBSRChecker.markSynced(syncableCommit.commit.Hash().Hex()) - assert.NotNil(t, syncableCommit.module) - // no need to assert syncableCommit.module.ModuleIdentity, it's not renamed because it's - // not used when syncing. - } + require.NoError(t, syncer.Sync(context.Background())) + syncedCommits := handler.commitsByBranch[tc.branch] + require.Len(t, syncedCommits, tc.expectedCommits) }) }(tc) } } } -type mockErrorHandler struct{} - -func (*mockErrorHandler) HandleReadModuleError(readErr *ReadModuleError) LookbackDecisionCode { - if readErr.code == ReadModuleErrorCodeUnexpectedName { - return LookbackDecisionCodeOverride - } - return LookbackDecisionCodeSkip -} - -func (*mockErrorHandler) InvalidBSRSyncPoint(bufmoduleref.ModuleIdentity, string, git.Hash, bool, error) error { - return errors.New("unimplemented") -} - -type mockSyncedGitChecker struct { - syncedCommitsSHAs map[string]struct{} -} - -func newMockSyncGitChecker() mockSyncedGitChecker { - return mockSyncedGitChecker{syncedCommitsSHAs: make(map[string]struct{})} -} - -func (c *mockSyncedGitChecker) markSynced(gitHash string) { - c.syncedCommitsSHAs[gitHash] = struct{}{} -} - -func (c *mockSyncedGitChecker) checkFunc() SyncedGitCommitChecker { - return func( - _ context.Context, - _ bufmoduleref.ModuleIdentity, - commitHashes map[string]struct{}, - ) (map[string]struct{}, error) { - syncedHashes := make(map[string]struct{}) - for hash := range commitHashes { - if _, isSynced := c.syncedCommitsSHAs[hash]; isSynced { - syncedHashes[hash] = struct{}{} - } - } - return syncedHashes, nil - } -} - // prepareGitRepoSyncWithNoPreviousSyncPoints writes and pushes commits in the repo with the // following commits: // // | o-o----------o-----------------o (master) // | └o-o (foo) └o--------o (bar) // | └o (baz) -func prepareGitRepoSyncWithNoPreviousSyncPoints(t *testing.T, repoDir string, moduleIdentity bufmoduleref.ModuleIdentity, defaultBranchName string) { - runner := command.NewRunner() +func prepareGitRepoSyncWithNoPreviousSyncPoints( + t *testing.T, + runner command.Runner, + repoDir string, + moduleIdentity bufmoduleref.ModuleIdentity, + defaultBranchName string, +) { var allBranches = []string{defaultBranchName, "foo", "bar", "baz"} var commitsCounter int diff --git a/private/buf/bufsync/main_test.go b/private/buf/bufsync/main_test.go deleted file mode 100644 index f151ed4a7d..0000000000 --- a/private/buf/bufsync/main_test.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright 2020-2023 Buf Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package bufsync - -import ( - "bytes" - "context" - "io" - "os" - "path" - "path/filepath" - "strings" - "testing" - - "github.com/bufbuild/buf/private/pkg/command" - "github.com/bufbuild/buf/private/pkg/git" - "github.com/stretchr/testify/require" -) - -// scaffoldGitRepository returns an initialized git repository with a single commit, and returns the -// repository and its directory. -func scaffoldGitRepository(t *testing.T, defaultBranchName string) (git.Repository, string) { - runner := command.NewRunner() - repoDir := scaffoldGitRepositoryDir(t, runner, defaultBranchName) - dotGitPath := path.Join(repoDir, git.DotGitDir) - repo, err := git.OpenRepository( - context.Background(), - dotGitPath, - runner, - git.OpenRepositoryWithDefaultBranch(defaultBranchName), - ) - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, repo.Close()) - }) - return repo, repoDir -} - -// scaffoldGitRepositoryDir prepares a git repository with an initial README, and a single commit. -// It returns the directory where the local git repo is. -func scaffoldGitRepositoryDir(t *testing.T, runner command.Runner, defaultBranchName string) string { - repoDir := t.TempDir() - - // setup repo - runInDir(t, runner, repoDir, "git", "init", "--initial-branch", defaultBranchName) - runInDir(t, runner, repoDir, "git", "config", "user.name", "Buf TestBot") - runInDir(t, runner, repoDir, "git", "config", "user.email", "testbot@buf.build") - - // write and commit a README file - writeFiles(t, repoDir, map[string]string{"README.md": "This is a scaffold repository.\n"}) - runInDir(t, runner, repoDir, "git", "add", ".") - runInDir(t, runner, repoDir, "git", "commit", "-m", "Write README") - - return repoDir -} - -func runInDir(t *testing.T, runner command.Runner, dir string, cmd string, args ...string) { - stderr := bytes.NewBuffer(nil) - err := runner.Run( - context.Background(), - cmd, - command.RunWithArgs(args...), - command.RunWithDir(dir), - command.RunWithStderr(stderr), - ) - if err != nil { - t.Logf("run %q", strings.Join(append([]string{cmd}, args...), " ")) - _, err := io.Copy(os.Stderr, stderr) - require.NoError(t, err) - } - require.NoError(t, err) -} - -func writeFiles(t *testing.T, directoryPath string, pathToContents map[string]string) { - for path, contents := range pathToContents { - require.NoError(t, os.MkdirAll(filepath.Join(directoryPath, filepath.Dir(path)), 0700)) - require.NoError(t, os.WriteFile(filepath.Join(directoryPath, path), []byte(contents), 0600)) - } -} diff --git a/private/buf/bufsync/prepare_sync_test.go b/private/buf/bufsync/prepare_sync_test.go index 16d8c10eaf..110438978a 100644 --- a/private/buf/bufsync/prepare_sync_test.go +++ b/private/buf/bufsync/prepare_sync_test.go @@ -12,13 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -package bufsync +package bufsync_test import ( "context" "fmt" "testing" + "github.com/bufbuild/buf/private/buf/bufsync" "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduleref" "github.com/bufbuild/buf/private/pkg/command" "github.com/bufbuild/buf/private/pkg/storage/storagegit" @@ -76,24 +77,25 @@ func TestPrepareSyncDuplicateIdentities(t *testing.T) { for moduleDir := range tc.modulesIdentitiesInHEAD { moduleDirs = append(moduleDirs, moduleDir) } - testSyncer := syncer{ - repo: repo, - storageGitProvider: storagegit.NewProvider(repo.Objects()), - logger: zaptest.NewLogger(t), - sortedModulesDirsForSync: moduleDirs, - modulesDirsToIdentityOverrideForSync: tc.modulesOverrides, - commitsToTags: make(map[string][]string), - modulesDirsToBranchesToIdentities: make(map[string]map[string]bufmoduleref.ModuleIdentity), - modulesToBranchesExpectedSyncPoints: make(map[string]map[string]string), - modulesIdentitiesToCommitsSyncedCache: make(map[string]map[string]struct{}), - errorHandler: &mockErrorHandler{}, + var opts []bufsync.SyncerOption + for moduleDir, identityOverride := range tc.modulesOverrides { + opts = append(opts, bufsync.SyncerWithModule(moduleDir, identityOverride)) } - prepareErr := testSyncer.prepareSync(context.Background()) - require.Error(t, prepareErr) - assert.Contains(t, prepareErr.Error(), repeatedIdentity.IdentityString()) - assert.Contains(t, prepareErr.Error(), defaultBranchName) + syncer, err := bufsync.NewSyncer( + zaptest.NewLogger(t), + bufsync.NewRealClock(), + repo, + storagegit.NewProvider(repo.Objects()), + newMockSyncHandler(), + opts..., + ) + require.NoError(t, err) + err = syncer.Sync(context.Background()) + require.Error(t, err) + assert.Contains(t, err.Error(), repeatedIdentity.IdentityString()) + assert.Contains(t, err.Error(), defaultBranchName) for _, moduleDir := range moduleDirs { - assert.Contains(t, prepareErr.Error(), moduleDir) + assert.Contains(t, err.Error(), moduleDir) } }) }(tc) diff --git a/private/buf/bufsync/syncer.go b/private/buf/bufsync/syncer.go index 8f9627ccb1..7633d2e7a4 100644 --- a/private/buf/bufsync/syncer.go +++ b/private/buf/bufsync/syncer.go @@ -34,26 +34,23 @@ import ( ) const ( - // lookbackCommitsLimit is the amount of commits that we will look back before the start sync + // LookbackCommitsLimit is the amount of commits that we will look back before the start sync // point to backfill old git tags. We might allow customizing this value in the future. - lookbackCommitsLimit = 5 - // lookbackTimeLimit is how old we will look back (git commit timestamps) before the start sync + LookbackCommitsLimit = 5 + // LookbackTimeLimit is how old we will look back (git commit timestamps) before the start sync // point to backfill old git tags. We might allow customizing this value in the future. - lookbackTimeLimit = 24 * time.Hour + LookbackTimeLimit = 24 * time.Hour ) type syncer struct { - logger *zap.Logger - repo git.Repository - storageGitProvider storagegit.Provider - errorHandler ErrorHandler - syncedGitCommitChecker SyncedGitCommitChecker - moduleDefaultBranchGetter ModuleDefaultBranchGetter - syncPointResolver SyncPointResolver - tagsBackfiller TagsBackfiller + logger *zap.Logger + repo git.Repository + storageGitProvider storagegit.Provider + handler Handler + clock Clock // flags received on creation - remote string + gitRemoteName string sortedModulesDirsForSync []string modulesDirsToIdentityOverrideForSync map[string]bufmoduleref.ModuleIdentity // moduleDir:moduleIdentityOverride syncAllBranches bool @@ -74,49 +71,42 @@ type syncer struct { // cache "unsynced" git commits, because during the sync process we will be syncing new git // commits, which then will be added also to this cache. (moduleIdentity:commits) modulesIdentitiesToCommitsSyncedCache map[string]map[string]struct{} - // modulesBSRDefaultBranch holds the branch name that's set as default branch in the BSR. This + // modulesBSRReleaseBranch holds the branch name that's set as release branch in the BSR. This // branch tracks "the main|prod BSR commits", which requires some additional protection like not // allowing Git history rewrites. (moduleIdentity:branch) - modulesBSRDefaultBranch map[string]string + modulesBSRReleaseBranch map[string]string } func newSyncer( logger *zap.Logger, + clock Clock, repo git.Repository, storageGitProvider storagegit.Provider, - errorHandler ErrorHandler, + handler Handler, options ...SyncerOption, ) (Syncer, error) { s := &syncer{ logger: logger, + clock: clock, repo: repo, storageGitProvider: storageGitProvider, - errorHandler: errorHandler, + handler: handler, modulesDirsToIdentityOverrideForSync: make(map[string]bufmoduleref.ModuleIdentity), commitsToTags: make(map[string][]string), modulesDirsToBranchesToIdentities: make(map[string]map[string]bufmoduleref.ModuleIdentity), modulesToBranchesExpectedSyncPoints: make(map[string]map[string]string), modulesIdentitiesToCommitsSyncedCache: make(map[string]map[string]struct{}), - modulesBSRDefaultBranch: make(map[string]string), + modulesBSRReleaseBranch: make(map[string]string), } for _, opt := range options { if err := opt(s); err != nil { return nil, err } } - if s.moduleDefaultBranchGetter == nil { - s.logger.Warn( - "no module default branch getter, the default branch validation will be skipped for all modules and branches", - zap.String("default_git_branch", s.repo.DefaultBranch()), - ) - } - if s.syncedGitCommitChecker == nil { - s.logger.Warn("no sync git commit checker, all branches will attempt to sync from the start") - } return s, nil } -func (s *syncer) Sync(ctx context.Context, syncFunc SyncFunc) error { +func (s *syncer) Sync(ctx context.Context) error { if err := s.prepareSync(ctx); err != nil { return fmt.Errorf("sync preparation: %w", err) } @@ -154,7 +144,7 @@ func (s *syncer) Sync(ctx context.Context, syncFunc SyncFunc) error { zap.String("branch", branch), ) } - if err := s.syncModuleInBranch(ctx, moduleDir, moduleIdentity, branch, expectedSyncPoint, syncFunc); err != nil { + if err := s.syncModuleInBranch(ctx, moduleDir, moduleIdentity, branch, expectedSyncPoint); err != nil { return fmt.Errorf("sync module %s in branch %s: %w", moduleDir, branch, err) } } @@ -175,41 +165,44 @@ func (s *syncer) prepareSync(ctx context.Context) error { if err := s.repo.ForEachBranch(func(branch string, _ git.Hash) error { allBranches[branch] = struct{}{} return nil - }, git.ForEachBranchWithRemote(s.remote)); err != nil { + }, git.ForEachBranchWithRemote(s.gitRemoteName)); err != nil { return fmt.Errorf("looping over repository branches: %w", err) } remoteErrMsg := "on local branches" - if s.remote != "" { - remoteErrMsg = fmt.Sprintf("on remote %s", s.remote) + if s.gitRemoteName != "" { + remoteErrMsg = fmt.Sprintf("on git remote %s", s.gitRemoteName) } // sync default git branch, make sure it's present - defaultBranch := s.repo.DefaultBranch() - if _, defaultBranchPresent := allBranches[defaultBranch]; !defaultBranchPresent { - return fmt.Errorf("default branch %s is not present %s", defaultBranch, remoteErrMsg) + gitDefaultBranch := s.repo.DefaultBranch() + if _, gitDefaultBranchPresent := allBranches[gitDefaultBranch]; !gitDefaultBranchPresent { + return fmt.Errorf("default branch %s is not present %s", gitDefaultBranch, remoteErrMsg) } - s.logger.Debug("default git branch", zap.String("name", defaultBranch)) + s.logger.Debug("default git branch", zap.String("name", gitDefaultBranch)) var branchesToSync []string if s.syncAllBranches { // sync all branches branchesToSync = stringutil.MapToSlice(allBranches) } else { // sync current branch, make sure it's present - currentBranch := s.repo.CurrentBranch() + currentBranch, err := s.repo.CurrentBranch(ctx) + if err != nil { + return fmt.Errorf("determine checked out branch") + } if _, currentBranchPresent := allBranches[currentBranch]; !currentBranchPresent { return fmt.Errorf("current branch %s is not present %s", currentBranch, remoteErrMsg) } - branchesToSync = append(branchesToSync, defaultBranch, currentBranch) + branchesToSync = append(branchesToSync, gitDefaultBranch, currentBranch) s.logger.Debug("current git branch", zap.String("name", currentBranch)) } var sortedBranchesForSync []string for _, branch := range branchesToSync { - if branch == defaultBranch { + if branch == gitDefaultBranch { continue // default branch will be injected manually } sortedBranchesForSync = append(sortedBranchesForSync, branch) } sort.Strings(sortedBranchesForSync) - s.sortedBranchesForSync = append([]string{defaultBranch}, sortedBranchesForSync...) // default first, then the rest A-Z + s.sortedBranchesForSync = append([]string{gitDefaultBranch}, sortedBranchesForSync...) // default first, then the rest A-Z for _, moduleDir := range s.sortedModulesDirsForSync { s.modulesDirsToBranchesToIdentities[moduleDir] = make(map[string]bufmoduleref.ModuleIdentity) for _, branch := range s.sortedBranchesForSync { @@ -221,7 +214,7 @@ func (s *syncer) prepareSync(ctx context.Context) error { for _, branch := range branchesToSync { headCommit, err := s.repo.HEADCommit( git.HEADCommitWithBranch(branch), - git.HEADCommitWithRemote(s.remote), + git.HEADCommitWithRemote(s.gitRemoteName), ) if err != nil { return fmt.Errorf("reading head commit for branch %s: %w", branch, err) @@ -289,33 +282,27 @@ func (s *syncer) prepareSync(ctx context.Context) error { return duplicatedIdentitiesErr } // (4) Populate default branches for all module identities (from all branches). - if s.moduleDefaultBranchGetter != nil { - for _, moduleIdentity := range allModulesIdentitiesForSync { - bsrDefaultBranch, err := s.moduleDefaultBranchGetter(ctx, moduleIdentity) - if err != nil { - if errors.Is(err, ErrModuleDoesNotExist) { - s.logger.Warn( - "no default branch for module", - zap.String("module", moduleIdentity.IdentityString()), - zap.Error(err), - ) - continue - } - return fmt.Errorf("get default branch for BSR module %s: %w", moduleIdentity.IdentityString(), err) + for _, moduleIdentity := range allModulesIdentitiesForSync { + bsrReleaseBranch, err := s.handler.GetModuleReleaseBranch(ctx, moduleIdentity) + if err != nil { + if errors.Is(err, ErrModuleDoesNotExist) { + s.logger.Warn( + "no default branch for module", + zap.String("module", moduleIdentity.IdentityString()), + zap.Error(err), + ) + continue } - s.modulesBSRDefaultBranch[moduleIdentity.IdentityString()] = bsrDefaultBranch + return fmt.Errorf("get default branch for BSR module %s: %w", moduleIdentity.IdentityString(), err) } + s.modulesBSRReleaseBranch[moduleIdentity.IdentityString()] = bsrReleaseBranch } return nil } // resolveSyncPoint resolves a sync point for a particular module identity and branch. func (s *syncer) resolveSyncPoint(ctx context.Context, moduleIdentity bufmoduleref.ModuleIdentity, branch string) (git.Hash, error) { - // If resumption is not enabled, we can bail early. - if s.syncPointResolver == nil { - return nil, nil - } - syncPoint, err := s.syncPointResolver(ctx, moduleIdentity, branch) + syncPoint, err := s.handler.ResolveSyncPoint(ctx, moduleIdentity, branch) if err != nil { return nil, fmt.Errorf("resolve sync point for module %s: %w", moduleIdentity.IdentityString(), err) } @@ -326,7 +313,7 @@ func (s *syncer) resolveSyncPoint(ctx context.Context, moduleIdentity bufmoduler // Validate that the commit pointed to by the sync point exists in the git repo. if _, err := s.repo.Objects().Commit(syncPoint); err != nil { isDefaultBranch := branch == s.repo.DefaultBranch() - return nil, s.errorHandler.InvalidBSRSyncPoint(moduleIdentity, branch, syncPoint, isDefaultBranch, err) + return nil, s.handler.InvalidBSRSyncPoint(moduleIdentity, branch, syncPoint, isDefaultBranch, err) } return syncPoint, nil } @@ -351,33 +338,31 @@ func (s *syncer) syncModuleInBranch( moduleIdentity bufmoduleref.ModuleIdentity, branch string, expectedSyncPoint string, - syncFunc SyncFunc, ) error { commitsForSync, err := s.branchSyncableCommits(ctx, moduleDir, moduleIdentity, branch, expectedSyncPoint) if err != nil { return fmt.Errorf("finding commits to sync: %w", err) } // first sync tags in old commits - if s.tagsBackfiller != nil { - var startSyncPoint git.Hash - if len(commitsForSync) == 0 { - // no commits to sync for this branch, backfill from HEAD - headCommit, err := s.repo.HEADCommit( - git.HEADCommitWithBranch(branch), - git.HEADCommitWithRemote(s.remote), - ) - if err != nil { - return fmt.Errorf("read HEAD commit for branch %s: %w", branch, err) - } - startSyncPoint = headCommit.Hash() - } else { - // backfill from the first commit to sync - startSyncPoint = commitsForSync[0].commit.Hash() - } - if err := s.backfillTags(ctx, moduleDir, moduleIdentity, branch, startSyncPoint, &realClock{}); err != nil { - return fmt.Errorf("sync looking back for branch %s: %w", branch, err) + var startSyncPoint git.Hash + if len(commitsForSync) == 0 { + // no commits to sync for this branch, backfill from HEAD + headCommit, err := s.repo.HEADCommit( + git.HEADCommitWithBranch(branch), + git.HEADCommitWithRemote(s.gitRemoteName), + ) + if err != nil { + return fmt.Errorf("read HEAD commit for branch %s: %w", branch, err) } + startSyncPoint = headCommit.Hash() + } else { + // backfill from the first commit to sync + startSyncPoint = commitsForSync[0].commit.Hash() } + if err := s.backfillTags(ctx, moduleDir, moduleIdentity, branch, startSyncPoint); err != nil { + return fmt.Errorf("sync looking back for branch %s: %w", branch, err) + } + // now sync targetModuleIdentity := moduleIdentity.IdentityString() // all syncable modules in the branch have the same target logger := s.logger.With( zap.String("module directory", branch), @@ -396,7 +381,7 @@ func (s *syncer) syncModuleInBranch( if builtModule == nil { return fmt.Errorf("syncable commit %s has no built module to sync", commitHash) } - if err := syncFunc( + if err := s.handler.SyncModuleCommit( ctx, newModuleCommit( branch, @@ -455,7 +440,7 @@ func (s *syncer) branchSyncableCommits( // we expected a different sync point for this branch, it's ok to stop as long as it's not a // default branch switch branch { - case s.modulesBSRDefaultBranch[targetModuleIdentity]: + case s.modulesBSRReleaseBranch[targetModuleIdentity]: return fmt.Errorf( "BSR default branch protection: "+ "found synced git commit %s for branch %s, but expected sync point was %s, "+ @@ -487,7 +472,7 @@ func (s *syncer) branchSyncableCommits( commitsForSync = append(commitsForSync, &syncableCommit{commit: commit, module: builtModule}) return nil } - decision := s.errorHandler.HandleReadModuleError(readErr) + decision := s.handler.HandleReadModuleError(readErr) switch decision { case LookbackDecisionCodeFail: return fmt.Errorf("read module error: %w", readErr) @@ -512,7 +497,7 @@ func (s *syncer) branchSyncableCommits( eachCommitFunc, git.ForEachCommitWithBranchStartPoint( branch, - git.ForEachCommitWithBranchStartPointWithRemote(s.remote), + git.ForEachCommitWithBranchStartPointWithRemote(s.gitRemoteName), ), ); err != nil && !errors.Is(err, stopLoopErr) { return nil, err @@ -532,9 +517,6 @@ func (s *syncer) branchSyncableCommits( // isGitCommitSynced checks if a commit hash is already synced to a BSR module. func (s *syncer) isGitCommitSynced(ctx context.Context, moduleIdentity bufmoduleref.ModuleIdentity, commitHash string) (bool, error) { - if s.syncedGitCommitChecker == nil { - return false, nil - } modIdentity := moduleIdentity.IdentityString() // check local cache first if syncedModuleCommits, ok := s.modulesIdentitiesToCommitsSyncedCache[modIdentity]; ok { @@ -543,7 +525,7 @@ func (s *syncer) isGitCommitSynced(ctx context.Context, moduleIdentity bufmodule } } // not in the cache, request BSR check - syncedModuleCommits, err := s.syncedGitCommitChecker(ctx, moduleIdentity, map[string]struct{}{commitHash: {}}) + syncedModuleCommits, err := s.handler.CheckSyncedGitCommits(ctx, moduleIdentity, map[string]struct{}{commitHash: {}}) if err != nil { return false, err } @@ -641,11 +623,10 @@ func (s *syncer) backfillTags( moduleIdentity bufmoduleref.ModuleIdentity, branch string, syncStartHash git.Hash, - clock clock, ) error { var ( lookbackCommitsCount int - timeLimit = clock.Now().Add(-lookbackTimeLimit) + timeLimit = s.clock.Now().Add(-LookbackTimeLimit) stopLoopErr = errors.New("stop loop") logger = s.logger.With( zap.String("branch", branch), @@ -658,7 +639,7 @@ func (s *syncer) backfillTags( lookbackCommitsCount++ // For the lookback into older commits to stop, both lookback limits (amount of commits and // timespan) need to be met. - if lookbackCommitsCount > lookbackCommitsLimit && + if lookbackCommitsCount > LookbackCommitsLimit && oldCommit.Committer().Timestamp().Before(timeLimit) { return stopLoopErr } @@ -672,7 +653,7 @@ func (s *syncer) backfillTags( if _, readErr := s.readModuleAt( ctx, branch, oldCommit, moduleDir, readModuleAtWithExpectedModuleIdentity(moduleIdentity.IdentityString()), - ); readErr != nil && s.errorHandler.HandleReadModuleError(readErr) != LookbackDecisionCodeOverride { + ); readErr != nil && s.handler.HandleReadModuleError(readErr) != LookbackDecisionCodeOverride { // read module failed, and the error handler would not have overwritten it return nil } @@ -682,7 +663,7 @@ func (s *syncer) backfillTags( ) // Valid module in this commit to backfill tags. If backfilling the tags fails, we'll // WARN+continue to not block actual pending commits to sync in this run. - bsrCommitName, err := s.tagsBackfiller(ctx, moduleIdentity, oldCommit.Hash(), oldCommit.Author(), oldCommit.Committer(), tagsToBackfill) + bsrCommitName, err := s.handler.BackfillTags(ctx, moduleIdentity, oldCommit.Hash(), oldCommit.Author(), oldCommit.Committer(), tagsToBackfill) if err != nil { logger.Warn("backfill older tags failed", zap.Error(err)) return nil @@ -726,16 +707,6 @@ type syncableCommit struct { module *bufmodulebuild.BuiltModule } -// clock allows embedding a custom time.Now implementation, so it's easier to test. -type clock interface { - Now() time.Time -} - -// realClock returns the real time.Now. -type realClock struct{} - -func (*realClock) Now() time.Time { return time.Now() } - func syncableCommitsHashes(syncableCommits []*syncableCommit) []string { var hashes []string for _, sCommit := range syncableCommits { diff --git a/private/buf/cmd/buf/command/alpha/repo/reposync/reposync.go b/private/buf/cmd/buf/command/alpha/repo/reposync/reposync.go index c3f107abad..bc5e089946 100644 --- a/private/buf/cmd/buf/command/alpha/repo/reposync/reposync.go +++ b/private/buf/cmd/buf/command/alpha/repo/reposync/reposync.go @@ -20,28 +20,19 @@ import ( "fmt" "strings" - "connectrpc.com/connect" "github.com/bufbuild/buf/private/buf/bufcli" "github.com/bufbuild/buf/private/buf/bufsync" "github.com/bufbuild/buf/private/bufpkg/bufanalysis" - "github.com/bufbuild/buf/private/bufpkg/bufcas" - "github.com/bufbuild/buf/private/bufpkg/bufcas/bufcasalpha" "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduleref" - "github.com/bufbuild/buf/private/gen/proto/connect/buf/alpha/registry/v1alpha1/registryv1alpha1connect" - registryv1alpha1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/registry/v1alpha1" "github.com/bufbuild/buf/private/pkg/app/appcmd" "github.com/bufbuild/buf/private/pkg/app/appflag" "github.com/bufbuild/buf/private/pkg/command" - "github.com/bufbuild/buf/private/pkg/connectclient" "github.com/bufbuild/buf/private/pkg/git" "github.com/bufbuild/buf/private/pkg/normalpath" - "github.com/bufbuild/buf/private/pkg/storage" "github.com/bufbuild/buf/private/pkg/storage/storagegit" "github.com/bufbuild/buf/private/pkg/stringutil" "github.com/spf13/cobra" "github.com/spf13/pflag" - "go.uber.org/zap" - "google.golang.org/protobuf/types/known/timestamppb" ) const ( @@ -195,11 +186,7 @@ func sync( return fmt.Errorf("create connect client %w", err) } syncerOptions := []bufsync.SyncerOption{ - bufsync.SyncerWithRemote(remoteName), - bufsync.SyncerWithResumption(syncPointResolver(clientConfig)), - bufsync.SyncerWithGitCommitChecker(syncGitCommitChecker(clientConfig)), - bufsync.SyncerWithModuleDefaultBranchGetter(defaultBranchGetter(clientConfig)), - bufsync.SyncerWithTagsBackfiller(tagsBackfiller(clientConfig)), + bufsync.SyncerWithGitRemote(remoteName), } if allBranches { syncerOptions = append(syncerOptions, bufsync.SyncerWithAllBranches()) @@ -230,346 +217,21 @@ func sync( } syncer, err := bufsync.NewSyncer( container.Logger(), + bufsync.NewRealClock(), repo, storageProvider, - newErrorHandler(container.Logger(), modulesDirsWithOverrides), - syncerOptions..., - ) - if err != nil { - return fmt.Errorf("new syncer: %w", err) - } - return syncer.Sync(ctx, func(ctx context.Context, moduleCommit bufsync.ModuleCommit) error { - syncPoint, err := pushOrCreate( - ctx, + newSyncHandler( + container.Logger(), clientConfig, + container, repo, - moduleCommit.Commit(), - moduleCommit.Branch(), - moduleCommit.Tags(), - moduleCommit.Identity(), - moduleCommit.Bucket(), createWithVisibility, - ) - if err != nil { - // We failed to push. We fail hard on this because the error may be recoverable - // (i.e., the BSR may be down) and we should re-attempt this commit. - return fmt.Errorf( - "failed to push or create %s at %s: %w", - moduleCommit.Identity().IdentityString(), - moduleCommit.Commit().Hash(), - err, - ) - } - _, err = container.Stderr().Write([]byte( - // from local -> to remote - // :: -> : - fmt.Sprintf( - "%s:%s:%s -> %s:%s\n", - moduleCommit.Directory(), moduleCommit.Branch(), moduleCommit.Commit().Hash().Hex(), - moduleCommit.Identity().IdentityString(), syncPoint.BsrCommitName, - )), - ) - return err - }) -} - -func syncPointResolver(clientConfig *connectclient.Config) bufsync.SyncPointResolver { - return func(ctx context.Context, module bufmoduleref.ModuleIdentity, branch string) (git.Hash, error) { - service := connectclient.Make(clientConfig, module.Remote(), registryv1alpha1connect.NewSyncServiceClient) - syncPoint, err := service.GetGitSyncPoint(ctx, connect.NewRequest(®istryv1alpha1.GetGitSyncPointRequest{ - Owner: module.Owner(), - Repository: module.Repository(), - Branch: branch, - })) - if err != nil { - if connect.CodeOf(err) == connect.CodeNotFound { - // No syncpoint - return nil, nil - } - return nil, fmt.Errorf("get git sync point: %w", err) - } - hash, err := git.NewHashFromHex(syncPoint.Msg.GetSyncPoint().GitCommitHash) - if err != nil { - return nil, fmt.Errorf( - "invalid sync point from BSR %q: %w", - syncPoint.Msg.GetSyncPoint().GetGitCommitHash(), - err, - ) - } - return hash, nil - } -} - -func syncGitCommitChecker(clientConfig *connectclient.Config) bufsync.SyncedGitCommitChecker { - return func(ctx context.Context, module bufmoduleref.ModuleIdentity, commitHashes map[string]struct{}) (map[string]struct{}, error) { - service := connectclient.Make(clientConfig, module.Remote(), registryv1alpha1connect.NewLabelServiceClient) - res, err := service.GetLabelsInNamespace(ctx, connect.NewRequest(®istryv1alpha1.GetLabelsInNamespaceRequest{ - RepositoryOwner: module.Owner(), - RepositoryName: module.Repository(), - LabelNamespace: registryv1alpha1.LabelNamespace_LABEL_NAMESPACE_GIT_COMMIT, - LabelNames: stringutil.MapToSlice(commitHashes), - })) - if err != nil { - if connect.CodeOf(err) == connect.CodeNotFound { - // Repo is not created - return nil, nil - } - return nil, fmt.Errorf("get labels in namespace: %w", err) - } - syncedHashes := make(map[string]struct{}) - for _, label := range res.Msg.Labels { - syncedHash := label.LabelName.Name - if _, expected := commitHashes[syncedHash]; !expected { - return nil, fmt.Errorf("received unexpected synced hash %q, expected %v", syncedHash, commitHashes) - } - syncedHashes[syncedHash] = struct{}{} - } - return syncedHashes, nil - } -} - -func defaultBranchGetter(clientConfig *connectclient.Config) bufsync.ModuleDefaultBranchGetter { - return func(ctx context.Context, module bufmoduleref.ModuleIdentity) (string, error) { - service := connectclient.Make(clientConfig, module.Remote(), registryv1alpha1connect.NewRepositoryServiceClient) - res, err := service.GetRepositoryByFullName(ctx, connect.NewRequest(®istryv1alpha1.GetRepositoryByFullNameRequest{ - FullName: module.Owner() + "/" + module.Repository(), - })) - if err != nil { - if connect.CodeOf(err) == connect.CodeNotFound { - // Repo is not created - return "", bufsync.ErrModuleDoesNotExist - } - return "", fmt.Errorf("get repository by full name %q: %w", module.IdentityString(), err) - } - return res.Msg.Repository.DefaultBranch, nil - } -} - -func tagsBackfiller(clientConfig *connectclient.Config) bufsync.TagsBackfiller { - return func( - ctx context.Context, - module bufmoduleref.ModuleIdentity, - alreadySyncedHash git.Hash, - author git.Ident, - committer git.Ident, - tags []string, - ) (string, error) { - service := connectclient.Make(clientConfig, module.Remote(), registryv1alpha1connect.NewSyncServiceClient) - res, err := service.AttachGitTags(ctx, connect.NewRequest(®istryv1alpha1.AttachGitTagsRequest{ - Owner: module.Owner(), - Repository: module.Repository(), - Hash: alreadySyncedHash.Hex(), - Author: ®istryv1alpha1.GitIdentity{ - Name: author.Name(), - Email: author.Email(), - Time: timestamppb.New(author.Timestamp()), - }, - Committer: ®istryv1alpha1.GitIdentity{ - Name: committer.Name(), - Email: committer.Email(), - Time: timestamppb.New(committer.Timestamp()), - }, - Tags: tags, - })) - if err != nil { - if connect.CodeOf(err) == connect.CodeNotFound { - // Repo is not created - return "", bufsync.ErrModuleDoesNotExist - } - return "", fmt.Errorf("attach git tags to module %q: %w", module.IdentityString(), err) - } - return res.Msg.GetBsrCommitName(), nil - } -} - -type syncErrorHandler struct { - logger *zap.Logger - modulesDirsWithIdentityOverride map[string]struct{} -} - -func newErrorHandler( - logger *zap.Logger, - modulesDirsWithIdentityOverride map[string]struct{}, -) bufsync.ErrorHandler { - return &syncErrorHandler{ - logger: logger, - modulesDirsWithIdentityOverride: modulesDirsWithIdentityOverride, - } -} - -func (h *syncErrorHandler) HandleReadModuleError(err *bufsync.ReadModuleError) bufsync.LookbackDecisionCode { - switch err.Code() { - case bufsync.ReadModuleErrorCodeModuleNotFound, - bufsync.ReadModuleErrorCodeInvalidModuleConfig, - bufsync.ReadModuleErrorCodeBuildModule: - // if the module cannot be found, has an invalid config, or cannot build, we can just skip the - // commit. - return bufsync.LookbackDecisionCodeSkip - case bufsync.ReadModuleErrorCodeUnnamedModule, - bufsync.ReadModuleErrorCodeUnexpectedName: - // if the module has an unexpected or no name, we should override the module identity only if it - // was passed explicitly as an identity override, otherwise skip the commit. - if _, hasExplicitOverride := h.modulesDirsWithIdentityOverride[err.ModuleDir()]; hasExplicitOverride { - return bufsync.LookbackDecisionCodeOverride - } - return bufsync.LookbackDecisionCodeSkip - } - // any unhandled scenarios? just fail the sync - return bufsync.LookbackDecisionCodeFail -} - -func (h *syncErrorHandler) InvalidBSRSyncPoint( - module bufmoduleref.ModuleIdentity, - branch string, - syncPoint git.Hash, - isGitDefaultBranch bool, - err error, -) error { - // The most likely culprit for an invalid sync point is a rebase, where the last known commit has - // been garbage collected. In this case, let's present a better error message. - // - // This is not trivial scenario if the branch that's been rebased is a long-lived branch (like - // main) whose artifacts are consumed by other branches, as we may fail to sync those commits if - // we continue. - // - // For now we simply error if this happens in the default branch, and WARN+skip for the other - // branches. We may want to provide a flag in the future for forcing sync to continue despite - // this. - if errors.Is(err, git.ErrObjectNotFound) { - if isGitDefaultBranch { - return fmt.Errorf( - "last synced git commit %q for default branch %q in module %q is not found in the git repo, did you rebase or reset your default branch?", - syncPoint.Hex(), branch, module.IdentityString(), - ) - } - h.logger.Warn( - "last synced git commit not found in the git repo for a non-default branch", - zap.String("module", module.IdentityString()), - zap.String("branch", branch), - zap.String("last synced git commit", syncPoint.Hex()), - ) - return nil - } - // Other error, let's abort sync. - return fmt.Errorf( - "invalid sync point %q for branch %q in module %q: %w", - syncPoint.Hex(), branch, module.IdentityString(), err, - ) -} - -func pushOrCreate( - ctx context.Context, - clientConfig *connectclient.Config, - repo git.Repository, - commit git.Commit, - branch string, - tags []string, - moduleIdentity bufmoduleref.ModuleIdentity, - moduleBucket storage.ReadBucket, - createWithVisibility string, -) (*registryv1alpha1.GitSyncPoint, error) { - modulePin, err := push( - ctx, - clientConfig, - repo, - commit, - branch, - tags, - moduleIdentity, - moduleBucket, + modulesDirsWithOverrides, + ), + syncerOptions..., ) if err != nil { - // We rely on Push* returning a NotFound error to denote the repository is not created. - // This technically could be a NotFound error for some other entity than the repository - // in question, however if it is, then this Create call will just fail as the repository - // is already created, and there is no side effect. The 99% case is that a NotFound - // error is because the repository does not exist, and we want to avoid having to do - // a GetRepository RPC call for every call to push --create. - if createWithVisibility != "" && connect.CodeOf(err) == connect.CodeNotFound { - if err := create(ctx, clientConfig, moduleIdentity, createWithVisibility); err != nil { - return nil, fmt.Errorf("create repo: %w", err) - } - return push( - ctx, - clientConfig, - repo, - commit, - branch, - tags, - moduleIdentity, - moduleBucket, - ) - } - return nil, fmt.Errorf("push: %w", err) - } - return modulePin, nil -} - -func push( - ctx context.Context, - clientConfig *connectclient.Config, - repo git.Repository, - commit git.Commit, - branch string, - tags []string, - moduleIdentity bufmoduleref.ModuleIdentity, - moduleBucket storage.ReadBucket, -) (*registryv1alpha1.GitSyncPoint, error) { - service := connectclient.Make(clientConfig, moduleIdentity.Remote(), registryv1alpha1connect.NewSyncServiceClient) - fileSet, err := bufcas.NewFileSetForBucket(ctx, moduleBucket) - if err != nil { - return nil, err - } - protoManifestBlob, protoBlobs, err := bufcas.FileSetToProtoManifestBlobAndBlobs(fileSet) - if err != nil { - return nil, err - } - resp, err := service.SyncGitCommit(ctx, connect.NewRequest(®istryv1alpha1.SyncGitCommitRequest{ - Owner: moduleIdentity.Owner(), - Repository: moduleIdentity.Repository(), - Manifest: bufcasalpha.BlobToAlpha(protoManifestBlob), - Blobs: bufcasalpha.BlobsToAlpha(protoBlobs), - Hash: commit.Hash().Hex(), - Branch: branch, - Tags: tags, - Author: ®istryv1alpha1.GitIdentity{ - Name: commit.Author().Name(), - Email: commit.Author().Email(), - Time: timestamppb.New(commit.Author().Timestamp()), - }, - Committer: ®istryv1alpha1.GitIdentity{ - Name: commit.Committer().Name(), - Email: commit.Committer().Email(), - Time: timestamppb.New(commit.Committer().Timestamp()), - }, - })) - if err != nil { - return nil, err - } - return resp.Msg.SyncPoint, nil -} - -func create( - ctx context.Context, - clientConfig *connectclient.Config, - moduleIdentity bufmoduleref.ModuleIdentity, - visibility string, -) error { - service := connectclient.Make(clientConfig, moduleIdentity.Remote(), registryv1alpha1connect.NewRepositoryServiceClient) - visiblity, err := bufcli.VisibilityFlagToVisibility(visibility) - if err != nil { - return err - } - fullName := moduleIdentity.Owner() + "/" + moduleIdentity.Repository() - _, err = service.CreateRepositoryByFullName( - ctx, - connect.NewRequest(®istryv1alpha1.CreateRepositoryByFullNameRequest{ - FullName: fullName, - Visibility: visiblity, - }), - ) - if err != nil && connect.CodeOf(err) == connect.CodeAlreadyExists { - return connect.NewError(connect.CodeInternal, fmt.Errorf("expected repository %s to be missing but found the repository to already exist", fullName)) + return fmt.Errorf("new syncer: %w", err) } - return err + return syncer.Sync(ctx) } diff --git a/private/buf/cmd/buf/command/alpha/repo/reposync/sync_handler.go b/private/buf/cmd/buf/command/alpha/repo/reposync/sync_handler.go new file mode 100644 index 0000000000..324998f935 --- /dev/null +++ b/private/buf/cmd/buf/command/alpha/repo/reposync/sync_handler.go @@ -0,0 +1,362 @@ +// Copyright 2020-2023 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reposync + +import ( + "context" + "errors" + "fmt" + + "connectrpc.com/connect" + "github.com/bufbuild/buf/private/buf/bufcli" + "github.com/bufbuild/buf/private/buf/bufsync" + "github.com/bufbuild/buf/private/bufpkg/bufcas" + "github.com/bufbuild/buf/private/bufpkg/bufcas/bufcasalpha" + "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduleref" + "github.com/bufbuild/buf/private/gen/proto/connect/buf/alpha/registry/v1alpha1/registryv1alpha1connect" + registryv1alpha1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/registry/v1alpha1" + "github.com/bufbuild/buf/private/pkg/app/appflag" + "github.com/bufbuild/buf/private/pkg/connectclient" + "github.com/bufbuild/buf/private/pkg/git" + "github.com/bufbuild/buf/private/pkg/storage" + "github.com/bufbuild/buf/private/pkg/stringutil" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/timestamppb" +) + +type syncHandler struct { + logger *zap.Logger + clientConfig *connectclient.Config + container appflag.Container + repo git.Repository + createWithVisibility string + modulesDirsWithIdentityOverride map[string]struct{} +} + +func newSyncHandler( + logger *zap.Logger, + clientConfig *connectclient.Config, + container appflag.Container, + repo git.Repository, + createWithVisibility string, + modulesDirsWithIdentityOverride map[string]struct{}, +) bufsync.Handler { + return &syncHandler{ + logger: logger, + clientConfig: clientConfig, + container: container, + repo: repo, + createWithVisibility: createWithVisibility, + modulesDirsWithIdentityOverride: modulesDirsWithIdentityOverride, + } +} + +func (h *syncHandler) ResolveSyncPoint(ctx context.Context, module bufmoduleref.ModuleIdentity, branch string) (git.Hash, error) { + service := connectclient.Make(h.clientConfig, module.Remote(), registryv1alpha1connect.NewSyncServiceClient) + syncPoint, err := service.GetGitSyncPoint(ctx, connect.NewRequest(®istryv1alpha1.GetGitSyncPointRequest{ + Owner: module.Owner(), + Repository: module.Repository(), + Branch: branch, + })) + if err != nil { + if connect.CodeOf(err) == connect.CodeNotFound { + // No syncpoint + return nil, nil + } + return nil, fmt.Errorf("get git sync point: %w", err) + } + hash, err := git.NewHashFromHex(syncPoint.Msg.GetSyncPoint().GitCommitHash) + if err != nil { + return nil, fmt.Errorf( + "invalid sync point from BSR %q: %w", + syncPoint.Msg.GetSyncPoint().GetGitCommitHash(), + err, + ) + } + return hash, nil +} + +func (h *syncHandler) CheckSyncedGitCommits(ctx context.Context, module bufmoduleref.ModuleIdentity, commitHashes map[string]struct{}) (map[string]struct{}, error) { + service := connectclient.Make(h.clientConfig, module.Remote(), registryv1alpha1connect.NewLabelServiceClient) + res, err := service.GetLabelsInNamespace(ctx, connect.NewRequest(®istryv1alpha1.GetLabelsInNamespaceRequest{ + RepositoryOwner: module.Owner(), + RepositoryName: module.Repository(), + LabelNamespace: registryv1alpha1.LabelNamespace_LABEL_NAMESPACE_GIT_COMMIT, + LabelNames: stringutil.MapToSlice(commitHashes), + })) + if err != nil { + if connect.CodeOf(err) == connect.CodeNotFound { + // Repo is not created + return nil, nil + } + return nil, fmt.Errorf("get labels in namespace: %w", err) + } + syncedHashes := make(map[string]struct{}) + for _, label := range res.Msg.Labels { + syncedHash := label.LabelName.Name + if _, expected := commitHashes[syncedHash]; !expected { + return nil, fmt.Errorf("received unexpected synced hash %q, expected %v", syncedHash, commitHashes) + } + syncedHashes[syncedHash] = struct{}{} + } + return syncedHashes, nil +} + +func (h *syncHandler) GetModuleReleaseBranch(ctx context.Context, module bufmoduleref.ModuleIdentity) (string, error) { + service := connectclient.Make(h.clientConfig, module.Remote(), registryv1alpha1connect.NewRepositoryServiceClient) + res, err := service.GetRepositoryByFullName(ctx, connect.NewRequest(®istryv1alpha1.GetRepositoryByFullNameRequest{ + FullName: module.Owner() + "/" + module.Repository(), + })) + if err != nil { + if connect.CodeOf(err) == connect.CodeNotFound { + // Repo is not created + return "", bufsync.ErrModuleDoesNotExist + } + return "", fmt.Errorf("get repository by full name %q: %w", module.IdentityString(), err) + } + return res.Msg.Repository.DefaultBranch, nil +} + +func (h *syncHandler) BackfillTags( + ctx context.Context, + module bufmoduleref.ModuleIdentity, + alreadySyncedHash git.Hash, + author git.Ident, + committer git.Ident, + tags []string, +) (string, error) { + service := connectclient.Make(h.clientConfig, module.Remote(), registryv1alpha1connect.NewSyncServiceClient) + res, err := service.AttachGitTags(ctx, connect.NewRequest(®istryv1alpha1.AttachGitTagsRequest{ + Owner: module.Owner(), + Repository: module.Repository(), + Hash: alreadySyncedHash.Hex(), + Author: ®istryv1alpha1.GitIdentity{ + Name: author.Name(), + Email: author.Email(), + Time: timestamppb.New(author.Timestamp()), + }, + Committer: ®istryv1alpha1.GitIdentity{ + Name: committer.Name(), + Email: committer.Email(), + Time: timestamppb.New(committer.Timestamp()), + }, + Tags: tags, + })) + if err != nil { + if connect.CodeOf(err) == connect.CodeNotFound { + // Repo is not created + return "", bufsync.ErrModuleDoesNotExist + } + return "", fmt.Errorf("attach git tags to module %q: %w", module.IdentityString(), err) + } + return res.Msg.GetBsrCommitName(), nil +} + +func (h *syncHandler) SyncModuleCommit(ctx context.Context, moduleCommit bufsync.ModuleCommit) error { + syncPoint, err := h.pushOrCreate( + ctx, + moduleCommit.Commit(), + moduleCommit.Branch(), + moduleCommit.Tags(), + moduleCommit.Identity(), + moduleCommit.Bucket(), + ) + if err != nil { + // We failed to push. We fail hard on this because the error may be recoverable + // (i.e., the BSR may be down) and we should re-attempt this commit. + return fmt.Errorf( + "failed to push or create %s at %s: %w", + moduleCommit.Identity().IdentityString(), + moduleCommit.Commit().Hash(), + err, + ) + } + _, err = h.container.Stderr().Write([]byte( + // from local -> to remote + // :: -> : + fmt.Sprintf( + "%s:%s:%s -> %s:%s\n", + moduleCommit.Directory(), moduleCommit.Branch(), moduleCommit.Commit().Hash().Hex(), + moduleCommit.Identity().IdentityString(), syncPoint.BsrCommitName, + )), + ) + return err +} + +func (h *syncHandler) HandleReadModuleError(err *bufsync.ReadModuleError) bufsync.LookbackDecisionCode { + switch err.Code() { + case bufsync.ReadModuleErrorCodeModuleNotFound, + bufsync.ReadModuleErrorCodeInvalidModuleConfig, + bufsync.ReadModuleErrorCodeBuildModule: + // if the module cannot be found, has an invalid config, or cannot build, we can just skip the + // commit. + return bufsync.LookbackDecisionCodeSkip + case bufsync.ReadModuleErrorCodeUnnamedModule, + bufsync.ReadModuleErrorCodeUnexpectedName: + // if the module has an unexpected or no name, we should override the module identity only if it + // was passed explicitly as an identity override, otherwise skip the commit. + if _, hasExplicitOverride := h.modulesDirsWithIdentityOverride[err.ModuleDir()]; hasExplicitOverride { + return bufsync.LookbackDecisionCodeOverride + } + return bufsync.LookbackDecisionCodeSkip + } + // any unhandled scenarios? just fail the sync + return bufsync.LookbackDecisionCodeFail +} + +func (h *syncHandler) InvalidBSRSyncPoint( + module bufmoduleref.ModuleIdentity, + branch string, + syncPoint git.Hash, + isGitDefaultBranch bool, + err error, +) error { + // The most likely culprit for an invalid sync point is a rebase, where the last known commit has + // been garbage collected. In this case, let's present a better error message. + // + // This is not trivial scenario if the branch that's been rebased is a long-lived branch (like + // main) whose artifacts are consumed by other branches, as we may fail to sync those commits if + // we continue. + // + // For now we simply error if this happens in the default branch, and WARN+skip for the other + // branches. We may want to provide a flag in the future for forcing sync to continue despite + // this. + if errors.Is(err, git.ErrObjectNotFound) { + if isGitDefaultBranch { + return fmt.Errorf( + "last synced git commit %q for default branch %q in module %q is not found in the git repo, did you rebase or reset your default branch?", + syncPoint.Hex(), branch, module.IdentityString(), + ) + } + h.logger.Warn( + "last synced git commit not found in the git repo for a non-default branch", + zap.String("module", module.IdentityString()), + zap.String("branch", branch), + zap.String("last synced git commit", syncPoint.Hex()), + ) + return nil + } + // Other error, let's abort sync. + return fmt.Errorf( + "invalid sync point %q for branch %q in module %q: %w", + syncPoint.Hex(), branch, module.IdentityString(), err, + ) +} + +func (h *syncHandler) pushOrCreate( + ctx context.Context, + commit git.Commit, + branch string, + tags []string, + moduleIdentity bufmoduleref.ModuleIdentity, + moduleBucket storage.ReadBucket, +) (*registryv1alpha1.GitSyncPoint, error) { + modulePin, err := h.push( + ctx, + commit, + branch, + tags, + moduleIdentity, + moduleBucket, + ) + if err != nil { + // We rely on Push* returning a NotFound error to denote the repository is not created. + // This technically could be a NotFound error for some other entity than the repository + // in question, however if it is, then this Create call will just fail as the repository + // is already created, and there is no side effect. The 99% case is that a NotFound + // error is because the repository does not exist, and we want to avoid having to do + // a GetRepository RPC call for every call to push --create. + if h.createWithVisibility != "" && connect.CodeOf(err) == connect.CodeNotFound { + if err := h.create(ctx, moduleIdentity); err != nil { + return nil, fmt.Errorf("create repo: %w", err) + } + return h.push( + ctx, + commit, + branch, + tags, + moduleIdentity, + moduleBucket, + ) + } + return nil, fmt.Errorf("push: %w", err) + } + return modulePin, nil +} + +func (h *syncHandler) push( + ctx context.Context, + commit git.Commit, + branch string, + tags []string, + moduleIdentity bufmoduleref.ModuleIdentity, + moduleBucket storage.ReadBucket, +) (*registryv1alpha1.GitSyncPoint, error) { + service := connectclient.Make(h.clientConfig, moduleIdentity.Remote(), registryv1alpha1connect.NewSyncServiceClient) + fileSet, err := bufcas.NewFileSetForBucket(ctx, moduleBucket) + if err != nil { + return nil, err + } + protoManifestBlob, protoBlobs, err := bufcas.FileSetToProtoManifestBlobAndBlobs(fileSet) + if err != nil { + return nil, err + } + resp, err := service.SyncGitCommit(ctx, connect.NewRequest(®istryv1alpha1.SyncGitCommitRequest{ + Owner: moduleIdentity.Owner(), + Repository: moduleIdentity.Repository(), + Manifest: bufcasalpha.BlobToAlpha(protoManifestBlob), + Blobs: bufcasalpha.BlobsToAlpha(protoBlobs), + Hash: commit.Hash().Hex(), + Branch: branch, + Tags: tags, + Author: ®istryv1alpha1.GitIdentity{ + Name: commit.Author().Name(), + Email: commit.Author().Email(), + Time: timestamppb.New(commit.Author().Timestamp()), + }, + Committer: ®istryv1alpha1.GitIdentity{ + Name: commit.Committer().Name(), + Email: commit.Committer().Email(), + Time: timestamppb.New(commit.Committer().Timestamp()), + }, + })) + if err != nil { + return nil, err + } + return resp.Msg.SyncPoint, nil +} + +func (h *syncHandler) create( + ctx context.Context, + moduleIdentity bufmoduleref.ModuleIdentity, +) error { + service := connectclient.Make(h.clientConfig, moduleIdentity.Remote(), registryv1alpha1connect.NewRepositoryServiceClient) + visiblity, err := bufcli.VisibilityFlagToVisibility(h.createWithVisibility) + if err != nil { + return err + } + fullName := moduleIdentity.Owner() + "/" + moduleIdentity.Repository() + _, err = service.CreateRepositoryByFullName( + ctx, + connect.NewRequest(®istryv1alpha1.CreateRepositoryByFullNameRequest{ + FullName: fullName, + Visibility: visiblity, + }), + ) + if err != nil && connect.CodeOf(err) == connect.CodeAlreadyExists { + return connect.NewError(connect.CodeInternal, fmt.Errorf("expected repository %s to be missing but found the repository to already exist", fullName)) + } + return err +} diff --git a/private/pkg/git/git.go b/private/pkg/git/git.go index d3bfd368b0..ac93b04ac0 100644 --- a/private/pkg/git/git.go +++ b/private/pkg/git/git.go @@ -272,7 +272,7 @@ type Repository interface { // remote named `origin`). It can be customized via the `OpenRepositoryWithDefaultBranch` option. DefaultBranch() string // CurrentBranch is the current checked out branch. - CurrentBranch() string + CurrentBranch(ctx context.Context) (string, error) // ForEachBranch ranges over branches in the repository in an undefined order. ForEachBranch(f func(branch string, headHash Hash) error, options ...ForEachBranchOption) error // ForEachCommit ranges over commits in reverse topological order, going backwards in time always diff --git a/private/pkg/git/repository.go b/private/pkg/git/repository.go index 835d0f1b9a..c3bd2e756e 100644 --- a/private/pkg/git/repository.go +++ b/private/pkg/git/repository.go @@ -35,10 +35,10 @@ type openRepositoryOpts struct { } type repository struct { - gitDirPath string - defaultBranch string - checkedOutBranch string - objectReader *objectReader + gitDirPath string + defaultBranch string + objectReader *objectReader + runner command.Runner // packedOnce controls the fields below related to reading the `packed-refs` file packedOnce sync.Once @@ -77,15 +77,11 @@ func openGitRepository( return nil, fmt.Errorf("automatically determine default branch: %w", err) } } - checkedOutBranch, err := detectCheckedOutBranch(ctx, gitDirPath, runner) - if err != nil { - return nil, fmt.Errorf("automatically determine checked out branch: %w", err) - } return &repository{ - gitDirPath: gitDirPath, - defaultBranch: opts.defaultBranch, - checkedOutBranch: checkedOutBranch, - objectReader: reader, + gitDirPath: gitDirPath, + defaultBranch: opts.defaultBranch, + objectReader: reader, + runner: runner, }, nil } @@ -157,8 +153,8 @@ func (r *repository) DefaultBranch() string { return r.defaultBranch } -func (r *repository) CurrentBranch() string { - return r.checkedOutBranch +func (r *repository) CurrentBranch(ctx context.Context) (string, error) { + return detectCheckedOutBranch(ctx, r.gitDirPath, r.runner) } func (r *repository) ForEachCommit(f func(Commit) error, options ...ForEachCommitOption) error { diff --git a/private/pkg/git/repository_test.go b/private/pkg/git/repository_test.go index ecc407d18e..11a4e22750 100644 --- a/private/pkg/git/repository_test.go +++ b/private/pkg/git/repository_test.go @@ -15,6 +15,7 @@ package git_test import ( + "context" "testing" "github.com/bufbuild/buf/private/pkg/git" @@ -235,9 +236,11 @@ func TestForEachBranch(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() repo := gittest.ScaffoldGitRepository(t) - assert.Equal(t, gittest.DefaultBranch, repo.CurrentBranch()) + currentBranch, err := repo.CurrentBranch(context.Background()) + require.NoError(t, err) + assert.Equal(t, gittest.DefaultBranch, currentBranch) branches := make(map[string]struct{}) - err := repo.ForEachBranch(func(branch string, headHash git.Hash) error { + err = repo.ForEachBranch(func(branch string, headHash git.Hash) error { require.NotEmpty(t, branch) if _, alreadySeen := branches[branch]; alreadySeen { assert.Fail(t, "duplicate branch", branch)