14 changes: 7 additions & 7 deletions src/common/const.go
Expand Up @@ -46,10 +46,10 @@ const (
ResourceTypeProject = "p"
ResourceTypeRepository = "r"
ResourceTypeImage = "i"
ResourceTypeChart = "c"

ExtEndpoint = "ext_endpoint"
AUTHMode = "auth_mode"
PrimaryAuthMode = "primary_auth_mode"
DatabaseType = "database_type"
PostGreSQLHOST = "postgresql_host"
PostGreSQLPort = "postgresql_port"
Expand Down Expand Up @@ -136,15 +136,14 @@ const (
LDAPGroupAdminDn = "ldap_group_admin_dn"
LDAPGroupMembershipAttribute = "ldap_group_membership_attribute"
DefaultRegistryControllerEndpoint = "http://registryctl:8080"
WithChartMuseum = "with_chartmuseum"
ChartRepoURL = "chart_repository_url"
DefaultChartRepoURL = "http://chartmuseum:9999"
DefaultPortalURL = "http://portal:8080"
DefaultRegistryCtlURL = "http://registryctl:8080"
// Use this prefix to distinguish harbor user, the prefix contains a special character($), so it cannot be registered as a harbor user.
RobotPrefix = "robot$"
// System admin defined the robot name prefix.
RobotNamePrefix = "robot_name_prefix"
// Scanner robot name prefix
RobotScannerNamePrefix = "robot_scanner_name_prefix"
// Use this prefix to index user who tries to login with web hook token.
AuthProxyUserNamePrefix = "tokenreview$"
CoreConfigPath = "/api/v2.0/internalconfig"
Expand All @@ -155,9 +154,6 @@ const (

AuthProxyRediretPath = "/c/authproxy/redirect"

ChartUploadCtxKey = contextKey("chart_upload_event")
ChartDownloadCtxKey = contextKey("chart_download_event")

// Global notification enable configuration
NotificationEnable = "notification_enable"

Expand Down Expand Up @@ -220,10 +216,14 @@ const (
SkipAuditLogDatabase = "skip_audit_log_database"
// MaxAuditRetentionHour allowed in audit log purge
MaxAuditRetentionHour = 240000
// ScannerSkipUpdatePullTime
ScannerSkipUpdatePullTime = "scanner_skip_update_pulltime"

// SessionTimeout defines the web session timeout
SessionTimeout = "session_timeout"

// UIMaxLengthLimitedOfNumber is the max length that UI limited for type number
UIMaxLengthLimitedOfNumber = 10
// ExecutionStatusRefreshIntervalSeconds is the interval seconds for refreshing execution status
ExecutionStatusRefreshIntervalSeconds = "execution_status_refresh_interval_seconds"
)
49 changes: 23 additions & 26 deletions src/common/rbac/const.go
Expand Up @@ -35,32 +35,29 @@ const (

// const resource variables
const (
ResourceAll = Resource("*") // resource match any other resources
ResourceConfiguration = Resource("configuration") // project configuration compatible for portal only
ResourceHelmChart = Resource("helm-chart")
ResourceHelmChartVersion = Resource("helm-chart-version")
ResourceHelmChartVersionLabel = Resource("helm-chart-version-label")
ResourceLabel = Resource("label")
ResourceLog = Resource("log")
ResourceLdapUser = Resource("ldap-user")
ResourceMember = Resource("member")
ResourceMetadata = Resource("metadata")
ResourceQuota = Resource("quota")
ResourceRepository = Resource("repository")
ResourceTagRetention = Resource("tag-retention")
ResourceImmutableTag = Resource("immutable-tag")
ResourceRobot = Resource("robot")
ResourceNotificationPolicy = Resource("notification-policy")
ResourceScan = Resource("scan")
ResourceScanner = Resource("scanner")
ResourceArtifact = Resource("artifact")
ResourceTag = Resource("tag")
ResourceAccessory = Resource("accessory")
ResourceArtifactAddition = Resource("artifact-addition")
ResourceArtifactLabel = Resource("artifact-label")
ResourcePreatPolicy = Resource("preheat-policy")
ResourcePreatInstance = Resource("preheat-instance")
ResourceSelf = Resource("") // subresource for self
ResourceAll = Resource("*") // resource match any other resources
ResourceConfiguration = Resource("configuration") // project configuration compatible for portal only
ResourceLabel = Resource("label")
ResourceLog = Resource("log")
ResourceLdapUser = Resource("ldap-user")
ResourceMember = Resource("member")
ResourceMetadata = Resource("metadata")
ResourceQuota = Resource("quota")
ResourceRepository = Resource("repository")
ResourceTagRetention = Resource("tag-retention")
ResourceImmutableTag = Resource("immutable-tag")
ResourceRobot = Resource("robot")
ResourceNotificationPolicy = Resource("notification-policy")
ResourceScan = Resource("scan")
ResourceScanner = Resource("scanner")
ResourceArtifact = Resource("artifact")
ResourceTag = Resource("tag")
ResourceAccessory = Resource("accessory")
ResourceArtifactAddition = Resource("artifact-addition")
ResourceArtifactLabel = Resource("artifact-label")
ResourcePreatPolicy = Resource("preheat-policy")
ResourcePreatInstance = Resource("preheat-instance")
ResourceSelf = Resource("") // subresource for self

ResourceAuditLog = Resource("audit-log")
ResourceCatalog = Resource("catalog")
Expand Down
52 changes: 0 additions & 52 deletions src/common/rbac/project/rbac_role.go
Expand Up @@ -68,20 +68,6 @@ var (
{Resource: rbac.ResourceImmutableTag, Action: rbac.ActionDelete},
{Resource: rbac.ResourceImmutableTag, Action: rbac.ActionList},

{Resource: rbac.ResourceHelmChart, Action: rbac.ActionCreate}, // upload helm chart
{Resource: rbac.ResourceHelmChart, Action: rbac.ActionRead}, // download helm chart
{Resource: rbac.ResourceHelmChart, Action: rbac.ActionDelete},
{Resource: rbac.ResourceHelmChart, Action: rbac.ActionList},

{Resource: rbac.ResourceHelmChartVersion, Action: rbac.ActionCreate}, // upload helm chart version
{Resource: rbac.ResourceHelmChartVersion, Action: rbac.ActionRead}, // read and download helm chart version
{Resource: rbac.ResourceHelmChartVersion, Action: rbac.ActionDelete},
{Resource: rbac.ResourceHelmChartVersion, Action: rbac.ActionList},

{Resource: rbac.ResourceHelmChartVersionLabel, Action: rbac.ActionCreate},
{Resource: rbac.ResourceHelmChartVersionLabel, Action: rbac.ActionDelete},
{Resource: rbac.ResourceHelmChartVersionLabel, Action: rbac.ActionList},

{Resource: rbac.ResourceConfiguration, Action: rbac.ActionRead},
{Resource: rbac.ResourceConfiguration, Action: rbac.ActionUpdate},

Expand Down Expand Up @@ -173,20 +159,6 @@ var (
{Resource: rbac.ResourceImmutableTag, Action: rbac.ActionDelete},
{Resource: rbac.ResourceImmutableTag, Action: rbac.ActionList},

{Resource: rbac.ResourceHelmChart, Action: rbac.ActionCreate},
{Resource: rbac.ResourceHelmChart, Action: rbac.ActionRead},
{Resource: rbac.ResourceHelmChart, Action: rbac.ActionDelete},
{Resource: rbac.ResourceHelmChart, Action: rbac.ActionList},

{Resource: rbac.ResourceHelmChartVersion, Action: rbac.ActionCreate},
{Resource: rbac.ResourceHelmChartVersion, Action: rbac.ActionRead},
{Resource: rbac.ResourceHelmChartVersion, Action: rbac.ActionDelete},
{Resource: rbac.ResourceHelmChartVersion, Action: rbac.ActionList},

{Resource: rbac.ResourceHelmChartVersionLabel, Action: rbac.ActionCreate},
{Resource: rbac.ResourceHelmChartVersionLabel, Action: rbac.ActionDelete},
{Resource: rbac.ResourceHelmChartVersionLabel, Action: rbac.ActionList},

{Resource: rbac.ResourceConfiguration, Action: rbac.ActionRead},

{Resource: rbac.ResourceRobot, Action: rbac.ActionRead},
Expand Down Expand Up @@ -245,18 +217,6 @@ var (
{Resource: rbac.ResourceTagRetention, Action: rbac.ActionList},
{Resource: rbac.ResourceTagRetention, Action: rbac.ActionOperate},

{Resource: rbac.ResourceHelmChart, Action: rbac.ActionCreate},
{Resource: rbac.ResourceHelmChart, Action: rbac.ActionRead},
{Resource: rbac.ResourceHelmChart, Action: rbac.ActionList},

{Resource: rbac.ResourceHelmChartVersion, Action: rbac.ActionCreate},
{Resource: rbac.ResourceHelmChartVersion, Action: rbac.ActionRead},
{Resource: rbac.ResourceHelmChartVersion, Action: rbac.ActionList},

{Resource: rbac.ResourceHelmChartVersionLabel, Action: rbac.ActionCreate},
{Resource: rbac.ResourceHelmChartVersionLabel, Action: rbac.ActionDelete},
{Resource: rbac.ResourceHelmChartVersionLabel, Action: rbac.ActionList},

{Resource: rbac.ResourceConfiguration, Action: rbac.ActionRead},

{Resource: rbac.ResourceRobot, Action: rbac.ActionRead},
Expand Down Expand Up @@ -301,12 +261,6 @@ var (
{Resource: rbac.ResourceRepository, Action: rbac.ActionList},
{Resource: rbac.ResourceRepository, Action: rbac.ActionPull},

{Resource: rbac.ResourceHelmChart, Action: rbac.ActionRead},
{Resource: rbac.ResourceHelmChart, Action: rbac.ActionList},

{Resource: rbac.ResourceHelmChartVersion, Action: rbac.ActionRead},
{Resource: rbac.ResourceHelmChartVersion, Action: rbac.ActionList},

{Resource: rbac.ResourceConfiguration, Action: rbac.ActionRead},

{Resource: rbac.ResourceRobot, Action: rbac.ActionRead},
Expand All @@ -332,12 +286,6 @@ var (
{Resource: rbac.ResourceRepository, Action: rbac.ActionList},
{Resource: rbac.ResourceRepository, Action: rbac.ActionPull},

{Resource: rbac.ResourceHelmChart, Action: rbac.ActionRead},
{Resource: rbac.ResourceHelmChart, Action: rbac.ActionList},

{Resource: rbac.ResourceHelmChartVersion, Action: rbac.ActionRead},
{Resource: rbac.ResourceHelmChartVersion, Action: rbac.ActionList},

{Resource: rbac.ResourceConfiguration, Action: rbac.ActionRead},

{Resource: rbac.ResourceScan, Action: rbac.ActionRead},
Expand Down
6 changes: 0 additions & 6 deletions src/common/rbac/project/rbac_util.go
Expand Up @@ -30,12 +30,6 @@ var (
{Resource: rbac.ResourceRepository, Action: rbac.ActionList},
{Resource: rbac.ResourceRepository, Action: rbac.ActionPull},

{Resource: rbac.ResourceHelmChart, Action: rbac.ActionRead},
{Resource: rbac.ResourceHelmChart, Action: rbac.ActionList},

{Resource: rbac.ResourceHelmChartVersion, Action: rbac.ActionRead},
{Resource: rbac.ResourceHelmChartVersion, Action: rbac.ActionList},

{Resource: rbac.ResourceScan, Action: rbac.ActionRead},
{Resource: rbac.ResourceScanner, Action: rbac.ActionRead},

Expand Down
10 changes: 9 additions & 1 deletion src/common/security/robot/context.go
Expand Up @@ -93,7 +93,7 @@ func (s *SecurityContext) Can(ctx context.Context, action types.Action, resource
accesses = append(accesses, &types.Policy{
Action: a.Action,
Effect: a.Effect,
Resource: types.Resource(fmt.Sprintf("%s/%s", p.Scope, a.Resource)),
Resource: types.Resource(getPolicyResource(p, a)),
})
}
}
Expand Down Expand Up @@ -138,3 +138,11 @@ func filterRobotPolicies(p *models.Project, policies []*types.Policy) []*types.P
}
return results
}

// getPolicyResource to determine permissions for the project resource, the path should be /project instead of /project/project.
func getPolicyResource(perm *robot.Permission, pol *types.Policy) string {
if strings.HasPrefix(perm.Scope, robot.SCOPEPROJECT) && pol.Resource == rbac.ResourceProject {
return perm.Scope
}
return fmt.Sprintf("%s/%s", perm.Scope, pol.Resource)
}
86 changes: 86 additions & 0 deletions src/common/security/robot/context_test.go
Expand Up @@ -242,3 +242,89 @@ func Test_filterRobotPolicies(t *testing.T) {
})
}
}

func Test_getPolicyResource(t *testing.T) {
type args struct {
perm *robot.Permission
poli *types.Policy
}
tests := []struct {
name string
args args
want string
}{
{
"project resource",
args{
&robot.Permission{
Kind: "project",
Namespace: "library",
Access: []*types.Policy{
{
Resource: rbac.Resource(fmt.Sprintf("project/%d/repository", private.ProjectID)),
Action: rbac.ActionPush,
},
{
Resource: rbac.Resource(fmt.Sprintf("project/%d/repository", private.ProjectID)),
Action: rbac.ActionPull,
},
},
Scope: fmt.Sprintf("/project/%d", private.ProjectID),
},
&types.Policy{Resource: "project", Action: "pull", Effect: "allow"},
},
fmt.Sprintf("/project/%d", private.ProjectID),
},
{
"project resource",
args{
&robot.Permission{
Kind: "project",
Namespace: "library",
Access: []*types.Policy{
{
Resource: rbac.Resource(fmt.Sprintf("project/%d/repository", private.ProjectID)),
Action: rbac.ActionPush,
},
{
Resource: rbac.Resource(fmt.Sprintf("project/%d/repository", private.ProjectID)),
Action: rbac.ActionPull,
},
},
Scope: fmt.Sprintf("/project/%d", private.ProjectID),
},
&types.Policy{Resource: "repository", Action: "get", Effect: "allow"},
},
fmt.Sprintf("/project/%d/repository", private.ProjectID),
},
{
"system resource",
args{
&robot.Permission{
Kind: "project",
Namespace: "library",
Access: []*types.Policy{
{
Resource: rbac.Resource(fmt.Sprintf("project/%d/repository", private.ProjectID)),
Action: rbac.ActionPush,
},
{
Resource: rbac.Resource(fmt.Sprintf("project/%d/repository", private.ProjectID)),
Action: rbac.ActionPull,
},
},
Scope: "/system",
},
&types.Policy{Resource: "repository", Action: "get", Effect: "allow"},
},
"/system/repository",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getPolicyResource(tt.args.perm, tt.args.poli); !reflect.DeepEqual(got, tt.want) {
t.Errorf("getPolicyResource() = %v, want %v", got, tt.want)
}
})
}
}
1 change: 0 additions & 1 deletion src/common/utils/test/test.go
Expand Up @@ -122,7 +122,6 @@ func GetUnitTestConfig() map[string]interface{} {
common.LDAPGroupSearchScope: 2,
common.LDAPGroupAdminDn: "cn=harbor_users,ou=groups,dc=example,dc=com",
common.WithNotary: "false",
common.WithChartMuseum: "false",
common.SelfRegistration: "true",
common.WithTrivy: "true",
common.TokenServiceURL: "http://core:8080/service/token",
Expand Down
26 changes: 26 additions & 0 deletions src/common/utils/utils.go
Expand Up @@ -300,3 +300,29 @@ func NextSchedule(cron string, curTime time.Time) time.Time {
func CronParser() cronlib.Parser {
return cronlib.NewParser(cronlib.Second | cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.Dow)
}

// MostMatchSorter is a sorter for the most match, usually invoked in sort Less function
// usage:
//
// sort.Slice(input, func(i, j int) bool {
// return MostMatchSorter(input[i].GroupName, input[j].GroupName, matchWord)
// })
// a is the field to be used for sorting, b is the other field, matchWord is the word to be matched
// the return value is true if a is less than b
// for example, search with "user", input is {"harbor_user", "user", "users, "admin_user"}
// it returns with this order {"user", "users", "admin_user", "harbor_user"}

func MostMatchSorter(a, b string, matchWord string) bool {
// exact match always first
if a == matchWord {
return true
}
if b == matchWord {
return false
}
// sort by length, then sort by alphabet
if len(a) == len(b) {
return a < b
}
return len(a) < len(b)
}
42 changes: 42 additions & 0 deletions src/common/utils/utils_test.go
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/base64"
"net/http/httptest"
"reflect"
"sort"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -394,3 +395,44 @@ func TestNextSchedule(t *testing.T) {
})
}
}

type UserGroupSearchItem struct {
GroupName string
}

func Test_sortMostMatch(t *testing.T) {
type args struct {
input []*UserGroupSearchItem
matchWord string
expected []*UserGroupSearchItem
}
tests := []struct {
name string
args args
}{
{"normal", args{[]*UserGroupSearchItem{
{GroupName: "user"}, {GroupName: "harbor_user"}, {GroupName: "admin_user"}, {GroupName: "users"},
}, "user", []*UserGroupSearchItem{
{GroupName: "user"}, {GroupName: "users"}, {GroupName: "admin_user"}, {GroupName: "harbor_user"},
}}},
{"duplicate_item", args{[]*UserGroupSearchItem{
{GroupName: "user"}, {GroupName: "user"}, {GroupName: "harbor_user"}, {GroupName: "admin_user"}, {GroupName: "users"},
}, "user", []*UserGroupSearchItem{
{GroupName: "user"}, {GroupName: "user"}, {GroupName: "users"}, {GroupName: "admin_user"}, {GroupName: "harbor_user"},
}}},
{"miss_exact_match", args{[]*UserGroupSearchItem{
{GroupName: "harbor_user"}, {GroupName: "admin_user"}, {GroupName: "users"},
}, "user", []*UserGroupSearchItem{
{GroupName: "users"}, {GroupName: "admin_user"}, {GroupName: "harbor_user"},
}}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

sort.Slice(tt.args.input, func(i, j int) bool {
return MostMatchSorter(tt.args.input[i].GroupName, tt.args.input[j].GroupName, tt.args.matchWord)
})
assert.True(t, reflect.DeepEqual(tt.args.input, tt.args.expected))
})
}
}
24 changes: 20 additions & 4 deletions src/controller/artifact/controller.go
Expand Up @@ -19,6 +19,7 @@ import (
"context"
stderrors "errors"
"fmt"
"os"
"strings"
"time"

Expand Down Expand Up @@ -156,12 +157,12 @@ func (c *controller) Ensure(ctx context.Context, repository, digest string, opti
}
if option != nil {
for _, tag := range option.Tags {
if err = c.tagCtl.Ensure(ctx, artifact.RepositoryID, artifact.ID, tag); err != nil {
if _, err = c.tagCtl.Ensure(ctx, artifact.RepositoryID, artifact.ID, tag); err != nil {
return false, 0, err
}
}
for _, acc := range option.Accs {
if err = c.accessoryMgr.Ensure(ctx, artifact.ID, acc.ArtifactID, acc.Size, acc.Digest, acc.Type); err != nil {
if err = c.accessoryMgr.Ensure(ctx, artifact.Digest, artifact.RepositoryName, artifact.ID, acc.ArtifactID, acc.Size, acc.Digest, acc.Type); err != nil {
return false, 0, err
}
}
Expand Down Expand Up @@ -487,6 +488,21 @@ func (c *controller) copyDeeply(ctx context.Context, srcRepo, reference, dstRepo

// copy accessory if contains any
for _, acc := range srcArt.Accessories {
accs, err := c.accessoryMgr.List(ctx, q.New(q.KeyWords{"SubjectArtifactRepo": srcRepo, "SubjectArtifactDigest": acc.GetData().Digest}))
if err != nil {
return 0, err
}
// copy the fork which root is the accessory self with a temp array
// to avoid infinite recursion, disable this part in UT.
if os.Getenv("UTTEST") != "true" {
if len(accs) > 0 {
tmpDstAccs := make([]*accessorymodel.AccessoryData, 0)
_, err = c.copyDeeply(ctx, srcRepo, acc.GetData().Digest, dstRepo, true, false, &tmpDstAccs)
if err != nil {
return 0, err
}
}
}
dstAcc := &accessorymodel.AccessoryData{
Digest: acc.GetData().Digest,
Type: acc.GetData().Type,
Expand Down Expand Up @@ -567,8 +583,8 @@ func (c *controller) AddLabel(ctx context.Context, artifactID int64, labelID int
LabelID: labelID,
Ctx: ctx,
}
if err := e.Build(metaData); err == nil {
if err := e.Publish(); err != nil {
if err := e.Build(ctx, metaData); err == nil {
if err := e.Publish(ctx); err != nil {
log.Error(errors.Wrap(err, "mark label to resource handler: event publish"))
}
} else {
Expand Down
22 changes: 11 additions & 11 deletions src/controller/artifact/controller_test.go
Expand Up @@ -137,10 +137,10 @@ func (c *controllerTestSuite) TestAssembleArtifact() {
}, nil)
acc := &basemodel.Default{
Data: accessorymodel.AccessoryData{
ID: 1,
ArtifactID: 2,
SubArtifactID: 1,
Type: accessorymodel.TypeCosignSignature,
ID: 1,
ArtifactID: 2,
SubArtifactDigest: "sha256:123",
Type: accessorymodel.TypeCosignSignature,
},
}
c.accMgr.On("List", mock.Anything, mock.Anything).Return([]accessorymodel.Accessory{
Expand Down Expand Up @@ -250,7 +250,7 @@ func (c *controllerTestSuite) TestEnsure() {
c.artMgr.On("GetByDigest", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.NotFoundError(nil))
c.artMgr.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil)
c.abstractor.On("AbstractMetadata").Return(nil)
c.tagCtl.On("Ensure").Return(nil)
c.tagCtl.On("Ensure").Return(int64(1), nil)
c.accMgr.On("Ensure").Return(nil)
_, id, err := c.ctl.Ensure(orm.NewContext(nil, &ormtesting.FakeOrmer{}), "library/hello-world", digest, &ArtOption{
Tags: []string{"latest"},
Expand Down Expand Up @@ -546,10 +546,10 @@ func (c *controllerTestSuite) TestCopy() {
}, nil)
acc := &basemodel.Default{
Data: accessorymodel.AccessoryData{
ID: 1,
ArtifactID: 2,
SubArtifactID: 1,
Type: accessorymodel.TypeCosignSignature,
ID: 1,
ArtifactID: 2,
SubArtifactDigest: "sha256:418fb88ec412e340cdbef913b8ca1bbe8f9e8dc705f9617414c1f2c8db980180",
Type: accessorymodel.TypeCosignSignature,
},
}
c.accMgr.On("List", mock.Anything, mock.Anything).Return([]accessorymodel.Accessory{
Expand All @@ -563,8 +563,8 @@ func (c *controllerTestSuite) TestCopy() {
c.abstractor.On("AbstractMetadata").Return(nil)
c.artMgr.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil)
c.regCli.On("Copy", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
c.tagCtl.On("Ensure").Return(nil)
c.accMgr.On("Ensure", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
c.tagCtl.On("Ensure").Return(int64(1), nil)
c.accMgr.On("Ensure", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
_, err := c.ctl.Copy(orm.NewContext(nil, &ormtesting.FakeOrmer{}), "library/hello-world", "latest", "library/hello-world2")
c.Require().Nil(err)
}
Expand Down
6 changes: 3 additions & 3 deletions src/controller/artifact/model_test.go
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestUnmarshalJSONWithACC(t *testing.T) {
data := []byte(`[{"accessories":[{"artifact_id":9,"creation_time":"2022-01-20T09:18:50.993Z","digest":"sha256:a7caa2636af890178a0b8c4cdbc47ced4dbdf29a1680e9e50823e85ce35b28d3","icon":"","id":4,"size":501,"subject_artifact_id":8,"type":"signature.cosign"}],
data := []byte(`[{"accessories":[{"artifact_id":9,"creation_time":"2022-01-20T09:18:50.993Z","digest":"sha256:a7caa2636af890178a0b8c4cdbc47ced4dbdf29a1680e9e50823e85ce35b28d3","icon":"","id":4,"size":501,"subject_artifact_digest":"sha256:e4b315ad03a1d1d9ff0c111e648a1a91066c09ead8352d3d6a48fa971a82922c","type":"signature.cosign"}],
"addition_links":{"build_history":{"absolute":false,"href":"/api/v2.0/projects/source_project011642670285/repositories/redis/artifacts/sha256:e4b315ad03a1d1d9ff0c111e648a1a91066c09ead8352d3d6a48fa971a82922c/additions/build_history"},
"vulnerabilities":{"absolute":false,"href":"/api/v2.0/projects/source_project011642670285/repositories/redis/artifacts/sha256:e4b315ad03a1d1d9ff0c111e648a1a91066c09ead8352d3d6a48fa971a82922c/additions/vulnerabilities"}},
"digest":"sha256:e4b315ad03a1d1d9ff0c111e648a1a91066c09ead8352d3d6a48fa971a82922c",
Expand All @@ -31,7 +31,7 @@ func TestUnmarshalJSONWithACC(t *testing.T) {
}

func TestUnmarshalJSONWithACCPartial(t *testing.T) {
data := []byte(`[{"accessories":[{"artifact_id":9,"creation_time":"2022-01-20T09:18:50.993Z","digest":"sha256:a7caa2636af890178a0b8c4cdbc47ced4dbdf29a1680e9e50823e85ce35b28d3","icon":"","id":4,"size":501,"subject_artifact_id":8,"type":"signature.cosign"}, {"artifact_id":2, "type":"signature.cosign"}],
data := []byte(`[{"accessories":[{"artifact_id":9,"creation_time":"2022-01-20T09:18:50.993Z","digest":"sha256:a7caa2636af890178a0b8c4cdbc47ced4dbdf29a1680e9e50823e85ce35b28d3","icon":"","id":4,"size":501,"subject_artifact_digest":"sha256:a7caa2636af890178a0b8c4cdbc47ced4dbdf29a1680e9e50823e85ce35b28d3","type":"signature.cosign"}, {"artifact_id":2, "type":"signature.cosign"}],
"digest":"sha256:e4b315ad03a1d1d9ff0c111e648a1a91066c09ead8352d3d6a48fa971a82922c","tags":[{"artifact_id":8,"id":6,"immutable":false,"name":"latest","pull_time":"2022-01-20T09:18:50.783Z","push_time":"2022-01-20T09:18:50.303Z","repository_id":5,"signed":false}],"type":"IMAGE"}]`)

var artifact []Artifact
Expand All @@ -47,7 +47,7 @@ func TestUnmarshalJSONWithACCPartial(t *testing.T) {
}

func TestUnmarshalJSONWithACCUnknownType(t *testing.T) {
data := []byte(`[{"accessories":[{"artifact_id":9,"creation_time":"2022-01-20T09:18:50.993Z","digest":"sha256:a7caa2636af890178a0b8c4cdbc47ced4dbdf29a1680e9e50823e85ce35b28d3","icon":"","id":4,"size":501,"subject_artifact_id":8}],
data := []byte(`[{"accessories":[{"artifact_id":9,"creation_time":"2022-01-20T09:18:50.993Z","digest":"sha256:a7caa2636af890178a0b8c4cdbc47ced4dbdf29a1680e9e50823e85ce35b28d3","icon":"","id":4,"size":501,"subject_artifact_digest":"sha256:a7caa2636af890178a0b8c4cdbc47ced4dbdf29a1680e9e50823e85ce35b28d3"}],
"digest":"sha256:e4b315ad03a1d1d9ff0c111e648a1a91066c09ead8352d3d6a48fa971a82922c","tags":[{"artifact_id":8,"id":6,"immutable":false,"name":"latest","pull_time":"2022-01-20T09:18:50.783Z","push_time":"2022-01-20T09:18:50.303Z","repository_id":5,"signed":false}],"type":"IMAGE"}]`)

var artifact []Artifact
Expand Down
2 changes: 1 addition & 1 deletion src/controller/artifact/processor/processor.go
Expand Up @@ -44,7 +44,7 @@ type Processor interface {
AbstractMetadata(ctx context.Context, artifact *artifact.Artifact, manifest []byte) error
// AbstractAddition abstracts the addition of the artifact.
// The additions are different for different artifacts:
// build history for image; values.yaml, readme and dependencies for chart, etc
// build history for image;
AbstractAddition(ctx context.Context, artifact *artifact.Artifact, additionType string) (addition *Addition, err error)
}

Expand Down
2 changes: 1 addition & 1 deletion src/controller/event/handler/auditlog/auditlog_test.go
Expand Up @@ -81,7 +81,7 @@ func (suite *AuditLogHandlerTestSuite) TestSubscribeTagEvent() {

notifier.Subscribe(event.TopicCreateProject, suite.auditLogHandler)
// event data should implement the interface TopicEvent
ne.BuildAndPublish(&metadata.CreateProjectEventMetadata{
ne.BuildAndPublish(context.TODO(), &metadata.CreateProjectEventMetadata{
ProjectID: 1,
Project: "test",
Operator: "admin",
Expand Down
6 changes: 1 addition & 5 deletions src/controller/event/handler/init.go
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/goharbor/harbor/src/controller/event/handler/p2p"
"github.com/goharbor/harbor/src/controller/event/handler/replication"
"github.com/goharbor/harbor/src/controller/event/handler/webhook/artifact"
"github.com/goharbor/harbor/src/controller/event/handler/webhook/chart"
"github.com/goharbor/harbor/src/controller/event/handler/webhook/quota"
"github.com/goharbor/harbor/src/controller/event/handler/webhook/scan"
"github.com/goharbor/harbor/src/controller/event/metadata"
Expand All @@ -24,9 +23,6 @@ func init() {
_ = notifier.Subscribe(event.TopicPushArtifact, &artifact.Handler{})
_ = notifier.Subscribe(event.TopicPullArtifact, &artifact.Handler{})
_ = notifier.Subscribe(event.TopicDeleteArtifact, &artifact.Handler{})
_ = notifier.Subscribe(event.TopicUploadChart, &chart.Handler{})
_ = notifier.Subscribe(event.TopicDeleteChart, &chart.Handler{})
_ = notifier.Subscribe(event.TopicDownloadChart, &chart.Handler{})
_ = notifier.Subscribe(event.TopicQuotaExceed, &quota.Handler{})
_ = notifier.Subscribe(event.TopicQuotaWarning, &quota.Handler{})
_ = notifier.Subscribe(event.TopicScanningFailed, &scan.Handler{})
Expand Down Expand Up @@ -61,7 +57,7 @@ func init() {
_ = notifier.Subscribe(event.TopicPullArtifact, &internal.Handler{})
_ = notifier.Subscribe(event.TopicPushArtifact, &internal.Handler{})

_ = task.RegisterTaskStatusChangePostFunc(job.Replication, func(ctx context.Context, taskID int64, status string) error {
_ = task.RegisterTaskStatusChangePostFunc(job.ReplicationVendorType, func(ctx context.Context, taskID int64, status string) error {
notification.AddEvent(ctx, &metadata.ReplicationMetaData{
ReplicationTaskID: taskID,
Status: status,
Expand Down
23 changes: 23 additions & 0 deletions src/controller/event/handler/internal/artifact.go
Expand Up @@ -99,6 +99,9 @@ func (a *Handler) IsStateful() bool {
}

func (a *Handler) onPull(ctx context.Context, event *event.ArtifactEvent) error {
if config.ScannerSkipUpdatePullTime(ctx) && isScannerUser(ctx, event) {
return nil
}
// if duration is equal to 0 or negative, keep original sync mode.
if asyncFlushDuration <= 0 {
var tagName string
Expand Down Expand Up @@ -240,3 +243,23 @@ func (a *Handler) onPush(ctx context.Context, event *event.ArtifactEvent) error

return nil
}

// isScannerUser check if the current user is a scanner user by its prefix
// usually a scanner user should be named like `robot$<projectName>+<Scanner UUID (8byte)>-<Scanner Name>-<UUID>`
// verify it by the prefix `robot$<projectName>+<Scanner UUID (8byte)>`
func isScannerUser(ctx context.Context, event *event.ArtifactEvent) bool {
if len(event.Operator) == 0 {
return false
}
robotPrefix := config.RobotPrefix(ctx)
scannerPrefix := config.ScannerRobotPrefix(ctx)
prefix := fmt.Sprintf("%s%s+%s", robotPrefix, parseProjectName(event.Repository), scannerPrefix)
return strings.HasPrefix(event.Operator, prefix)
}

func parseProjectName(repoName string) string {
if strings.Contains(repoName, "/") {
return strings.Split(repoName, "/")[0]
}
return ""
}
60 changes: 58 additions & 2 deletions src/controller/event/handler/internal/artifact_test.go
Expand Up @@ -24,22 +24,28 @@ import (

common_dao "github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/controller/event"
"github.com/goharbor/harbor/src/controller/scanner"
"github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/pkg"
"github.com/goharbor/harbor/src/pkg/artifact"
_ "github.com/goharbor/harbor/src/pkg/config/db"
"github.com/goharbor/harbor/src/pkg/project"
"github.com/goharbor/harbor/src/pkg/repository/model"
"github.com/goharbor/harbor/src/pkg/tag"
tagmodel "github.com/goharbor/harbor/src/pkg/tag/model/tag"
scannerCtlMock "github.com/goharbor/harbor/src/testing/controller/scanner"
projectMock "github.com/goharbor/harbor/src/testing/pkg/project"
)

// ArtifactHandlerTestSuite is test suite for artifact handler.
type ArtifactHandlerTestSuite struct {
suite.Suite

ctx context.Context
handler *Handler
ctx context.Context
handler *Handler
projectManager project.Manager
scannerCtl scanner.Controller
}

// TestArtifactHandler tests ArtifactHandler.
Expand All @@ -53,6 +59,8 @@ func (suite *ArtifactHandlerTestSuite) SetupSuite() {
config.Init()
suite.handler = &Handler{}
suite.ctx = orm.NewContext(context.TODO(), beegoorm.NewOrm())
suite.projectManager = &projectMock.Manager{}
suite.scannerCtl = &scannerCtlMock.Controller{}

// mock artifact
_, err := pkg.ArtifactMgr.Create(suite.ctx, &artifact.Artifact{ID: 1, RepositoryID: 1})
Expand Down Expand Up @@ -143,3 +151,51 @@ func (suite *ArtifactHandlerTestSuite) TestOnPull() {
return int64(2) == repository.PullCount
}, 3*asyncFlushDuration, asyncFlushDuration/2, "wait for pull_count async update")
}

func (suite *ArtifactHandlerTestSuite) TestIsScannerUser() {
type args struct {
prefix string
event *event.ArtifactEvent
}
tests := []struct {
name string
args args
want bool
}{
{"normal_true", args{"robot$", &event.ArtifactEvent{Operator: "robot$library+scanner+Trivy-2e6240a1-f3be-11ec-8fba-0242ac1e0009", Repository: "library/nginx"}}, true},
{"no_scanner_prefix_false", args{"robot$", &event.ArtifactEvent{Operator: "robot$library+Trivy-2e6240a1-f3be-11ec-8fba-0242ac1e0009", Repository: "library/nginx"}}, false},
{"operator_empty", args{"robot$", &event.ArtifactEvent{Operator: "", Repository: "library/nginx"}}, false},
{"normal_user", args{"robot$", &event.ArtifactEvent{Operator: "Trivy_sample", Repository: "library/nginx"}}, false},
{"normal_user_with_robotname", args{"robot$", &event.ArtifactEvent{Operator: "robot_Trivy", Repository: "library/nginx"}}, false},
}

for _, tt := range tests {
suite.Run(tt.name, func() {
if got := isScannerUser(suite.ctx, tt.args.event); got != tt.want {
suite.Errorf(nil, "isScannerUser() = %v, want %v", got, tt.want)
}
})
}
}

func Test_parseProjectName(t *testing.T) {
type args struct {
repoName string
}
tests := []struct {
name string
args args
want string
}{
{"normal repo name", args{"library/nginx"}, "library"},
{"three levels of repository", args{"library/nginx/nginx"}, "library"},
{"repo name without project name", args{"nginx"}, ""},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := parseProjectName(tt.args.repoName); got != tt.want {
t.Errorf("parseProjectName() = %v, want %v", got, tt.want)
}
})
}
}
4 changes: 1 addition & 3 deletions src/controller/event/handler/replication/event/event.go
Expand Up @@ -21,11 +21,9 @@ const (
EventTypeArtifactPush = "artifact_push"
EventTypeArtifactDelete = "artifact_delete"
EventTypeTagDelete = "tag_delete"
EventTypeChartUpload = "chart_upload"
EventTypeChartDelete = "chart_delete"
)

// Event is the model that defines the image/chart pull/push event
// Event is the model that defines the image pull/push event
type Event struct {
Type string
Resource *model.Resource
Expand Down
3 changes: 1 addition & 2 deletions src/controller/event/handler/replication/event/handler.go
Expand Up @@ -37,8 +37,7 @@ func Handle(ctx context.Context, event *Event) error {
var policies []*repctlmodel.Policy
var err error
switch event.Type {
case EventTypeArtifactPush, EventTypeChartUpload, EventTypeTagDelete,
EventTypeArtifactDelete, EventTypeChartDelete:
case EventTypeArtifactPush, EventTypeTagDelete, EventTypeArtifactDelete:
policies, err = getRelatedPolicies(ctx, event.Resource)
default:
return fmt.Errorf("unsupported event type %s", event.Type)
Expand Down
11 changes: 6 additions & 5 deletions src/controller/event/handler/util/util.go
@@ -1,23 +1,23 @@
package util

import (
"context"
"errors"
"fmt"
"strings"

"github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/pkg/distribution"
policy_model "github.com/goharbor/harbor/src/pkg/notification/policy/model"
"github.com/goharbor/harbor/src/pkg/notifier/event"
notifyModel "github.com/goharbor/harbor/src/pkg/notifier/model"
)

// SendHookWithPolicies send hook by publishing topic of specified target type(notify type)
func SendHookWithPolicies(policies []*policy_model.Policy, payload *notifyModel.Payload, eventType string) error {
func SendHookWithPolicies(ctx context.Context, policies []*policy_model.Policy, payload *notifyModel.Payload, eventType string) error {
// if global notification configured disabled, return directly
if !config.NotificationEnable(orm.Context()) {
if !config.NotificationEnable(ctx) {
log.Debug("notification feature is not enabled")
return nil
}
Expand All @@ -28,14 +28,15 @@ func SendHookWithPolicies(policies []*policy_model.Policy, payload *notifyModel.
for _, target := range targets {
evt := &event.Event{}
hookMetadata := &event.HookMetaData{
ProjectID: ply.ProjectID,
EventType: eventType,
PolicyID: ply.ID,
Payload: payload,
Target: &target,
}
// It should never affect evaluating other policies when one is failed, but error should return
if err := evt.Build(hookMetadata); err == nil {
if err := evt.Publish(); err != nil {
if err := evt.Build(ctx, hookMetadata); err == nil {
if err := evt.Publish(ctx); err != nil {
errRet = true
log.Errorf("failed to publish hook notify event: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion src/controller/event/handler/webhook/artifact/artifact.go
Expand Up @@ -88,7 +88,7 @@ func (a *Handler) handle(ctx context.Context, event *event.ArtifactEvent) error
return err
}

err = util.SendHookWithPolicies(policies, payload, event.EventType)
err = util.SendHookWithPolicies(ctx, policies, payload, event.EventType)
if err != nil {
return err
}
Expand Down
12 changes: 5 additions & 7 deletions src/controller/event/handler/webhook/artifact/replication.go
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/pkg/notification"
"github.com/goharbor/harbor/src/pkg/notifier/model"
proModels "github.com/goharbor/harbor/src/pkg/project/models"
Expand Down Expand Up @@ -46,12 +45,12 @@ func (r *ReplicationHandler) Handle(ctx context.Context, value interface{}) erro
return fmt.Errorf("nil replication event")
}

payload, project, err := constructReplicationPayload(rpEvent)
payload, project, err := constructReplicationPayload(ctx, rpEvent)
if err != nil {
return err
}

policies, err := notification.PolicyMgr.GetRelatedPolices(orm.Context(), project.ProjectID, rpEvent.EventType)
policies, err := notification.PolicyMgr.GetRelatedPolices(ctx, project.ProjectID, rpEvent.EventType)
if err != nil {
log.Errorf("failed to find policy for %s event: %v", rpEvent.EventType, err)
return err
Expand All @@ -60,7 +59,7 @@ func (r *ReplicationHandler) Handle(ctx context.Context, value interface{}) erro
log.Debugf("cannot find policy for %s event: %v", rpEvent.EventType, rpEvent)
return nil
}
err = util.SendHookWithPolicies(policies, payload, rpEvent.EventType)
err = util.SendHookWithPolicies(ctx, policies, payload, rpEvent.EventType)
if err != nil {
return err
}
Expand All @@ -72,8 +71,7 @@ func (r *ReplicationHandler) IsStateful() bool {
return false
}

func constructReplicationPayload(event *event.ReplicationEvent) (*model.Payload, *proModels.Project, error) {
ctx := orm.Context()
func constructReplicationPayload(ctx context.Context, event *event.ReplicationEvent) (*model.Payload, *proModels.Project, error) {
task, err := replication.Ctl.GetTask(ctx, event.ReplicationTaskID)
if err != nil {
log.Errorf("failed to get replication task %d: error: %v", event.ReplicationTaskID, err)
Expand Down Expand Up @@ -191,7 +189,7 @@ func constructReplicationPayload(event *event.ReplicationEvent) (*model.Payload,
payload.EventData.Replication.FailedArtifact = []*ctlModel.ArtifactInfo{failedArtifact}
}

prj, err := project.Ctl.GetByName(orm.Context(), prjName, project.Metadata(true))
prj, err := project.Ctl.GetByName(ctx, prjName, project.Metadata(true))
if err != nil {
log.Errorf("failed to get project %s, error: %v", prjName, err)
return nil, nil, err
Expand Down
Expand Up @@ -15,7 +15,6 @@
package artifact

import (
"context"
"testing"
"time"

Expand All @@ -28,6 +27,7 @@ import (
repctl "github.com/goharbor/harbor/src/controller/replication"
repctlmodel "github.com/goharbor/harbor/src/controller/replication/model"
"github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/lib/orm"
_ "github.com/goharbor/harbor/src/pkg/config/db"
_ "github.com/goharbor/harbor/src/pkg/config/inmemory"
"github.com/goharbor/harbor/src/pkg/notification"
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestReplicationHandler_Handle(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := handler.Handle(context.TODO(), tt.args.data)
err := handler.Handle(orm.Context(), tt.args.data)
if tt.wantErr {
require.NotNil(t, err, "Error: %s", err)
return
Expand Down
17 changes: 9 additions & 8 deletions src/controller/event/handler/webhook/artifact/retention.go
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/pkg/notification"
"github.com/goharbor/harbor/src/pkg/notifier/model"
)
Expand Down Expand Up @@ -45,7 +44,7 @@ func (r *RetentionHandler) Handle(ctx context.Context, value interface{}) error
return nil
}

payload, dryRun, project, err := r.constructRetentionPayload(trEvent)
payload, dryRun, project, err := r.constructRetentionPayload(ctx, trEvent)
if err != nil {
return err
}
Expand All @@ -64,7 +63,7 @@ func (r *RetentionHandler) Handle(ctx context.Context, value interface{}) error
log.Debugf("cannot find policy for %s event: %v", trEvent.EventType, trEvent)
return nil
}
err = util.SendHookWithPolicies(policies, payload, trEvent.EventType)
err = util.SendHookWithPolicies(ctx, policies, payload, trEvent.EventType)
if err != nil {
return err
}
Expand All @@ -76,8 +75,7 @@ func (r *RetentionHandler) IsStateful() bool {
return false
}

func (r *RetentionHandler) constructRetentionPayload(event *event.RetentionEvent) (*model.Payload, bool, int64, error) {
ctx := orm.Context()
func (r *RetentionHandler) constructRetentionPayload(ctx context.Context, event *event.RetentionEvent) (*model.Payload, bool, int64, error) {
task, err := retention.Ctl.GetRetentionExecTask(ctx, event.TaskID)
if err != nil {
log.Errorf("failed to get retention task %d: error: %v", event.TaskID, err)
Expand Down Expand Up @@ -121,8 +119,8 @@ func (r *RetentionHandler) constructRetentionPayload(event *event.RetentionEvent
Operator: execution.Trigger,
EventData: &model.EventData{
Retention: &evtModel.Retention{
Total: task.Total,
Retained: task.Retained,
Total: event.Total,
Retained: event.Retained,
HarborHostname: hostname,
ProjectName: event.Deleted[0].Target.Namespace,
RetentionPolicyID: execution.PolicyID,
Expand All @@ -140,8 +138,11 @@ func (r *RetentionHandler) constructRetentionPayload(event *event.RetentionEvent
}
if len(target.Tags) != 0 {
deletedArtifact.NameAndTag = target.Repository + ":" + target.Tags[0]
} else {
// use digest if no tag
deletedArtifact.NameAndTag = target.Repository + "@" + target.Digest
}
payload.EventData.Retention.DeletedArtifact = []*evtModel.ArtifactInfo{deletedArtifact}
payload.EventData.Retention.DeletedArtifact = append(payload.EventData.Retention.DeletedArtifact, deletedArtifact)
}

for _, v := range md.Rules {
Expand Down
@@ -1,7 +1,6 @@
package artifact

import (
"context"
"os"
"testing"
"time"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/goharbor/harbor/src/controller/event"
"github.com/goharbor/harbor/src/controller/retention"
"github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/selector"
"github.com/goharbor/harbor/src/pkg/notification"
policy_model "github.com/goharbor/harbor/src/pkg/notification/policy/model"
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestRetentionHandler_Handle(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := handler.Handle(context.TODO(), tt.args.data)
err := handler.Handle(orm.Context(), tt.args.data)
if tt.wantErr {
require.NotNil(t, err, "Error: %s", err)
return
Expand Down
122 changes: 0 additions & 122 deletions src/controller/event/handler/webhook/chart/chart.go

This file was deleted.

183 changes: 0 additions & 183 deletions src/controller/event/handler/webhook/chart/chart_test.go

This file was deleted.

5 changes: 2 additions & 3 deletions src/controller/event/handler/webhook/quota/quota.go
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/goharbor/harbor/src/controller/event/handler/util"
"github.com/goharbor/harbor/src/controller/project"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/pkg/notification"
notifyModel "github.com/goharbor/harbor/src/pkg/notifier/model"
proModels "github.com/goharbor/harbor/src/pkg/project/models"
Expand All @@ -48,7 +47,7 @@ func (qp *Handler) Handle(ctx context.Context, value interface{}) error {
return fmt.Errorf("nil quota event")
}

prj, err := project.Ctl.GetByName(orm.Context(), quotaEvent.Project.Name, project.Metadata(true))
prj, err := project.Ctl.GetByName(ctx, quotaEvent.Project.Name, project.Metadata(true))
if err != nil {
log.Errorf("failed to get project:%s, error: %v", quotaEvent.Project.Name, err)
return err
Expand All @@ -69,7 +68,7 @@ func (qp *Handler) Handle(ctx context.Context, value interface{}) error {
return err
}

err = util.SendHookWithPolicies(policies, payload, quotaEvent.EventType)
err = util.SendHookWithPolicies(ctx, policies, payload, quotaEvent.EventType)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion src/controller/event/handler/webhook/quota/quota_test.go
Expand Up @@ -26,6 +26,7 @@ import (
common_dao "github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/controller/event"
"github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/lib/orm"
_ "github.com/goharbor/harbor/src/pkg/config/inmemory"
"github.com/goharbor/harbor/src/pkg/notification"
"github.com/goharbor/harbor/src/pkg/notification/policy"
Expand Down Expand Up @@ -96,7 +97,7 @@ func (suite *QuotaPreprocessHandlerSuite) TearDownSuite() {
// TestHandle ...
func (suite *QuotaPreprocessHandlerSuite) TestHandle() {
handler := &Handler{}
err := handler.Handle(context.TODO(), suite.evt)
err := handler.Handle(orm.Context(), suite.evt)
suite.NoError(err)
}

Expand Down
13 changes: 4 additions & 9 deletions src/controller/event/handler/webhook/scan/scan.go
Expand Up @@ -18,16 +18,13 @@ import (
"context"
"time"

o "github.com/beego/beego/v2/client/orm"

"github.com/goharbor/harbor/src/controller/artifact"
"github.com/goharbor/harbor/src/controller/event"
"github.com/goharbor/harbor/src/controller/event/handler/util"
"github.com/goharbor/harbor/src/controller/project"
"github.com/goharbor/harbor/src/controller/scan"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/pkg/notification"
"github.com/goharbor/harbor/src/pkg/notifier/model"
proModels "github.com/goharbor/harbor/src/pkg/project/models"
Expand Down Expand Up @@ -66,17 +63,17 @@ func (si *Handler) Handle(ctx context.Context, value interface{}) error {
}

// Get project
prj, err := project.Ctl.Get(orm.Context(), e.Artifact.NamespaceID, project.Metadata(true))
prj, err := project.Ctl.Get(ctx, e.Artifact.NamespaceID, project.Metadata(true))
if err != nil {
return errors.Wrap(err, "scan preprocess handler")
}

payload, err := constructScanImagePayload(e, prj)
payload, err := constructScanImagePayload(ctx, e, prj)
if err != nil {
return errors.Wrap(err, "scan preprocess handler")
}

err = util.SendHookWithPolicies(policies, payload, e.EventType)
err = util.SendHookWithPolicies(ctx, policies, payload, e.EventType)
if err != nil {
return errors.Wrap(err, "scan preprocess handler")
}
Expand All @@ -89,7 +86,7 @@ func (si *Handler) IsStateful() bool {
return false
}

func constructScanImagePayload(event *event.ScanImageEvent, project *proModels.Project) (*model.Payload, error) {
func constructScanImagePayload(ctx context.Context, event *event.ScanImageEvent, project *proModels.Project) (*model.Payload, error) {
repoType := proModels.ProjectPrivate
if project.IsPublic() {
repoType = proModels.ProjectPublic
Expand Down Expand Up @@ -121,8 +118,6 @@ func constructScanImagePayload(event *event.ScanImageEvent, project *proModels.P
return nil, errors.Wrap(err, "construct scan payload")
}

ctx := orm.NewContext(context.TODO(), o.NewOrm())

art, err := artifact.Ctl.GetByReference(ctx, event.Artifact.Repository, event.Artifact.Digest, nil)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion src/controller/event/handler/webhook/scan/scan_test.go
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/goharbor/harbor/src/controller/event"
sc "github.com/goharbor/harbor/src/controller/scan"
"github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/lib/orm"
_ "github.com/goharbor/harbor/src/pkg/config/inmemory"
"github.com/goharbor/harbor/src/pkg/notification"
"github.com/goharbor/harbor/src/pkg/notification/policy"
Expand Down Expand Up @@ -137,7 +138,7 @@ func (suite *ScanImagePreprocessHandlerSuite) TearDownSuite() {
func (suite *ScanImagePreprocessHandlerSuite) TestHandle() {
handler := &Handler{}

err := handler.Handle(context.TODO(), suite.evt)
err := handler.Handle(orm.Context(), suite.evt)
suite.NoError(err)
}

Expand Down
76 changes: 0 additions & 76 deletions src/controller/event/metadata/chart.go

This file was deleted.

73 changes: 0 additions & 73 deletions src/controller/event/metadata/chart_test.go

This file was deleted.

2 changes: 2 additions & 0 deletions src/controller/event/metadata/retention.go
Expand Up @@ -25,6 +25,8 @@ func (r *RetentionMetaData) Resolve(evt *event.Event) error {
Status: r.Status,
Deleted: r.Deleted,
TaskID: r.TaskID,
Total: r.Total,
Retained: r.Retained,
}

evt.Topic = event2.TopicTagRetention
Expand Down
20 changes: 2 additions & 18 deletions src/controller/event/topic.go
Expand Up @@ -44,9 +44,6 @@ const (
// QuotaExceedTopic is topic for quota warning event, the usage reaches the warning bar of limitation, like 85%
TopicQuotaWarning = "QUOTA_WARNING"
TopicQuotaExceed = "QUOTA_EXCEED"
TopicUploadChart = "UPLOAD_CHART"
TopicDownloadChart = "DOWNLOAD_CHART"
TopicDeleteChart = "DELETE_CHART"
TopicReplication = "REPLICATION"
TopicArtifactLabeled = "ARTIFACT_LABELED"
TopicTagRetention = "TAG_RETENTION"
Expand Down Expand Up @@ -302,21 +299,6 @@ func (s *ScanImageEvent) String() string {
s.Artifact, s.Operator, s.OccurAt.Format("2006-01-02 15:04:05"))
}

// ChartEvent is chart related event data to publish
type ChartEvent struct {
EventType string
ProjectName string
ChartName string
Versions []string
OccurAt time.Time
Operator string
}

func (c *ChartEvent) String() string {
return fmt.Sprintf("ProjectName-%s ChartName-%s Versions-%s Operator-%s OccurAt-%s",
c.ProjectName, c.ChartName, c.Versions, c.Operator, c.OccurAt.Format("2006-01-02 15:04:05"))
}

// QuotaEvent is project quota related event data to publish
type QuotaEvent struct {
EventType string
Expand Down Expand Up @@ -371,6 +353,8 @@ type RetentionEvent struct {
OccurAt time.Time
Status string
Deleted []*selector.Result
Total int
Retained int
}

func (r *RetentionEvent) String() string {
Expand Down
4 changes: 2 additions & 2 deletions src/controller/gc/callback.go
Expand Up @@ -29,12 +29,12 @@ import (
)

func init() {
err := scheduler.RegisterCallbackFunc(SchedulerCallback, gcCallback)
err := scheduler.RegisterCallbackFunc(job.GarbageCollectionVendorType, gcCallback)
if err != nil {
log.Fatalf("failed to registry GC call back, %v", err)
}

if err := task.RegisterTaskStatusChangePostFunc(GCVendorType, gcTaskStatusChange); err != nil {
if err := task.RegisterTaskStatusChangePostFunc(job.GarbageCollectionVendorType, gcTaskStatusChange); err != nil {
log.Fatalf("failed to register the task status change post for the gc job, error %v", err)
}
}
Expand Down
32 changes: 10 additions & 22 deletions src/controller/gc/controller.go
Expand Up @@ -10,23 +10,11 @@ import (
"github.com/goharbor/harbor/src/pkg/task"
)

func init() {
// keep only the latest created 50 gc execution records
task.SetExecutionSweeperCount(GCVendorType, 50)
}

var (
// Ctl is a global garbage collection controller instance
Ctl = NewController()
)

const (
// SchedulerCallback ...
SchedulerCallback = "GARBAGE_COLLECTION"
// GCVendorType ...
GCVendorType = "GARBAGE_COLLECTION"
)

// Controller manages the tags
type Controller interface {
// Start start a manual gc job
Expand Down Expand Up @@ -79,12 +67,12 @@ func (c *controller) Start(ctx context.Context, policy Policy, trigger string) (
para["redis_url_reg"] = policy.ExtraAttrs["redis_url_reg"]
para["time_window"] = policy.ExtraAttrs["time_window"]

execID, err := c.exeMgr.Create(ctx, GCVendorType, -1, trigger, para)
execID, err := c.exeMgr.Create(ctx, job.GarbageCollectionVendorType, -1, trigger, para)
if err != nil {
return -1, err
}
_, err = c.taskMgr.Create(ctx, execID, &task.Job{
Name: job.GarbageCollection,
Name: job.GarbageCollectionVendorType,
Metadata: &job.Metadata{
JobKind: job.KindGeneric,
},
Expand All @@ -103,14 +91,14 @@ func (c *controller) Stop(ctx context.Context, id int64) error {

// ExecutionCount ...
func (c *controller) ExecutionCount(ctx context.Context, query *q.Query) (int64, error) {
query.Keywords["VendorType"] = GCVendorType
query.Keywords["VendorType"] = job.GarbageCollectionVendorType
return c.exeMgr.Count(ctx, query)
}

// ListExecutions ...
func (c *controller) ListExecutions(ctx context.Context, query *q.Query) ([]*Execution, error) {
query = q.MustClone(query)
query.Keywords["VendorType"] = GCVendorType
query.Keywords["VendorType"] = job.GarbageCollectionVendorType

execs, err := c.exeMgr.List(ctx, query)
if err != nil {
Expand All @@ -128,7 +116,7 @@ func (c *controller) GetExecution(ctx context.Context, id int64) (*Execution, er
execs, err := c.exeMgr.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"ID": id,
"VendorType": GCVendorType,
"VendorType": job.GarbageCollectionVendorType,
},
})
if err != nil {
Expand All @@ -146,7 +134,7 @@ func (c *controller) GetTask(ctx context.Context, id int64) (*Task, error) {
tasks, err := c.taskMgr.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"ID": id,
"VendorType": GCVendorType,
"VendorType": job.GarbageCollectionVendorType,
},
})
if err != nil {
Expand All @@ -162,7 +150,7 @@ func (c *controller) GetTask(ctx context.Context, id int64) (*Task, error) {
// ListTasks ...
func (c *controller) ListTasks(ctx context.Context, query *q.Query) ([]*Task, error) {
query = q.MustClone(query)
query.Keywords["VendorType"] = GCVendorType
query.Keywords["VendorType"] = job.GarbageCollectionVendorType
tks, err := c.taskMgr.List(ctx, query)
if err != nil {
return nil, err
Expand All @@ -185,7 +173,7 @@ func (c *controller) GetTaskLog(ctx context.Context, id int64) ([]byte, error) {

// GetSchedule ...
func (c *controller) GetSchedule(ctx context.Context) (*scheduler.Schedule, error) {
sch, err := c.schedulerMgr.ListSchedules(ctx, q.New(q.KeyWords{"VendorType": GCVendorType}))
sch, err := c.schedulerMgr.ListSchedules(ctx, q.New(q.KeyWords{"VendorType": job.GarbageCollectionVendorType}))
if err != nil {
return nil, err
}
Expand All @@ -202,12 +190,12 @@ func (c *controller) GetSchedule(ctx context.Context) (*scheduler.Schedule, erro
func (c *controller) CreateSchedule(ctx context.Context, cronType, cron string, policy Policy) (int64, error) {
extras := make(map[string]interface{})
extras["delete_untagged"] = policy.DeleteUntagged
return c.schedulerMgr.Schedule(ctx, GCVendorType, -1, cronType, cron, SchedulerCallback, policy, extras)
return c.schedulerMgr.Schedule(ctx, job.GarbageCollectionVendorType, -1, cronType, cron, job.GarbageCollectionVendorType, policy, extras)
}

// DeleteSchedule ...
func (c *controller) DeleteSchedule(ctx context.Context) error {
return c.schedulerMgr.UnScheduleByVendor(ctx, GCVendorType, -1)
return c.schedulerMgr.UnScheduleByVendor(ctx, job.GarbageCollectionVendorType, -1)
}

func convertExecution(exec *task.Execution) *Execution {
Expand Down
2 changes: 1 addition & 1 deletion src/controller/gc/controller_test.go
Expand Up @@ -80,7 +80,7 @@ func (g *gcCtrTestSuite) TestGetExecution() {
{
ID: 1,
Trigger: "Manual",
VendorType: GCVendorType,
VendorType: job.GarbageCollectionVendorType,
StatusMessage: "Success",
},
}, nil)
Expand Down
15 changes: 0 additions & 15 deletions src/controller/health/checker.go
Expand Up @@ -146,18 +146,6 @@ func registryCtlHealthChecker() health.Checker {
return PeriodicHealthChecker(checker, period)
}

func chartmuseumHealthChecker() health.Checker {
url, err := config.GetChartMuseumEndpoint()
if err != nil {
log.Errorf("failed to get the URL of chartmuseum: %v", err)
}
url = url + "/health"
timeout := 60 * time.Second
period := 10 * time.Second
checker := HTTPStatusCodeHealthChecker(http.MethodGet, url, nil, timeout, http.StatusOK)
return PeriodicHealthChecker(checker, period)
}

func notaryHealthChecker() health.Checker {
url := config.InternalNotaryEndpoint() + "/_notary_server/health"
timeout := 60 * time.Second
Expand Down Expand Up @@ -203,9 +191,6 @@ func RegisterHealthCheckers() {
registry["registryctl"] = registryCtlHealthChecker()
registry["database"] = databaseHealthChecker()
registry["redis"] = redisHealthChecker()
if config.WithChartMuseum() {
registry["chartmuseum"] = chartmuseumHealthChecker()
}
if config.WithNotary() {
registry["notary"] = notaryHealthChecker()
}
Expand Down
6 changes: 6 additions & 0 deletions src/controller/jobmonitor/monitor.go
Expand Up @@ -49,6 +49,7 @@ var skippedJobTypes = []string{
"IMAGE_REPLICATE",
"IMAGE_SCAN_ALL",
"IMAGE_GC",
"PURGE_AUDIT",
}

// MonitorController defines the worker pool operations
Expand All @@ -69,6 +70,7 @@ type MonitorController interface {
PauseJobQueues(ctx context.Context, jobType string) error
// ResumeJobQueues resume the job queue by type
ResumeJobQueues(ctx context.Context, jobType string) error
GetJobLog(ctx context.Context, jobID string) ([]byte, error)
}

type monitorController struct {
Expand Down Expand Up @@ -365,3 +367,7 @@ func (w *monitorController) resumeQueue(ctx context.Context, jobType string) err
}
return nil
}

func (w *monitorController) GetJobLog(ctx context.Context, jobID string) ([]byte, error) {
return w.taskManager.GetLogByJobID(ctx, jobID)
}
11 changes: 6 additions & 5 deletions src/controller/jobservice/schedule_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/suite"

"github.com/goharbor/harbor/src/controller/purge"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/queuestatus"
"github.com/goharbor/harbor/src/pkg/scheduler"
"github.com/goharbor/harbor/src/testing/mock"
Expand Down Expand Up @@ -50,27 +51,27 @@ func (s *ScheduleTestSuite) TestCreateSchedule() {

dataMap := make(map[string]interface{})
p := purge.JobPolicy{}
id, err := s.ctl.Create(nil, purge.VendorType, "Daily", "* * * * * *", purge.SchedulerCallback, p, dataMap)
id, err := s.ctl.Create(nil, job.PurgeAuditVendorType, "Daily", "* * * * * *", purge.SchedulerCallback, p, dataMap)
s.Nil(err)
s.Equal(int64(1), id)
}

func (s *ScheduleTestSuite) TestDeleteSchedule() {
s.scheduler.On("UnScheduleByVendor", mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.Nil(s.ctl.Delete(nil, purge.VendorType))
s.Nil(s.ctl.Delete(nil, job.PurgeAuditVendorType))
}

func (s *ScheduleTestSuite) TestGetSchedule() {
s.scheduler.On("ListSchedules", mock.Anything, mock.Anything).Return([]*scheduler.Schedule{
{
ID: 1,
VendorType: purge.VendorType,
VendorType: job.PurgeAuditVendorType,
},
}, nil).Once()

schedule, err := s.ctl.Get(nil, purge.VendorType)
schedule, err := s.ctl.Get(nil, job.PurgeAuditVendorType)
s.Nil(err)
s.Equal(purge.VendorType, schedule.VendorType)
s.Equal(job.PurgeAuditVendorType, schedule.VendorType)
}

func (s *ScheduleTestSuite) TestListSchedule() {
Expand Down
12 changes: 6 additions & 6 deletions src/controller/p2p/preheat/controller.go
Expand Up @@ -292,7 +292,7 @@ func (c *controller) CreatePolicy(ctx context.Context, schema *policyModels.Sche
len(schema.Trigger.Settings.Cron) > 0 {
// schedule and update policy
extras := make(map[string]interface{})
if _, err = c.scheduler.Schedule(ctx, job.P2PPreheat, id, "", schema.Trigger.Settings.Cron,
if _, err = c.scheduler.Schedule(ctx, job.P2PPreheatVendorType, id, "", schema.Trigger.Settings.Cron,
SchedulerCallback, TriggerParam{PolicyID: id}, extras); err != nil {
return 0, err
}
Expand All @@ -302,7 +302,7 @@ func (c *controller) CreatePolicy(ctx context.Context, schema *policyModels.Sche
}

if err != nil {
if e := c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, id); e != nil {
if e := c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheatVendorType, id); e != nil {
return 0, errors.Wrap(e, err.Error())
}

Expand Down Expand Up @@ -375,7 +375,7 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche

// unschedule old
if needUn {
err = c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, schema.ID)
err = c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheatVendorType, schema.ID)
if err != nil {
return err
}
Expand All @@ -384,7 +384,7 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche
// schedule new
if needSch {
extras := make(map[string]interface{})
if _, err := c.scheduler.Schedule(ctx, job.P2PPreheat, schema.ID, "", cron, SchedulerCallback,
if _, err := c.scheduler.Schedule(ctx, job.P2PPreheatVendorType, schema.ID, "", cron, SchedulerCallback,
TriggerParam{PolicyID: schema.ID}, extras); err != nil {
return err
}
Expand All @@ -408,7 +408,7 @@ func (c *controller) DeletePolicy(ctx context.Context, id int64) error {
return err
}
if s.Trigger != nil && s.Trigger.Type == policyModels.TriggerTypeScheduled && len(s.Trigger.Settings.Cron) > 0 {
err = c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, id)
err = c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheatVendorType, id)
if err != nil {
return err
}
Expand Down Expand Up @@ -440,7 +440,7 @@ func (c *controller) DeletePoliciesOfProject(ctx context.Context, project int64)
func (c *controller) deleteExecs(ctx context.Context, vendorID int64) error {
executions, err := c.executionMgr.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"VendorType": job.P2PPreheat,
"VendorType": job.P2PPreheatVendorType,
"VendorID": vendorID,
},
})
Expand Down
11 changes: 3 additions & 8 deletions src/controller/p2p/preheat/enforcer.go
Expand Up @@ -46,11 +46,6 @@ import (
"github.com/goharbor/harbor/src/pkg/task"
)

func init() {
// keep only the latest created 50 p2p preheat execution records
task.SetExecutionSweeperCount(job.P2PPreheat, 50)
}

const (
defaultSeverityCode = 99
extraAttrTotal = "totalCount"
Expand Down Expand Up @@ -383,7 +378,7 @@ func (de *defaultEnforcer) launchExecutions(ctx context.Context, candidates []*s
attrs[extraAttrTriggerSetting] = "-"
}

eid, err := de.executionMgr.Create(ctx, job.P2PPreheat, pl.ID, pl.Trigger.Type, attrs)
eid, err := de.executionMgr.Create(ctx, job.P2PPreheatVendorType, pl.ID, pl.Trigger.Type, attrs)
if err != nil {
return -1, err
}
Expand Down Expand Up @@ -458,7 +453,7 @@ func (de *defaultEnforcer) startTask(ctx context.Context, executionID int64, can
}

j := &task.Job{
Name: job.P2PPreheat,
Name: job.P2PPreheatVendorType,
Parameters: job.Parameters{
preheat.PreheatParamProvider: instance,
preheat.PreheatParamImage: piData,
Expand All @@ -483,7 +478,7 @@ func (de *defaultEnforcer) startTask(ctx context.Context, executionID int64, can

// getVulnerabilitySev gets the severity code value for the given artifact with allowlist option set
func (de *defaultEnforcer) getVulnerabilitySev(ctx context.Context, p *proModels.Project, art *artifact.Artifact) (uint, error) {
vulnerable, err := de.scanCtl.GetVulnerable(ctx, art, p.CVEAllowlist.CVESet())
vulnerable, err := de.scanCtl.GetVulnerable(ctx, art, p.CVEAllowlist.CVESet(), p.CVEAllowlist.IsExpired())
if err != nil {
if errors.IsNotFoundErr(err) {
// no vulnerability report
Expand Down
1 change: 1 addition & 0 deletions src/controller/p2p/preheat/enforcer_test.go
Expand Up @@ -111,6 +111,7 @@ func (suite *EnforcerTestSuite) SetupSuite() {
context.TODO(),
mock.AnythingOfType("*artifact.Artifact"),
mock.AnythingOfType("models.CVESet"),
mock.AnythingOfType("bool"),
).Return(&scan.Vulnerable{Severity: &low, ScanStatus: "Success"}, nil)

fakeProCtl := &project.Controller{}
Expand Down
18 changes: 16 additions & 2 deletions src/controller/proxy/controller.go
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
proModels "github.com/goharbor/harbor/src/pkg/project/models"
model_tag "github.com/goharbor/harbor/src/pkg/tag/model/tag"
)

const (
Expand Down Expand Up @@ -117,7 +118,17 @@ func (c *controller) EnsureTag(ctx context.Context, art lib.ArtifactInfo, tagNam
if a == nil {
return fmt.Errorf("the artifact is not ready yet, failed to tag it to %v", tagName)
}
return tag.Ctl.Ensure(ctx, a.RepositoryID, a.Artifact.ID, tagName)
tagID, err := tag.Ctl.Ensure(ctx, a.RepositoryID, a.Artifact.ID, tagName)
if err != nil {
return err
}
// update the pull time of tag for the first time cache
return tag.Ctl.Update(ctx, &tag.Tag{
Tag: model_tag.Tag{
ID: tagID,
PullTime: time.Now(),
},
}, "PullTime")
}

func (c *controller) UseLocalBlob(ctx context.Context, art lib.ArtifactInfo) bool {
Expand Down Expand Up @@ -155,6 +166,9 @@ func (c *controller) UseLocalManifest(ctx context.Context, art lib.ArtifactInfo,
remoteRepo := getRemoteRepo(art)
exist, desc, err := remote.ManifestExist(remoteRepo, getReference(art)) // HEAD
if err != nil {
if errors.IsRateLimitError(err) && a != nil { // if rate limit, use local if it exists, otherwise return error
return true, nil, nil
}
return false, nil, err
}
if !exist || desc == nil {
Expand Down Expand Up @@ -239,7 +253,7 @@ func (c *controller) ProxyManifest(ctx context.Context, art lib.ArtifactInfo, re
}
}
if a != nil {
SendPullEvent(a, art.Tag, operator)
SendPullEvent(bCtx, a, art.Tag, operator)
}
}(operator.FromContext(ctx))

Expand Down
24 changes: 24 additions & 0 deletions src/controller/proxy/controller_test.go
Expand Up @@ -122,6 +122,30 @@ func (p *proxyControllerTestSuite) TestUseLocalManifest_False() {
p.Assert().False(result)
}

func (p *proxyControllerTestSuite) TestUseLocalManifest_429() {
ctx := context.Background()
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
desc := &distribution.Descriptor{Digest: digest.Digest(dig)}
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
p.remote.On("ManifestExist", mock.Anything, mock.Anything).Return(false, desc, errors.New("too many requests").WithCode(errors.RateLimitCode))
p.local.On("GetManifest", mock.Anything, mock.Anything).Return(nil, nil)
_, _, err := p.ctr.UseLocalManifest(ctx, art, p.remote)
p.Assert().NotNil(err)
errors.IsRateLimitError(err)
}

func (p *proxyControllerTestSuite) TestUseLocalManifest_429ToLocal() {
ctx := context.Background()
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
desc := &distribution.Descriptor{Digest: digest.Digest(dig)}
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
p.remote.On("ManifestExist", mock.Anything, mock.Anything).Return(false, desc, errors.New("too many requests").WithCode(errors.RateLimitCode))
p.local.On("GetManifest", mock.Anything, mock.Anything).Return(&artifact.Artifact{}, nil)
result, _, err := p.ctr.UseLocalManifest(ctx, art, p.remote)
p.Assert().Nil(err)
p.Assert().True(result)
}

func (p *proxyControllerTestSuite) TestUseLocalManifestWithTag_False() {
ctx := context.Background()
art := lib.ArtifactInfo{Repository: "library/hello-world", Tag: "latest"}
Expand Down
4 changes: 2 additions & 2 deletions src/controller/proxy/local.go
Expand Up @@ -152,11 +152,11 @@ func (l *localHelper) CheckDependencies(ctx context.Context, repo string, man di
}

// SendPullEvent send a pull image event
func SendPullEvent(a *artifact.Artifact, tag, operator string) {
func SendPullEvent(ctx context.Context, a *artifact.Artifact, tag, operator string) {
e := &metadata.PullArtifactEventMetadata{
Artifact: &a.Artifact,
Tag: tag,
Operator: operator,
}
event.BuildAndPublish(e)
event.BuildAndPublish(ctx, e)
}
6 changes: 2 additions & 4 deletions src/controller/purge/controller.go
Expand Up @@ -29,8 +29,6 @@ import (
const (
// SchedulerCallback ...
SchedulerCallback = "PURGE_AUDIT_LOG_CALLBACK"
// VendorType ...
VendorType = "PURGE_AUDIT_LOG"
)

// Ctrl a global purge controller instance
Expand Down Expand Up @@ -76,12 +74,12 @@ func (c *controller) Start(ctx context.Context, policy JobPolicy, trigger string
para[common.PurgeAuditRetentionHour] = policy.RetentionHour
para[common.PurgeAuditIncludeOperations] = policy.IncludeOperations

execID, err := c.exeMgr.Create(ctx, VendorType, -1, trigger, para)
execID, err := c.exeMgr.Create(ctx, job.PurgeAuditVendorType, -1, trigger, para)
if err != nil {
return -1, err
}
_, err = c.taskMgr.Create(ctx, execID, &task.Job{
Name: job.PurgeAudit,
Name: job.PurgeAuditVendorType,
Metadata: &job.Metadata{
JobKind: job.KindGeneric,
},
Expand Down
23 changes: 12 additions & 11 deletions src/controller/replication/execution.go
Expand Up @@ -35,11 +35,6 @@ import (
"github.com/goharbor/harbor/src/pkg/task"
)

func init() {
// keep only the latest created 50 replication execution records
task.SetExecutionSweeperCount(job.Replication, 50)
}

// Ctl is a global replication controller instance
var Ctl = NewController()

Expand Down Expand Up @@ -109,7 +104,7 @@ func (c *controller) Start(ctx context.Context, policy *replicationmodel.Policy,
WithMessage("the policy %d is disabled", policy.ID)
}
// create an execution record
id, err := c.execMgr.Create(ctx, job.Replication, policy.ID, trigger)
id, err := c.execMgr.Create(ctx, job.ReplicationVendorType, policy.ID, trigger)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -163,6 +158,12 @@ func (c *controller) markError(ctx context.Context, executionID int64, err error
}

func (c *controller) Stop(ctx context.Context, id int64) error {
// check whether the replication execution existed
_, err := c.GetExecution(ctx, id)
if err != nil {
return err
}

return c.execMgr.Stop(ctx, id)
}

Expand All @@ -185,7 +186,7 @@ func (c *controller) ListExecutions(ctx context.Context, query *q.Query) ([]*Exe
func (c *controller) buildExecutionQuery(query *q.Query) *q.Query {
// as the following logic may change the content of the query, clone it first
query = q.MustClone(query)
query.Keywords["VendorType"] = job.Replication
query.Keywords["VendorType"] = job.ReplicationVendorType
// convert the query keyword "PolicyID" or "policy_id" to the "VendorID"
if value, exist := query.Keywords["PolicyID"]; exist {
query.Keywords["VendorID"] = value
Expand All @@ -202,7 +203,7 @@ func (c *controller) GetExecution(ctx context.Context, id int64) (*Execution, er
execs, err := c.execMgr.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"ID": id,
"VendorType": job.Replication,
"VendorType": job.ReplicationVendorType,
},
})
if err != nil {
Expand All @@ -217,13 +218,13 @@ func (c *controller) GetExecution(ctx context.Context, id int64) (*Execution, er

func (c *controller) TaskCount(ctx context.Context, query *q.Query) (int64, error) {
query = q.MustClone(query)
query.Keywords["VendorType"] = job.Replication
query.Keywords["VendorType"] = job.ReplicationVendorType
return c.taskMgr.Count(ctx, query)
}

func (c *controller) ListTasks(ctx context.Context, query *q.Query) ([]*Task, error) {
query = q.MustClone(query)
query.Keywords["VendorType"] = job.Replication
query.Keywords["VendorType"] = job.ReplicationVendorType
tks, err := c.taskMgr.List(ctx, query)
if err != nil {
return nil, err
Expand All @@ -239,7 +240,7 @@ func (c *controller) GetTask(ctx context.Context, id int64) (*Task, error) {
tasks, err := c.taskMgr.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"ID": id,
"VendorType": job.Replication,
"VendorType": job.ReplicationVendorType,
},
})
if err != nil {
Expand Down
19 changes: 17 additions & 2 deletions src/controller/replication/execution_test.go
Expand Up @@ -105,6 +105,21 @@ func (r *replicationTestSuite) TestStart() {
}

func (r *replicationTestSuite) TestStop() {
r.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{
{
ID: 1,
VendorType: job.ReplicationVendorType,
VendorID: 1,
Status: job.RunningStatus.String(),
Metrics: &dao.Metrics{
TaskCount: 1,
RunningTaskCount: 1,
},
Trigger: task.ExecutionTriggerManual,
StartTime: time.Time{},
EndTime: time.Time{},
},
}, nil)
r.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
err := r.ctl.Stop(nil, 1)
r.Require().Nil(err)
Expand All @@ -123,7 +138,7 @@ func (r *replicationTestSuite) TestListExecutions() {
r.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{
{
ID: 1,
VendorType: job.Replication,
VendorType: job.ReplicationVendorType,
VendorID: 1,
Status: job.RunningStatus.String(),
Metrics: &dao.Metrics{
Expand All @@ -147,7 +162,7 @@ func (r *replicationTestSuite) TestGetExecution() {
r.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{
{
ID: 1,
VendorType: job.Replication,
VendorType: job.ReplicationVendorType,
VendorID: 1,
Status: job.RunningStatus.String(),
Metrics: &dao.Metrics{
Expand Down
2 changes: 1 addition & 1 deletion src/controller/replication/flow/copy.go
Expand Up @@ -132,7 +132,7 @@ func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources [
}

job := &task.Job{
Name: job.Replication,
Name: job.ReplicationVendorType,
Metadata: &job.Metadata{
JobKind: job.KindGeneric,
},
Expand Down
11 changes: 0 additions & 11 deletions src/controller/replication/flow/copy_test.go
Expand Up @@ -42,16 +42,6 @@ func (c *copyFlowTestSuite) TestRun() {
},
}, nil)
adp.On("FetchArtifacts", mock.Anything).Return([]*model.Resource{
{
Type: model.ResourceTypeChart,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: "library/hello-world",
},
Vtags: []string{"latest"},
},
Override: false,
},
{
Type: model.ResourceTypeArtifact,
Metadata: &model.ResourceMetadata{
Expand All @@ -61,7 +51,6 @@ func (c *copyFlowTestSuite) TestRun() {
Vtags: []string{"latest"},
},
Override: false,
Skip: true,
},
}, nil)
adp.On("PrepareForPush", mock.Anything).Return(nil)
Expand Down
2 changes: 1 addition & 1 deletion src/controller/replication/flow/deletion.go
Expand Up @@ -74,7 +74,7 @@ func (d *deletionFlow) createTasks(ctx context.Context, srcResources, dstResourc
}

job := &task.Job{
Name: job.Replication,
Name: job.ReplicationVendorType,
Metadata: &job.Metadata{
JobKind: job.KindGeneric,
},
Expand Down
7 changes: 5 additions & 2 deletions src/controller/replication/flow/mock_adapter_factory_test.go
74 changes: 55 additions & 19 deletions src/controller/replication/flow/mock_adapter_test.go
55 changes: 7 additions & 48 deletions src/controller/replication/flow/stage.go
Expand Up @@ -56,57 +56,16 @@ func initialize(policy *repctlmodel.Policy) (adp.Adapter, adp.Adapter, error) {

// fetch resources from the source registry
func fetchResources(adapter adp.Adapter, policy *repctlmodel.Policy) ([]*model.Resource, error) {
var resTypes []string
for _, filter := range policy.Filters {
if filter.Type == model.FilterTypeResource {
resTypes = append(resTypes, filter.Value.(string))
}
}
if len(resTypes) == 0 {
info, err := adapter.Info()
if err != nil {
return nil, fmt.Errorf("failed to get the adapter info: %v", err)
}
resTypes = append(resTypes, info.SupportedResourceTypes...)
}

fetchArtifact := false
fetchChart := false
for _, resType := range resTypes {
if resType == model.ResourceTypeChart {
fetchChart = true
continue
}
fetchArtifact = true
}

var resources []*model.Resource
// artifacts
if fetchArtifact {
reg, ok := adapter.(adp.ArtifactRegistry)
if !ok {
return nil, fmt.Errorf("the adapter doesn't implement the ArtifactRegistry interface")
}
res, err := reg.FetchArtifacts(policy.Filters)
if err != nil {
return nil, fmt.Errorf("failed to fetch artifacts: %v", err)
}
resources = append(resources, res...)
log.Debug("fetch artifacts completed")
reg, ok := adapter.(adp.ArtifactRegistry)
if !ok {
return nil, fmt.Errorf("the adapter doesn't implement the ArtifactRegistry interface")
}
// charts
if fetchChart {
reg, ok := adapter.(adp.ChartRegistry)
if !ok {
return nil, fmt.Errorf("the adapter doesn't implement the ChartRegistry interface")
}
res, err := reg.FetchCharts(policy.Filters)
if err != nil {
return nil, fmt.Errorf("failed to fetch charts: %v", err)
}
resources = append(resources, res...)
log.Debug("fetch charts completed")
res, err := reg.FetchArtifacts(policy.Filters)
if err != nil {
return nil, fmt.Errorf("failed to fetch artifacts: %v", err)
}
resources = append(resources, res...)

log.Debug("fetch resources from the source registry completed")
return resources, nil
Expand Down
11 changes: 3 additions & 8 deletions src/controller/replication/flow/stage_test.go
Expand Up @@ -53,11 +53,6 @@ func (s *stageTestSuite) TestInitialize() {

func (s *stageTestSuite) TestFetchResources() {
adapter := &mockAdapter{}
adapter.On("Info").Return(&model.RegistryInfo{
SupportedResourceTypes: []string{
model.ResourceTypeArtifact,
},
}, nil)
adapter.On("FetchArtifacts", mock.Anything).Return([]*model.Resource{
{},
{},
Expand All @@ -72,7 +67,7 @@ func (s *stageTestSuite) TestFetchResources() {
func (s *stageTestSuite) TestAssembleSourceResources() {
resources := []*model.Resource{
{
Type: model.ResourceTypeChart,
Type: model.ResourceTypeArtifact,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: "library/hello-world",
Expand All @@ -95,7 +90,7 @@ func (s *stageTestSuite) TestAssembleSourceResources() {
func (s *stageTestSuite) TestAssembleDestinationResources() {
resources := []*model.Resource{
{
Type: model.ResourceTypeChart,
Type: model.ResourceTypeArtifact,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: "library/hello-world",
Expand All @@ -114,7 +109,7 @@ func (s *stageTestSuite) TestAssembleDestinationResources() {
res, err := assembleDestinationResources(resources, policy, "")
s.Require().Nil(err)
s.Len(res, 1)
s.Equal(model.ResourceTypeChart, res[0].Type)
s.Equal(model.ResourceTypeArtifact, res[0].Type)
s.Equal("test/hello-world", res[0].Metadata.Repository.Name)
s.Equal(1, len(res[0].Metadata.Vtags))
s.Equal("latest", res[0].Metadata.Vtags[0])
Expand Down
2 changes: 1 addition & 1 deletion src/controller/replication/mock_flow_controller_test.go
10 changes: 5 additions & 5 deletions src/controller/replication/policy.go
Expand Up @@ -121,7 +121,7 @@ func (c *controller) CreatePolicy(ctx context.Context, policy *model.Policy) (in
}
// create schedule if needed
if policy.IsScheduledTrigger() {
if _, err = c.scheduler.Schedule(ctx, job.Replication, id, "", policy.Trigger.Settings.Cron,
if _, err = c.scheduler.Schedule(ctx, job.ReplicationVendorType, id, "", policy.Trigger.Settings.Cron,
callbackFuncName, id, map[string]interface{}{}); err != nil {
return 0, err
}
Expand All @@ -134,7 +134,7 @@ func (c *controller) UpdatePolicy(ctx context.Context, policy *model.Policy, pro
return err
}
// delete the schedule
if err := c.scheduler.UnScheduleByVendor(ctx, job.Replication, policy.ID); err != nil {
if err := c.scheduler.UnScheduleByVendor(ctx, job.ReplicationVendorType, policy.ID); err != nil {
return err
}

Expand All @@ -148,7 +148,7 @@ func (c *controller) UpdatePolicy(ctx context.Context, policy *model.Policy, pro
}
// create schedule if needed
if policy.IsScheduledTrigger() {
if _, err := c.scheduler.Schedule(ctx, job.Replication, policy.ID, "", policy.Trigger.Settings.Cron,
if _, err := c.scheduler.Schedule(ctx, job.ReplicationVendorType, policy.ID, "", policy.Trigger.Settings.Cron,
callbackFuncName, policy.ID, map[string]interface{}{}); err != nil {
return err
}
Expand All @@ -175,11 +175,11 @@ func (c *controller) validatePolicy(ctx context.Context, policy *model.Policy) e

func (c *controller) DeletePolicy(ctx context.Context, id int64) error {
// delete the executions
if err := c.execMgr.DeleteByVendor(ctx, job.Replication, id); err != nil {
if err := c.execMgr.DeleteByVendor(ctx, job.ReplicationVendorType, id); err != nil {
return err
}
// delete the schedule
if err := c.scheduler.UnScheduleByVendor(ctx, job.Replication, id); err != nil {
if err := c.scheduler.UnScheduleByVendor(ctx, job.ReplicationVendorType, id); err != nil {
return err
}
// delete the policy
Expand Down