Skip to content

Commit

Permalink
Merge branch 'release-1.5.0' into release_1.5.2
Browse files Browse the repository at this point in the history
  • Loading branch information
stonezdj(Daojun Zhang) committed Jul 2, 2018
2 parents 31dc774 + 8bbd84e commit 4fad4ea
Show file tree
Hide file tree
Showing 22 changed files with 228 additions and 84 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.5.0
v1.5.1
4 changes: 3 additions & 1 deletion src/jobservice/core/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ func createJobReq(kind string, isUnique bool, withHook bool) models.JobRequest {

type fakePool struct{}

func (f *fakePool) Start() {}
func (f *fakePool) Start() error {
return nil
}

func (f *fakePool) RegisterJob(name string, job interface{}) error {
return nil
Expand Down
28 changes: 25 additions & 3 deletions src/jobservice/job/impl/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"context"
"errors"
"fmt"
"math"
"reflect"
"time"

"github.com/vmware/harbor/src/adminserver/client"
"github.com/vmware/harbor/src/common"
Expand All @@ -19,6 +21,10 @@ import (
"github.com/vmware/harbor/src/jobservice/logger"
)

const (
maxRetryTimes = 5
)

//Context ...
type Context struct {
//System context
Expand Down Expand Up @@ -51,9 +57,25 @@ func NewContext(sysCtx context.Context, adminClient client.Client) *Context {

//Init ...
func (c *Context) Init() error {
configs, err := c.adminClient.GetCfgs()
if err != nil {
return err
var (
counter = 0
err error
configs map[string]interface{}
)

for counter == 0 || err != nil {
counter++
configs, err = c.adminClient.GetCfgs()
if err != nil {
logger.Errorf("Job context initialization error: %s\n", err.Error())
if counter < maxRetryTimes {
backoff := (int)(math.Pow(2, (float64)(counter))) + 2*counter + 5
logger.Infof("Retry in %d seconds", backoff)
time.Sleep(time.Duration(backoff) * time.Second)
} else {
return fmt.Errorf("job context initialization error: %s (%d times tried)", err.Error(), counter)
}
}
}

db := getDBFromConfig(configs)
Expand Down
7 changes: 2 additions & 5 deletions src/jobservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"errors"
"flag"
"fmt"

"github.com/vmware/harbor/src/adminserver/client"
"github.com/vmware/harbor/src/jobservice/config"
Expand All @@ -22,15 +21,13 @@ func main() {

//Missing config file
if configPath == nil || utils.IsEmptyStr(*configPath) {
fmt.Println("Config file should be specified")
flag.Usage()
return
logger.Fatal("Config file should be specified")
}

//Load configurations
if err := config.DefaultConfig.Load(*configPath, true); err != nil {
fmt.Printf("Failed to load configurations with error: %s\n", err)
return
logger.Fatalf("Failed to load configurations with error: %s\n", err)
}

//Set job context initializer
Expand Down
5 changes: 4 additions & 1 deletion src/jobservice/pool/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import "github.com/vmware/harbor/src/jobservice/models"
//More like a driver to transparent the lower queue.
type Interface interface {
//Start to serve
Start()
//
//Return:
// error if failed to start
Start() error

//Register job to the pool.
//
Expand Down
17 changes: 10 additions & 7 deletions src/jobservice/pool/message_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"time"

Expand All @@ -18,6 +19,10 @@ import (
"github.com/vmware/harbor/src/jobservice/utils"
)

const (
msgServerRetryTimes = 5
)

//MessageServer implements the sub/pub mechanism via redis to do async message exchanging.
type MessageServer struct {
context context.Context
Expand All @@ -42,14 +47,13 @@ func (ms *MessageServer) Start() error {
logger.Info("Message server is stopped")
}()

//As we get one connection from the pool, don't try to close it.
conn := ms.redisPool.Get()
defer conn.Close()

conn := ms.redisPool.Get() //Get one backend connection!
psc := redis.PubSubConn{
Conn: conn,
}
defer psc.Close()

//Subscribe channel
err := psc.Subscribe(redis.Args{}.AddFlat(utils.KeyPeriodicNotification(ms.namespace))...)
if err != nil {
return err
Expand All @@ -60,8 +64,7 @@ func (ms *MessageServer) Start() error {
for {
switch res := psc.Receive().(type) {
case error:
done <- res
return
done <- fmt.Errorf("error occurred when receiving from pub/sub channel of message server: %s", res.(error).Error())
case redis.Message:
m := &models.Message{}
if err := json.Unmarshal(res.Data, m); err != nil {
Expand Down Expand Up @@ -131,12 +134,12 @@ func (ms *MessageServer) Start() error {
case <-ms.context.Done():
err = errors.New("context exit")
case err = <-done:
return err
}
}

//Unsubscribe all
psc.Unsubscribe()

return <-done
}

Expand Down
42 changes: 39 additions & 3 deletions src/jobservice/pool/redis_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package pool
import (
"errors"
"fmt"
"math"
"time"

"github.com/garyburd/redigo/redis"
Expand All @@ -29,6 +30,8 @@ const (

//Copy from period.enqueuer
periodicEnqueuerHorizon = 4 * time.Minute

pingRedisMaxTimes = 10
)

//GoCraftWorkPool is the pool implementation based on gocraft/work powered by redis.
Expand Down Expand Up @@ -80,13 +83,17 @@ func NewGoCraftWorkPool(ctx *env.Context, namespace string, workerCount uint, re

//Start to serve
//Unblock action
func (gcwp *GoCraftWorkPool) Start() {
func (gcwp *GoCraftWorkPool) Start() error {
if gcwp.redisPool == nil ||
gcwp.pool == nil ||
gcwp.context.SystemContext == nil {
//report and exit
gcwp.context.ErrorChan <- errors.New("Redis worker pool can not start as it's not correctly configured")
return
return errors.New("Redis worker pool can not start as it's not correctly configured")
}

//Test the redis connection
if err := gcwp.ping(); err != nil {
return err
}

done := make(chan interface{}, 1)
Expand Down Expand Up @@ -130,8 +137,18 @@ func (gcwp *GoCraftWorkPool) Start() {
return
}

startTimes := 0
START_MSG_SERVER:
//Start message server
if err = gcwp.messageServer.Start(); err != nil {
logger.Errorf("Message server exits with error: %s\n", err.Error())
if startTimes < msgServerRetryTimes {
startTimes++
time.Sleep(time.Duration((int)(math.Pow(2, (float64)(startTimes)))+5) * time.Second)
logger.Infof("Restart message server (%d times)\n", startTimes)
goto START_MSG_SERVER
}

return
}
}()
Expand Down Expand Up @@ -177,6 +194,8 @@ func (gcwp *GoCraftWorkPool) Start() {

gcwp.pool.Stop()
}()

return nil
}

//RegisterJob is used to register the job to the pool.
Expand Down Expand Up @@ -593,6 +612,23 @@ func (rpc *RedisPoolContext) logJob(job *work.Job, next work.NextMiddlewareFunc)
return next()
}

//Ping the redis server
func (gcwp *GoCraftWorkPool) ping() error {
conn := gcwp.redisPool.Get()
defer conn.Close()

var err error
for count := 1; count <= pingRedisMaxTimes; count++ {
if _, err = conn.Do("ping"); err == nil {
return nil
}

time.Sleep(time.Duration(count+4) * time.Second)
}

return fmt.Errorf("connect to redis server timeout: %s", err.Error())
}

//generate the job stats data
func generateResult(j *work.Job, jobKind string, isUnique bool) models.JobStats {
if j == nil {
Expand Down
36 changes: 24 additions & 12 deletions src/jobservice/runtime/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,21 @@ func (bs *Bootstrap) LoadAndRun() {
}

//Start the pool
var backendPool pool.Interface
var (
backendPool pool.Interface
wpErr error
)
if config.DefaultConfig.PoolConfig.Backend == config.JobServicePoolBackendRedis {
backendPool = bs.loadAndRunRedisWorkerPool(rootContext, config.DefaultConfig)
backendPool, wpErr = bs.loadAndRunRedisWorkerPool(rootContext, config.DefaultConfig)
if wpErr != nil {
logger.Fatalf("Failed to load and run worker pool: %s\n", wpErr.Error())
}
} else {
logger.Fatalf("Worker pool backend '%s' is not supported", config.DefaultConfig.PoolConfig.Backend)
}

//Initialize controller
ctl := core.NewController(backendPool)

//Start the API server
apiServer := bs.loadAndRunAPIServer(rootContext, config.DefaultConfig, ctl)
logger.Infof("Server is started at %s:%d with %s", "", config.DefaultConfig.Port, config.DefaultConfig.Protocol)
Expand All @@ -84,13 +91,14 @@ func (bs *Bootstrap) LoadAndRun() {
logSweeper := logger.NewSweeper(ctx, config.GetLogBasePath(), config.GetLogArchivePeriod())
logSweeper.Start()

//To indicate if any errors occurred
var err error
//Block here
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, os.Kill)
select {
case <-sig:
case err := <-rootContext.ErrorChan:
logger.Errorf("Server error:%s\n", err)
case err = <-rootContext.ErrorChan:
}

//Call cancel to send termination signal to other interested parts.
Expand Down Expand Up @@ -118,6 +126,10 @@ func (bs *Bootstrap) LoadAndRun() {
rootContext.WG.Wait()
close <- true

if err != nil {
logger.Fatalf("Server exit with error: %s\n", err)
}

logger.Infof("Server gracefully exit")
}

Expand All @@ -144,7 +156,7 @@ func (bs *Bootstrap) loadAndRunAPIServer(ctx *env.Context, cfg *config.Configura
}

//Load and run the worker pool
func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Configuration) pool.Interface {
func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Configuration) (pool.Interface, error) {
redisPool := &redis.Pool{
MaxActive: 6,
MaxIdle: 6,
Expand All @@ -166,8 +178,7 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con
//Register jobs here
if err := redisWorkerPool.RegisterJob(impl.KnownJobDemo, (*impl.DemoJob)(nil)); err != nil {
//exit
ctx.ErrorChan <- err
return redisWorkerPool //avoid nil pointer issue
return nil, err
}
if err := redisWorkerPool.RegisterJobs(
map[string]interface{}{
Expand All @@ -177,11 +188,12 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con
job.ImageReplicate: (*replication.Replicator)(nil),
}); err != nil {
//exit
ctx.ErrorChan <- err
return redisWorkerPool //avoid nil pointer issue
return nil, err
}

redisWorkerPool.Start()
if err := redisWorkerPool.Start(); err != nil {
return nil, err
}

return redisWorkerPool
return redisWorkerPool, nil
}
6 changes: 4 additions & 2 deletions src/ui/api/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
package api

import (
"net/http"

"github.com/vmware/harbor/src/common"
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log"
"net/http"
)

// InternalAPI handles request of harbor admin...
Expand All @@ -44,7 +45,8 @@ func (ia *InternalAPI) Prepare() {
func (ia *InternalAPI) SyncRegistry() {
err := SyncRegistry(ia.ProjectMgr)
if err != nil {
ia.CustomAbort(http.StatusInternalServerError, "internal error")
ia.HandleInternalServerError(err.Error())
return
}
}

Expand Down
Loading

0 comments on commit 4fad4ea

Please sign in to comment.