Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: skip replication to proxy cache project #16286

Merged
merged 1 commit into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 26 additions & 6 deletions src/controller/replication/flow/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

repctlmodel "github.com/goharbor/harbor/src/controller/replication/model"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/pkg/reg/model"
"github.com/goharbor/harbor/src/pkg/task"
Expand Down Expand Up @@ -103,12 +104,29 @@ func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) {
}

func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource, speed int32) error {
for i, resource := range srcResources {
src, err := json.Marshal(resource)
var taskCnt int
defer func() {
// if no task be created, mark execution done.
if taskCnt == 0 {
if err := c.executionMgr.MarkDone(ctx, c.executionID, "no resources need to be replicated"); err != nil {
logger.Errorf("failed to mark done for the execution %d: %v", c.executionID, err)
}
}
}()

for i, srcResource := range srcResources {
dstResource := dstResources[i]
// if dest resource should be skipped, ignore replicate.
if dstResource.Skip {
log.Warningf("skip create replication task because of dest limitation, src: %s, dst: %s", srcResource.Metadata, dstResource.Metadata)
continue
}

src, err := json.Marshal(srcResource)
if err != nil {
return err
}
dest, err := json.Marshal(dstResources[i])
dest, err := json.Marshal(dstResource)
if err != nil {
return err
}
Expand All @@ -127,11 +145,13 @@ func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources [

if _, err = c.taskMgr.Create(ctx, c.executionID, job, map[string]interface{}{
"operation": "copy",
"resource_type": string(resource.Type),
"source_resource": getResourceName(resource),
"destination_resource": getResourceName(dstResources[i])}); err != nil {
"resource_type": string(srcResource.Type),
"source_resource": getResourceName(srcResource),
"destination_resource": getResourceName(dstResource)}); err != nil {
return err
}

taskCnt++
chlins marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}
13 changes: 12 additions & 1 deletion src/controller/replication/flow/copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ func (c *copyFlowTestSuite) TestRun() {
},
Override: false,
},
{
Type: model.ResourceTypeArtifact,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: "proxy/hello-world",
},
Vtags: []string{"latest"},
},
Override: false,
Skip: true,
},
}, nil)
adp.On("PrepareForPush", mock.Anything).Return(nil)

Expand All @@ -60,7 +71,7 @@ func (c *copyFlowTestSuite) TestRun() {
}, nil)

taskMgr := &testingTask.Manager{}
taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil).Once()
policy := &repctlmodel.Policy{
SrcRegistry: &model.Registry{
Type: "TEST_FOR_COPY_FLOW",
Expand Down
4 changes: 3 additions & 1 deletion src/controller/replication/flow/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ package flow

import (
"fmt"
"github.com/goharbor/harbor/src/lib/errors"
"path"
"strings"

"github.com/goharbor/harbor/src/lib/errors"

repctlmodel "github.com/goharbor/harbor/src/controller/replication/model"
"github.com/goharbor/harbor/src/lib/log"
adp "github.com/goharbor/harbor/src/pkg/reg/adapter"
Expand Down Expand Up @@ -138,6 +139,7 @@ func assembleDestinationResources(resources []*model.Resource,
Deleted: resource.Deleted,
IsDeleteTag: resource.IsDeleteTag,
Override: policy.Override,
Skip: resource.Skip,
}
res.Metadata = &model.ResourceMetadata{
Repository: &model.Repository{
Expand Down
30 changes: 23 additions & 7 deletions src/pkg/reg/adapter/harbor/base/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,19 @@ func (a *Adapter) PrepareForPush(resources []*model.Resource) error {
return errors.Wrapf(err, "list projects with query %s", q)
}

existProjects := make(map[string]*Project)
proxyCacheProjects := make(map[string]bool)
existProjects := make(map[string]bool)
for _, p := range queryProjects {
existProjects[p.Name] = p
existProjects[p.Name] = true
// if project with registry_id, that means this is a proxy cache project.
if p.RegistryID > 0 {
chlins marked this conversation as resolved.
Show resolved Hide resolved
proxyCacheProjects[p.Name] = true
}
}

var notExistProjects []*Project
for _, p := range projects {
_, exist := existProjects[p.Name]
if !exist {
if !existProjects[p.Name] {
notExistProjects = append(notExistProjects, p)
}
}
Expand All @@ -205,6 +209,17 @@ func (a *Adapter) PrepareForPush(resources []*model.Resource) error {
}
log.Debugf("project %s created", project.Name)
}

// do filter for proxy cache projects.
for _, res := range resources {
paths := strings.Split(res.Metadata.Repository.Name, "/")
chlins marked this conversation as resolved.
Show resolved Hide resolved
projectName := paths[0]
if proxyCacheProjects[projectName] {
// set resource skip flag to true if it's a proxy cache project.
res.Skip = true
}
}

return nil
}

Expand Down Expand Up @@ -295,9 +310,10 @@ func parsePublic(metadata map[string]interface{}) bool {

// Project model
type Project struct {
ID int64 `json:"project_id"`
Name string `json:"name"`
Metadata map[string]interface{} `json:"metadata"`
ID int64 `json:"project_id"`
Name string `json:"name"`
Metadata map[string]interface{} `json:"metadata"`
RegistryID int64 `json:"registry_id"`
}

func isLocalHarbor(url string) bool {
Expand Down
27 changes: 27 additions & 0 deletions src/pkg/reg/adapter/harbor/base/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,33 @@ func TestPrepareForPush(t *testing.T) {
},
})
require.Nil(t, err)

// project already exists and the type is proxy cache
server = test.NewServer(&test.RequestHandlerMapping{
Method: http.MethodGet,
Pattern: "/api/projects",
Handler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`[{"name": "library", "registry_id": 1}]`))
},
})
registry = &model.Registry{
URL: server.URL,
}
adapter, err = New(registry)
require.Nil(t, err)
resources := []*model.Resource{
{
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: "library/hello-world",
},
},
},
}
err = adapter.PrepareForPush(resources)
require.Nil(t, err)
require.True(t, resources[0].Skip)
}

func TestParsePublic(t *testing.T) {
Expand Down
17 changes: 17 additions & 0 deletions src/pkg/reg/model/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@

package model

import (
"encoding/json"
"fmt"
)

// the resource type
const (
ResourceTypeArtifact = "artifact"
Expand All @@ -33,6 +38,9 @@ type Resource struct {
IsDeleteTag bool `json:"is_delete_tag"`
// indicate whether the resource can be overridden
Override bool `json:"override"`
// Skip is a flag for resource which satisfies replication rules but should
// be skipped because of other limits like when dest project's type is proxy cache.
Skip bool `json:"-"`
chlins marked this conversation as resolved.
Show resolved Hide resolved
}

// ResourceMetadata of resource
Expand All @@ -55,3 +63,12 @@ type Artifact struct {
Labels []string `json:"labels"`
Tags []string `json:"tags"`
}

func (r *ResourceMetadata) String() string {
data, err := json.Marshal(r)
if err == nil {
return string(data)
}

return fmt.Sprintf("repository: %+v, artifacts: %+v, tags: %+v", r.Repository, r.Artifacts, r.Vtags)
}