Skip to content

Commit

Permalink
Channel can send successfully without raise
Browse files Browse the repository at this point in the history
  • Loading branch information
firejox committed Oct 8, 2019
1 parent cdafa0e commit b76b1fb
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 7 deletions.
42 changes: 42 additions & 0 deletions spec/std/channel_spec.cr
Expand Up @@ -186,6 +186,26 @@ describe "unbuffered" do

closed.should be_true
end

it "can send suceesfully without raise" do
ch = Channel(Int32).new
raise_flag = false

sender = Fiber.new do
ch.send 1
rescue ex
raise_flag = true
end

yield_to(sender)

ch.receive.should eq(1)
ch.close

Fiber.yield

raise_flag.should be_false
end
end

describe "buffered" do
Expand Down Expand Up @@ -291,6 +311,28 @@ describe "buffered" do
ch.receive?.should eq(123)
end

it "can send sucessfully without raise" do
ch = Channel(Int32).new(1)
raise_flag = false

sender = Fiber.new do
ch.send 1
ch.send 2
rescue ex
raise_flag = true
end

yield_to(sender)

ch.receive.should eq(1)
ch.receive.should eq(2)
ch.close

Fiber.yield

raise_flag.should be_false
end

it "does inspect on unbuffered channel" do
ch = Channel(Int32).new
ch.inspect.should eq("#<Channel(Int32):0x#{ch.object_id.to_s(16)}>")
Expand Down
29 changes: 22 additions & 7 deletions src/channel.cr
Expand Up @@ -79,7 +79,7 @@ class Channel(T)
Closed
end

private record Sender(T), fiber : Fiber, value : T, select_context : SelectContext(Nil)?
private record Sender(T), fiber : Fiber, value : T, state_ptr : DeliveryState*, select_context : SelectContext(Nil)?
private record Receiver(T), fiber : Fiber, value_ptr : T*, state_ptr : DeliveryState*, select_context : SelectContext(T)?

def initialize(@capacity = 0)
Expand All @@ -94,7 +94,10 @@ class Channel(T)
def close
@closed = true

@senders.each &.fiber.enqueue
@senders.each do |sender|
sender.state_ptr.value = DeliveryState::Closed
sender.fiber.enqueue
end

@receivers.each do |receiver|
receiver.state_ptr.value = DeliveryState::Closed
Expand All @@ -115,11 +118,20 @@ class Channel(T)
raise_if_closed

send_internal(value) do
@senders << Sender(T).new(Fiber.current, value, select_context: nil)
state = DeliveryState::None
@senders << Sender(T).new(Fiber.current, value, pointerof(state), select_context: nil)
@lock.unsync do
Crystal::Scheduler.reschedule
end
raise_if_closed

case state
when DeliveryState::Delivered
# ignore
when DeliveryState::Closed
raise ClosedError.new
else
raise "BUG: Fiber was awaken without channel delivery state set"
end
end

self
Expand Down Expand Up @@ -188,11 +200,13 @@ class Channel(T)
if (queue = @queue) && !queue.empty?
deque_value = queue.shift
if sender = dequeue_sender
sender.state_ptr.value = DeliveryState::Delivered
sender.fiber.enqueue
queue << sender.value
end
deque_value
elsif sender = dequeue_sender
sender.state_ptr.value = DeliveryState::Delivered
sender.fiber.enqueue
sender.value
else
Expand Down Expand Up @@ -240,8 +254,8 @@ class Channel(T)
@receivers.delete_if { |receiver| receiver.fiber == Fiber.current }
end

protected def wait_for_send(value, select_context)
@senders << Sender(T).new(Fiber.current, value, select_context)
protected def wait_for_send(value, state_ptr, select_context)
@senders << Sender(T).new(Fiber.current, value, state_ptr, select_context)
end

protected def unwait_for_send
Expand Down Expand Up @@ -379,6 +393,7 @@ class Channel(T)
include SelectAction(Nil)

def initialize(@channel : Channel(T), @value : T)
@state = DeliveryState::None
end

def execute : Channel::NotReady?
Expand All @@ -391,7 +406,7 @@ class Channel(T)
end

def wait(context : SelectContext(Nil))
@channel.wait_for_send(@value, context)
@channel.wait_for_send(@value, pointerof(@state), context)
end

def unwait
Expand Down

0 comments on commit b76b1fb

Please sign in to comment.