Skip to content

Commit

Permalink
feat: add rough prototype of dfs!
Browse files Browse the repository at this point in the history
I do not understand the per_slot/total/per_resource enough
(mostly in the context of non-core things) but I have at least
a quasi functional depth first search. I have not done that before,
let along in go, so despite the jank I am proud :)

Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Mar 2, 2024
1 parent 85db70d commit 1823f59
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 22 deletions.
25 changes: 12 additions & 13 deletions backends/memory/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (g *ClusterGraph) Satisfies(payload string) (*service.SatisfyResponse, erro
fmt.Printf("\n🍇️ Satisfy request to Graph 🍇️\n")
fmt.Printf(" jobspec: %s\n", payload)

// Do depth first search to determine if there is a match.
// Do depth rst search to determine if there is a match.
// Right now this is a boolean because I don't know what it should look like
matches, err := ss.DFSForMatch(&jobspec)
if err != nil {
Expand Down Expand Up @@ -233,18 +233,8 @@ func (g *ClusterGraph) LoadClusterNodes(
// Create an empty resource counter for the cluster
ss.Metrics.NewResource(name)

// Add a cluster root to it, and connect to the top root. We can add metadata/weight here too
clusterRoot := ss.AddNode("", name, "cluster", 1, "")
err := ss.AddEdge(root, clusterRoot, 0, "")
if err != nil {
return err
}

// Now loop through the nodes and add them, keeping a temporary lookup
lookup := map[string]int{"root": root, name: clusterRoot}

// This is pretty dumb because we don't add metadata yet, oh well
// we will!
lookup := map[string]int{"root": root}
for nid, node := range nodes.Graph.Nodes {

// Currently we are saving the type, size, and unit
Expand All @@ -253,7 +243,15 @@ func (g *ClusterGraph) LoadClusterNodes(
// levelName (cluster)
// name for lookup/cache (if we want to keep it there)
// resource type, size, and unit
id := ss.AddNode(name, "", resource.Type, resource.Size, resource.Unit)
var id int
if resource.Type == "cluster" {

// If it's the cluster, we save the named identifier for it
id = ss.AddNode("", name, resource.Type, resource.Size, resource.Unit)
lookup[name] = id
} else {
id = ss.AddNode(name, "", resource.Type, resource.Size, resource.Unit)
}
lookup[nid] = id
}

Expand All @@ -269,6 +267,7 @@ func (g *ClusterGraph) LoadClusterNodes(
if !ok {
return fmt.Errorf("destination %s is defined as an edge, but missing as node in graph", edge.Label)
}
fmt.Printf("Adding edge from %s -%s-> %s\n", ss.Vertices[src].Type, edge.Relation, ss.Vertices[dest].Type)
err := ss.AddEdge(src, dest, 0, edge.Relation)
if err != nil {
return err
Expand Down
217 changes: 208 additions & 9 deletions backends/memory/subsystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ func NewSubsystem() *Subsystem {
return &s
}

// DFSForMatch WILL be a depth first search for matches
// Right now it's looking at total cluster resources on the top level,
// which is kind of terrible, but it's a start :)
// DFSForMatch WILL is a depth first search for matches
// It starts by looking at total cluster resources on the top level,
// and then traverses into those that match the first check
func (s *Subsystem) DFSForMatch(jobspec *js.Jobspec) ([]string, error) {

// Return a list of matching clusters
Expand Down Expand Up @@ -53,6 +53,7 @@ func (s *Subsystem) DFSForMatch(jobspec *js.Jobspec) ([]string, error) {
}
}

// Make a call on each of the top level resources
for _, resource := range jobspec.Resources {
checkResource(&resource)
}
Expand Down Expand Up @@ -94,12 +95,209 @@ func (s *Subsystem) DFSForMatch(jobspec *js.Jobspec) ([]string, error) {
}

// No matches, womp womp.
if len(matches) == 0 {
fmt.Println(" match: 😥️ no clusters could satisfy this request. We are sad")
if len(matches) != 0 {
// Now that we got through the quicker check, do a deeper search
return s.depthFirstSearch(matches, jobspec)
}

fmt.Println(" match: 😥️ no clusters could satisfy this request. We are sad")
return matches, nil
}

// depthFirstSearch fully searches the graph finding a list of maches and a jobspec
func (s *Subsystem) depthFirstSearch(matches []string, jobspec *js.Jobspec) ([]string, error) {

// Prepare a lookup of tasks for slots
slots := map[string]*v1.Tasks{}
for _, task := range jobspec.Tasks {
slots[task.Slot] = &task
}

// Keep a list of final matches
finalMatches := []string{}

// Look through our potential matching clusters
for _, cluster := range matches {
fmt.Printf("\n 🔍️ Exploring cluster %s deeper with depth first search\n", cluster)

// This is the root vertex of the cluster "cluster" we start with it
root := s.lookup[cluster]
vertex := s.Vertices[root]

// Assume this is a match to start
isMatch := true

// Recursive function to recurse into slot resource and find count
// of matches for the slot. This returns a count of the matching
// slots under a parent level, recursing into child vertices until
// we find the right type (and take a count) or keep exploring
var findSlots func(vtx *Vertex, slot *v1.Resource) int32
findSlots = func(vtx *Vertex, resource *v1.Resource) int32 {

// This assumes the resource
// Is the current vertex what we need? If yes, assess if it can satisfy
slotsFound := int32(0)
if vtx.Type == resource.Type {

// I don't know if resource.Count can be zero, but be prepared...
if resource.Count == 0 {
return slotsFound
}
// How many full slots can we satisfy at this vertex?
// TODO how to handle the slot per/total thing?
return vtx.Size

} else {

// Otherwise, we haven't found the right level of the graph, keep going
for _, child := range vtx.Edges {

// Only interested in children. That sounds weird.
if child.Relation == "contains" {
slotsFound += findSlots(child.Vertex, resource)
}
}
}
return slotsFound
}

// Recursive function to Determine if a vertex satisfies a resource
// Given a resource and a vertex root, it returns the count of vertices under
// the root that satisfy the request.
var satisfies func(vtx *Vertex, resource *v1.Resource, found int32) int32
satisfies = func(vtx *Vertex, resource *v1.Resource, found int32) int32 {

// All my life, searchin' for a vertex like youuu <3
lookingAt := fmt.Sprintf("vertex '%s' (count=%d)", vtx.Type, vtx.Size)
lookingFor := fmt.Sprintf("for '%s' (need=%d)", resource.Type, resource.Count)
fmt.Printf(" => Checking %s %s\n", lookingAt, lookingFor)

// A slot needs deeper exploration, and we need to add per_slot/total logic
if resource.Type == "slot" {

// Keep going until we have all the slots, or we run out of places to look
return findSlots(vtx, resource)
}

// Wrong resource type, womp womp
if vtx.Type != resource.Type {
for _, child := range vtx.Edges {

// Update our found count to include recursing all children
if child.Relation == "contains" {
found += satisfies(child.Vertex, resource, found)

// Stop when we have enough
if found >= resource.Count {
return found
}
}
}
}

// this resource type is satisfied, keep going and add to count
if vtx.Type == resource.Type {
return found + vtx.Size
}

// I'm not sure we'd ever get here, might want to check
return found
}

// Traverse resource is the main function to handle traversing a cluster vertex
var traverseResource = func(resource *v1.Resource) bool {
fmt.Printf("\n 👀️ Looking for '%s' in cluster %s\n", resource.Type, cluster)

// Case 1: A slot needs to be explored to determine if we can satsify
// the count under it of some resource type
if resource.Type == "slot" {

// TODO: how does the slot Count (under tasks) fit in?
// I don't understand what these counts are, because they seem like MPI tasks
// but a slot can be defined at any level. So I'm going to ignore for now
// Suggestion - this needs to be more clear in jobspec v2.
// https://flux-framework.readthedocs.io/projects/flux-rfc/en/latest/spec_14.html

// These are logical groups of "stuff" that need to be scheduled together
slotsNeeded := resource.Count

// Keep going until we have all the slots, or we run out of places to look
slotsFound := int32(0)

// This assumes that the slot value is defined in the next resource block
if resource.With != nil {
for _, subresource := range resource.With {
slotsFound += findSlots(vertex, &subresource)

// The slot is satisfied and we can continue searching resources
if slotsFound >= slotsNeeded {
return true
}
}
}
// The slot is satisfied and we can continue searching resources
if slotsFound >= slotsNeeded {
return true
}
return false

} else {

// Keep traversing vertices, start at the graph root
foundMatches := satisfies(vertex, resource, int32(0))

// We don't have a match, abort.
if foundMatches < resource.Count {
reason := fmt.Sprintf("%d of %s and %d needed\n", foundMatches, resource.Type, resource.Count)
fmt.Printf(" ❌️ %s not a match, %s\n", cluster, reason)
return false
} else {
reason := fmt.Sprintf("%d of needed %s satisfied", foundMatches, resource.Type)
fmt.Printf(" ⏳️ %s still contender, %s\n", cluster, reason)
}
}
// We get here if we assess a resource and vertex that isn't a slot, and foundMatches >= resource count
return true
}

// Go through jobspec resources and determine satisfiability
// This currently treats each item under resources separately
// as opposed to one unit of work, and I'm not sure if that is
// right. I haven't seen jobspecs in the wild with two entries
// under resources.
for _, resource := range jobspec.Resources {

// Break out early if we can't sastify a resource group
isMatch := traverseResource(&resource)
if !isMatch {
fmt.Printf("Resource %s is not a match for for cluster %s", resource.Label, cluster)
break
}

// This is the recursive bit
if resource.With != nil {
for _, with := range resource.With {
isMatch = traverseResource(&with)
if !isMatch {
break
}
}
}
}
if isMatch {
finalMatches = append(finalMatches, cluster)
}

}

if len(finalMatches) == 0 {
fmt.Println(" 😥️ dfs: no clusters could satisfy this request. We are sad")
} else {
fmt.Printf(" 🎯️ dfs: we found %d clusters to satisfy the request\n", len(finalMatches))
}
return finalMatches, nil
}

// AddNode (a physical node) as a vertex, return the vertex id
func (s *Subsystem) AddNode(
clusterName, name, typ string,
Expand Down Expand Up @@ -147,18 +345,19 @@ func (s *Subsystem) AddEdge(src, dest int, weight int, relation string) error {

// We shoudn't be added identifiers that don't exist...
// TODO: do we want to count edges?
_, ok := s.Vertices[src]
srcVertex, ok := s.Vertices[src]
if !ok {
return fmt.Errorf("vertex with identifier %d does not exist", src)
}
_, ok = s.Vertices[dest]
destVertex, ok := s.Vertices[dest]
if !ok {
return fmt.Errorf("vertex with identifier %d does not exist", dest)
}

// add edge src --> dest
newEdge := Edge{Weight: weight, Vertex: s.Vertices[dest], Relation: relation}
s.Vertices[src].Edges[dest] = &newEdge
newEdge := Edge{Weight: weight, Vertex: destVertex, Relation: relation}
srcVertex.Edges[dest] = &newEdge
s.Vertices[src] = srcVertex
return nil
}

Expand Down

0 comments on commit 1823f59

Please sign in to comment.