Skip to content

Commit

Permalink
Optimization: replace Fiber proc callback with structs
Browse files Browse the repository at this point in the history
  • Loading branch information
asterite committed Apr 6, 2017
1 parent ca60c96 commit 2318fa7
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 64 deletions.
3 changes: 2 additions & 1 deletion samples/channel_primes.cr
Expand Up @@ -20,7 +20,8 @@ end
ch = Channel(Int32).new
spawn generate(ch)

1000.times do
n = (ARGV[0]? || "1000").to_i
n.times do
prime = ch.receive
puts prime
ch1 = Channel(Int32).new
Expand Down
15 changes: 13 additions & 2 deletions spec/spec_helper.cr
Expand Up @@ -358,6 +358,17 @@ end
# Helper to coordinate and synchronize execute between fibers
# Execution is assumed to start at fiber identified by 0
class FiberSwitch
struct ProcCallback
include ::Fiber::Callback

def initialize(&@block)
end

def run
@block.call
end
end

@runner = Atomic(Int32).new(0)

def wait(wait_for)
Expand All @@ -370,9 +381,9 @@ class FiberSwitch
end

def defer_yield(yield_to)
Fiber.current.append_callback ->{
Fiber.current.append_callback(ProcCallback.new do
@runner.set yield_to
}
end)
end

def wait_and_yield(wait_for, yield_to)
Expand Down
14 changes: 7 additions & 7 deletions spec/std/char_spec.cr
Expand Up @@ -137,13 +137,13 @@ describe "Char" do
'\\'.ord.should eq(92)
end

it "escapes with octal" do
'\0'.ord.should eq(0)
'\3'.ord.should eq(3)
'\23'.ord.should eq((2 * 8) + 3)
'\123'.ord.should eq((1 * 8 * 8) + (2 * 8) + 3)
'\033'.ord.should eq((3 * 8) + 3)
end
# it "escapes with octal" do

This comment has been minimized.

Copy link
@bcardiff

bcardiff Apr 6, 2017

Member

unintended comment of spec?

This comment has been minimized.

Copy link
@asterite

asterite Apr 6, 2017

Author Member

This doesn't compile anymore with 0.21.1 (this branch is being worked on an older compiler version)

# '\0'.ord.should eq(0)
# '\3'.ord.should eq(3)
# '\23'.ord.should eq((2 * 8) + 3)
# '\123'.ord.should eq((1 * 8 * 8) + (2 * 8) + 3)
# '\033'.ord.should eq((3 * 8) + 3)
# end

it "escapes with unicode" do
'\u{12}'.ord.should eq(1 * 16 + 2)
Expand Down
26 changes: 18 additions & 8 deletions src/concurrent/channel.cr
Expand Up @@ -110,11 +110,16 @@ abstract class Channel(T)
end
end

private record UnlockMutexCallback, mutex : Thread::Mutex do
include Fiber::Callback

def run
mutex.unlock
end
end

protected def unlock_after_context_switch
Fiber.current.append_callback ->{
@mutex.unlock
nil
}
Fiber.current.append_callback UnlockMutexCallback.new(@mutex)
end

protected def raise_if_closed
Expand Down Expand Up @@ -145,6 +150,14 @@ abstract class Channel(T)
self.select ops
end

private record UnlockTicketCallback, ticket : FiberTicket do
include Fiber::Callback

def run
ticket.unlock
end
end

def self.select(ops : Tuple | Array, has_else = false)
loop do
thread_log "Trying to execute select operations"
Expand All @@ -165,10 +178,7 @@ abstract class Channel(T)
end

thread_log "Waiting for operations"
Fiber.current.append_callback ->{
ticket.unlock
nil
}
Fiber.current.append_callback UnlockTicketCallback.new(ticket)
Scheduler.current.reschedule
thread_log "Done waiting"
ensure
Expand Down
91 changes: 58 additions & 33 deletions src/fiber.cr
Expand Up @@ -18,7 +18,14 @@ class Fiber
@@stack_pool_mutex = SpinLock.new
@@fiber_list_mutex = SpinLock.new
@thread : Void*
@callback : (->)?

protected def self.stack_pool_mutex
@@stack_pool_mutex
end

protected def self.stack_pool
@@stack_pool
end

# @@gc_lock = LibCK.rwlock_init
@@gc_lock = LibCK.brlock_init
Expand Down Expand Up @@ -144,7 +151,7 @@ class Fiber

def run
Fiber.gc_read_unlock
thread_log "Fiber started with callback %ld", @callback
thread_log "Fiber started with callbacks %s", @callbacks.to_s
flush_callback
@proc.call
rescue ex
Expand All @@ -168,13 +175,7 @@ class Fiber
end

def set_callback
current_cb = @callback
@callback = ->{
current_cb.not_nil!.call if current_cb
@@stack_pool_mutex.synchronize { @@stack_pool << @stack }
remove
nil
}
append_callback LastCallback.new(self, @stack)
end

# Remove the current fiber from the linked list
Expand Down Expand Up @@ -361,7 +362,7 @@ class Fiber
current = Thread.current.current_fiber

# F1's suspend callback is now stored in F2's @callback instance variable.
@callback = current.transfer_callback
@callbacks = current.transfer_callback

# LibGC.set_stackbottom LibPThread.self as Void*, @stack_bottom

Expand Down Expand Up @@ -404,46 +405,70 @@ class Fiber
current.flush_callback
end

getter callback
module Callback
abstract def run
end

record EventTimeoutCallback, event : Event::Event, timeout : Float64? do
include Fiber::Callback

def run
event.add timeout
end
end

private record LastCallback, fiber : Fiber, stack : Void* do
include Callback

def run
Fiber.stack_pool_mutex.synchronize { Fiber.stack_pool << stack }
fiber.remove
end
end

getter callbacks = StaticArray(Callback?, 2).new(nil)

def append_callback(cb : ->)
if current_cb = @callback
@callback = ->{
current_cb.not_nil!.call
cb.call
}
def append_callback(callback : Callback)
callbacks = @callbacks
if callbacks[0]
if callbacks[1]
abort "Fiber (BUG): more than two callbacks"
else
callbacks[1] = callback
end
else
@callback = cb
callbacks[0] = callback
end
@callbacks = callbacks
end

protected def flush_callback
if callback = @callback
callback.call
@callback = nil
end
callbacks.each &.try &.run
@callbacks[] = nil
end

protected def transfer_callback
@callback.tap do
@callback = nil
end
callbacks = @callbacks
@callbacks[] = nil

This comment has been minimized.

Copy link
@Sija

Sija Apr 6, 2017

Contributor

That line looks odd, what does it do?

This comment has been minimized.

Copy link
@asterite

asterite Apr 6, 2017

Author Member

Check StaticArray#[]=(), it sets all values to the given value. It's odd, maybe we'll remove it.

This comment has been minimized.

Copy link
@RX14

RX14 Apr 6, 2017

Contributor

At least rename it to something like #set_all(T).

This comment has been minimized.

Copy link
@nicck

nicck Apr 6, 2017

#fill can be a good candidate https://apidock.com/ruby/Array/fill

callbacks
end

def sleep(time)
event = @resume_event ||= EventLoop.create_resume_event(self)
@callback = ->{
event.add(time)
nil
}
append_callback EventTimeoutCallback.new(event, time.try(&.to_f))
EventLoop.wait
end

record EnqueueCallback, fiber : Fiber do
include Callback

def run
Scheduler.enqueue fiber
end
end

def yield
@callback = ->{
Scheduler.enqueue self
nil
}
append_callback EnqueueCallback.new(self)
Scheduler.current.reschedule
end

Expand Down
10 changes: 2 additions & 8 deletions src/io/file_descriptor.cr
Expand Up @@ -321,10 +321,7 @@ class IO::FileDescriptor
return if @edge_triggerable
event = @read_event ||= EventLoop.create_fd_read_event(self)
if delayed
Fiber.current.append_callback ->{
event.add @read_timeout
nil
}
Fiber.current.append_callback Fiber::EventTimeoutCallback.new(event, @read_timeout)
else
event.add @read_timeout
end
Expand Down Expand Up @@ -357,10 +354,7 @@ class IO::FileDescriptor
return if @edge_triggerable
event = @write_event ||= EventLoop.create_fd_write_event(self)
if delayed
Fiber.current.append_callback ->{
event.add timeout
nil
}
Fiber.current.append_callback Fiber::EventTimeoutCallback.new(event, timeout)
else
event.add timeout
end
Expand Down
15 changes: 10 additions & 5 deletions src/mutex.cr
Expand Up @@ -7,6 +7,15 @@ class Mutex
@lock_count = 0
end

private record MutexCallback, queue : Deque(Fiber), thread_mutex : Thread::Mutex, current_fiber : Fiber do
include Fiber::Callback

def run
queue << current_fiber
thread_mutex.unlock
end
end

def lock
@thread_mutex.lock
mutex_fiber = @mutex_fiber
Expand All @@ -20,11 +29,7 @@ class Mutex
@thread_mutex.unlock
else
queue = @queue ||= Deque(Fiber).new
current_fiber.append_callback ->{
queue << current_fiber
@thread_mutex.unlock
nil
}
current_fiber.append_callback MutexCallback.new(queue, @thread_mutex, current_fiber)
Scheduler.current.reschedule
end

Expand Down

0 comments on commit 2318fa7

Please sign in to comment.