Skip to content
Merged
12 changes: 6 additions & 6 deletions internal/pkg/deploy/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,9 @@ func (stg *PipelineStage) Deployments() ([]DeployAction, error) {
prevActions = append(prevActions, approval)
}

topo, err := graph.TopologicalSort(stg.buildDeploymentsGraph())
topo, err := graph.TopologicalOrder(stg.buildDeploymentsGraph())
if err != nil {
return nil, fmt.Errorf("topological sort deployments: %v", err)
return nil, fmt.Errorf("find an ordering for deployments: %v", err)
}

var actions []DeployAction
Expand Down Expand Up @@ -636,7 +636,7 @@ func (a *ManualApprovalAction) Name() string {
}

type ranker interface {
Rank(name string) (int, error)
Rank(name string) (int, bool)
}

// DeployAction represents a CodePipeline action of category "Deploy" for a cloudformation stack.
Expand All @@ -658,15 +658,15 @@ func (a *DeployAction) Name() string {

// StackName returns the name of the workload stack to create or update.
func (a *DeployAction) StackName() string {
if a.override != nil {
if a.override != nil && a.override.StackName != "" {
return a.override.StackName
}
return fmt.Sprintf("%s-%s-%s", a.appName, a.envName, a.name)
}

// TemplatePath returns the path of the CloudFormation template file generated during the build phase.
func (a *DeployAction) TemplatePath() string {
if a.override != nil {
if a.override != nil && a.override.TemplatePath != "" {
return a.override.TemplatePath
}

Expand All @@ -676,7 +676,7 @@ func (a *DeployAction) TemplatePath() string {

// TemplateConfigPath returns the path of the CloudFormation template config file generated during the build phase.
func (a *DeployAction) TemplateConfigPath() string {
if a.override != nil {
if a.override != nil && a.override.TemplateConfig != "" {
return a.override.TemplateConfig
}

Expand Down
78 changes: 75 additions & 3 deletions internal/pkg/deploy/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,78 @@ func TestPipelineStage_Init(t *testing.T) {
})
}

func TestPipelineStage_Deployments(t *testing.T) {
testCases := map[string]struct {
stg *PipelineStage

wantedRunOrder map[string]int
wantedErr error
}{
"should return an error when the deployments contain a cycle": {
stg: func() *PipelineStage {
// Create a pipeline with a self-depending deployment.
var stg PipelineStage
stg.Init(&config.Environment{Name: "test"}, &manifest.PipelineStage{
Name: "test",
Deployments: map[string]*manifest.Deployment{
"api": {
DependsOn: []string{"api"},
},
},
}, nil)

return &stg
}(),
wantedErr: errors.New("find an ordering for deployments: graph contains a cycle: api"),
},
"should return the expected run orders": {
stg: func() *PipelineStage {
// Create a pipeline with a manual approval and 4 deployments.
var stg PipelineStage
stg.Init(&config.Environment{Name: "test"}, &manifest.PipelineStage{
Name: "test",
RequiresApproval: true,
Deployments: map[string]*manifest.Deployment{
"frontend": {
DependsOn: []string{"orders", "payments"},
},
"orders": {
DependsOn: []string{"warehouse"},
},
"payments": nil,
"warehouse": nil,
},
}, nil)

return &stg
}(),
wantedRunOrder: map[string]int{
"CreateOrUpdate-frontend-test": 4,
"CreateOrUpdate-orders-test": 3,
"CreateOrUpdate-payments-test": 2,
"CreateOrUpdate-warehouse-test": 2,
},
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
deployments, err := tc.stg.Deployments()

if tc.wantedErr != nil {
require.EqualError(t, err, tc.wantedErr.Error())
} else {
require.NoError(t, err)
for _, deployment := range deployments {
wanted, ok := tc.wantedRunOrder[deployment.Name()]
require.True(t, ok, "expected deployment named %s to be created", deployment.Name())
require.Equal(t, wanted, deployment.RunOrder(), "order for deployment %s does not match", deployment.Name())
}
}
})
}
}

type mockAction struct {
order int
}
Expand Down Expand Up @@ -498,11 +570,11 @@ func TestDeployAction_TemplateConfigPath(t *testing.T) {

type mockRanker struct {
rank int
err error
ok bool
}

func (m mockRanker) Rank(name string) (int, error) {
return m.rank, m.err
func (m mockRanker) Rank(name string) (int, bool) {
return m.rank, m.ok
}

func TestDeployAction_RunOrder(t *testing.T) {
Expand Down
21 changes: 21 additions & 0 deletions internal/pkg/graph/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package graph

import (
"fmt"
"strings"
)

type errCycle[V comparable] struct {
vertices []V
}

func (e *errCycle[V]) Error() string {
ss := make([]string, len(e.vertices))
for i, v := range e.vertices {
ss[i] = fmt.Sprintf("%v", v)
}
return fmt.Sprintf("graph contains a cycle: %s", strings.Join(ss, ", "))
}
127 changes: 112 additions & 15 deletions internal/pkg/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ const (

// Graph represents a directed graph.
type Graph[V comparable] struct {
vertices map[V]neighbors[V]
vertices map[V]neighbors[V] // Adjacency list for each vertex.
inDegrees map[V]int // Number of incoming edges for each vertex.
}

// Edge represents one edge of a directed graph.
Expand All @@ -28,24 +29,65 @@ type neighbors[V comparable] map[V]bool

// New initiates a new Graph.
func New[V comparable](vertices ...V) *Graph[V] {
m := make(map[V]neighbors[V])
adj := make(map[V]neighbors[V])
inDegrees := make(map[V]int)
for _, vertex := range vertices {
m[vertex] = make(neighbors[V])
adj[vertex] = make(neighbors[V])
inDegrees[vertex] = 0
}
return &Graph[V]{
vertices: m,
vertices: adj,
inDegrees: inDegrees,
}
}

// Neighbors returns the list of connected vertices from vtx.
func (g *Graph[V]) Neighbors(vtx V) []V {
neighbors, ok := g.vertices[vtx]
if !ok {
return nil
}
arr := make([]V, len(neighbors))
i := 0
for neighbor := range neighbors {
arr[i] = neighbor
i += 1
}
Comment on lines +50 to +55
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
arr := make([]V, len(neighbors))
i := 0
for neighbor := range neighbors {
arr[i] = neighbor
i += 1
}
var arr []V
for neighbor := range neighbors {
arr = append(arr, neighbor)
}

return arr
}

// Add adds a connection between two vertices.
func (g *Graph[V]) Add(edge Edge[V]) {
from, to := edge.From, edge.To
// Add origin vertex if doesn't exist.
if _, ok := g.vertices[from]; !ok {
g.vertices[from] = make(neighbors[V])
}
// Add edge.
if _, ok := g.vertices[to]; !ok {
g.vertices[to] = make(neighbors[V])
}
if _, ok := g.inDegrees[from]; !ok {
g.inDegrees[from] = 0
}
if _, ok := g.inDegrees[to]; !ok {
g.inDegrees[to] = 0
}

g.vertices[from][to] = true
g.inDegrees[to] += 1
}

// InDegree returns the number of incoming edges to vtx.
func (g *Graph[V]) InDegree(vtx V) int {
return g.inDegrees[vtx]
}

// Remove deletes a connection between two vertices.
func (g *Graph[V]) Remove(edge Edge[V]) {
if _, ok := g.vertices[edge.From][edge.To]; !ok {
return
}
delete(g.vertices[edge.From], edge.To)
g.inDegrees[edge.To] -= 1
}

type findCycleTempVars[V comparable] struct {
Expand Down Expand Up @@ -82,6 +124,17 @@ func (g *Graph[V]) IsAcyclic() ([]V, bool) {
return nil, true
}

// Roots returns a slice of vertices with no incoming edges.
func (g *Graph[V]) Roots() []V {
var roots []V
for vtx, degree := range g.inDegrees {
if degree == 0 {
roots = append(roots, vtx)
}
}
return roots
}

func (g *Graph[V]) hasCycles(temp *findCycleTempVars[V], currVertex V) bool {
temp.status[currVertex] = visiting
for vertex := range g.vertices[currVertex] {
Expand All @@ -100,21 +153,65 @@ func (g *Graph[V]) hasCycles(temp *findCycleTempVars[V], currVertex V) bool {
return false
}

// TopologicalSorter ranks vertices in a graph using topological sort.
// TopologicalSorter ranks vertices using Kahn's algorithm: https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm
// However, if two vertices can be scheduled in parallel then the same rank is returned.
type TopologicalSorter[V comparable] struct {
ranks map[V]int
}

// Rank returns the order of the vertex. The smallest order starts at 0.
// If the vertex does not exist in the graph, then returns an error.
func (s *TopologicalSorter[V]) Rank(vtx V) (int, error) {
// TODO(efekarakus): Implement me.
return 0, nil
// The second boolean return value is used to indicate whether the vertex exists in the graph.
func (alg *TopologicalSorter[V]) Rank(vtx V) (int, bool) {
r, ok := alg.ranks[vtx]
return r, ok
}

func (alg *TopologicalSorter[V]) traverse(g *Graph[V]) {
roots := g.Roots()
for _, root := range roots {
alg.ranks[root] = 0 // Explicitly set to 0 so that `_, ok := alg.ranks[vtx]` returns true instead of false.
}
for len(roots) > 0 {
var vtx V
vtx, roots = roots[0], roots[1:]
for _, neighbor := range g.Neighbors(vtx) {
if new, old := alg.ranks[vtx]+1, alg.ranks[neighbor]; new > old {
alg.ranks[neighbor] = new
}
g.Remove(Edge[V]{vtx, neighbor})
if g.InDegree(neighbor) == 0 {
roots = append(roots, neighbor)
}
}
}
}

// TopologicalSort determines whether the directed graph is acyclic, and if so then finds a topological order.
// TopologicalOrder determines whether the directed graph is acyclic, and if so then
// finds a topological-order, or a linear order, of the vertices.
// Note that this function will modify the original graph.
//
// If there is an edge from vertex V to U, then V must happen before U and results in rank of V < rank of U.
// When there are ties (two vertices can be scheduled in parallel), the vertices are given the same rank.
// If the digraph contains a cycle, then an error is returned.
func TopologicalSort[V comparable](digraph *Graph[V]) (*TopologicalSorter[V], error) {
// TODO(efekarakus): Implement me.
return nil, nil
//
// An example graph and their ranks is shown below to illustrate:
// .
//├── a rank: 0
//│ ├── c rank: 1
//│ │ └── f rank: 2
//│ └── d rank: 1
//└── b rank: 0
// └── e rank: 1
Comment thread
efekarakus marked this conversation as resolved.
func TopologicalOrder[V comparable](digraph *Graph[V]) (*TopologicalSorter[V], error) {
if vertices, isAcyclic := digraph.IsAcyclic(); !isAcyclic {
return nil, &errCycle[V]{
vertices,
}
}

topo := &TopologicalSorter[V]{
ranks: make(map[V]int),
}
topo.traverse(digraph)
return topo, nil
}
Loading