celtic / akari

My attempt at bringing light to operating system development.

This URL has Read+Write access

akari / UserIPC.cpp
100644 188 lines (150 sloc) 5.209 kb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#include <UserCalls.hpp>
#include <Akari.hpp>
#include <POSIX.hpp>
#include <Symbol.hpp>
#include <debug.hpp>
#include <Console.hpp>
#include <Tasks.hpp>
#include <Syscall.hpp>
#include <BlockingCall.hpp>
 
namespace User {
namespace IPC {
bool registerName(const char *name) {
Symbol sName(name);
if (Akari->tasks->registeredTasks->hasKey(sName))
return false;
 
(*Akari->tasks->registeredTasks)[sName] = Akari->tasks->current;
Akari->tasks->current->registeredName = sName;
return true;
}
 
bool registerStream(const char *name) {
Symbol sNode(name);
if (!Akari->tasks->current->registeredName) {
// TODO: just kill the process, don't kill the system.
// TODO: is this correct behaviour? Or could we have registered nodes
// on no particular name? Why not?.. think about it.
AkariPanic("name not registered - cannot register node");
}
 
if (Akari->tasks->current->streamsByName->hasKey(name)) {
AkariPanic("node already registered - cannot register atop it");
}
 
Tasks::Task::Stream *target = new Tasks::Task::Stream();
 
(*Akari->tasks->current->streamsByName)[sNode] = target;
return true;
}
 
static inline Tasks::Task::Stream *getStream(const char *name, const char *node) {
Symbol sName(name), sNode(node);
 
if (!Akari->tasks->registeredTasks->hasKey(sName))
return 0;
 
Tasks::Task *task = (*Akari->tasks->registeredTasks)[sName];
if (!task->streamsByName->hasKey(sNode))
return 0;
 
return (*task->streamsByName)[sNode];
}
 
u32 obtainStreamWriter(const char *name, const char *node, bool exclusive) {
Tasks::Task::Stream *target = getStream(name, node);
if (!target) return -1;
return target->registerWriter(exclusive);
}
 
u32 obtainStreamListener(const char *name, const char *node) {
Tasks::Task::Stream *target = getStream(name, node);
if (!target) return -1;
return target->registerListener();
}
 
class ReadStreamCall : public BlockingCall {
public:
ReadStreamCall(const char *name, const char *node, u32 listener, char *buffer, u32 n):
_listener(&getStream(name, node)->getListener(listener)), _buffer(buffer), _n(n)
{ }
 
Tasks::Task::Stream::Listener *getListener() const {
return _listener;
}
 
u32 operator ()() {
if (_n == 0) {
_wontBlock();
return 0;
}
 
u32 len = _listener->length();
if (len == 0) {
_willBlock();
return 0;
}
 
if (len > _n) len = _n;
POSIX::memcpy(_buffer, _listener->view(), len);
_listener->cut(len);
 
_wontBlock();
return len;
}
 
protected:
Tasks::Task::Stream::Listener *_listener;
char *_buffer;
u32 _n;
};
 
// Stream reading calls (can block).
 
// Keeping in mind that `buffer''s data probably isn't asciz.
u32 readStream_impl(const char *name, const char *node, u32 listener, char *buffer, u32 n, bool block) {
ReadStreamCall c(name, node, listener, buffer, n);
u32 r = c();
if (!block || !c.shallBlock())
return r;
 
// block && r.shallBlock()
// Block until such time as some data is available.
Tasks::Task::Stream::Listener *l = c.getListener();
 
Akari->tasks->current->userWaiting = true;
Akari->tasks->current->userCall = new ReadStreamCall(c);
l->hook(Akari->tasks->current);
Akari->syscall->returnToNextTask();
return 0;
}
 
u32 readStream(const char *name, const char *node, u32 listener, char *buffer, u32 n) {
return readStream_impl(name, node, listener, buffer, n, true);
}
 
u32 readStreamUnblock(const char *name, const char *node, u32 listener, char *buffer, u32 n) {
return readStream_impl(name, node, listener, buffer, n, false);
}
u32 writeStream(const char *name, const char *node, u32 writer, const char *buffer, u32 n) {
Tasks::Task::Stream *target = getStream(name, node);
if (!target || !target->hasWriter(writer)) return -1;
 
// We do have a writer, so we can go ahead and write to all listeners.
target->writeAllListeners(buffer, n);
return n; // what else?!
}
 
bool registerQueue(const char *node) {
Symbol sNode(node);
if (!Akari->tasks->current->registeredName) {
// TODO
AkariPanic("name not registered - cannot register node");
}
 
if (Akari->tasks->current->streamsByName->hasKey(node)) {
AkariPanic("node already registered - cannot register atop it");
}
 
Tasks::Task::Queue *target = new Tasks::Task::Queue();
 
(*Akari->tasks->current->queuesByName)[sNode] = target;
return true;
}
 
static inline Tasks::Task::Queue *getQueue(Tasks::Task *task, const char *node) {
Symbol sNode(node);
 
if (!task || !task->queuesByName->hasKey(sNode))
return 0;
 
return (*task->queuesByName)[sNode];
}
 
class ReadQueueCall : public BlockingCall {
public:
// TODO: need to know the current task!
ReadQueueCall(Tasks::Task *task, const char *node) { }
};
 
u32 readQueue(const char *node) {
ReadQueueCall c(node);
u32 r = c();
if (!block || !c.shallBlock())
return r;
 
Akari->tasks->current->userWaiting = true;
Akari->tasks->current->userCall = new ReadQueueCall(c);
Akari->syscall->returnToNextTask();
return 0;
}
 
void sendQueue(const char *name, const char *node, u32 reply_to, u32 value) {
// TODO: a u32 value is pretty limited. We'll need to specify a buffer and a length soon.
}
}
}