Skip to content

Commit

Permalink
✨ (config): allow users to manually specify cpus for judging
Browse files Browse the repository at this point in the history
  • Loading branch information
AlphaNecron committed Oct 31, 2023
1 parent 10fcead commit f32b731
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 26 deletions.
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
type (
IglooConfig struct {
ID string
Debug bool `yaml:"-"`
CPUs []uint16
Parallelism uint16 `yaml:"-"`
Debug bool `yaml:"-"`
Key string
Parallelism int16
Storage StorageConfig
RabbitMQ RabbitMQConfig
// TODO: make caching optional
Expand Down
2 changes: 1 addition & 1 deletion igloo.example.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
id: igloo-1
parallelism: 3
cpus: [11,12,13,14,15]
rabbitmq:
username: guest
password: guest
Expand Down
4 changes: 2 additions & 2 deletions models/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ type (
Verdict CaseVerdict
}
FinalResult struct {
Points float32
MaxPoints float32
Points float64
MaxPoints float64
CompilerOutput string
Verdict FinalVerdict
}
Expand Down
2 changes: 1 addition & 1 deletion models/submission.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type (
Language string
ProblemID string
TestCount uint16
PointsPerTest float32
PointsPerTest float64
Constraints Constraints
}

Expand Down
4 changes: 2 additions & 2 deletions runner/linux/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
)

type LinuxRunner struct {
cpu uint8
cpu uint16
container.Environment
}

Expand Down Expand Up @@ -73,7 +73,7 @@ func init() {
}
}

func New(cpu uint8) (r *LinuxRunner, e error) {
func New(cpu uint16) (r *LinuxRunner, e error) {
uid := os.Getuid()
if uid == 0 {
// fallback to 1536 on root
Expand Down
2 changes: 1 addition & 1 deletion runner/runner_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ func init() {
_ = container.Init()
}

func New(cpu uint8) (Runner, error) {
func New(cpu uint16) (Runner, error) {
return runner.New(cpu)
}
4 changes: 2 additions & 2 deletions worker/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ func (jc *JudgeRunner) Compile(rt runtimes.Runtime, sub *models.Submission, ctx
return jc.runner.Compile(rt, srcCode, ctx)
}

func (jc *JudgeRunner) Run(rt runtimes.Runtime, sub *models.Submission, announce func(uint16), prog string, callback func(uint16, models.CaseResult) bool, ctx context.Context) (models.FinalVerdict, float32, error) {
func (jc *JudgeRunner) Run(rt runtimes.Runtime, sub *models.Submission, announce func(uint16), prog string, callback func(uint16, models.CaseResult) bool, ctx context.Context) (models.FinalVerdict, float64, error) {
cmd, args := rt.BuildExecCommand(prog)
c := sub.Constraints
// TODO: implement per language time limit
// TODO: store output file inside container to run checker there
out, e := utils.CreateRandomFile("output-")
var p float32 = 0
var p float64 = 0
if e != nil {
return models.InitError, p, e
}
Expand Down
10 changes: 5 additions & 5 deletions worker/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
)

type JudgeRunner struct {
boundCpu uint8
boundCpu uint16
isBusy atomic.Bool
runner runner.Runner
}

func NewJudge(boundCpu int) (r *JudgeRunner) {
r = &JudgeRunner{boundCpu: uint8(boundCpu)}
_r, e := runner.New(uint8(boundCpu))
func NewJudge(boundCpu uint16) (r *JudgeRunner) {
r = &JudgeRunner{boundCpu: boundCpu}
_r, e := runner.New(boundCpu)
logger.Panic(e, "could not spawn runner for cpu %d", boundCpu)
r.runner = _r
return
Expand Down Expand Up @@ -44,7 +44,7 @@ func (jc *JudgeRunner) Judge(sub *models.Submission, ctx context.Context, announ
Verdict: fv,
CompilerOutput: compOut,
Points: p,
MaxPoints: float32(sub.TestCount) * sub.PointsPerTest,
MaxPoints: float64(sub.TestCount) * sub.PointsPerTest,
}
}
}
Expand Down
22 changes: 12 additions & 10 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"math"
"net"
"runtime"
"slices"
"sync/atomic"
"time"
)
Expand All @@ -41,26 +42,27 @@ type (
)

func New(ctx context.Context) *JudgeWorker {
maxCpus := runtime.NumCPU()
if config.Config.Parallelism == -1 || int(config.Config.Parallelism) > maxCpus {
config.Config.Parallelism = int16(maxCpus / 2)
logger.Logger.Info().Msgf("automatically allocating %d cores for workers", config.Config.Parallelism)
} else if int16(maxCpus/2) < config.Config.Parallelism {
maxCpus := uint16(runtime.NumCPU())
config.Config.CPUs = slices.DeleteFunc(config.Config.CPUs, func(cpu uint16) bool {
return cpu >= maxCpus
})
slices.Sort(config.Config.CPUs)
config.Config.Parallelism = uint16(len(config.Config.CPUs))
if maxCpus/2 < config.Config.Parallelism {
logger.Logger.Warn().Msg("running with more than 50% logical cores is not recommended.")
}
offset := maxCpus - int(config.Config.Parallelism)
runners := make([]*_runner, config.Config.Parallelism)
for i := range runners {
_ctx, cancel := context.WithCancel(ctx)
runners[i] = &_runner{
JudgeRunner: NewJudge(offset + i),
JudgeRunner: NewJudge(config.Config.CPUs[i]),
c: make(<-chan amqp.Delivery, 1),
ctx: _ctx,
cancel: cancel,
}
runners[i].currentSubmission.Store(math.MaxUint32)
}
logger.Logger.Info().Msgf("initializing igloo with %d concurrent runner(s)", config.Config.Parallelism)
logger.Logger.Info().Uints16("cpus", config.Config.CPUs).Uint16("parallelism", config.Config.Parallelism).Msgf("initializing igloo")
return &JudgeWorker{
ctx: ctx,
pool: runners,
Expand Down Expand Up @@ -106,7 +108,7 @@ func (w *JudgeWorker) Connect() {
"BootedSince": sys.BootTimestamp,
"OS": sys.OS,
"Memory": int64(sys.Memory),
"Parallelism": config.Config.Parallelism,
"Parallelism": int(config.Config.Parallelism),
"Version": "0.0.1-prealpha",
})
logger.Panic(e, "failed to open queue for submissions")
Expand All @@ -120,7 +122,7 @@ func (w *JudgeWorker) Connect() {
}
logger.Panic(w.mqChan.Qos(int(config.Config.Parallelism), 0, true), "error whilst setting QoS")
for i := range w.pool {
w.pool[i].c, e = w.mqChan.Consume(w.mq.Name, fmt.Sprintf("%s#%d", w.mq.Name, i), false, false, false, false, nil)
w.pool[i].c, e = w.mqChan.Consume(w.mq.Name, fmt.Sprintf("%s#%d", w.mq.Name, w.pool[i].boundCpu), false, false, false, false, nil)
logger.Panic(e, "could not create a consumer for runner #%d", i)
}
//w.rc = w.mqChan.NotifyReturn(make(chan amqp.Return, 1))
Expand Down

0 comments on commit f32b731

Please sign in to comment.