Skip to content

Commit

Permalink
Cleanup TCPConnection GC-safety mechanism for writev buffers
Browse files Browse the repository at this point in the history
Prior to this commit, the TCPConnection `pending_writev` buffer had to
have a secondary buffer called `_pending` to ensure that the data
wouldn't get GC'd too early by the runtime.

This commit removes that hack and replaces it with a slightly different
hack based on the strategy used in `File` in PR ponylang#2775. The hacky part
is that now we have two buffers for `writev` data. One for windows
(`_pending_windows`) and one for non-windows (`_pending_posix`) to
account for the difference in order of the struct between windows
and posix. This, however, seems less bad than the previous hack of
having the secondary buffer to ensure GC safety.

resolves ponylang#2782
resolves ponylang#2779
  • Loading branch information
dipinhora committed Jun 9, 2019
1 parent 302dccf commit 32d6b51
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 56 deletions.
6 changes: 6 additions & 0 deletions packages/builtin/pointer.pony
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ struct Pointer[A]
"""
compile_intrinsic

fun tag offset(n: USize): Pointer[A] tag =>
"""
Return a tag pointer to the n-th element.
"""
_unsafe()._offset(n)

fun tag _element_size(): USize =>
"""
Return the size of a single element in an array of type A.
Expand Down
106 changes: 50 additions & 56 deletions packages/net/tcp_connection.pony
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,9 @@ actor TCPConnection
var _shutdown: Bool = false
var _shutdown_peer: Bool = false
var _in_sent: Bool = false
// _pending is used to avoid GC prematurely reaping memory.
// See GitHub bug 2526 for more. It looks like a write-only
// data structure, but its use is vital to avoid GC races:
// _pending_writev's C pointers are invisible to ORCA.
embed _pending: Array[ByteSeq] = _pending.create()
embed _pending_writev: Array[USize] = _pending_writev.create()
embed _pending_writev_posix: Array[(Pointer[U8] tag, USize)] = _pending_writev_posix.create()
embed _pending_writev_windows: Array[(USize, Pointer[U8] tag)] = _pending_writev_windows.create()

var _pending_sent: USize = 0
var _pending_writev_total: USize = 0
var _read_buf: Array[U8] iso
Expand Down Expand Up @@ -365,18 +362,16 @@ actor TCPConnection
var num_to_send: I32 = 0
for bytes in _notify.sentv(this, data).values() do
// Add an IOCP write.
_pending_writev
.> push(bytes.size())
.> push(bytes.cpointer().usize())
_pending_writev_windows
.> push((bytes.size(), bytes.cpointer()))
_pending_writev_total = _pending_writev_total + bytes.size()
_pending.push(bytes)
num_to_send = num_to_send + 1
end

// Write as much data as possible.
var len =
@pony_os_writev[USize](_event,
_pending_writev.cpointer(_pending_sent * 2),
_pending_writev_windows.cpointer(_pending_sent),
num_to_send) ?

_pending_sent = _pending_sent + num_to_send.usize()
Expand All @@ -390,11 +385,9 @@ actor TCPConnection
end
else
for bytes in _notify.sentv(this, data).values() do
_pending_writev
.> push(bytes.cpointer().usize())
.> push(bytes.size())
_pending_writev_posix
.> push((bytes.cpointer(), bytes.size()))
_pending_writev_total = _pending_writev_total + bytes.size()
_pending.push(bytes)
end

_pending_writes()
Expand Down Expand Up @@ -582,12 +575,11 @@ actor TCPConnection
ifdef windows then
try
// Add an IOCP write.
_pending_writev .> push(data.size()) .> push(data.cpointer().usize())
_pending_writev_windows .> push((data.size(), data.cpointer()))
_pending_writev_total = _pending_writev_total + data.size()
_pending.push(data)

@pony_os_writev[USize](_event,
_pending_writev.cpointer(_pending_sent * 2), I32(1)) ?
_pending_writev_windows.cpointer(_pending_sent), I32(1)) ?

_pending_sent = _pending_sent + 1

Expand All @@ -599,9 +591,8 @@ actor TCPConnection
end
end
else
_pending_writev .> push(data.cpointer().usize()) .> push(data.size())
_pending_writev_posix .> push((data.cpointer(), data.size()))
_pending_writev_total = _pending_writev_total + data.size()
_pending.push(data)
_pending_writes()
end
end
Expand All @@ -620,7 +611,7 @@ actor TCPConnection

try
_manage_pending_buffer(len.usize(),
_pending_writev_total, _pending.size())?
_pending_writev_total, _pending_writev_windows.size())?
end

if _pending_sent < 16 then
Expand Down Expand Up @@ -656,22 +647,22 @@ actor TCPConnection
end
try
// Determine number of bytes and buffers to send.
if (_pending_writev.size() / 2) < writev_batch_size then
num_to_send = _pending_writev.size() / 2
if _pending_writev_posix.size() < writev_batch_size then
num_to_send = _pending_writev_posix.size()
bytes_to_send = _pending_writev_total
else
// Have more buffers than a single writev can handle.
// Iterate over buffers being sent to add up total.
num_to_send = writev_batch_size
bytes_to_send = 0
for d in Range[USize](1, num_to_send * 2, 2) do
bytes_to_send = bytes_to_send + _pending_writev(d)?
for d in Range[USize](0, num_to_send, 1) do
bytes_to_send = bytes_to_send + _pending_writev_posix(d)?._2
end
end

// Write as much data as possible.
var len = @pony_os_writev[USize](_event,
_pending_writev.cpointer(), num_to_send.i32()) ?
_pending_writev_posix.cpointer(), num_to_send.i32()) ?

if _manage_pending_buffer(len, bytes_to_send, num_to_send)? then
return true
Expand Down Expand Up @@ -699,61 +690,63 @@ actor TCPConnection
"""
var len = bytes_sent
if len < bytes_to_send then
var num_sent: USize = 0
while len > 0 do
let iov_p =
(let iov_p, let iov_s) =
ifdef windows then
_pending_writev(1)?
(let tmp_s, let tmp_p) = _pending_writev_windows(num_sent)?
(tmp_p, tmp_s)
else
_pending_writev(0)?
end
let iov_s =
ifdef windows then
_pending_writev(0)?
else
_pending_writev(1)?
_pending_writev_posix(num_sent)?
end
if iov_s <= len then
num_sent = num_sent + 1
len = len - iov_s
_pending_writev.shift()?
_pending_writev.shift()?
_pending.shift()?
ifdef windows then
_pending_sent = _pending_sent - 1
end
_pending_writev_total = _pending_writev_total - iov_s
else
ifdef windows then
_pending_writev.update(1, iov_p+len)?
_pending_writev.update(0, iov_s-len)?
_pending_writev_windows(num_sent)? = (iov_s-len, iov_p.offset(len))
else
_pending_writev.update(0, iov_p+len)?
_pending_writev.update(1, iov_s-len)?
_pending_writev_posix(num_sent)? = (iov_p.offset(len), iov_s-len)
end
_pending_writev_total = _pending_writev_total - len
len = 0
end
end

ifdef windows then
// do a trim in place instead of many shifts for efficiency
_pending_writev_windows.trim_in_place(num_sent)
_pending_sent = _pending_sent - num_sent
else
// do a trim in place instead of many shifts for efficiency
_pending_writev_posix.trim_in_place(num_sent)
end

ifdef not windows then
_apply_backpressure()
end
else
// sent all data we requested in this batch
_pending_writev_total = _pending_writev_total - bytes_to_send
if _pending_writev_total == 0 then
_pending_writev.clear()
_pending.clear()
ifdef windows then
// do a trim in place instead of a clear to free up memory
_pending_writev_windows.trim_in_place(_pending_writev_windows.size())
_pending_sent = 0
else
// do a trim in place instead of a clear to free up memory
_pending_writev_posix.trim_in_place(_pending_writev_posix.size())
end
return true
else
for d in Range[USize](0, num_to_send, 1) do
_pending_writev.shift()?
_pending_writev.shift()?
_pending.shift()?
ifdef windows then
_pending_sent = _pending_sent - 1
end
ifdef windows then
// do a trim in place instead of many shifts for efficiency
_pending_writev_windows.trim_in_place(num_to_send)
_pending_sent = _pending_sent - 1
else
// do a trim in place instead of many shifts for efficiency
_pending_writev_posix.trim_in_place(num_to_send)
end
end
end
Expand Down Expand Up @@ -972,11 +965,12 @@ actor TCPConnection
_shutdown = true
_shutdown_peer = true

_pending.clear()
_pending_writev.clear()
_pending_writev_total = 0
ifdef windows then
_pending_writev_windows.clear()
_pending_sent = 0
else
_pending_writev_posix.clear()
end

ifdef not windows then
Expand Down

0 comments on commit 32d6b51

Please sign in to comment.