Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/shell-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func main() {

// Be a good parent - clean up after the child processes
// in case if shell-operator is a PID1.
executor.ReapLocked = false
go executor.Reap()

defaultOperator := shell_operator.DefaultOperator()
Expand Down
11 changes: 11 additions & 0 deletions pkg/executor/zombie_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type Config struct {

var reapLogEntry = log.WithField("operator.component", "zombieReaper")

// Reap orphaned processes after unlocking
var ReapLocked = true

// Handle death of child (SIGCHLD) messages. Pushes the signal onto the
// notifications channel if there is a waiter.
func sigChildHandler(notifications chan os.Signal) {
Expand Down Expand Up @@ -64,9 +67,17 @@ func reapChildren(config Config) {

pid := config.Pid

// TODO RWMutex is not appropriate here: call to Lock blocks subsequent RLock calls. Reaper should wait for orpahs on 'zero locks' event similar to WaitGroup but without Wait limitations

// TODO may be allow syscall.Wait only after some condition is met (for example, after all Synchronization are done or after all converge tasks are done)?

for {
<-notifications

if ReapLocked {
continue
}

// Lock until all exec.Cmd are stopped.
ExecutorLock.Lock()

Expand Down
2 changes: 2 additions & 0 deletions pkg/hook/config/schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ properties:
type: boolean
executeHookOnSynchronization:
type: boolean
waitForSynchronization:
type: boolean
resynchronizationPeriod:
type: string
nameSelector:
Expand Down
15 changes: 8 additions & 7 deletions pkg/hook/controller/hook_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import (
)

type BindingExecutionInfo struct {
BindingContext []BindingContext
IncludeSnapshots []string
IncludeAllSnapshots bool
AllowFailure bool
QueueName string
Binding string
Group string
BindingContext []BindingContext
IncludeSnapshots []string
IncludeAllSnapshots bool
AllowFailure bool
QueueName string
Binding string
Group string
WaitForSynchronization bool
}

// В каждый хук надо будет положить этот объект.
Expand Down
40 changes: 22 additions & 18 deletions pkg/hook/controller/kubernetes_bindings_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ import (
)

// A link between a hook and a kube monitor
// TODO replace "Useful fields" with OnKubernetesEventConfig
type KubernetesBindingToMonitorLink struct {
BindingName string
MonitorId string
// Useful fields to create a BindingContext
IncludeSnapshots []string
AllowFailure bool
JqFilter string
QueueName string
Group string
IncludeSnapshots []string
AllowFailure bool
JqFilter string
QueueName string
Group string
WaitForSynchronization bool
}

// KubernetesBindingsController handles kubernetes bindings for one hook.
Expand Down Expand Up @@ -79,13 +81,14 @@ func (c *kubernetesBindingsController) EnableKubernetesBindings() ([]BindingExec
return nil, fmt.Errorf("run monitor: %s", err)
}
c.BindingMonitorLinks[config.Monitor.Metadata.MonitorId] = &KubernetesBindingToMonitorLink{
MonitorId: config.Monitor.Metadata.MonitorId,
BindingName: config.BindingName,
IncludeSnapshots: config.IncludeSnapshotsFrom,
AllowFailure: config.AllowFailure,
JqFilter: config.Monitor.JqFilter,
QueueName: config.Queue,
Group: config.Group,
MonitorId: config.Monitor.Metadata.MonitorId,
BindingName: config.BindingName,
IncludeSnapshots: config.IncludeSnapshotsFrom,
AllowFailure: config.AllowFailure,
JqFilter: config.Monitor.JqFilter,
QueueName: config.Queue,
Group: config.Group,
WaitForSynchronization: config.WaitForSynchronization,
}

// There is no Synchronization event for 'v0' binding configuration.
Expand Down Expand Up @@ -144,12 +147,13 @@ func (c *kubernetesBindingsController) HandleEvent(kubeEvent KubeEvent) BindingE
bindingContext := ConvertKubeEventToBindingContext(kubeEvent, link)

return BindingExecutionInfo{
BindingContext: bindingContext,
IncludeSnapshots: link.IncludeSnapshots,
AllowFailure: link.AllowFailure,
QueueName: link.QueueName,
Binding: link.BindingName,
Group: link.Group,
BindingContext: bindingContext,
IncludeSnapshots: link.IncludeSnapshots,
AllowFailure: link.AllowFailure,
QueueName: link.QueueName,
Binding: link.BindingName,
Group: link.Group,
WaitForSynchronization: link.WaitForSynchronization,
}
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/hook/hook_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type OnKubernetesEventConfigV1 struct {
WatchEventTypes []WatchEventType `json:"watchEvent,omitempty"`
ExecuteHookOnEvents []WatchEventType `json:"executeHookOnEvent,omitempty"`
ExecuteHookOnSynchronization string `json:"executeHookOnSynchronization,omitempty"`
WaitForSynchronization string `json:"waitForSynchronization,omitempty"`
Mode KubeEventMode `json:"mode,omitempty"`
ApiVersion string `json:"apiVersion,omitempty"`
Kind string `json:"kind,omitempty"`
Expand Down Expand Up @@ -310,6 +311,13 @@ func (c *HookConfig) ConvertAndCheckV1() (err error) {
kubeConfig.ExecuteHookOnSynchronization = true
}

// Disable WaitForSynchronization makes sense only for named queues.
if kubeCfg.WaitForSynchronization == "false" && kubeCfg.Queue != "" {
kubeConfig.WaitForSynchronization = false
} else {
kubeConfig.WaitForSynchronization = true
}

c.OnKubernetesEvents = append(c.OnKubernetesEvents, kubeConfig)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/hook/types/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ type OnKubernetesEventConfig struct {
Queue string
Group string
ExecuteHookOnSynchronization bool
WaitForSynchronization bool
}
17 changes: 12 additions & 5 deletions pkg/shell-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (op *ShellOperator) TaskHandler(t task.Task) queue.TaskResult {

taskHook := op.HookManager.GetHook(hookMeta.HookName)
if taskHook.Config.Version == "v1" {
bcs := op.CombineBindingContextForHook(op.TaskQueues.GetByName(t.GetQueueName()), t)
bcs := op.CombineBindingContextForHook(op.TaskQueues.GetByName(t.GetQueueName()), t, nil)
if bcs != nil {
hookMeta.BindingContext = bcs
t.UpdateMetadata(hookMeta)
Expand Down Expand Up @@ -389,7 +389,7 @@ func (op *ShellOperator) TaskHandler(t task.Task) queue.TaskResult {
// Also, compacts sequences of binding contexts with similar group.
// If input task has no metadata, result will be nil.
// Metadata should implement HookNameAccessor and BindingContextAccessor interfaces.
func (op *ShellOperator) CombineBindingContextForHook(q *queue.TaskQueue, t task.Task) []BindingContext {
func (op *ShellOperator) CombineBindingContextForHook(q *queue.TaskQueue, t task.Task, stopCombineFn func(tsk task.Task) bool) []BindingContext {
if q == nil {
return nil
}
Expand All @@ -416,11 +416,18 @@ func (op *ShellOperator) CombineBindingContextForHook(q *queue.TaskQueue, t task
return
}
nextHookName := hm.(HookNameAccessor).GetHookName()
if nextHookName != hookName || t.GetType() != tsk.GetType() {
// Only tasks for the same hook and of the same type can be combined.
// Using stopCombineFn function more stricter combine rules can be defined.
if nextHookName == hookName && t.GetType() == tsk.GetType() {
if stopCombineFn != nil {
stopIterate = stopCombineFn(tsk)
}
} else {
stopIterate = true
return
}
otherTasks = append(otherTasks, tsk)
if !stopIterate {
otherTasks = append(otherTasks, tsk)
}
})

// no tasks found to combine
Expand Down
8 changes: 4 additions & 4 deletions pkg/shell-operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func Test_CombineBindingContext_MultipleHooks(t *testing.T) {
op.TaskQueues.GetByName("test_multiple_hooks").AddLast(tsk)
}

bcs := op.CombineBindingContextForHook(op.TaskQueues.GetByName("test_multiple_hooks"), currTask)
bcs := op.CombineBindingContextForHook(op.TaskQueues.GetByName("test_multiple_hooks"), currTask, nil)
g.Expect(bcs).Should(HaveLen(4))
}

Expand Down Expand Up @@ -172,7 +172,7 @@ func Test_CombineBindingContext_OneHook(t *testing.T) {
op.TaskQueues.GetByName("test_one_hook").AddLast(tsk)
}

bcs := op.CombineBindingContextForHook(op.TaskQueues.GetByName("test_one_hook"), currTask)
bcs := op.CombineBindingContextForHook(op.TaskQueues.GetByName("test_one_hook"), currTask, nil)
g.Expect(bcs).Should(BeNil())
}

Expand Down Expand Up @@ -284,7 +284,7 @@ func Test_CombineBindingContext_Group(t *testing.T) {
op.TaskQueues.GetByName("test_multiple_hooks").AddLast(tsk)
}

bcList := op.CombineBindingContextForHook(op.TaskQueues.GetByName("test_multiple_hooks"), currTask)
bcList := op.CombineBindingContextForHook(op.TaskQueues.GetByName("test_multiple_hooks"), currTask, nil)
g.Expect(bcList).Should(HaveLen(2))
}

Expand Down Expand Up @@ -443,7 +443,7 @@ func Test_CombineBindingContext_Group_Type(t *testing.T) {
op.TaskQueues.GetByName("test_multiple_hooks").AddLast(tsk)
}

bcList := op.CombineBindingContextForHook(op.TaskQueues.GetByName("test_multiple_hooks"), currTask)
bcList := op.CombineBindingContextForHook(op.TaskQueues.GetByName("test_multiple_hooks"), currTask, nil)

g.Expect(bcList).Should(HaveLen(5))

Expand Down
10 changes: 10 additions & 0 deletions pkg/task/queue/queue_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,13 @@ func (tqs *TaskQueueSet) Iterate(doFn func(queue *TaskQueue)) {
}
}
}

func (tqs *TaskQueueSet) Remove(name string) {
ts, exists := tqs.Queues[name]
if exists {
ts.Stop()
}
tqs.m.Lock()
defer tqs.m.Unlock()
delete(tqs.Queues, name)
}
73 changes: 9 additions & 64 deletions pkg/task/queue/task_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ config parameter.
var (
DelayOnQueueIsEmpty = 3 * time.Second
DelayOnFailedTask = 5 * time.Second
DelayOnRepeat = 25 * time.Millisecond
)

type QueueWatcher interface {
Expand Down Expand Up @@ -92,7 +93,6 @@ func (q *TaskQueue) AddFirst(t task.Task) {
q.m.Lock()
q.items = append([]task.Task{t}, q.items...)
q.m.Unlock()
q.queueChanged()
}

// RemoveFirst deletes a head element, so head is moved.
Expand All @@ -105,7 +105,6 @@ func (q *TaskQueue) RemoveFirst() (t task.Task) {
t = q.items[0]
q.items = q.items[1:]
q.m.Unlock()
q.queueChanged()
return t
}

Expand All @@ -124,7 +123,6 @@ func (q *TaskQueue) AddLast(task task.Task) {
q.m.Lock()
q.items = append(q.items, task)
q.m.Unlock()
q.queueChanged()
}

// RemoveLast deletes a tail element, so tail is moved.
Expand All @@ -141,7 +139,6 @@ func (q *TaskQueue) RemoveLast() (t task.Task) {
q.items = q.items[:len(q.items)-1]
}
q.m.Unlock()
q.queueChanged()
return t
}

Expand All @@ -167,7 +164,7 @@ func (q *TaskQueue) Get(id string) task.Task {
return nil
}

//
// AddAfter inserts a task after the task with specified id.
func (q *TaskQueue) AddAfter(id string, newTask task.Task) {
newItems := make([]task.Task, len(q.items)+1)

Expand All @@ -190,7 +187,7 @@ func (q *TaskQueue) AddAfter(id string, newTask task.Task) {
q.items = newItems
}

//
// AddBefore inserts a task before the task with specified id.
func (q *TaskQueue) AddBefore(id string, newTask task.Task) {
newItems := make([]task.Task, len(q.items)+1)

Expand All @@ -216,10 +213,6 @@ func (q *TaskQueue) AddBefore(id string, newTask task.Task) {
q.items = newItems
}

//func (q *TaskQueue) AddBefore(id string, newTask task.Task) {
// return
//}

// Remove finds element by id and deletes it.
func (q *TaskQueue) Remove(id string) (t task.Task) {
q.m.Lock()
Expand Down Expand Up @@ -319,7 +312,12 @@ func (q *TaskQueue) Start() {
}
})
q.Status = ""
case "Repeat":
// repeat a current task after a small delay
nextSleepDelay = DelayOnRepeat
q.Status = "repeat head task"
}

if taskRes.DelayBeforeNextTask != 0 {
nextSleepDelay = taskRes.DelayBeforeNextTask
q.Status = fmt.Sprintf("sleep for %s", nextSleepDelay.String())
Expand Down Expand Up @@ -414,60 +412,7 @@ func (q *TaskQueue) Filter(filterFn func(task.Task) bool) {
q.items = newItems
}

// Watcher functions

// AddWatcher adds queue watcher.
func (q *TaskQueue) AddWatcher(queueWatcher QueueWatcher) {
q.queueWatchers = append(q.queueWatchers, queueWatcher)
}

// queueChanged must be called every time the queue is changed.
func (q *TaskQueue) queueChanged() {
// TODO
// if q.changesSuspended { return }

if len(q.queueWatchers) == 0 {
return
}

if q.changesEnabled {
for _, watcher := range q.queueWatchers {
watcher.QueueChangeCallback()
}
} else {
q.changesCount++
}
// TODO
// Send changes signal asynchronously.
// go func(){
// for _, watcher := range q.watchers {
// watcher.Ch() <- true
// }
// }()
}

// TODO
//func (q *TaskQueue) DoWithSuspendedWatchers(action func (Task)) {
//
//
//}

// Включить вызов QueueChangeCallback при каждом изменении
// В паре с ChangesDisabled могут быть использованы, чтобы
// производить массовые изменения. Если runCallbackOnPreviousChanges true,
// то будет вызвана QueueChangeCallback
func (q *TaskQueue) ChangesEnable(runCallbackOnPreviousChanges bool) {
q.changesEnabled = true
if runCallbackOnPreviousChanges && q.changesCount > 0 {
q.changesCount = 0
q.queueChanged()
}
}

func (q *TaskQueue) ChangesDisable() {
q.changesEnabled = false
q.changesCount = 0
}
// TODO define mapping method with QueueAction to insert, modify and delete tasks.

// Dump tasks in queue to one line
func (q *TaskQueue) String() string {
Expand Down