Skip to content

Commit

Permalink
Use batch to list the job id in the job queue to avoid crash redis (#…
Browse files Browse the repository at this point in the history
…19444)

fixes: #19436

Signed-off-by: stonezdj <daojunz@vmware.com>
  • Loading branch information
stonezdj committed Oct 18, 2023
1 parent 84a85fb commit d030ab2
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 24 deletions.
34 changes: 26 additions & 8 deletions src/pkg/jobmonitor/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (
// JobServicePool job service pool name
const JobServicePool = "JobService"

// batchSize the batch size to list the job in queue
const batchSize = 1000

// RedisClient defines the job service operations related to redis
type RedisClient interface {
// AllJobTypes returns all the job types registered in the job service
Expand Down Expand Up @@ -74,21 +77,36 @@ func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) (
var jobInfo struct {
ID string `json:"id"`
}
jobs, err := redis.Strings(conn.Do("LRANGE", redisKeyJobQueue, 0, -1))
size, err := redis.Int64(conn.Do("LLEN", redisKeyJobQueue))
if err != nil {
log.Infof("fail to get the size of the queue")
return []string{}, err
}
if len(jobs) == 0 {
log.Infof("no pending job for job type %v", jobType)
if size == 0 {
return []string{}, nil
}
for _, j := range jobs {
if err := json.Unmarshal([]byte(j), &jobInfo); err != nil {
log.Errorf("failed to parse the job info %v, %v", j, err)
continue

// use batch to list the job in queue, because the too many object load from a list might cause the redis crash
for startIndex := int64(0); startIndex < int64(size); startIndex += batchSize {
endIndex := startIndex + batchSize
if endIndex > int64(size) {
endIndex = int64(size)
}
jobs, err := redis.Strings(conn.Do("LRANGE", redisKeyJobQueue, startIndex, endIndex))
if err != nil {
return []string{}, err
}
for _, j := range jobs {
if err := json.Unmarshal([]byte(j), &jobInfo); err != nil {
log.Errorf("failed to parse the job info %v, %v", j, err)
continue
}
if len(jobInfo.ID) > 0 {
jobIDs = append(jobIDs, jobInfo.ID)
}
}
jobIDs = append(jobIDs, jobInfo.ID)
}

log.Infof("updated %d tasks in pending status to stop", len(jobIDs))
ret, err := redis.Int64(conn.Do("DEL", redisKeyJobQueue))
if err != nil {
Expand Down
79 changes: 63 additions & 16 deletions src/pkg/jobmonitor/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package jobmonitor

import (
"context"
"encoding/json"
"fmt"
"os"
"testing"
Expand All @@ -34,51 +35,97 @@ type RedisClientTestSuite struct {
redisURL string
}

func (suite *RedisClientTestSuite) SetupSuite() {
func (s *RedisClientTestSuite) SetupSuite() {
redisHost := os.Getenv("REDIS_HOST")
if redisHost == "" {
suite.FailNow("REDIS_HOST is not specified")
s.FailNow("REDIS_HOST is not specified")
}
suite.redisURL = fmt.Sprintf("redis://%s:6379", redisHost)
pool, err := redisPool(&config.RedisPoolConfig{RedisURL: suite.redisURL, Namespace: "{jobservice_namespace}", IdleTimeoutSecond: 30})
suite.redisClient = redisClientImpl{
s.redisURL = fmt.Sprintf("redis://%s:6379", redisHost)
pool, err := redisPool(&config.RedisPoolConfig{RedisURL: s.redisURL, Namespace: "{jobservice_namespace}", IdleTimeoutSecond: 30})
s.redisClient = redisClientImpl{
redisPool: pool,
namespace: "{harbor_job_service_namespace}",
}
if err != nil {
suite.FailNow("failed to create redis client", err)
s.FailNow("failed to create redis client", err)
}
}

func (suite *RedisClientTestSuite) TearDownSuite() {
func (s *RedisClientTestSuite) TearDownSuite() {
}

func (suite *RedisClientTestSuite) TestUntrackJobStatusInBatch() {
func (s *RedisClientTestSuite) TestUntrackJobStatusInBatch() {
// create key and value
jobIDs := make([]string, 0)
conn := suite.redisClient.redisPool.Get()
conn := s.redisClient.redisPool.Get()
defer conn.Close()
for i := 0; i < 100; i++ {
k := utils.GenerateRandomStringWithLen(10)
jobIDs = append(jobIDs, k)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), k)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", s.redisClient.namespace), k)
v := utils.GenerateRandomStringWithLen(10)
_, err := conn.Do("HSET", key, k, v)
if err != nil {
suite.FailNow("can not insert data to redis", err)
s.FailNow("can not insert data to redis", err)
}
}
suite.redisClient.removeJobStatusInRedis(context.Background(), jobIDs)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), "*")

s.redisClient.removeJobStatusInRedis(context.Background(), jobIDs)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", s.redisClient.namespace), "*")
result, err := conn.Do("KEYS", key)
if err != nil {
suite.FailNow("can not get data from redis", err)
s.FailNow("can not get data from redis", err)
}
remains, err := redis.Values(result, err)
if err != nil {
suite.FailNow("can not get data from redis", err)
s.FailNow("can not get data from redis", err)
}
s.Equal(0, len(remains))
}

func (s *RedisClientTestSuite) TestStopPendingJobs() {
redisKeyJobQueue := fmt.Sprintf("{%s}:jobs:%v", "{harbor_job_service_namespace}", "REPLICATION")
// create key and value
type jobInfo struct {
ID string `json:"id"`
Params string `json:"params"`
}
conn := s.redisClient.redisPool.Get()
defer conn.Close()
for i := 0; i < 100; i++ {
job := jobInfo{
ID: utils.GenerateRandomStringWithLen(10),
Params: utils.GenerateRandomStringWithLen(10),
}
val, err := json.Marshal(&job)
if err != nil {
s.Errorf(err, "failed to marshal job info")
}
_, err = conn.Do("LPUSH", redisKeyJobQueue, val)
if err != nil {
s.FailNow("can not insert data to redis", err)
}
}
// job without id
for i := 0; i < 10; i++ {
job := jobInfo{
Params: utils.GenerateRandomStringWithLen(10),
}
val, err := json.Marshal(&job)
if err != nil {
s.Errorf(err, "failed to marshal job info")
}
_, err = conn.Do("LPUSH", redisKeyJobQueue, val)
if err != nil {
s.FailNow("can not insert data to redis", err)
}
}

jobIDs, err := s.redisClient.StopPendingJobs(context.Background(), "REPLICATION")
if err != nil {
s.FailNow("failed to stop pending jobs", err)
}
suite.Equal(0, len(remains))
s.Assert().Equal(100, len(jobIDs))
}

func TestRedisClientTestSuite(t *testing.T) {
Expand Down

0 comments on commit d030ab2

Please sign in to comment.