/
message_handler.h
284 lines (222 loc) · 9.02 KB
/
message_handler.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
// Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
#ifndef RUNTIME_VM_MESSAGE_HANDLER_H_
#define RUNTIME_VM_MESSAGE_HANDLER_H_
#include <memory>
#include "vm/isolate.h"
#include "vm/lockers.h"
#include "vm/message.h"
#include "vm/os_thread.h"
#include "vm/port_set.h"
#include "vm/thread_pool.h"
namespace dart {
// A MessageHandler is an entity capable of accepting messages.
class MessageHandler {
protected:
MessageHandler();
public:
enum MessageStatus {
kOK, // We successfully handled a message.
kError, // We encountered an error handling a message.
kShutdown, // The VM is shutting down.
};
static const char* MessageStatusString(MessageStatus status);
virtual ~MessageHandler();
// Allow subclasses to provide a handler name.
virtual const char* name() const;
typedef uword CallbackData;
typedef MessageStatus (*StartCallback)(CallbackData data);
typedef void (*EndCallback)(CallbackData data);
// Runs this message handler on the thread pool.
//
// Before processing messages, the optional StartFunction is run.
//
// A message handler will run until it terminates either normally or
// abnormally. Normal termination occurs when the message handler
// no longer has any live ports. Abnormal termination occurs when
// HandleMessage() indicates that an error has occurred during
// message processing.
// Returns false if the handler terminated abnormally, otherwise it
// returns true.
bool Run(ThreadPool* pool,
StartCallback start_callback,
EndCallback end_callback,
CallbackData data);
// Handles the next message for this message handler. Should only
// be used when not running the handler on the thread pool (via Run
// or RunBlocking).
//
// Returns true on success.
MessageStatus HandleNextMessage();
// Handles any OOB messages for this message handler. Can be used
// even if the message handler is running on the thread pool.
//
// Returns true on success.
MessageStatus HandleOOBMessages();
// Blocks the thread on a condition variable until a message arrives, and then
// handles all messages.
MessageStatus PauseAndHandleAllMessages(int64_t timeout_millis);
// Returns true if there are pending OOB messages for this message
// handler.
bool HasOOBMessages();
// Returns true if there are pending normal messages for this message
// handler.
bool HasMessages();
// A message handler tracks how many live ports it has.
bool HasLivePorts() const { return live_ports_ > 0; }
intptr_t live_ports() const { return live_ports_; }
bool paused() const { return paused_ > 0; }
void increment_paused() { paused_++; }
void decrement_paused() {
ASSERT(paused_ > 0);
paused_--;
}
#if !defined(PRODUCT)
void DebugDump();
bool should_pause_on_start() const { return should_pause_on_start_; }
void set_should_pause_on_start(bool should_pause_on_start) {
should_pause_on_start_ = should_pause_on_start;
}
bool is_paused_on_start() const { return is_paused_on_start_; }
bool should_pause_on_exit() const { return should_pause_on_exit_; }
void set_should_pause_on_exit(bool should_pause_on_exit) {
should_pause_on_exit_ = should_pause_on_exit;
}
bool is_paused_on_exit() const { return is_paused_on_exit_; }
// Timestamp of the paused on start or paused on exit.
int64_t paused_timestamp() const { return paused_timestamp_; }
bool ShouldPauseOnStart(MessageStatus status) const;
bool ShouldPauseOnExit(MessageStatus status) const;
void PausedOnStart(bool paused);
void PausedOnExit(bool paused);
#endif
// Gives temporary ownership of |queue| and |oob_queue|. Using this object
// has the side effect that no OOB messages will be handled if a stack
// overflow interrupt is delivered.
class AcquiredQueues : public ValueObject {
public:
explicit AcquiredQueues(MessageHandler* handler);
~AcquiredQueues();
MessageQueue* queue() {
if (handler_ == NULL) {
return NULL;
}
return handler_->queue_;
}
MessageQueue* oob_queue() {
if (handler_ == NULL) {
return NULL;
}
return handler_->oob_queue_;
}
private:
MessageHandler* handler_;
SafepointMonitorLocker ml_;
friend class MessageHandler;
};
#if defined(DEBUG)
// Check that it is safe to access this message handler.
//
// For example, if this MessageHandler is an isolate, then it is
// only safe to access it when the MessageHandler is the current
// isolate.
virtual void CheckAccess();
#endif
protected:
// ------------ START PortMap API ------------
// These functions should only be called from the PortMap.
// Does this message handler correspond to the current isolate?
virtual bool IsCurrentIsolate() const { return false; }
// Return Isolate to which this message handler corresponds to.
virtual Isolate* isolate() const { return NULL; }
// Posts a message on this handler's message queue.
// If before_events is true, then the message is enqueued before any pending
// events, but after any pending isolate library events.
void PostMessage(std::unique_ptr<Message> message,
bool before_events = false);
// Notifies this handler that a port is being closed.
void ClosePort(Dart_Port port);
// Notifies this handler that all ports are being closed.
void CloseAllPorts();
// Returns true if the handler is owned by the PortMap.
//
// This is used to delete handlers when their last live port is closed.
virtual bool OwnedByPortMap() const { return false; }
// Requests deletion of this message handler when the next task
// completes.
void RequestDeletion();
void increment_live_ports();
void decrement_live_ports();
// ------------ END PortMap API ------------
// Custom message notification. Optionally provided by subclass.
virtual void MessageNotify(Message::Priority priority);
// Handles a single message. Provided by subclass.
//
// Returns true on success.
virtual MessageStatus HandleMessage(std::unique_ptr<Message> message) = 0;
virtual void NotifyPauseOnStart() {}
virtual void NotifyPauseOnExit() {}
// TODO(iposva): Set a local field before entering MessageHandler methods.
Thread* thread() const { return Thread::Current(); }
private:
template <typename GCVisitorType>
friend void MournFinalized(GCVisitorType* visitor);
friend class PortMap;
friend class MessageHandlerTestPeer;
friend class MessageHandlerTask;
struct PortSetEntry : public PortSet<PortSetEntry>::Entry {};
// Called by MessageHandlerTask to process our task queue.
void TaskCallback();
// Checks if we have a slot for idle task execution, if we have a slot
// for idle task execution it is scheduled immediately or we wait for
// idle expiration and then attempt to schedule the idle task.
// Returns true if their is scope for idle task execution so that we
// can loop back to handle more messages or false if idle tasks are not
// scheduled.
bool CheckIfIdleLocked(MonitorLocker* ml);
// Triggers a run of the idle task.
void RunIdleTaskLocked(MonitorLocker* ml);
// NOTE: These two functions release and reacquire the monitor, you may
// need to call HandleMessages to ensure all pending messages are handled.
void PausedOnStartLocked(MonitorLocker* ml, bool paused);
void PausedOnExitLocked(MonitorLocker* ml, bool paused);
// Dequeue the next message. Prefer messages from the oob_queue_ to
// messages from the queue_.
std::unique_ptr<Message> DequeueMessage(Message::Priority min_priority);
void ClearOOBQueue();
// Handles any pending messages.
MessageStatus HandleMessages(MonitorLocker* ml,
bool allow_normal_messages,
bool allow_multiple_normal_messages);
Monitor monitor_; // Protects all fields in MessageHandler.
MessageQueue* queue_;
MessageQueue* oob_queue_;
// This flag is not thread safe and can only reliably be accessed on a single
// thread.
bool oob_message_handling_allowed_;
bool paused_for_messages_;
PortSet<PortSetEntry>
ports_; // Only accessed by [PortMap], protected by [PortMap]s lock.
intptr_t live_ports_; // The number of open ports, including control ports.
intptr_t paused_; // The number of pause messages received.
#if !defined(PRODUCT)
bool should_pause_on_start_;
bool should_pause_on_exit_;
bool is_paused_on_start_;
bool is_paused_on_exit_;
// When isolate gets paused, remember the status of the message being
// processed so that we can resume correctly(into potentially not-OK status).
MessageStatus remembered_paused_on_exit_status_;
int64_t paused_timestamp_;
#endif
bool task_running_;
bool delete_me_;
ThreadPool* pool_;
StartCallback start_callback_;
EndCallback end_callback_;
CallbackData callback_data_;
DISALLOW_COPY_AND_ASSIGN(MessageHandler);
};
} // namespace dart
#endif // RUNTIME_VM_MESSAGE_HANDLER_H_