-
Notifications
You must be signed in to change notification settings - Fork 0
/
uthread.cpp
543 lines (438 loc) · 12.5 KB
/
uthread.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
// From http://www-users.cselabs.umn.edu/classes/Fall-2017/csci5103/PROJECT/PROJECT1/sigjmp-demo.c.
#include <aio.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#include <stdlib.h>
#include <sys/time.h>
#include <deque>
#include <algorithm>
#include "uthread.h"
#define STACK_SIZE 10000
/**
* Global thread state.
*/
TCB threads[MAX_THREADS];
/**
* Jump buff for returning to main environment.
*/
sigjmp_buf main_env;
/**
* Total number of created threads.
*/
int num_threads = 0;
/**
* Ready List, waiting, suspended list
*/
std::deque<int> ready_list;
std::deque<int> waiting_list;
std::deque<int> suspended_list;
std::deque<char*> garbage;
/**
* TID of the current executing thread.
*/
int current_thread_id = -1;
/**
* For signal handler and timer
*/
struct sigaction sa;
struct itimerval timer;
void uthread_yield();
void test_uthread_suspend();
void test_timing();
void finish() {
siglongjmp(main_env, 1);
}
bool valid_tid(int tid) {
return 0 <= tid && tid < num_threads;
}
/**
* Remove an id from a queue of items.
* @param items - The queue of items.
* @param id - The id to remove.
* @return true if the item was removed, else false
*/
bool remove(std::deque<int> &items, int num) {
std::deque<int>::iterator it;
if ((it = find(items.begin(), items.end(), num)) != items.end()) {
items.erase(it);
return true;
}
return false;
}
/**
* Add an id to a queue, removing it if it already exists.
*/
void add_unique(std::deque<int> &items, int num){
remove(items, num);
items.push_back(num);
}
/**
* Removes all the threads that were waiting on thread tid from
* the waiting list.
* @param tid - the tid that threads no longre have to wait for
*/
void free_waiting_threads(int tid) {
// Search for threads waiting on this thread - set them to ready
std::deque<int>::iterator it = waiting_list.begin();
std::deque<int>::iterator end = waiting_list.end();
while (it != end) {
int id = *it;
if (threads[id].waiting_for_tid == tid) {
threads[id].waiting_for_tid = -1;
waiting_list.erase(it);
add_unique(ready_list, id);
}
++it;
}
}
/**
* Mark a thread as complete and release its resources.
* Interrupts should be disabled when this function is called.
* @param tid The tid of the thread.
*/
void set_complete(int tid){
TCB &tcb = threads[tid];
tcb.complete = true;
// Remove garbage.
while(!garbage.empty()){
free(garbage.front());
garbage.pop_front();
}
// Add stack to garbage.
garbage.push_back(tcb.stack);
free_waiting_threads(tid);
}
/* A translation is required when using an address of a variable.
Use this as a black box in your code. */
#ifdef __x86_64__
/* 64 bit Intel arch */
typedef unsigned long address_t;
#define JB_SP 6
#define JB_PC 7
address_t translate_address(address_t addr) {
address_t ret;
asm volatile("xor %%fs:0x30,%0\n"
"rol $0x11,%0\n"
: "=g" (ret)
: "0" (addr));
return ret;
}
#else
/* 32 bit Intel arch */
typedef unsigned int address_t;
#define JB_BP 3
#define JB_SP 4
#define JB_PC 5
address_t translate_address(address_t addr){
address_t ret;
asm volatile("xor %%gs:0x18,%0\n"
"rol $0x9,%0\n"
: "=g" (ret)
: "0" (addr));
return ret;
}
#endif
/**
* Blocks SIGVTALRM interrupts
*/
void disable_interrupts() {
// Block interrupts
sigset_t blockmask;
if ((sigemptyset(&blockmask) == -1) || (sigaddset(&blockmask, SIGVTALRM) == -1)) {
perror("Failed to initialize signal set");
exit(1);
} else if (sigprocmask(SIG_BLOCK, &blockmask, nullptr) == -1) {
perror("Failed to block interrupt");
exit(1);
}
}
/**
* Removes the block on SIGVTALRM
*/
void enable_interrupts() {
sigset_t blockmask;
if ((sigemptyset(&blockmask) == -1) || (sigaddset(&blockmask, SIGVTALRM) == -1)) {
perror("Failed to initialize signal set");
exit(1);
}
// Unblock interrupts
if (sigprocmask(SIG_UNBLOCK, &blockmask, nullptr) == -1) {
perror("Failed to unblock interrupt");
exit(1);
}
}
/**
* Completes a thread's execution. adds to the ready list.
*/
void thread_complete() {
disable_interrupts();
int ret_val = sigsetjmp(threads[current_thread_id].env, 1);
if (ret_val == 1) {
enable_interrupts();
return;
}
set_complete(current_thread_id);
// If all threads have complete execution
if (ready_list.empty()) {
enable_interrupts();
finish();
}
int old_id = current_thread_id;
// Choose the first thread on the ready list
do{
current_thread_id = ready_list.front();
ready_list.pop_front();
}while(threads[current_thread_id].complete);
enable_interrupts();
siglongjmp(threads[current_thread_id].env, 1);
}
/**
* Add calling thread to the waiting list to block. Switch to a
* different thread.
*/
void thread_switch() {
disable_interrupts();
int ret_val = sigsetjmp(threads[current_thread_id].env, 1);
if (ret_val == 1) {
enable_interrupts();
return;
}
// Place calling thread on waiting list
add_unique(waiting_list, current_thread_id);
// Deadlock
if (ready_list.empty()) {
enable_interrupts();
finish();
}
// Take the top thread off the ready list
current_thread_id = ready_list.front();
ready_list.pop_front();
enable_interrupts();
siglongjmp(threads[current_thread_id].env, 1);
}
/**
* thread_wrapper is the function initially called when a thread starts.
* Calls the thread function with its argument, marks the thread as complete, and yields.
* @param arg not used
*/
void thread_wrapper(void *arg) {
TCB *tcb = &threads[current_thread_id];
tcb->result = tcb->function(tcb->arg);
thread_complete();
}
/*
* Create a new uthread.
* @param start_routine - The function that should be executed in the thread.
* @param arg - An argument to pass to the function.
* @return The tid of the created thread, or -1 if there were too many threads.
*/
int uthread_create(void *(start_routine)(void *), void *arg) {
disable_interrupts();
// We support a limited number of threads.
if (num_threads >= MAX_THREADS - 1) {
enable_interrupts();
return -1;
}
int tid = num_threads;
TCB *tcb = &threads[tid];
// Store the arg and function in a global variable.
tcb->function = start_routine;
tcb->arg = arg;
// Allocate the stack.
tcb->stack = (char *) malloc(STACK_SIZE);
// sp starts out at the top of the stack, pc at the wrapper function.
auto sp = (address_t) tcb->stack + STACK_SIZE - 10 * sizeof(void *);
auto pc = (address_t) thread_wrapper;
// Modify the env_buf with the thread context.
sigsetjmp(tcb->env, 1);
(tcb->env->__jmpbuf)[JB_SP] = translate_address(sp);
(tcb->env->__jmpbuf)[JB_PC] = translate_address(pc);
sigemptyset(&tcb->env->__saved_mask);
// Add thread to ready list
add_unique(ready_list, tid);
// We now have one more thread!
num_threads++;
enable_interrupts();
return tid;
}
void uthread_yield() {
disable_interrupts();
// Save execution state.
int ret_val = sigsetjmp(threads[current_thread_id].env, 1);
if (ret_val == 1) {
enable_interrupts();
return;
}
// Choose the next thread to execute.
add_unique(ready_list, current_thread_id);
current_thread_id = ready_list.front();
ready_list.pop_front();
enable_interrupts();
siglongjmp(threads[current_thread_id].env, 1);
}
int uthread_self() {
return current_thread_id;
}
/**
* Join against another thread.
* @param tid The tid of the thread to join on.
* @param retval A pointer which will be set to the return value of the thread.
* @return 0 if join was successful, false if tid did not represent a valid thread.
*/
int uthread_join(int tid, void **retval) {
if (!valid_tid(tid)) return -1;
if (tid == current_thread_id || threads[tid].complete) return 0;
threads[current_thread_id].waiting_for_tid = tid;
thread_switch();
*retval = threads[tid].result;
return 0;
}
/**
* Resume a suspended thread. Returns -1 on if tid is invalid or tid was not suspended
* @param tid - The tid needed to be resumed
*/
int uthread_resume(int tid) {
if (!valid_tid(tid)) return -1;
disable_interrupts();
// Verify that tid is currently suspended
if (remove(suspended_list, tid)) {
if (threads[tid].waiting_for_tid == -1){
add_unique(ready_list, tid);
} else {
add_unique(waiting_list, tid);
}
enable_interrupts();
return 0;
}
enable_interrupts();
// If it wasn't suspended, return an error.
return -1;
}
/**
* Attmpt to read nbytes bytes from file for file descriptor fildes, into the buffer pointed to by buff.
* @param fildes The file descriptor to read from.
* @param buf The buffer to read into.
* @param nbytes Number of bytes to read.
*/
ssize_t async_read(int fildes, void *buf, size_t nbytes) {
struct aiocb params{};
params.aio_fildes = fildes;
params.aio_buf = buf;
params.aio_nbytes = nbytes;
if (aio_read(¶ms) != 0) return -1;
while (true) {
switch (aio_error(¶ms)) {
case EINPROGRESS:
uthread_yield();
continue;
case ECANCELED:
errno = ECANCELED;
return -1;
default:
return aio_return(¶ms);
}
}
}
/**
* Suspend a thread by adding it to the suspended list. Return -1 if tid is invalid or
* tid is already suspended or complete
* @param - The tid of the thread needed to be suspended
* @return 0, if thread was successfully suspend, else -1.
*/
int uthread_suspend(int tid) {
if (!valid_tid(tid)) return -1;
disable_interrupts();
// If tid is complete it can't be suspended
if (threads[tid].complete) {
enable_interrupts();
return -1;
}
// If a thread tries to suspend itself
if (tid == current_thread_id) {
add_unique(suspended_list, current_thread_id);
thread_switch();
return 0;
}
// Check if the tid trying to be suspended is in the ready list or the waiting list.
if(remove(ready_list, tid)){
add_unique(suspended_list, tid);
} else if(remove(waiting_list, tid)){
add_unique(suspended_list, tid);
} else {
// tid is already suspended
enable_interrupts();
return -1;
}
enable_interrupts();
return 0;
}
/**
* Terminate a thread by setting it to complete
* @param tid - the tid of the thread needing to be terminated
* @return 0 if termination was successful, -1 if tid was not valid.
*/
int uthread_terminate(int tid) {
if (!valid_tid(tid)) return -1;
disable_interrupts();
if (tid == current_thread_id) {
// We could enable interrupts here, but it is safer to not,
// and thread_complete will disable interrupts immediatly anyway.
thread_complete();
return 0; // Unreachable code.
}
remove(ready_list, tid) || remove(waiting_list, tid) || remove(suspended_list, tid);
set_complete(tid);
enable_interrupts();
return 0;
}
/**
* Sets the time slice for how long each thread runs
* @param time_slice - the new time slice for each thread in microseconds
*/
int uthread_init(int time_slice) {
timer.it_interval.tv_usec = time_slice;
return setitimer(ITIMER_VIRTUAL, &timer, nullptr);
}
/**
* Calls thread_yield every time the timer expires
*/
void timer_handler(int signum) {
static int count = 0;
count++;
uthread_yield();
}
/**
* Set up timer
*/
int setupitimer(void) {
timer.it_interval.tv_sec = 0;
timer.it_interval.tv_usec = 1000;
timer.it_value = timer.it_interval;
return setitimer(ITIMER_VIRTUAL, &timer, nullptr);
}
/**
* Set up interrupt handler
*/
int setupinterrupt(void) {
sa.sa_handler = timer_handler;
sa.sa_flags = 0;
return (sigemptyset(&sa.sa_mask) || sigaction(SIGVTALRM, &sa, nullptr));
}
/**
* Start the threading library.
*/
void start() {
if (sigsetjmp(main_env, 1) != 0) {
return;
}
if (setupinterrupt() == -1) {
perror("Failed to set up handler");
}
if (setupitimer() == -1) {
perror("Failed to set up timer");
}
current_thread_id = ready_list.front();
ready_list.pop_front();
siglongjmp(threads[current_thread_id].env, 1);
}