diff --git a/internal/pkg/deploy/pipeline.go b/internal/pkg/deploy/pipeline.go index 9691da387bb..9f6ce2684ac 100644 --- a/internal/pkg/deploy/pipeline.go +++ b/internal/pkg/deploy/pipeline.go @@ -562,9 +562,9 @@ func (stg *PipelineStage) Deployments() ([]DeployAction, error) { prevActions = append(prevActions, approval) } - topo, err := graph.TopologicalSort(stg.buildDeploymentsGraph()) + topo, err := graph.TopologicalOrder(stg.buildDeploymentsGraph()) if err != nil { - return nil, fmt.Errorf("topological sort deployments: %v", err) + return nil, fmt.Errorf("find an ordering for deployments: %v", err) } var actions []DeployAction @@ -636,7 +636,7 @@ func (a *ManualApprovalAction) Name() string { } type ranker interface { - Rank(name string) (int, error) + Rank(name string) (int, bool) } // DeployAction represents a CodePipeline action of category "Deploy" for a cloudformation stack. @@ -658,7 +658,7 @@ func (a *DeployAction) Name() string { // StackName returns the name of the workload stack to create or update. func (a *DeployAction) StackName() string { - if a.override != nil { + if a.override != nil && a.override.StackName != "" { return a.override.StackName } return fmt.Sprintf("%s-%s-%s", a.appName, a.envName, a.name) @@ -666,7 +666,7 @@ func (a *DeployAction) StackName() string { // TemplatePath returns the path of the CloudFormation template file generated during the build phase. func (a *DeployAction) TemplatePath() string { - if a.override != nil { + if a.override != nil && a.override.TemplatePath != "" { return a.override.TemplatePath } @@ -676,7 +676,7 @@ func (a *DeployAction) TemplatePath() string { // TemplateConfigPath returns the path of the CloudFormation template config file generated during the build phase. func (a *DeployAction) TemplateConfigPath() string { - if a.override != nil { + if a.override != nil && a.override.TemplateConfig != "" { return a.override.TemplateConfig } diff --git a/internal/pkg/deploy/pipeline_test.go b/internal/pkg/deploy/pipeline_test.go index 47f892cd795..5c7776f79af 100644 --- a/internal/pkg/deploy/pipeline_test.go +++ b/internal/pkg/deploy/pipeline_test.go @@ -353,6 +353,78 @@ func TestPipelineStage_Init(t *testing.T) { }) } +func TestPipelineStage_Deployments(t *testing.T) { + testCases := map[string]struct { + stg *PipelineStage + + wantedRunOrder map[string]int + wantedErr error + }{ + "should return an error when the deployments contain a cycle": { + stg: func() *PipelineStage { + // Create a pipeline with a self-depending deployment. + var stg PipelineStage + stg.Init(&config.Environment{Name: "test"}, &manifest.PipelineStage{ + Name: "test", + Deployments: map[string]*manifest.Deployment{ + "api": { + DependsOn: []string{"api"}, + }, + }, + }, nil) + + return &stg + }(), + wantedErr: errors.New("find an ordering for deployments: graph contains a cycle: api"), + }, + "should return the expected run orders": { + stg: func() *PipelineStage { + // Create a pipeline with a manual approval and 4 deployments. + var stg PipelineStage + stg.Init(&config.Environment{Name: "test"}, &manifest.PipelineStage{ + Name: "test", + RequiresApproval: true, + Deployments: map[string]*manifest.Deployment{ + "frontend": { + DependsOn: []string{"orders", "payments"}, + }, + "orders": { + DependsOn: []string{"warehouse"}, + }, + "payments": nil, + "warehouse": nil, + }, + }, nil) + + return &stg + }(), + wantedRunOrder: map[string]int{ + "CreateOrUpdate-frontend-test": 4, + "CreateOrUpdate-orders-test": 3, + "CreateOrUpdate-payments-test": 2, + "CreateOrUpdate-warehouse-test": 2, + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + deployments, err := tc.stg.Deployments() + + if tc.wantedErr != nil { + require.EqualError(t, err, tc.wantedErr.Error()) + } else { + require.NoError(t, err) + for _, deployment := range deployments { + wanted, ok := tc.wantedRunOrder[deployment.Name()] + require.True(t, ok, "expected deployment named %s to be created", deployment.Name()) + require.Equal(t, wanted, deployment.RunOrder(), "order for deployment %s does not match", deployment.Name()) + } + } + }) + } +} + type mockAction struct { order int } @@ -498,11 +570,11 @@ func TestDeployAction_TemplateConfigPath(t *testing.T) { type mockRanker struct { rank int - err error + ok bool } -func (m mockRanker) Rank(name string) (int, error) { - return m.rank, m.err +func (m mockRanker) Rank(name string) (int, bool) { + return m.rank, m.ok } func TestDeployAction_RunOrder(t *testing.T) { diff --git a/internal/pkg/graph/errors.go b/internal/pkg/graph/errors.go new file mode 100644 index 00000000000..5637a37f644 --- /dev/null +++ b/internal/pkg/graph/errors.go @@ -0,0 +1,21 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package graph + +import ( + "fmt" + "strings" +) + +type errCycle[V comparable] struct { + vertices []V +} + +func (e *errCycle[V]) Error() string { + ss := make([]string, len(e.vertices)) + for i, v := range e.vertices { + ss[i] = fmt.Sprintf("%v", v) + } + return fmt.Sprintf("graph contains a cycle: %s", strings.Join(ss, ", ")) +} diff --git a/internal/pkg/graph/graph.go b/internal/pkg/graph/graph.go index e6af52de685..9f94c3ebcdd 100644 --- a/internal/pkg/graph/graph.go +++ b/internal/pkg/graph/graph.go @@ -15,7 +15,8 @@ const ( // Graph represents a directed graph. type Graph[V comparable] struct { - vertices map[V]neighbors[V] + vertices map[V]neighbors[V] // Adjacency list for each vertex. + inDegrees map[V]int // Number of incoming edges for each vertex. } // Edge represents one edge of a directed graph. @@ -28,24 +29,65 @@ type neighbors[V comparable] map[V]bool // New initiates a new Graph. func New[V comparable](vertices ...V) *Graph[V] { - m := make(map[V]neighbors[V]) + adj := make(map[V]neighbors[V]) + inDegrees := make(map[V]int) for _, vertex := range vertices { - m[vertex] = make(neighbors[V]) + adj[vertex] = make(neighbors[V]) + inDegrees[vertex] = 0 } return &Graph[V]{ - vertices: m, + vertices: adj, + inDegrees: inDegrees, } } +// Neighbors returns the list of connected vertices from vtx. +func (g *Graph[V]) Neighbors(vtx V) []V { + neighbors, ok := g.vertices[vtx] + if !ok { + return nil + } + arr := make([]V, len(neighbors)) + i := 0 + for neighbor := range neighbors { + arr[i] = neighbor + i += 1 + } + return arr +} + // Add adds a connection between two vertices. func (g *Graph[V]) Add(edge Edge[V]) { from, to := edge.From, edge.To - // Add origin vertex if doesn't exist. if _, ok := g.vertices[from]; !ok { g.vertices[from] = make(neighbors[V]) } - // Add edge. + if _, ok := g.vertices[to]; !ok { + g.vertices[to] = make(neighbors[V]) + } + if _, ok := g.inDegrees[from]; !ok { + g.inDegrees[from] = 0 + } + if _, ok := g.inDegrees[to]; !ok { + g.inDegrees[to] = 0 + } + g.vertices[from][to] = true + g.inDegrees[to] += 1 +} + +// InDegree returns the number of incoming edges to vtx. +func (g *Graph[V]) InDegree(vtx V) int { + return g.inDegrees[vtx] +} + +// Remove deletes a connection between two vertices. +func (g *Graph[V]) Remove(edge Edge[V]) { + if _, ok := g.vertices[edge.From][edge.To]; !ok { + return + } + delete(g.vertices[edge.From], edge.To) + g.inDegrees[edge.To] -= 1 } type findCycleTempVars[V comparable] struct { @@ -82,6 +124,17 @@ func (g *Graph[V]) IsAcyclic() ([]V, bool) { return nil, true } +// Roots returns a slice of vertices with no incoming edges. +func (g *Graph[V]) Roots() []V { + var roots []V + for vtx, degree := range g.inDegrees { + if degree == 0 { + roots = append(roots, vtx) + } + } + return roots +} + func (g *Graph[V]) hasCycles(temp *findCycleTempVars[V], currVertex V) bool { temp.status[currVertex] = visiting for vertex := range g.vertices[currVertex] { @@ -100,21 +153,65 @@ func (g *Graph[V]) hasCycles(temp *findCycleTempVars[V], currVertex V) bool { return false } -// TopologicalSorter ranks vertices in a graph using topological sort. +// TopologicalSorter ranks vertices using Kahn's algorithm: https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm +// However, if two vertices can be scheduled in parallel then the same rank is returned. type TopologicalSorter[V comparable] struct { ranks map[V]int } // Rank returns the order of the vertex. The smallest order starts at 0. -// If the vertex does not exist in the graph, then returns an error. -func (s *TopologicalSorter[V]) Rank(vtx V) (int, error) { - // TODO(efekarakus): Implement me. - return 0, nil +// The second boolean return value is used to indicate whether the vertex exists in the graph. +func (alg *TopologicalSorter[V]) Rank(vtx V) (int, bool) { + r, ok := alg.ranks[vtx] + return r, ok +} + +func (alg *TopologicalSorter[V]) traverse(g *Graph[V]) { + roots := g.Roots() + for _, root := range roots { + alg.ranks[root] = 0 // Explicitly set to 0 so that `_, ok := alg.ranks[vtx]` returns true instead of false. + } + for len(roots) > 0 { + var vtx V + vtx, roots = roots[0], roots[1:] + for _, neighbor := range g.Neighbors(vtx) { + if new, old := alg.ranks[vtx]+1, alg.ranks[neighbor]; new > old { + alg.ranks[neighbor] = new + } + g.Remove(Edge[V]{vtx, neighbor}) + if g.InDegree(neighbor) == 0 { + roots = append(roots, neighbor) + } + } + } } -// TopologicalSort determines whether the directed graph is acyclic, and if so then finds a topological order. +// TopologicalOrder determines whether the directed graph is acyclic, and if so then +// finds a topological-order, or a linear order, of the vertices. +// Note that this function will modify the original graph. +// +// If there is an edge from vertex V to U, then V must happen before U and results in rank of V < rank of U. +// When there are ties (two vertices can be scheduled in parallel), the vertices are given the same rank. // If the digraph contains a cycle, then an error is returned. -func TopologicalSort[V comparable](digraph *Graph[V]) (*TopologicalSorter[V], error) { - // TODO(efekarakus): Implement me. - return nil, nil +// +// An example graph and their ranks is shown below to illustrate: +// . +//├── a rank: 0 +//│ ├── c rank: 1 +//│ │ └── f rank: 2 +//│ └── d rank: 1 +//└── b rank: 0 +// └── e rank: 1 +func TopologicalOrder[V comparable](digraph *Graph[V]) (*TopologicalSorter[V], error) { + if vertices, isAcyclic := digraph.IsAcyclic(); !isAcyclic { + return nil, &errCycle[V]{ + vertices, + } + } + + topo := &TopologicalSorter[V]{ + ranks: make(map[V]int), + } + topo.traverse(digraph) + return topo, nil } diff --git a/internal/pkg/graph/graph_test.go b/internal/pkg/graph/graph_test.go index 4f689650c5b..71121718314 100644 --- a/internal/pkg/graph/graph_test.go +++ b/internal/pkg/graph/graph_test.go @@ -4,6 +4,7 @@ package graph import ( + "strings" "testing" "github.com/stretchr/testify/require" @@ -15,6 +16,8 @@ func TestGraph_Add(t *testing.T) { graph := New[string]() // WHEN + // A <-> B + // -> C graph.Add(Edge[string]{ From: "A", To: "B", @@ -29,11 +32,95 @@ func TestGraph_Add(t *testing.T) { }) // THEN - require.Equal(t, graph.vertices["A"], neighbors[string]{"B": true, "C": true}) - require.Equal(t, graph.vertices["B"], neighbors[string]{"A": true}) + require.ElementsMatch(t, []string{"B", "C"}, graph.Neighbors("A")) + require.ElementsMatch(t, []string{"A"}, graph.Neighbors("B")) }) } +func TestGraph_InDegree(t *testing.T) { + testCases := map[string]struct { + graph *Graph[rune] + + wanted map[rune]int + }{ + "should return 0 for nodes that don't exist in the graph": { + graph: New[rune](), + + wanted: map[rune]int{ + 'a': 0, + }, + }, + "should return number of incoming edges for complex graph": { + graph: func() *Graph[rune] { + g := New[rune]() + g.Add(Edge[rune]{'a', 'b'}) + g.Add(Edge[rune]{'b', 'a'}) + g.Add(Edge[rune]{'a', 'c'}) + g.Add(Edge[rune]{'b', 'c'}) + g.Add(Edge[rune]{'d', 'e'}) + return g + }(), + wanted: map[rune]int{ + 'a': 1, + 'b': 1, + 'c': 2, + 'd': 0, + 'e': 1, + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + for vtx, wanted := range tc.wanted { + require.Equal(t, wanted, tc.graph.InDegree(vtx), "indegree for vertex %v does not match", vtx) + } + }) + } +} + +func TestGraph_Remove(t *testing.T) { + testCases := map[string]struct { + graph *Graph[rune] + + wantedNeighbors map[rune][]rune + wantedIndegrees map[rune]int + }{ + "edge deletion should be idempotent": { + graph: func() *Graph[rune] { + g := New[rune]() + g.Add(Edge[rune]{'a', 'b'}) + g.Add(Edge[rune]{'z', 'b'}) + g.Remove(Edge[rune]{'a', 'b'}) + g.Remove(Edge[rune]{'a', 'b'}) // Remove a second time. + return g + }(), + + wantedNeighbors: map[rune][]rune{ + 'a': nil, + 'b': nil, + 'z': {'b'}, + }, + wantedIndegrees: map[rune]int{ + 'a': 0, + 'z': 0, + 'b': 1, + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + for vtx, wanted := range tc.wantedNeighbors { + require.ElementsMatch(t, wanted, tc.graph.Neighbors(vtx), "neighbors for vertex %v do not match", vtx) + } + for vtx, wanted := range tc.wantedIndegrees { + require.Equal(t, wanted, tc.graph.InDegree(vtx), "indegree for vertex %v does not match") + } + }) + } +} + func TestGraph_IsAcyclic(t *testing.T) { testCases := map[string]struct { graph Graph[string] @@ -91,3 +178,194 @@ func TestGraph_IsAcyclic(t *testing.T) { }) } } + +func TestGraph_Roots(t *testing.T) { + testCases := map[string]struct { + graph *Graph[int] + + wantedRoots []int + }{ + "should return nil if the graph is empty": { + graph: New[int](), + }, + "should return all the vertices if there are no edges in the graph": { + graph: New[int](1, 2, 3, 4, 5), + wantedRoots: []int{1, 2, 3, 4, 5}, + }, + "should return only vertices with no in degrees": { + graph: func() *Graph[int] { + g := New[int]() + g.Add(Edge[int]{ + From: 1, + To: 3, + }) + g.Add(Edge[int]{ + From: 2, + To: 3, + }) + g.Add(Edge[int]{ + From: 3, + To: 4, + }) + return g + }(), + + wantedRoots: []int{1, 2}, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + require.ElementsMatch(t, tc.wantedRoots, tc.graph.Roots()) + }) + } +} + +func TestTopologicalOrder(t *testing.T) { + testCases := map[string]struct { + graph *Graph[string] + + wantedRanks map[string]int + wantedErrPrefix string + }{ + "should return an error when a cycle is detected": { + // frontend <-> backend + graph: func() *Graph[string] { + g := New("frontend", "backend") + g.Add(Edge[string]{ + From: "frontend", + To: "backend", + }) + g.Add(Edge[string]{ + From: "backend", + To: "frontend", + }) + return g + }(), + wantedErrPrefix: "graph contains a cycle: ", // the cycle can appear in any order as map traversals are not deterministic, so only check the prefix. + }, + "should return the ranks for a graph that looks like a bus": { + // vpc -> lb -> api + graph: func() *Graph[string] { + g := New[string]() + g.Add(Edge[string]{ + From: "vpc", + To: "lb", + }) + g.Add(Edge[string]{ + From: "lb", + To: "api", + }) + return g + }(), + + wantedRanks: map[string]int{ + "api": 2, + "lb": 1, + "vpc": 0, + }, + }, + "should return the ranks for a graph that looks like a tree": { + graph: func() *Graph[string] { + // vpc -> rds -> backend + // -> s3 -> api + // -> frontend + g := New[string]() + g.Add(Edge[string]{ + From: "vpc", + To: "rds", + }) + g.Add(Edge[string]{ + From: "vpc", + To: "s3", + }) + g.Add(Edge[string]{ + From: "rds", + To: "backend", + }) + g.Add(Edge[string]{ + From: "s3", + To: "api", + }) + g.Add(Edge[string]{ + From: "s3", + To: "frontend", + }) + return g + }(), + + wantedRanks: map[string]int{ + "api": 2, + "frontend": 2, + "backend": 2, + "s3": 1, + "rds": 1, + "vpc": 0, + }, + }, + "should return the ranks for a graph with multiple root nodes": { + graph: func() *Graph[string] { + // warehouse -> orders -> frontend + // payments -> + g := New[string]() + g.Add(Edge[string]{ + From: "payments", + To: "frontend", + }) + g.Add(Edge[string]{ + From: "warehouse", + To: "orders", + }) + g.Add(Edge[string]{ + From: "orders", + To: "frontend", + }) + return g + }(), + + wantedRanks: map[string]int{ + "frontend": 2, + "orders": 1, + "warehouse": 0, + "payments": 0, + }, + }, + "should find the longest path to a node": { + graph: func() *Graph[string] { + // a -> b -> c -> d -> f + // a -> e -> f + g := New[string]() + for _, edge := range []Edge[string]{{"a", "b"}, {"b", "c"}, {"c", "d"}, {"d", "f"}, {"a", "e"}, {"e", "f"}} { + g.Add(edge) + } + return g + }(), + wantedRanks: map[string]int{ + "a": 0, + "b": 1, + "e": 1, + "c": 2, + "d": 3, + "f": 4, + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + topo, err := TopologicalOrder(tc.graph) + + if tc.wantedErrPrefix != "" { + require.Error(t, err) + require.True(t, strings.HasPrefix(err.Error(), tc.wantedErrPrefix)) + } else { + require.NoError(t, err) + + for vtx, wantedRank := range tc.wantedRanks { + rank, _ := topo.Rank(vtx) + require.Equal(t, wantedRank, rank, "expected rank for vertex %s does not match", vtx) + } + } + }) + } +} diff --git a/internal/pkg/manifest/pipeline.go b/internal/pkg/manifest/pipeline.go index 01377b9fb52..c544498cb24 100644 --- a/internal/pkg/manifest/pipeline.go +++ b/internal/pkg/manifest/pipeline.go @@ -184,10 +184,10 @@ type Build struct { // PipelineStage represents a stage in the pipeline manifest type PipelineStage struct { - Name string `yaml:"name"` - RequiresApproval bool `yaml:"requires_approval,omitempty"` - TestCommands []string `yaml:"test_commands,omitempty"` - Deployments Deployments + Name string `yaml:"name"` + RequiresApproval bool `yaml:"requires_approval,omitempty"` + TestCommands []string `yaml:"test_commands,omitempty"` + Deployments Deployments `yaml:"deployments,omitempty"` } // Deployments represent a directed graph of cloudformation deployments. diff --git a/internal/pkg/template/template.go b/internal/pkg/template/template.go index e247858af6e..7ef6e489ec1 100644 --- a/internal/pkg/template/template.go +++ b/internal/pkg/template/template.go @@ -114,7 +114,7 @@ func (t *Template) Parse(path string, data interface{}, options ...ParseOption) } buf := new(bytes.Buffer) if err := tpl.Execute(buf, data); err != nil { - return nil, fmt.Errorf("execute template %s with data %v: %w", path, data, err) + return nil, fmt.Errorf("execute template %s: %w", path, err) } return &Content{buf}, nil } diff --git a/internal/pkg/template/template_test.go b/internal/pkg/template/template_test.go index ec0554d2d85..bb41b432395 100644 --- a/internal/pkg/template/template_test.go +++ b/internal/pkg/template/template_test.go @@ -335,7 +335,7 @@ func TestTemplate_Parse(t *testing.T) { "templates/fake/manifest.yml": []byte(`{{.Name}}`), }, - wantedErr: fmt.Errorf("execute template %s with data %v", "/fake/manifest.yml", struct{}{}), + wantedErr: fmt.Errorf("execute template %s", "/fake/manifest.yml"), }, "valid template": { inPath: "/fake/manifest.yml",