Skip to content

Commit

Permalink
Refactor IOCP::OverlappedOperation internalize handle and `wait_f…
Browse files Browse the repository at this point in the history
…or_completion` (#14724)

* Add ivar `@handle`
* Internalize `schedule_overlapped` as `wait_for_completion`
* Add temporary special case for `overlapped_accept`
  • Loading branch information
straight-shoota committed Jun 25, 2024
1 parent a1db18c commit 7e33ecc
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 36 deletions.
70 changes: 39 additions & 31 deletions src/crystal/system/win32/iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,19 @@ module Crystal::IOCP
property previous : OverlappedOperation?
@@canceled = Thread::LinkedList(OverlappedOperation).new

def initialize(@handle : LibC::HANDLE)
end

def initialize(handle : LibC::SOCKET)
@handle = LibC::HANDLE.new(handle)
end

def self.run(handle, &)
operation = OverlappedOperation.new
operation = OverlappedOperation.new(handle)
begin
yield operation
ensure
operation.done(handle)
operation.done
end
end

Expand All @@ -93,9 +100,12 @@ module Crystal::IOCP
pointerof(@overlapped)
end

def result(handle, &)
def wait_for_result(timeout, &)
wait_for_completion(timeout)

raise Exception.new("Invalid state #{@state}") unless @state.done? || @state.started?
result = LibC.GetOverlappedResult(handle, self, out bytes, 0)

result = LibC.GetOverlappedResult(@handle, self, out bytes, 0)
if result.zero?
error = WinError.value
yield error
Expand All @@ -106,10 +116,15 @@ module Crystal::IOCP
bytes
end

def wsa_result(socket, &)
def wait_for_wsa_result(timeout, &)
wait_for_completion(timeout)
wsa_result { |error| yield error }
end

def wsa_result(&)
raise Exception.new("Invalid state #{@state}") unless @state.done? || @state.started?
flags = 0_u32
result = LibC.WSAGetOverlappedResult(socket, self, out bytes, false, pointerof(flags))
result = LibC.WSAGetOverlappedResult(LibC::SOCKET.new(@handle.address), self, out bytes, false, pointerof(flags))
if result.zero?
error = WinError.wsa_value
yield error
Expand All @@ -132,15 +147,13 @@ module Crystal::IOCP
end
end

protected def done(handle)
protected def done
case @state
when .started?
handle = LibC::HANDLE.new(handle) if handle.is_a?(LibC::SOCKET)

# https://learn.microsoft.com/en-us/windows/win32/api/ioapiset/nf-ioapiset-cancelioex
# > The application must not free or reuse the OVERLAPPED structure
# associated with the canceled I/O operations until they have completed
if LibC.CancelIoEx(handle, self) != 0
if LibC.CancelIoEx(@handle, self) != 0
@state = :cancelled
@@canceled.push(self) # to increase lifetime
end
Expand All @@ -150,24 +163,23 @@ module Crystal::IOCP
def done!
@state = :done
end
end

# Returns `false` if the operation timed out.
def self.schedule_overlapped(timeout : Time::Span?, line = __LINE__) : Bool
if timeout
timeout_event = Crystal::IOCP::Event.new(Fiber.current)
timeout_event.add(timeout)
else
timeout_event = Crystal::IOCP::Event.new(Fiber.current, Time::Span::MAX)
end
# memoize event loop to make sure that we still target the same instance
# after wakeup (guaranteed by current MT model but let's be future proof)
event_loop = Crystal::EventLoop.current
event_loop.enqueue(timeout_event)
def wait_for_completion(timeout)
if timeout
timeout_event = Crystal::IOCP::Event.new(Fiber.current)
timeout_event.add(timeout)
else
timeout_event = Crystal::IOCP::Event.new(Fiber.current, Time::Span::MAX)
end
# memoize event loop to make sure that we still target the same instance
# after wakeup (guaranteed by current MT model but let's be future proof)
event_loop = Crystal::EventLoop.current
event_loop.enqueue(timeout_event)

Fiber.suspend
Fiber.suspend

event_loop.dequeue(timeout_event)
event_loop.dequeue(timeout_event)
end
end

def self.overlapped_operation(target, handle, method, timeout, *, writing = false, &)
Expand All @@ -192,9 +204,7 @@ module Crystal::IOCP
return value
end

schedule_overlapped(timeout)

operation.result(handle) do |error|
operation.wait_for_result(timeout) do |error|
case error
when .error_io_incomplete?
raise IO::TimeoutError.new("#{method} timed out")
Expand Down Expand Up @@ -224,9 +234,7 @@ module Crystal::IOCP
return value
end

schedule_overlapped(timeout)

operation.wsa_result(socket) do |error|
operation.wait_for_wsa_result(timeout) do |error|
case error
when .wsa_io_incomplete?
raise IO::TimeoutError.new("#{method} timed out")
Expand Down
8 changes: 3 additions & 5 deletions src/crystal/system/win32/socket.cr
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,7 @@ module Crystal::System::Socket
return nil
end

IOCP.schedule_overlapped(read_timeout || 1.seconds)

operation.wsa_result(socket) do |error|
operation.wait_for_wsa_result(read_timeout || 1.seconds) do |error|
case error
when .wsa_io_incomplete?, .wsaeconnrefused?
return ::Socket::ConnectError.from_os_error(method, error)
Expand Down Expand Up @@ -210,11 +208,11 @@ module Crystal::System::Socket
return true
end

unless IOCP.schedule_overlapped(read_timeout)
unless operation.wait_for_completion(read_timeout)
raise IO::TimeoutError.new("#{method} timed out")
end

operation.wsa_result(socket) do |error|
operation.wsa_result do |error|
case error
when .wsa_io_incomplete?, .wsaenotsock?
return false
Expand Down

0 comments on commit 7e33ecc

Please sign in to comment.