Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(cherry-pick) Remove job status track information from redis after stop the job in the queue #19305

Merged
merged 1 commit into from Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/pkg/jobmonitor/redis.go
Expand Up @@ -23,6 +23,7 @@
"github.com/gomodule/redigo/redis"

"github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/lib/log"
libRedis "github.com/goharbor/harbor/src/lib/redis"
Expand Down Expand Up @@ -93,6 +94,10 @@
if err != nil {
return []string{}, err
}
go func() {
// the amount of jobIDs maybe large, so use goroutine to remove the job status tracking info
r.removeJobStatusInRedis(ctx, jobIDs)
}()

Check warning on line 100 in src/pkg/jobmonitor/redis.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/jobmonitor/redis.go#L97-L100

Added lines #L97 - L100 were not covered by tests
if ret < 1 {
// no job in queue removed
return []string{}, fmt.Errorf("no job in the queue removed")
Expand All @@ -102,6 +107,27 @@
return jobIDs, nil
}

// removeJobStatusInRedis remove job status track information from redis, to avoid performance impact when the jobIDs is too large, use batch to remove
func (r *redisClientImpl) removeJobStatusInRedis(ctx context.Context, jobIDs []string) {
conn := r.redisPool.Get()
defer conn.Close()
for _, id := range jobIDs {
namespace := fmt.Sprintf("{%s}", r.namespace)
redisKeyStatus := rds.KeyJobStats(namespace, id)
log.Debugf("delete job status info for job id:%v, key:%v", id, redisKeyStatus)
_, err := conn.Do("DEL", redisKeyStatus)
if err != nil {
log.Warningf("failed to delete the job status info for job %v, %v, continue", id, err)
}

Check warning on line 121 in src/pkg/jobmonitor/redis.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/jobmonitor/redis.go#L120-L121

Added lines #L120 - L121 were not covered by tests
redisKeyInProgress := rds.KeyJobTrackInProgress(namespace)
log.Debugf("delete inprogress info for key:%v, job id:%v", id, redisKeyInProgress)
_, err = conn.Do("HDEL", redisKeyInProgress, id)
if err != nil {
log.Warningf("failed to delete the job info in %v for job %v, %v, continue", rds.KeyJobTrackInProgress(namespace), id, err)
}

Check warning on line 127 in src/pkg/jobmonitor/redis.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/jobmonitor/redis.go#L126-L127

Added lines #L126 - L127 were not covered by tests
}
}

func (r *redisClientImpl) AllJobTypes(ctx context.Context) ([]string, error) {
conn := r.redisPool.Get()
defer conn.Close()
Expand Down
86 changes: 86 additions & 0 deletions src/pkg/jobmonitor/redis_test.go
@@ -0,0 +1,86 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jobmonitor

import (
"context"
"fmt"
"os"
"testing"

"github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/suite"

"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/config"
)

type RedisClientTestSuite struct {
suite.Suite
redisClient redisClientImpl
redisURL string
}

func (suite *RedisClientTestSuite) SetupSuite() {
redisHost := os.Getenv("REDIS_HOST")
if redisHost == "" {
suite.FailNow("REDIS_HOST is not specified")
}
suite.redisURL = fmt.Sprintf("redis://%s:6379", redisHost)
pool, err := redisPool(&config.RedisPoolConfig{RedisURL: suite.redisURL, Namespace: "{jobservice_namespace}", IdleTimeoutSecond: 30})
suite.redisClient = redisClientImpl{
redisPool: pool,
namespace: "{harbor_job_service_namespace}",
}
if err != nil {
suite.FailNow("failed to create redis client", err)
}
}

func (suite *RedisClientTestSuite) TearDownSuite() {
}

func (suite *RedisClientTestSuite) TestUntrackJobStatusInBatch() {
// create key and value
jobIDs := make([]string, 0)
conn := suite.redisClient.redisPool.Get()
defer conn.Close()
for i := 0; i < 100; i++ {
k := utils.GenerateRandomStringWithLen(10)
jobIDs = append(jobIDs, k)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), k)
v := utils.GenerateRandomStringWithLen(10)
_, err := conn.Do("HSET", key, k, v)
if err != nil {
suite.FailNow("can not insert data to redis", err)
}
}
suite.redisClient.removeJobStatusInRedis(context.Background(), jobIDs)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), "*")
result, err := conn.Do("KEYS", key)
if err != nil {
suite.FailNow("can not get data from redis", err)
}
remains, err := redis.Values(result, err)
if err != nil {
suite.FailNow("can not get data from redis", err)
}
suite.Equal(0, len(remains))
}

func TestRedisClientTestSuite(t *testing.T) {
suite.Run(t, &RedisClientTestSuite{})
}