From 751d64dfdcff8cd46001a37b5d1d2cb55dac7353 Mon Sep 17 00:00:00 2001 From: Vadim Markovtsev Date: Wed, 14 Jun 2017 11:49:11 +0200 Subject: [PATCH] Set the max number of driver instances to CPU cores --- pool.go | 21 ++++++++++++++++++++- pool_test.go | 3 ++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/pool.go b/pool.go index b2b5d58..2303adc 100644 --- a/pool.go +++ b/pool.go @@ -3,6 +3,9 @@ package server import ( "fmt" "math" + "os" + "runtime" + "strconv" "sync/atomic" "time" @@ -14,6 +17,11 @@ var ( // DefaultPoolTimeout is the time a request to the DriverPool can wait // before getting a driver assigned. DefaultPoolTimeout = time.Second * 5 + + // DefaultMaxInstancesPerDriver is the maximum number of instances of + // the same driver which can be launched following the default + // scaling policy (see DefaultScalingPolicy()). + DefaultMaxInstancesPerDriver = runtime.NumCPU() ) // DriverPool controls a pool of drivers and balances requests among them, @@ -235,7 +243,7 @@ type ScalingPolicy interface { // DefaultScalingPolicy returns a new instance of the default scaling policy. // Instances returned by this function should not be reused. func DefaultScalingPolicy() ScalingPolicy { - return MovingAverage(10, MinMax(1, 10, AIMD(1, 0.5))) + return MovingAverage(10, MinMax(1, DefaultMaxInstancesPerDriver, AIMD(1, 0.5))) } type movingAverage struct { @@ -334,3 +342,14 @@ func (p *aimd) Scale(total, load int) int { return res } + +func init() { + // Try to read DefaultMaxInstancesPerDriver from the environment variable + defaultMaxInstancesPerDriverEnv := os.Getenv("BBLFSH_MAX_INSTANCES_PER_DRIVER") + if len(defaultMaxInstancesPerDriverEnv) > 0 { + maxInstances, err := strconv.Atoi(defaultMaxInstancesPerDriverEnv) + if err == nil && maxInstances > 0 { + DefaultMaxInstancesPerDriver = maxInstances + } + } +} diff --git a/pool_test.go b/pool_test.go index c6ebaa6..7e49110 100644 --- a/pool_test.go +++ b/pool_test.go @@ -2,6 +2,7 @@ package server import ( "fmt" + "runtime" "sync" "testing" "time" @@ -121,7 +122,7 @@ func TestDriverPoolParallel(t *testing.T) { } wg.Wait() - require.Equal(10, dp.cur) + require.Equal(runtime.NumCPU(), dp.cur) time.Sleep(time.Second * 2) require.Equal(1, dp.cur)