-
Notifications
You must be signed in to change notification settings - Fork 0
/
BLQueue.c
167 lines (144 loc) · 5.42 KB
/
BLQueue.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#include <malloc.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include "BLQueue.h"
#include "HazardPointer.h"
struct BLNode;
typedef struct BLNode BLNode;
typedef _Atomic(BLNode*) AtomicBLNodePtr;
struct BLNode {
_Atomic(Value) buffer[BUFFER_SIZE];
_Atomic(int) push_idx;
_Atomic(int) pop_idx;
_Atomic(struct BLNode*) next;
};
// Initialize a new BLNode
static BLNode* BLNode_new(void) {
BLNode* node = (BLNode*)malloc(sizeof(BLNode));
if (node != NULL) {
for (int i = 0; i < BUFFER_SIZE; ++i) {
atomic_store(&node->buffer[i], EMPTY_VALUE);
}
atomic_store(&node->push_idx, 0);
atomic_store(&node->pop_idx, 0);
atomic_store(&node->next, NULL);
}
return node;
}
struct BLQueue {
AtomicBLNodePtr head;
AtomicBLNodePtr tail;
HazardPointer *hp;
};
BLQueue* BLQueue_new(void) {
BLQueue* queue = (BLQueue*)malloc(sizeof(BLQueue));
if (queue != NULL) {
BLNode* dummy = BLNode_new();
atomic_store(&queue->head, dummy);
atomic_store(&queue->tail, dummy);
}
queue->hp = malloc(sizeof(HazardPointer));
HazardPointer_initialize(queue->hp);
return queue;
}
void BLQueue_delete(BLQueue* queue) {
BLNode* node;
while ((node = atomic_load(&queue->head)) != NULL) {
BLNode* next = atomic_load(&node->next);
free(node);
atomic_store(&queue->head, next);
}
HazardPointer_finalize(queue->hp);
free(queue);
}
void BLQueue_push(BLQueue* queue, Value item) {
BLNode* tail;
int idx;
while (true) {
tail = atomic_load(&queue->tail);
// Hypothetical call to protect the tail node being accessed
HazardPointer_protect(queue->hp, (const _Atomic(void*)*)&queue->tail);
idx = atomic_fetch_add(&tail->push_idx, 1);
if (idx < BUFFER_SIZE) {
Value expected = EMPTY_VALUE;
if (atomic_compare_exchange_strong(&tail->buffer[idx], &expected, item)) {
// Clear the hazard pointer protection as we're done with the tail node
HazardPointer_clear(queue->hp);
return; // Successfully inserted
}
} else {
// Need to allocate a new node
BLNode* new_tail = BLNode_new();
if (atomic_compare_exchange_strong(&tail->next, &(BLNode*){NULL}, new_tail)) {
atomic_store(&queue->tail, new_tail);
// Clear the hazard pointer protection as we're done with the old tail node
HazardPointer_clear(queue->hp);
// Try inserting again
} else {
free(new_tail); // Another thread already added a new node
// Clear the hazard pointer protection as the operation failed and we're retrying
HazardPointer_clear(queue->hp);
}
}
}
}
Value BLQueue_pop(BLQueue* queue) {
BLNode* head;
int idx;
Value val;
while (true) {
head = atomic_load(&queue->head);
// Hypothetical call to protect the head node being accessed
HazardPointer_protect(queue->hp, (const _Atomic(void*)*)&queue->head);
idx = atomic_fetch_add(&head->pop_idx, 1);
if (idx < BUFFER_SIZE) {
val = atomic_exchange(&head->buffer[idx], TAKEN_VALUE);
if (val != EMPTY_VALUE) {
// Clear the hazard pointer protection as we're done with the head node
HazardPointer_clear(queue->hp);
return val; // Successfully popped
}
// This slot was empty; trying the next one, so no clear here as we're still in the loop
} else {
// Move to the next node
BLNode* next = atomic_load(&head->next);
if (next == NULL) {
// Clear the hazard pointer protection as the queue is empty and we're about to exit
HazardPointer_clear(queue->hp);
return EMPTY_VALUE; // Queue is empty
}
if (atomic_compare_exchange_strong(&queue->head, &head, next)) {
// Successfully moved head; now it's safe to free the old head
// But before freeing, clear the hazard pointer to avoid use-after-free in other threads
HazardPointer_clear(queue->hp);
HazardPointer_retire(queue->hp, head);
}
// If another thread already updated head, retry without clearing the hazard pointer
// as we're continuing the loop and still need the protection
}
}
}
bool BLQueue_is_empty(BLQueue* queue) {
BLNode* head;
bool isEmpty;
while (true) {
head = atomic_load(&queue->head);
// Hypothetically protect the head node being accessed
HazardPointer_protect(queue->hp, (const _Atomic(void*)*)&queue->head);
int pop_idx = atomic_load(&head->pop_idx);
BLNode* next = atomic_load(&head->next);
// Check if the head node is beyond its capacity and if the next node is NULL
isEmpty = (pop_idx >= BUFFER_SIZE) && (next == NULL);
// Clear the hazard pointer protection after checking
HazardPointer_clear(queue->hp);
if (isEmpty || next == NULL) {
break; // Exit the loop if the queue is determined to be empty or if next is NULL
} else {
// In case the head has moved, retry the operation
continue;
}
}
return isEmpty;
}