Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use batch to list the job id in the job queue to avoid crash redis #19444

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 26 additions & 8 deletions src/pkg/jobmonitor/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (
// JobServicePool job service pool name
const JobServicePool = "JobService"

// batchSize the batch size to list the job in queue
const batchSize = 1000

// RedisClient defines the job service operations related to redis
type RedisClient interface {
// AllJobTypes returns all the job types registered in the job service
Expand Down Expand Up @@ -74,21 +77,36 @@ func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) (
var jobInfo struct {
ID string `json:"id"`
}
jobs, err := redis.Strings(conn.Do("LRANGE", redisKeyJobQueue, 0, -1))
size, err := redis.Int64(conn.Do("LLEN", redisKeyJobQueue))
if err != nil {
log.Infof("fail to get the size of the queue")
return []string{}, err
}
if len(jobs) == 0 {
log.Infof("no pending job for job type %v", jobType)
if size == 0 {
return []string{}, nil
}
for _, j := range jobs {
if err := json.Unmarshal([]byte(j), &jobInfo); err != nil {
log.Errorf("failed to parse the job info %v, %v", j, err)
continue

// use batch to list the job in queue, because the too many object load from a list might cause the redis crash
for startIndex := int64(0); startIndex < int64(size); startIndex += batchSize {
endIndex := startIndex + batchSize
if endIndex > int64(size) {
endIndex = int64(size)
}
jobs, err := redis.Strings(conn.Do("LRANGE", redisKeyJobQueue, startIndex, endIndex))
if err != nil {
return []string{}, err
}
for _, j := range jobs {
if err := json.Unmarshal([]byte(j), &jobInfo); err != nil {
log.Errorf("failed to parse the job info %v, %v", j, err)
continue
}
if len(jobInfo.ID) > 0 {
jobIDs = append(jobIDs, jobInfo.ID)
}
}
jobIDs = append(jobIDs, jobInfo.ID)
}

log.Infof("updated %d tasks in pending status to stop", len(jobIDs))
ret, err := redis.Int64(conn.Do("DEL", redisKeyJobQueue))
if err != nil {
Expand Down
79 changes: 63 additions & 16 deletions src/pkg/jobmonitor/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package jobmonitor

import (
"context"
"encoding/json"
"fmt"
"os"
"testing"
Expand All @@ -34,51 +35,97 @@ type RedisClientTestSuite struct {
redisURL string
}

func (suite *RedisClientTestSuite) SetupSuite() {
func (s *RedisClientTestSuite) SetupSuite() {
redisHost := os.Getenv("REDIS_HOST")
if redisHost == "" {
suite.FailNow("REDIS_HOST is not specified")
s.FailNow("REDIS_HOST is not specified")
}
suite.redisURL = fmt.Sprintf("redis://%s:6379", redisHost)
pool, err := redisPool(&config.RedisPoolConfig{RedisURL: suite.redisURL, Namespace: "{jobservice_namespace}", IdleTimeoutSecond: 30})
suite.redisClient = redisClientImpl{
s.redisURL = fmt.Sprintf("redis://%s:6379", redisHost)
pool, err := redisPool(&config.RedisPoolConfig{RedisURL: s.redisURL, Namespace: "{jobservice_namespace}", IdleTimeoutSecond: 30})
s.redisClient = redisClientImpl{
redisPool: pool,
namespace: "{harbor_job_service_namespace}",
}
if err != nil {
suite.FailNow("failed to create redis client", err)
s.FailNow("failed to create redis client", err)
}
}

func (suite *RedisClientTestSuite) TearDownSuite() {
func (s *RedisClientTestSuite) TearDownSuite() {
}

func (suite *RedisClientTestSuite) TestUntrackJobStatusInBatch() {
func (s *RedisClientTestSuite) TestUntrackJobStatusInBatch() {
// create key and value
jobIDs := make([]string, 0)
conn := suite.redisClient.redisPool.Get()
conn := s.redisClient.redisPool.Get()
defer conn.Close()
for i := 0; i < 100; i++ {
k := utils.GenerateRandomStringWithLen(10)
jobIDs = append(jobIDs, k)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), k)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", s.redisClient.namespace), k)
v := utils.GenerateRandomStringWithLen(10)
_, err := conn.Do("HSET", key, k, v)
if err != nil {
suite.FailNow("can not insert data to redis", err)
s.FailNow("can not insert data to redis", err)
}
}
suite.redisClient.removeJobStatusInRedis(context.Background(), jobIDs)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), "*")

s.redisClient.removeJobStatusInRedis(context.Background(), jobIDs)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", s.redisClient.namespace), "*")
result, err := conn.Do("KEYS", key)
if err != nil {
suite.FailNow("can not get data from redis", err)
s.FailNow("can not get data from redis", err)
}
remains, err := redis.Values(result, err)
if err != nil {
suite.FailNow("can not get data from redis", err)
s.FailNow("can not get data from redis", err)
}
s.Equal(0, len(remains))
}

func (s *RedisClientTestSuite) TestStopPendingJobs() {
redisKeyJobQueue := fmt.Sprintf("{%s}:jobs:%v", "{harbor_job_service_namespace}", "REPLICATION")
// create key and value
type jobInfo struct {
ID string `json:"id"`
Params string `json:"params"`
}
conn := s.redisClient.redisPool.Get()
defer conn.Close()
for i := 0; i < 100; i++ {
job := jobInfo{
ID: utils.GenerateRandomStringWithLen(10),
Params: utils.GenerateRandomStringWithLen(10),
}
val, err := json.Marshal(&job)
if err != nil {
s.Errorf(err, "failed to marshal job info")
}
_, err = conn.Do("LPUSH", redisKeyJobQueue, val)
if err != nil {
s.FailNow("can not insert data to redis", err)
}
}
// job without id
for i := 0; i < 10; i++ {
job := jobInfo{
Params: utils.GenerateRandomStringWithLen(10),
}
val, err := json.Marshal(&job)
if err != nil {
s.Errorf(err, "failed to marshal job info")
}
_, err = conn.Do("LPUSH", redisKeyJobQueue, val)
if err != nil {
s.FailNow("can not insert data to redis", err)
}
}

jobIDs, err := s.redisClient.StopPendingJobs(context.Background(), "REPLICATION")
if err != nil {
s.FailNow("failed to stop pending jobs", err)
}
suite.Equal(0, len(remains))
s.Assert().Equal(100, len(jobIDs))
}

func TestRedisClientTestSuite(t *testing.T) {
Expand Down