-
Notifications
You must be signed in to change notification settings - Fork 3
/
concurrent_stack.rb
149 lines (134 loc) · 3.35 KB
/
concurrent_stack.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# 3 thread-safe stack implementations
# Written by Alex Dowad
# Usage:
# stack.push(1)
# stack.peek => 1 (1 is not removed from stack)
# stack.pop => 1 (now 1 is removed from stack)
require 'rubygems' # for compatibility with MRI 1.8, JRuby
require 'thread'
require 'atomic' # atomic gem must be installed
# The easy one first
class ThreadSafeStack
def initialize
@s,@m = [],Mutex.new
end
def push(value)
@m.synchronize { @s.push(value) }
end
def pop
@m.synchronize { @s.pop }
end
def peek
@m.synchronize { @s.last }
end
end
# a non-locking version which uses compare-and-swap to update stack
class ConcurrentStack
Node = Struct.new(:value,:next)
def initialize
@top = Atomic.new(nil)
end
def push(value)
node = Node.new(value,nil)
@top.update { |current| node.next = current; node }
end
def pop
node = nil
@top.update do |current|
node = current
return if node.nil?
node.next
end
node.value
end
def peek
node = @top.value
return if node.nil?
node.value
end
end
# same as ConcurrentStack, but additionally recycles popped nodes
# (to reduce load on GC)
# a global free list is used, and is also updated using CAS,
# in exactly the same way as the stacks themselves
class RecyclingConcurrentStack
Node = Struct.new(:value,:next)
FREE_LIST = Atomic.new(nil)
def initialize
@top = Atomic.new(nil)
end
def push(value)
node = get_node(value)
@top.update { |current| node.next = current; node }
end
def pop
node = nil
@top.update do |current|
return if (node = current).nil?
node.next
end
result = node.value
FREE.update do |current|
node.next = current
node
end
result
end
def peek
node = @top.value
return if node.nil?
node.value
end
private
def get_node(val)
# if contention causes the CAS to fail, just allocate a new node
node = FREE.value
if node && FREE.compare_and_swap(node,node.next)
node.value = val
node
else
Node.new(val,nil)
end
end
end
# Test driver
if __FILE__ == $0
require 'benchmark'
ITERATIONS_PER_TEST = 1000000
QUEUE,MUTEX = ConditionVariable.new,Mutex.new
def wait_for_signal
MUTEX.synchronize { QUEUE.wait }
end
def send_signal
MUTEX.synchronize { QUEUE.broadcast }
end
def test(klass)
test_with_threads(klass,1)
test_with_threads(klass,5)
test_with_threads(klass,25)
end
def test_with_threads(klass,n_threads)
iterations = ITERATIONS_PER_TEST / threads
puts "Testing #{klass} with #{n_threads} thread#{'s' if n_threads>1}, iterating #{iterations}x each"
threads = n_threads.times.collect do
Thread.new do
wait_for_signal
iterations.times do
stack.push(rand(100))
stack.peek
stack.pop
end
end
end
n_gc = GC.count
sleep(0.001)
puts Benchmark.measure do
send_signal
threads.each { |t| t.join }
end
puts "Garbage collector ran #{GC.count - n_gc} times"
end
test(ThreadSafeStack)
test(ConcurrentStack)
test(RecyclingConcurrentStack)
end