Skip to content

Commit

Permalink
feat: add cypher matcher (#36)
Browse files Browse the repository at this point in the history
* feat: adding support for match algorithm with cypher

This is a WIP to write the query to do a full match. I am
still needing to write the last bit of logic that returns
the number of slots.

* feat: finishing up match algorithms

Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed May 16, 2024
1 parent c7ab752 commit d9aebe2
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 95 deletions.
7 changes: 1 addition & 6 deletions docs/backends.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,7 @@ docker compose stop
docker compose rm
```

Note that this backend currently supports very basic (imperfect) slot matching, but custom algorithms have not been implemented. In other words, whatever algorithm you select for match will be ignored. The select is done separately and will
still be maintained. To account for match algorithms, we will have the interface generate a `GenerateCypher` function as follows:

```go
query, err := matcher.GenerateCypher(jobspec)
```
Note that this backend currently supports match algorithms for range and equality, and these are early in development and need further testing.


[home](/README.md#rainbow-scheduler)
2 changes: 1 addition & 1 deletion docs/examples/memgraph/rainbow-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ scheduler:
name: match
cluster:
name: keebler
secret: a38e8008-c5ef-4803-821f-4c930d366a3c
secret: 3a36e53d-ea00-467d-8d42-fa80cb9d58d8
graphdatabase:
name: memgraph
host: 127.0.0.1:50051
Expand Down
3 changes: 3 additions & 0 deletions pkg/graph/algorithm/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type MatchAlgorithm interface {

// A MatchAlgorithm needs to take a slot and determine if it matches
CheckSubsystemEdge(slotNeeds *types.MatchAlgorithmNeeds, edge *types.Edge)

// Graph backends that support cypher need a cypher query for the algorithm
GenerateCypher(matchNeeds *types.MatchAlgorithmNeeds) string
}

// List returns known algorithms
Expand Down
48 changes: 33 additions & 15 deletions pkg/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,31 +39,49 @@ func ReadNodeJsonGraphString(nodes string) (graph.JsonGraph, error) {
return g, nil
}

type SlotCount struct {

// The number of slots required
Count int32
Name string

// The number of resource submembers needed per slot
Members int32

// The parent of the slot
Parent string
}

// ExtractResourceSlots flattens a jobspec into a lookup of slots
func ExtractResourceSlots(jobspec *v1.Jobspec) map[string]int32 {
func ExtractResourceSlots(jobspec *v1.Jobspec) []SlotCount {

totals := map[string]int32{}
totals := []SlotCount{}

// Go sets loops to an initial value at start,
// so we need a function to recurse into nested resources
var checkResource func(resource *v1.Resource)
checkResource = func(resource *v1.Resource) {
count, ok := totals[resource.Type]
if !ok {
count = 0
}
// Assume a slot is a count for 1 resource type
// If we find the slot, we go just below it
// We just need the total for the slot level
if resource.Replicas != 0 {
count += resource.Replicas
} else {
count += resource.Count
}
totals[resource.Type] = count

// This is the recursive bit
if resource.With != nil {
for _, with := range resource.With {
checkResource(&with)
// This is the recursive bit
if resource.With != nil {
for _, with := range resource.With {
newSlot := SlotCount{
Count: resource.Replicas,
Name: with.Type,
Members: with.Count,
Parent: resource.Type}
totals = append(totals, newSlot)
}
}
} else {
if resource.With != nil {
for _, with := range resource.With {
checkResource(&with)
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/types/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
// many types.
// 2. A single resource type, in which case we define Type and
// ignore Found/Needed
//
// subsystem -> attribute -> isSatisfied
type MatchAlgorithmNeeds map[string]map[string]bool

// Serialize slot resource needs into a struct that is easier to parse
Expand Down
15 changes: 15 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,18 @@ func Copy(list []string) []string {
copy(copied, list)
return copied
}

// Diff returns the difference between two sets of strings
func Diff(one, two []string) []string {
var difference []string
lookup := make(map[string]struct{}, len(two))
for _, item := range two {
lookup[item] = struct{}{}
}
for _, item := range one {
if _, found := lookup[item]; !found {
difference = append(difference, item)
}
}
return difference
}
14 changes: 12 additions & 2 deletions plugins/algorithms/match/equals.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func NewMatchEqualRequest(value string) *MatchEqualRequest {
}

// MatchEqualityEdge looks for an exact match
func MatchEqualityEdge(k string, edge *types.Edge) bool {
req := NewMatchEqualRequest(k)
func MatchEqualityEdge(matchExpression string, edge *types.Edge) bool {
req := NewMatchEqualRequest(matchExpression)

// Get the field requested by the jobspec
toMatch, err := edge.Vertex.Metadata.GetStringElement(req.Field)
Expand All @@ -51,3 +51,13 @@ func MatchEqualityEdge(k string, edge *types.Edge) bool {
// matches the value provided in the slot request
return toMatch == req.Value
}

// MatchEqualityCypher writes the lines of cypher for a match
func MatchEqualityCypher(subsystem, matchExpression string) string {
req := NewMatchEqualRequest(matchExpression)

// req.Name => the subsystem
query := fmt.Sprintf("\n-[contains]-(%s:Node {subsystem: '%s'})", subsystem, subsystem)
query += fmt.Sprintf("\nWHERE %s.%s = '%s'", subsystem, req.Field, req.Value)
return query
}
22 changes: 22 additions & 0 deletions plugins/algorithms/match/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,28 @@ func (s MatchType) Init(options map[string]string) error {
return nil
}

// Generate cypher for the match algorithm for a specific slot
func (m MatchType) GenerateCypher(matchNeeds *types.MatchAlgorithmNeeds) string {

// This will be added as a piece in a query we are building
query := ""
for subsystemName, needs := range *matchNeeds {

// k is the string to parse, we can assume since we do one query
// that the boolean is always false
for matchExpression := range needs {
if strings.HasPrefix(matchExpression, "match") {
query += MatchEqualityCypher(subsystemName, matchExpression)

} else if strings.HasPrefix(matchExpression, "range") {
query += MatchRangeCypher(subsystemName, matchExpression)
}
}
}

return query
}

// Add the selection algorithm to be known to rainbow
func init() {
algo := MatchType{}
Expand Down
25 changes: 22 additions & 3 deletions plugins/algorithms/match/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ func (req *RangeRequest) Satisfies(value string) (bool, error) {
}

// MatchRangeEdge matches to a range
func MatchRangeEdge(k string, edge *types.Edge) bool {
rlog.Debugf(" => Found %s and inspecting edge metadata %v\n", k, edge.Vertex.Metadata.Elements)
req := NewRangeRequest(k)
func MatchRangeEdge(matchExpression string, edge *types.Edge) bool {
rlog.Debugf(" => Found %s and inspecting edge metadata %v\n", matchExpression, edge.Vertex.Metadata.Elements)
req := NewRangeRequest(matchExpression)

// Get the field requested by the jobspec
toMatch, err := edge.Vertex.Metadata.GetStringElement(req.Field)
Expand All @@ -129,3 +129,22 @@ func MatchRangeEdge(k string, edge *types.Edge) bool {
}
return satisfied
}

// MatchEqualityCypher writes the lines of cypher for a match
func MatchRangeCypher(subsystem, matchExpression string) string {
req := NewRangeRequest(matchExpression)

// req.Name => the subsystem
query := fmt.Sprintf("\n-[contains]-(%s:Node {subsystem: '%s'})", subsystem, subsystem)

// Need to assemble min/max, or both
queryPiece := "\nWHERE"
if req.Min != "" {
queryPiece += fmt.Sprintf("%s.%s >= %d", subsystem, req.Field, req.Min)
}
if req.Max != "" {
queryPiece += fmt.Sprintf("AND %s.%s <= %d", subsystem, req.Field, req.Max)
}
query += queryPiece
return query
}

0 comments on commit d9aebe2

Please sign in to comment.