Skip to content

Commit

Permalink
Sequential flow replication
Browse files Browse the repository at this point in the history
For sequential flow, each next replica is attached to the leafs
of previous one so that they will be deployed sequentially
  • Loading branch information
Stan Lagun committed Jun 13, 2017
1 parent 9ed8be6 commit 4a8d7b0
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 42 deletions.
9 changes: 9 additions & 0 deletions docs/flows.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ destruction:

replicaSpace: optional-name
exported: true
sequential: true

parameters:
parameterName1:
Expand Down Expand Up @@ -351,6 +352,14 @@ another flow will "see" only its own replicas so the `Flow` resource can always
However, when the flow is run independently, it will not have any context and thus query replicas based on
replica-space alone, which means it will get all the replicas from all contexts.

### Sequential flows

By default, if flow has more than one replica, generated dependency graph would have each replica subgraph attached
to the graph root vertex (the `Flow` vertex). When deployed, resources of all replicas are going to be created in
parallel. However, in some cases it is desired that replicas be deployed sequentially, one by one. This can be achieved
by setting `sequential` attribute of the `Flow` to `true`. For sequential flows each replica roots get attached to the
leaf vertices of previous one.

## Scheduling flow deployments

When user runs `kubeac run something` the deployment does not happen immediately (unless there is also a `--deploy`
Expand Down
3 changes: 1 addition & 2 deletions examples/etcd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ If omitted, `etcd` name is used by default.
`kubectl exec k8s-appcontroller kubeac run etcd-scale -n +1 --arg clusterName=my-cluster`

`-n +1` - adds one node to the cluster. Use `-n -1` to scale the cluster down by one node. In this case the last
added node is going to be deleted. At the moment it is only possible to scale cluster up by one node at a time.
However, any number of nodes can be removed. Note, that this can also remove nodes created upon initial deployment.
added node is going to be deleted. This flow can also remove nodes created upon initial deployment.

`--arg clusterName=my-cluster` - name of the cluster to scale (`etcd` if not specified).

Expand Down
2 changes: 2 additions & 0 deletions examples/etcd/resdefs/scale-flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ metadata:
name: etcd-scale

exported: true
sequential: true

construction:
flow: etcd-scale
destruction:
Expand Down
3 changes: 3 additions & 0 deletions pkg/client/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type Flow struct {
// can only be triggered by other flows (including DEFAULT flow which is exported by-default)
Exported bool `json:"exported,omitempty"`

// Flow replicas must be deployed sequentially, one by one
Sequential bool `json:"sequential,omitempty"`

// Parameters that the flow can accept (i.e. valid inputs for the flow)
Parameters map[string]FlowParameter `json:"parameters,omitempty"`

Expand Down
13 changes: 3 additions & 10 deletions pkg/resources/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,13 @@ func (f *flow) buildDependencyGraph(replicaCount int, silent bool) (interfaces.D
args[arg] = val
}
}
fixedNumberOfReplicas := false
if replicaCount > 0 {
fixedNumberOfReplicas = f.context.Graph().Options().FixedNumberOfReplicas
} else if replicaCount == 0 {
fixedNumberOfReplicas = true
replicaCount = -1
}
options := interfaces.DependencyGraphOptions{
FlowName: f.originalName,
Args: args,
FlowInstanceName: f.context.GetArg("AC_ID"),
ReplicaCount: replicaCount,
Silent: silent,
FixedNumberOfReplicas: fixedNumberOfReplicas,
FixedNumberOfReplicas: true,
}

graph, err := f.context.Scheduler().BuildDependencyGraph(options)
Expand Down Expand Up @@ -131,7 +124,7 @@ func (f *flow) Create() error {
// Delete is called during dlow destruction which can happen only once while Create ensures that at least one flow
// replica exists, and as such can be called any number of times
func (f flow) Delete() error {
graph, err := f.buildDependencyGraph(-1, false)
graph, err := f.buildDependencyGraph(0, false)
if err != nil {
return err
}
Expand All @@ -145,7 +138,7 @@ func (f flow) Status(meta map[string]string) (interfaces.ResourceStatus, error)
graph := f.currentGraph
if graph == nil {
var err error
graph, err = f.buildDependencyGraph(0, true)
graph, err = f.buildDependencyGraph(-1, true)
if err != nil {
return interfaces.ResourceError, err
}
Expand Down
167 changes: 137 additions & 30 deletions pkg/scheduler/dependency_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,27 +769,29 @@ func expandListExpression(expr string) []string {
return result
}

type interimGraphVertex struct {
dependency client.Dependency
scheduledResource *ScheduledResource
parentContext *graphContext
}

func (sched *scheduler) fillDependencyGraph(rootContext *graphContext,
resDefs map[string]client.ResourceDefinition,
dependencies map[string][]client.Dependency,
flow *client.Flow, replicas []client.Replica, useDestructionSelector bool) error {

type Block struct {
dependency client.Dependency
scheduledResource *ScheduledResource
parentContext *graphContext
}
blocks := map[string][]*Block{}
var vertices [][]interimGraphVertex
silent := rootContext.graph.Options().Silent

for _, replica := range replicas {
var replicaVertices []interimGraphVertex
replicaName := replica.ReplicaName()
replicaContext := sched.prepareContext(rootContext, nil, replicaName)
queue := list.New()
queue.PushFront(&Block{dependency: client.Dependency{Child: "flow/" + flow.Name}})
queue.PushFront(interimGraphVertex{dependency: client.Dependency{Child: "flow/" + flow.Name}})

for e := queue.Front(); e != nil; e = e.Next() {
parent := e.Value.(*Block)
parent := e.Value.(interimGraphVertex)

deps := listDependencies(dependencies, parent.dependency.Child, flow, useDestructionSelector, replicaContext)

Expand All @@ -815,44 +817,149 @@ func (sched *scheduler) fillDependencyGraph(rootContext *graphContext,
}
sr.usedInReplicas = []string{replicaName}

block := &Block{
vertex := interimGraphVertex{
scheduledResource: sr,
dependency: dep,
parentContext: parentContext,
}

blocks[dep.Child] = append(blocks[dep.Child], block)
replicaVertices = append(replicaVertices, vertex)

if parent.scheduledResource != nil {
sr.Requires = append(sr.Requires, parent.scheduledResource.Key())
parent.scheduledResource.RequiredBy = append(parent.scheduledResource.RequiredBy, sr.Key())
sr.Meta[parent.dependency.Child] = dep.Meta
}
queue.PushBack(block)
queue.PushBack(vertex)
}
}
for _, block := range blocks {
for _, entry := range block {
key := entry.scheduledResource.Key()
existingSr := rootContext.graph.graph[key]
if existingSr == nil {
if !silent {
log.Printf("Adding resource %s to the dependency graph flow %s", key, flow.Name)
}
rootContext.graph.graph[key] = entry.scheduledResource
} else {
sched.updateContext(existingSr.context, entry.parentContext, entry.dependency)
existingSr.Requires = append(existingSr.Requires, entry.scheduledResource.Requires...)
existingSr.RequiredBy = append(existingSr.RequiredBy, entry.scheduledResource.RequiredBy...)
existingSr.usedInReplicas = append(existingSr.usedInReplicas, entry.scheduledResource.usedInReplicas...)
for metaKey, metaValue := range entry.scheduledResource.Meta {
existingSr.Meta[metaKey] = metaValue
}
vertices = append(vertices, replicaVertices)
}

if flow.Sequential {
sched.concatenateReplicas(vertices, rootContext, rootContext.graph.Options())
} else {
sched.mergeReplicas(vertices, rootContext, rootContext.graph.Options())
}
return nil
}

func (sched *scheduler) mergeReplicas(vertices [][]interimGraphVertex, gc *graphContext,
options interfaces.DependencyGraphOptions) {

for _, replicaVertices := range vertices {
sched.mergeInterimGraphVertices(replicaVertices, gc.graph.graph, options)
}
}

func (sched *scheduler) concatenateReplicas(vertices [][]interimGraphVertex, gc *graphContext,
options interfaces.DependencyGraphOptions) {
graph := gc.graph.graph
var previousReplicaGraph map[string]*ScheduledResource
for i, replicaVertices := range vertices {
replicaGraph := map[string]*ScheduledResource{}
sched.mergeInterimGraphVertices(replicaVertices, replicaGraph, options)

if i > 0 {
correctDuplicateResources(graph, replicaGraph, i)

for _, leafName := range getLeafs(previousReplicaGraph) {
for _, rootName := range getRoots(replicaGraph) {
root := replicaGraph[rootName]
leaf := previousReplicaGraph[leafName]
root.Requires = append(root.Requires, leafName)
leaf.RequiredBy = append(leaf.RequiredBy, rootName)
}
}
}
previousReplicaGraph = replicaGraph
for key, value := range replicaGraph {
graph[key] = value
}
}
}

func correctDuplicateResources(existingGraph, newGraph map[string]*ScheduledResource, index int) {
toReplace := map[string]*ScheduledResource{}
for key, sr := range newGraph {
if existingGraph[key] != nil {
toReplace[key] = sr
}
}
for key, sr := range toReplace {
sr.context.id = existingGraph[key].context.id
j := index + 1
suffix := sr.suffix
for {
sr.suffix = fmt.Sprintf("%s #%d", suffix, j)
if existingGraph[sr.Key()] == nil {
break
}
j++
}
for _, rKey := range sr.RequiredBy {
requires := newGraph[rKey].Requires
for i, rKey2 := range requires {
if rKey2 == key {
requires[i] = sr.Key()
break
}
}
}
for _, rKey := range sr.Requires {
requiredBy := newGraph[rKey].RequiredBy
for i, rKey2 := range requiredBy {
if rKey2 == key {
requiredBy[i] = sr.Key()
break
}
}
}
delete(newGraph, key)
newGraph[sr.Key()] = sr
}
}

func getRoots(graph map[string]*ScheduledResource) []string {
var result []string
for key, sr := range graph {
if len(sr.Requires) == 0 {
result = append(result, key)
}
}
return result
}

func getLeafs(graph map[string]*ScheduledResource) []string {
var result []string
for key, sr := range graph {
if len(sr.RequiredBy) == 0 {
result = append(result, key)
}
}
return result
}

func (sched *scheduler) mergeInterimGraphVertices(vertices []interimGraphVertex, graph map[string]*ScheduledResource,
options interfaces.DependencyGraphOptions) {

for _, entry := range vertices {
key := entry.scheduledResource.Key()
existingSr := graph[key]
if existingSr == nil {
if !options.Silent {
log.Printf("Adding resource %s to the dependency graph flow %s", key, options.FlowName)
}
graph[key] = entry.scheduledResource
} else {
sched.updateContext(existingSr.context, entry.parentContext, entry.dependency)
existingSr.Requires = append(existingSr.Requires, entry.scheduledResource.Requires...)
existingSr.RequiredBy = append(existingSr.RequiredBy, entry.scheduledResource.RequiredBy...)
existingSr.usedInReplicas = append(existingSr.usedInReplicas, entry.scheduledResource.usedInReplicas...)
for metaKey, metaValue := range entry.scheduledResource.Meta {
existingSr.Meta[metaKey] = metaValue
}
}
}
return nil
}

// getResourceDestructors builds a list of functions, each of them delete one of replica resources
Expand Down
82 changes: 82 additions & 0 deletions pkg/scheduler/flows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1466,3 +1466,85 @@ func TestDynamicDependencyReplication(t *testing.T) {

ensureReplicas(c, t, 7, 1)
}

// TestSequentialReplication tests that resources of sequentially replicated flows create in right order
func TestSequentialReplication(t *testing.T) {
replicaCount := 3
flow := mocks.MakeFlow("test")
flow.Flow.Sequential = true

c, fake := mocks.NewClientWithFake(
flow,
mocks.MakeResourceDefinition("pod/ready-$AC_NAME"),
mocks.MakeResourceDefinition("secret/secret"),
mocks.MakeResourceDefinition("job/ready-$AC_NAME"),
mocks.MakeDependency("flow/test", "pod/ready-$AC_NAME", "flow=test"),
mocks.MakeDependency("pod/ready-$AC_NAME", "secret/secret", "flow=test"),
mocks.MakeDependency("secret/secret", "job/ready-$AC_NAME", "flow=test"),
)

stopChan := make(chan struct{})
var deployed []string
fake.PrependReactor("create", "*",
func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
resource := action.GetResource().Resource
if resource != "replica" {
deployed = append(deployed, resource)
}

return false, nil, nil
})

depGraph, err := New(c, nil, 0).BuildDependencyGraph(
interfaces.DependencyGraphOptions{ReplicaCount: replicaCount, FlowName: "test"})
if err != nil {
t.Fatal(err)
}

graph := depGraph.(*dependencyGraph).graph
if len(graph) != 3*replicaCount {
t.Error("wrong dependency graph length")
}

depGraph.Deploy(stopChan)
expected := []string{"pods", "secrets", "jobs", "pods", "jobs", "pods", "jobs"}
if len(deployed) != len(expected) {
t.Fatal("invalid resource sequence", deployed)
}
for i, r := range deployed {
if expected[i] != r {
t.Fatal("invalid resource sequence")
}
}

ensureReplicas(c, t, replicaCount, replicaCount)
}

// TestSequentialReplicationWithSharedFlow tests that flow consumed as a resource shared by replicas of
// sequentially replicated flow deployed only once
func TestSequentialReplicationWithSharedFlow(t *testing.T) {
replicaCount := 3
flow := mocks.MakeFlow("outer")
flow.Flow.Sequential = true

c := mocks.NewClient(
flow,
mocks.MakeFlow("inner"),
mocks.MakeResourceDefinition("job/ready-a$AC_NAME"),
mocks.MakeResourceDefinition("job/ready-b$AC_NAME"),
mocks.MakeDependency("flow/outer", "flow/inner", "flow=outer"),
mocks.MakeDependency("flow/inner", "job/ready-a$AC_NAME", "flow=outer"),
mocks.MakeDependency("flow/inner", "job/ready-b$AC_NAME", "flow=inner"),
)

stopChan := make(chan struct{})

depGraph, err := New(c, nil, 0).BuildDependencyGraph(
interfaces.DependencyGraphOptions{ReplicaCount: replicaCount, FlowName: "outer"})
if err != nil {
t.Fatal(err)
}

depGraph.Deploy(stopChan)
ensureReplicas(c, t, replicaCount+1, replicaCount+1)
}

0 comments on commit 4a8d7b0

Please sign in to comment.