-
Notifications
You must be signed in to change notification settings - Fork 13
/
manager.hpp
194 lines (171 loc) · 6.39 KB
/
manager.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
// Copyright 2021 Xilinx Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/**
* @file
* @brief Defines how the shared mutable state of Proteus is managed as Proteus
* runs
*/
#ifndef GUARD_PROTEUS_CORE_MANAGER
#define GUARD_PROTEUS_CORE_MANAGER
#include <exception> // for exception_ptr
#include <map> // for map
#include <memory> // for allocator, unique_ptr
#include <string> // for string
#include <thread> // for thread
#include <unordered_map> // for unordered_map
#include <utility> // for move, pair
#include "proteus/build_options.hpp" // for PROTEUS_ENABLE_LOGGING
#include "proteus/core/predict_api.hpp" // for RequestParameters
#include "proteus/helpers/queue.hpp" // for BlockingConcurrentQueue
#include "proteus/observation/logging.hpp" // for LoggerPtr
namespace proteus {
class WorkerInfo;
}
// IWYU pragma: no_forward_declare proteus::RequestParameters
namespace proteus {
/**
* @brief IDs used to specify commands to update the Proteus Manager
*
*/
enum class UpdateCommandType {
Shutdown,
Allocate,
Add,
Delete,
};
/**
* @brief Commands sent to update the Proteus Manager consist of an ID, a key
* value (string), an integer, and a pointer to an exception so if the update
* fails for some reason, this information is communicated back to the requester
*/
struct UpdateCommand {
/// Constructor for UpdateCommand
explicit UpdateCommand(UpdateCommandType cmd_, std::string key_ = "",
void* object_ = nullptr, void* retval_ = nullptr)
: cmd(cmd_),
key(std::move(key_)),
object(object_),
retval(retval_),
eptr(nullptr) {}
/// the command ID
UpdateCommandType cmd;
/// a string key that a command can make use of. Usually identifies the worker
std::string key;
/// pointer to an abitrary object
void* object;
/// pointer to a caller-allocated variable to hold the return value
void* retval;
/**
* @brief The caller making a request through the update mechanism should
* catch this exception which is thrown if the requested update fails so the
* caller is not waiting endlessly.
*/
std::exception_ptr eptr;
};
using UpdateCommandQueue = BlockingQueue<std::shared_ptr<UpdateCommand>>;
/**
* @brief The Proteus Manager holds all the state information about a running
* Proteus server. Read access to the state is thread-safe but all modifications
* are handled through a separate update thread to preserve consistency. It is
* a singleton instance and the base code is taken from
* https://stackoverflow.com/a/1008289.
*/
class Manager {
public:
/// Get the singleton Manager instance
static Manager& getInstance() {
// Guaranteed to be destroyed. Instantiated on first use.
static Manager instance;
return instance;
}
Manager(Manager const&) = delete; ///< Copy constructor
Manager& operator=(const Manager&) = delete; ///< Copy assignment constructor
Manager(Manager&& other) = delete; ///< Move constructor
Manager& operator=(Manager&& other) =
delete; ///< Move assignment constructor
std::string loadWorker(std::string const& key, RequestParameters parameters);
void unloadWorker(std::string const& key);
/**
* @brief Get the WorkerInfo object associated with the given key. If the
* worker does not exist, throws an exception.
*
* @param key name of the worker
* @return WorkerInfo*
*/
WorkerInfo* getWorker(std::string const& key);
bool workerReady(std::string const& key);
ModelMetadata getWorkerMetadata(std::string const& key);
/**
* @brief Request that a worker support a request with num inputs. This means
* that the worker must allocate enough buffers to have at least num buffers.
*
* @param key name of the worker to make the request to
* @param num the minimum number of buffers the worker should have after
* allocation
*/
void workerAllocate(std::string const& key, int num);
/**
* @brief Stop the Manager. This should be called prior to ending Proteus.
*
*/
void shutdown();
private:
/// Construct a new Manager object
Manager();
/// Destroy the Manager object
~Manager() = default;
/**
* @brief The Endpoints class is a helper class to bundle up all the worker
* state data structures and operations within the Manager. Its methods should
* be called from the update_manager.
*/
class Endpoints {
public:
std::string load(const std::string& worker, RequestParameters* parameters);
void unload(const std::string& endpoint);
bool exists(const std::string& endpoint);
WorkerInfo* get(const std::string& endpoint);
std::string add(const std::string& worker, RequestParameters parameters);
void shutdown();
private:
// worker -> map[parameters -> endpoint]
std::unordered_map<std::string, std::map<RequestParameters, std::string>>
worker_endpoints_;
// worker -> index
std::unordered_map<std::string, int> worker_indices_;
// endpoint -> parameters
std::unordered_map<std::string, RequestParameters> worker_parameters_;
// endpoint -> Worker_Info*
std::unordered_map<std::string, std::unique_ptr<WorkerInfo>> workers_;
};
/// instantiation of the Endpoints class for maintaining state
Endpoints endpoints_;
/// A queue used to sequentially order changes to the Proteus Manager state
std::unique_ptr<UpdateCommandQueue> update_queue_;
std::thread update_thread_;
#ifdef PROTEUS_ENABLE_LOGGING
LoggerPtr logger_;
#endif
/**
* @brief This method is started as a separate thread when the Manager is
* constructed. It monitors a queue which contains commands that modify the
* shared state of Proteus. This queue serializes these requests and ensures
* consistency
*
* @param input_queue queue where update requests will arrive
*/
void update_manager(UpdateCommandQueue* input_queue);
};
} // namespace proteus
#endif // GUARD_PROTEUS_CORE_MANAGER