-
Notifications
You must be signed in to change notification settings - Fork 1
/
memory.go
153 lines (129 loc) · 3.95 KB
/
memory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package memory
// The rainbow memory backend - vanilla / prototype
import (
"context"
"encoding/json"
"log"
js "github.com/compspec/jobspec-go/pkg/jobspec/experimental"
jgf "github.com/converged-computing/jsongraph-go/jsongraph/v2/graph"
"github.com/converged-computing/rainbow/pkg/graph/algorithm"
"github.com/converged-computing/rainbow/pkg/graph/backend"
"github.com/converged-computing/rainbow/pkg/types"
"github.com/converged-computing/rainbow/plugins/backends/memory/service"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// This is the global, in memory graph handle
var (
memoryHost = ":50051"
graphClient *Graph
)
type MemoryGraph struct{}
var (
description = "in-memory vanilla graph database for rainbow"
memoryName = "memory"
)
func (m MemoryGraph) Name() string {
return memoryName
}
func (m MemoryGraph) Description() string {
return description
}
// AddCluster adds a JGF graph of new nodes
// Note that a client can interact with the database (in read only)
// but since this is directly in the rainbow cluster, we call
// the functions directly. The "addCluster" here is referring
// to the dominant subsystem, while a "subsystem" below is
// considered supplementary to that.
func (m MemoryGraph) AddCluster(
name string,
nodes *jgf.JsonGraph,
subsystem string,
) error {
return graphClient.LoadClusterNodes(name, nodes, subsystem)
}
// Add subsystem adds a new subsystem to the graph!
func (m MemoryGraph) UpdateState(
name string,
payload string,
) error {
// Load state into interface
state := types.ClusterState{}
err := json.Unmarshal([]byte(payload), &state)
if err != nil {
return err
}
return graphClient.UpdateState(name, &state)
}
// GetStates for a list of clusters
func (m MemoryGraph) GetStates(names []string) (map[string]types.ClusterState, error) {
return graphClient.GetStates(names)
}
// UpdateState of a cluster in the graph.
// This is used for selection algorithms
func (m MemoryGraph) AddSubsystem(
name string,
nodes *jgf.JsonGraph,
subsystem string,
) error {
return graphClient.LoadSubsystemNodes(name, nodes, subsystem)
}
func (m MemoryGraph) RegisterService(s *grpc.Server) error {
// This is akin to calling init
// The service is in the same module as here, so is available to the grpc functions
log.Printf("🧠️ Registering memory graph database...\n")
graphClient = NewGraph()
service.RegisterMemoryGraphServer(s, MemoryServer{})
return nil
}
// Satisfies - determine what clusters satisfy a jobspec request
// Since this is called from the client function, it's technically
// running from the client (not from the server)
func (g MemoryGraph) Satisfies(
jobspec *js.Jobspec,
matcher algorithm.MatchAlgorithm,
) ([]string, error) {
matches := []string{}
var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.Dial(memoryHost, opts...)
if err != nil {
return matches, err
}
defer conn.Close()
client := service.NewMemoryGraphClient(conn)
// Prepare a satisfy request, the jobspec needs to be serialized to string
out, err := json.Marshal(jobspec)
if err != nil {
return matches, err
}
// Make the satisfy request, ensuring we provide the graph algorithm
request := service.SatisfyRequest{Payload: string(out), Matcher: matcher.Name()}
ctx := context.Background()
response, err := client.Satisfy(ctx, &request)
if err != nil {
return matches, err
}
return response.Clusters, nil
}
// Init provides extra initialization functionality, if needed
// The in memory database can take a backup file if desired
func (g MemoryGraph) Init(
options map[string]string,
) error {
backupFile, ok := options["backupFile"]
if ok {
graphClient.backupFile = backupFile
}
// Warning: this assumes one client running with one graph host
host, ok := options["host"]
if ok {
memoryHost = host
}
return nil
}
// Add the backend to be known to rainbow
func init() {
graph := MemoryGraph{}
backend.Register(graph)
}