-
Notifications
You must be signed in to change notification settings - Fork 1
/
ManyProducerManyConsumerStack.cs
131 lines (109 loc) · 3.57 KB
/
ManyProducerManyConsumerStack.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
namespace Nucs.Collections;
/// <summary>
/// An collection FIFO of unique elements that can be accessed by multiple threads concurrently for both read and write.
/// </summary>
public class ManyProducerManyConsumerStack<T> : IDisposable {
private sealed class Node {
public readonly T Value;
public Node? Next;
public Node(ref T value) {
Value = value;
}
public Node(T value) {
Value = value;
}
}
private int _count;
private volatile bool _isEmpty;
private volatile Node? _head;
public bool IsEmpty => _isEmpty;
public int Count => _count;
public ManyProducerManyConsumerStack() { }
public ManyProducerManyConsumerStack(IEnumerable<T> items) {
EnqueueRange(items);
}
public void Enqueue(T item) {
var nextNode = new Node(ref item);
Node? head;
do {
head = _head; //grab idempotency
//transact
nextNode.Next = head;
} while (!ReferenceEquals(Interlocked.CompareExchange(ref _head, value: nextNode, comparand: head), head));
//transaction completed
if (Interlocked.Increment(ref _count) == 1)
_isEmpty = false;
}
public void Enqueue(ref T item) {
var nextNode = new Node(ref item);
Node? head;
do {
head = _head; //grab idempotency
//transact
nextNode.Next = head;
} while (!ReferenceEquals(Interlocked.CompareExchange(ref _head, value: nextNode, comparand: head), head));
//transaction completed
if (Interlocked.Increment(ref _count) == 1)
_isEmpty = false;
}
public void EnqueueRange<TEnumerable>(TEnumerable items) where TEnumerable : IEnumerable<T> {
#if NET6_0_OR_GREATER
Unsafe.SkipInit(out Node nextNode);
Unsafe.SkipInit(out Node lastNode);
#else
Node nextNode = null;
Node lastNode = null;
#endif
int added = 0;
foreach (var item in items) {
if (++added == 1) {
lastNode = nextNode = new Node(item);
} else {
var newNode = new Node(item);
lastNode.Next = newNode;
lastNode = newNode;
}
}
if (added == 0)
return;
Node? head;
do {
head = _head; //grab idempotency
//transact
lastNode.Next = head;
} while (!ReferenceEquals(Interlocked.CompareExchange(ref _head, value: nextNode, comparand: head), head));
//transaction completed
if (Interlocked.Add(ref _count, added) == added)
_isEmpty = false;
}
public bool TryDequeue(out T? item) {
Node? curr, next;
do {
curr = _head; //take idempotency
if (curr == null) {
item = default;
return false;
}
//transact
next = curr.Next;
} while (!ReferenceEquals(Interlocked.CompareExchange(ref _head, value: next, comparand: curr), curr));
//transaction completed
if (Interlocked.Decrement(ref _count) == 0)
_isEmpty = true;
item = curr.Value;
return true;
}
public void Dispose() {
_head = null;
_count = 0;
}
public void Clear() {
Interlocked.Exchange(ref _head, null);
var cnt = _count;
Interlocked.Add(ref _count, -cnt);
}
}