Navigation Menu

Skip to content

Commit

Permalink
Allow concurrency for tasks that does not collide
Browse files Browse the repository at this point in the history
Two different concurrency modes are implemented, and is enabled by
setting "concurrency_mode" in the config file to either "project" or "node".

When "project" concurrency is enabled, tasks will run in parallel if and
only if they do not share the same project id, with no regard to the
nodes/hosts that are affected.

When "node" concurrency is enabled, a task will run in parallel if and
only if the hosts affected by tasks already running does not intersect
with the hosts that would be affected by the task in question.

If "concurrency_mode" is not specified, no task will start before the
previous one has finished.

The collision check is based on the output from the "--list-hosts"
argument to ansible, which uses the hosts specified in the inventory.
Thus, if two different hostnames are used that points to the same node,
such as "127.0.0.1" and "localhost", there will be no collision and two
tasks may connect to the same node concurrently. If this behaviour is
not desired, one should make sure to not include aliases for their hosts
in their inventories when enabling concurrency mode.

To restrict the amount of parallel tasks that runs at the same time, one
can add the "max_parallel_tasks" to the config file. This defaults to a
humble 10 if not specified.
  • Loading branch information
Natanande committed May 29, 2017
1 parent f4425b7 commit 8bc1b7f
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 44 deletions.
96 changes: 81 additions & 15 deletions api/tasks/pool.go
Expand Up @@ -3,49 +3,115 @@ package tasks
import (
"fmt"
"time"

"github.com/ansible-semaphore/semaphore/util"
)

type taskPool struct {
queue []*task
register chan *task
running *task
queue []*task
register chan *task
activeProj map[int]*task
activeNodes map[string]*task
running int
}

var pool = taskPool{
queue: make([]*task, 0),
register: make(chan *task),
running: nil,
queue: make([]*task, 0),
register: make(chan *task),
activeProj: make(map[int]*task),
activeNodes: make(map[string]*task),
running: 0,
}

type resourceLock struct {
lock bool
holder *task
}

var resourceLocker = make(chan *resourceLock)

func (p *taskPool) run() {
ticker := time.NewTicker(10 * time.Second)

defer func() {
close(resourceLocker)
}()

ticker := time.NewTicker(5 * time.Second)

defer func() {
ticker.Stop()
}()

// Lock or unlock resources when running a task
go func (locker <-chan *resourceLock) {
for l := range locker {
t := l.holder
if l.lock {
if p.blocks(t) {
panic("Trying to lock an already locked resource!")
}
p.activeProj[t.projectID] = t
for _, node := range t.hosts {
p.activeNodes[node] = t
}
p.running += 1
} else {
if p.activeProj[t.projectID] == t {
delete(p.activeProj, t.projectID)
}
for _, node := range t.hosts {
delete(p.activeNodes, node)
}
p.running -= 1
}
}
}(resourceLocker)

for {
select {
case task := <-p.register:
fmt.Println(task)
if p.running == nil {
go task.run()
continue
}

go task.prepareRun()
p.queue = append(p.queue, task)
case <-ticker.C:
if len(p.queue) == 0 || p.running != nil {
if len(p.queue) == 0 {
continue
} else if t := p.queue[0]; t.task.Status != "error" && (!t.prepared || p.blocks(t)) {
p.queue = append(p.queue[1:], t)
continue
}

fmt.Println("Running a task.")
go pool.queue[0].run()
if t := pool.queue[0]; t.task.Status != "error" {
fmt.Println("Running a task.")
resourceLocker <- &resourceLock{lock: true, holder: t,}
go t.run()
}
pool.queue = pool.queue[1:]
}
}
}

func (p *taskPool) blocks(t *task) bool {
if p.running >= util.Config.MaxParallelTasks {
return true
}
switch util.Config.ConcurrencyMode {
case "project":
return p.activeProj[t.projectID] != nil
case "node":
collision := false
for _, node := range t.hosts {
if p.activeNodes[node] != nil {
collision = true
break
}
}
return collision
default:
return p.running > 0
}
}

func StartRunner() {
pool.run()
}
133 changes: 104 additions & 29 deletions api/tasks/runner.go
Expand Up @@ -11,6 +11,7 @@ import (
"strconv"
"strings"
"time"
"regexp"

"github.com/ansible-semaphore/semaphore/db"
"github.com/ansible-semaphore/semaphore/util"
Expand All @@ -26,6 +27,8 @@ type task struct {
users []int
projectID int
alert bool
hosts []string
prepared bool
alert_chat string
}

Expand All @@ -36,16 +39,11 @@ func (t *task) fail() {
t.sendTelegramAlert()
}

func (t *task) run() {
pool.running = t
func (t *task) prepareRun() {
t.prepared = false

defer func() {
fmt.Println("Stopped running tasks")
pool.running = nil

now := time.Now()
t.task.End = &now
t.updateStatus()
fmt.Println("Stopped preparing task")

objType := "task"
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " finished - " + strings.ToUpper(t.task.Status)
Expand All @@ -60,23 +58,16 @@ func (t *task) run() {
}
}()

t.log("Preparing: " + strconv.Itoa(t.task.ID))

if err := t.populateDetails(); err != nil {
t.log("Error: " + err.Error())
t.fail()
return
}

{
fmt.Println(t.users)
now := time.Now()
t.task.Status = "running"
t.task.Start = &now

t.updateStatus()
}

objType := "task"
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " is running"
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " is preparing"
if err := (db.Event{
ProjectID: &t.projectID,
ObjectType: &objType,
Expand All @@ -87,8 +78,7 @@ func (t *task) run() {
panic(err)
}

t.log("Started: " + strconv.Itoa(t.task.ID))
t.log("Run task with template: " + t.template.Alias + "\n")
t.log("Prepare task with template: " + t.template.Alias + "\n")

if err := t.installKey(t.repository.SshKey); err != nil {
t.log("Failed installing ssh key for repository access: " + err.Error())
Expand All @@ -110,6 +100,62 @@ func (t *task) run() {

// todo: write environment

if err := t.listPlaybookHosts(); err != nil {
t.log("Listing playbook hosts failed: " + err.Error())
t.fail()
return
}

t.prepared = true
}

func (t *task) run() {

defer func() {
fmt.Println("Stopped running tasks")
resourceLocker <- &resourceLock{lock: false, holder: t,}

now := time.Now()
t.task.End = &now
t.updateStatus()

objType := "task"
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " finished - " + strings.ToUpper(t.task.Status)
if err := (db.Event{
ProjectID: &t.projectID,
ObjectType: &objType,
ObjectID: &t.task.ID,
Description: &desc,
}.Insert()); err != nil {
t.log("Fatal error inserting an event")
panic(err)
}
}()

{
fmt.Println(t.users)
now := time.Now()
t.task.Status = "running"
t.task.Start = &now

t.updateStatus()
}

objType := "task"
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " is running"
if err := (db.Event{
ProjectID: &t.projectID,
ObjectType: &objType,
ObjectID: &t.task.ID,
Description: &desc,
}.Insert()); err != nil {
t.log("Fatal error inserting an event")
panic(err)
}

t.log("Started: " + strconv.Itoa(t.task.ID))
t.log("Run task with template: " + t.template.Alias + "\n")

if err := t.runGalaxy(); err != nil {
t.log("Running galaxy failed: " + err.Error())
t.fail()
Expand Down Expand Up @@ -296,7 +342,42 @@ func (t *task) runGalaxy() error {
return cmd.Run()
}

func (t *task) listPlaybookHosts() error {
args, err := t.getPlaybookArgs()
if err != nil {
return err
}
args = append(args, "--list-hosts")

cmd := exec.Command("ansible-playbook", args...)
cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID)
cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil)

out, err := cmd.Output()
re := regexp.MustCompile("(?m)^\\s{6}(.*)$")
matches := re.FindAllSubmatch(out, 20)
hosts := make([]string, len(matches))
for i, _ := range matches {
hosts[i] = string(matches[i][1])
}
t.hosts = hosts
return err
}

func (t *task) runPlaybook() error {
args, err := t.getPlaybookArgs()
if err != nil {
return err
}
cmd := exec.Command("ansible-playbook", args...)
cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID)
cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil)

t.logCmd(cmd)
return cmd.Run()
}

func (t *task) getPlaybookArgs() ([]string, error) {
playbookName := t.task.Playbook
if len(playbookName) == 0 {
playbookName = t.template.Playbook
Expand All @@ -323,7 +404,7 @@ func (t *task) runPlaybook() error {
err := json.Unmarshal([]byte(t.environment.JSON), &js)
if err != nil {
t.log("JSON is not valid")
return err
return nil, err
}

args = append(args, "--extra-vars", t.environment.JSON)
Expand All @@ -334,7 +415,7 @@ func (t *task) runPlaybook() error {
err := json.Unmarshal([]byte(*t.template.Arguments), &extraArgs)
if err != nil {
t.log("Could not unmarshal arguments to []string")
return err
return nil, err
}
}

Expand All @@ -344,13 +425,7 @@ func (t *task) runPlaybook() error {
args = append(args, extraArgs...)
args = append(args, playbookName)
}

cmd := exec.Command("ansible-playbook", args...)
cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID)
cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil)

t.logCmd(cmd)
return cmd.Run()
return args, nil
}

func (t *task) envVars(home string, pwd string, gitSSHCommand *string) []string {
Expand Down
8 changes: 8 additions & 0 deletions util/config.go
Expand Up @@ -69,6 +69,10 @@ type configType struct {
TelegramAlert bool `json:"telegram_alert"`
TelegramChat string `json:"telegram_chat"`
TelegramToken string `json:"telegram_token"`

// task concurrency
ConcurrencyMode string `json:"concurrency_mode"`
MaxParallelTasks int `json:"max_parallel_tasks"`
}

var Config *configType
Expand Down Expand Up @@ -151,6 +155,10 @@ func init() {
Config.TmpPath = "/tmp/semaphore"
}

if Config.MaxParallelTasks < 1 {
Config.MaxParallelTasks = 10
}

var encryption []byte
encryption = nil

Expand Down

0 comments on commit 8bc1b7f

Please sign in to comment.