From b1afd2efb00d34d0177de1337a12c22548e628a2 Mon Sep 17 00:00:00 2001 From: chlins Date: Tue, 25 Jan 2022 21:23:10 +0800 Subject: [PATCH] fix: skip replication to proxy cache project Signed-off-by: chlins --- src/controller/replication/flow/copy.go | 32 +++++++++++++++---- src/controller/replication/flow/copy_test.go | 13 +++++++- src/controller/replication/flow/stage.go | 4 ++- src/pkg/reg/adapter/harbor/base/adapter.go | 30 +++++++++++++---- .../reg/adapter/harbor/base/adapter_test.go | 27 ++++++++++++++++ src/pkg/reg/model/resource.go | 17 ++++++++++ 6 files changed, 108 insertions(+), 15 deletions(-) diff --git a/src/controller/replication/flow/copy.go b/src/controller/replication/flow/copy.go index 243152d5f9f..f36f4eeb0dd 100644 --- a/src/controller/replication/flow/copy.go +++ b/src/controller/replication/flow/copy.go @@ -20,6 +20,7 @@ import ( repctlmodel "github.com/goharbor/harbor/src/controller/replication/model" "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/pkg/reg/model" "github.com/goharbor/harbor/src/pkg/task" @@ -103,12 +104,29 @@ func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) { } func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource, speed int32) error { - for i, resource := range srcResources { - src, err := json.Marshal(resource) + var taskCnt int + defer func() { + // if no task be created, mark execution done. + if taskCnt == 0 { + if err := c.executionMgr.MarkDone(ctx, c.executionID, "no resources need to be replicated"); err != nil { + logger.Errorf("failed to mark done for the execution %d: %v", c.executionID, err) + } + } + }() + + for i, srcResource := range srcResources { + dstResource := dstResources[i] + // if dest resource should be skipped, ignore replicate. + if dstResource.Skip { + log.Warningf("skip create replication task because of dest limitation, src: %s, dst: %s", srcResource.Metadata, dstResource.Metadata) + continue + } + + src, err := json.Marshal(srcResource) if err != nil { return err } - dest, err := json.Marshal(dstResources[i]) + dest, err := json.Marshal(dstResource) if err != nil { return err } @@ -127,11 +145,13 @@ func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources [ if _, err = c.taskMgr.Create(ctx, c.executionID, job, map[string]interface{}{ "operation": "copy", - "resource_type": string(resource.Type), - "source_resource": getResourceName(resource), - "destination_resource": getResourceName(dstResources[i])}); err != nil { + "resource_type": string(srcResource.Type), + "source_resource": getResourceName(srcResource), + "destination_resource": getResourceName(dstResource)}); err != nil { return err } + + taskCnt++ } return nil } diff --git a/src/controller/replication/flow/copy_test.go b/src/controller/replication/flow/copy_test.go index 9ddfaa574a2..ef6dde86436 100644 --- a/src/controller/replication/flow/copy_test.go +++ b/src/controller/replication/flow/copy_test.go @@ -51,6 +51,17 @@ func (c *copyFlowTestSuite) TestRun() { }, Override: false, }, + { + Type: model.ResourceTypeArtifact, + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: "proxy/hello-world", + }, + Vtags: []string{"latest"}, + }, + Override: false, + Skip: true, + }, }, nil) adp.On("PrepareForPush", mock.Anything).Return(nil) @@ -60,7 +71,7 @@ func (c *copyFlowTestSuite) TestRun() { }, nil) taskMgr := &testingTask.Manager{} - taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil) + taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil).Once() policy := &repctlmodel.Policy{ SrcRegistry: &model.Registry{ Type: "TEST_FOR_COPY_FLOW", diff --git a/src/controller/replication/flow/stage.go b/src/controller/replication/flow/stage.go index 54259a91671..fa2666617ce 100644 --- a/src/controller/replication/flow/stage.go +++ b/src/controller/replication/flow/stage.go @@ -16,10 +16,11 @@ package flow import ( "fmt" - "github.com/goharbor/harbor/src/lib/errors" "path" "strings" + "github.com/goharbor/harbor/src/lib/errors" + repctlmodel "github.com/goharbor/harbor/src/controller/replication/model" "github.com/goharbor/harbor/src/lib/log" adp "github.com/goharbor/harbor/src/pkg/reg/adapter" @@ -138,6 +139,7 @@ func assembleDestinationResources(resources []*model.Resource, Deleted: resource.Deleted, IsDeleteTag: resource.IsDeleteTag, Override: policy.Override, + Skip: resource.Skip, } res.Metadata = &model.ResourceMetadata{ Repository: &model.Repository{ diff --git a/src/pkg/reg/adapter/harbor/base/adapter.go b/src/pkg/reg/adapter/harbor/base/adapter.go index 21d5523759d..5f616f599d6 100644 --- a/src/pkg/reg/adapter/harbor/base/adapter.go +++ b/src/pkg/reg/adapter/harbor/base/adapter.go @@ -182,15 +182,19 @@ func (a *Adapter) PrepareForPush(resources []*model.Resource) error { return errors.Wrapf(err, "list projects with query %s", q) } - existProjects := make(map[string]*Project) + proxyCacheProjects := make(map[string]bool) + existProjects := make(map[string]bool) for _, p := range queryProjects { - existProjects[p.Name] = p + existProjects[p.Name] = true + // if project with registry_id, that means this is a proxy cache project. + if p.RegistryID > 0 { + proxyCacheProjects[p.Name] = true + } } var notExistProjects []*Project for _, p := range projects { - _, exist := existProjects[p.Name] - if !exist { + if !existProjects[p.Name] { notExistProjects = append(notExistProjects, p) } } @@ -205,6 +209,17 @@ func (a *Adapter) PrepareForPush(resources []*model.Resource) error { } log.Debugf("project %s created", project.Name) } + + // do filter for proxy cache projects. + for _, res := range resources { + paths := strings.Split(res.Metadata.Repository.Name, "/") + projectName := paths[0] + if proxyCacheProjects[projectName] { + // set resource skip flag to true if it's a proxy cache project. + res.Skip = true + } + } + return nil } @@ -295,9 +310,10 @@ func parsePublic(metadata map[string]interface{}) bool { // Project model type Project struct { - ID int64 `json:"project_id"` - Name string `json:"name"` - Metadata map[string]interface{} `json:"metadata"` + ID int64 `json:"project_id"` + Name string `json:"name"` + Metadata map[string]interface{} `json:"metadata"` + RegistryID int64 `json:"registry_id"` } func isLocalHarbor(url string) bool { diff --git a/src/pkg/reg/adapter/harbor/base/adapter_test.go b/src/pkg/reg/adapter/harbor/base/adapter_test.go index 3c542389968..b260b33914a 100644 --- a/src/pkg/reg/adapter/harbor/base/adapter_test.go +++ b/src/pkg/reg/adapter/harbor/base/adapter_test.go @@ -170,6 +170,33 @@ func TestPrepareForPush(t *testing.T) { }, }) require.Nil(t, err) + + // project already exists and the type is proxy cache + server = test.NewServer(&test.RequestHandlerMapping{ + Method: http.MethodGet, + Pattern: "/api/projects", + Handler: func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(`[{"name": "library", "registry_id": 1}]`)) + }, + }) + registry = &model.Registry{ + URL: server.URL, + } + adapter, err = New(registry) + require.Nil(t, err) + resources := []*model.Resource{ + { + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: "library/hello-world", + }, + }, + }, + } + err = adapter.PrepareForPush(resources) + require.Nil(t, err) + require.True(t, resources[0].Skip) } func TestParsePublic(t *testing.T) { diff --git a/src/pkg/reg/model/resource.go b/src/pkg/reg/model/resource.go index 1d22b83d703..48153a1483c 100644 --- a/src/pkg/reg/model/resource.go +++ b/src/pkg/reg/model/resource.go @@ -14,6 +14,11 @@ package model +import ( + "encoding/json" + "fmt" +) + // the resource type const ( ResourceTypeArtifact = "artifact" @@ -33,6 +38,9 @@ type Resource struct { IsDeleteTag bool `json:"is_delete_tag"` // indicate whether the resource can be overridden Override bool `json:"override"` + // Skip is a flag for resource which satisfies replication rules but should + // be skipped because of other limits like when dest project's type is proxy cache. + Skip bool `json:"-"` } // ResourceMetadata of resource @@ -55,3 +63,12 @@ type Artifact struct { Labels []string `json:"labels"` Tags []string `json:"tags"` } + +func (r *ResourceMetadata) String() string { + data, err := json.Marshal(r) + if err == nil { + return string(data) + } + + return fmt.Sprintf("repository: %+v, artifacts: %+v, tags: %+v", r.Repository, r.Artifacts, r.Vtags) +}