Skip to content

Commit

Permalink
Decouple Chan::Counter from Chan::Bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
0x1eef committed May 10, 2024
1 parent 183210d commit dc43469
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 32 deletions.
8 changes: 0 additions & 8 deletions lib/xchan/bytes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ class Chan::Bytes
require "json"
require_relative "counter"

##
# @return [Chan::Counter]
attr_reader :counter

##
# @param [String] tmpdir
# Directory where temporary files are stored
Expand All @@ -22,7 +18,6 @@ class Chan::Bytes
def initialize(tmpdir)
@io = Chan.temporary_file("xchan.bytes", tmpdir:)
@io.sync = true
@counter = Chan::Counter.new(tmpdir)
write(@io, [])
end

Expand All @@ -38,7 +33,6 @@ def unshift(len)
bytes = read(@io)
bytes.unshift(len)
write(@io, bytes)
@counter.store(bytes_written: len)
len
end

Expand All @@ -54,7 +48,6 @@ def push(len)
bytes = read(@io)
bytes.push(len)
write(@io, bytes)
@counter.store(bytes_written: len)
len
end

Expand All @@ -68,7 +61,6 @@ def shift
return 0 if bytes.size.zero?
len = bytes.shift
write(@io, bytes)
@counter.store(bytes_read: len)
len
end

Expand Down
2 changes: 1 addition & 1 deletion lib/xchan/counter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def bytes_read
# @param [Hash] new_stat
# @return [void]
# @private
def store(new_stat)
def increment!(new_stat)
stat = read(@io)
new_stat.each { stat[_1.to_s] += _2 }
write(@io, stat)
Expand Down
49 changes: 26 additions & 23 deletions lib/xchan/unix_socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,26 @@ def initialize(serializer, sock_type: Socket::SOCK_DGRAM, tmpdir: Dir.tmpdir)
@serializer = Chan.serializers[serializer]&.call || serializer
@r, @w = ::UNIXSocket.pair(sock_type)
@bytes = Chan::Bytes.new(tmpdir)
@counter = Chan::Counter.new(tmpdir)
@lock = LockFile.new Chan.temporary_file("xchan.lock", tmpdir:)
end

##
# @return [<#dump, #load>]
# Returns the serializer used by the channel.
# Returns the serializer used by the channel
def serializer
@serializer
end

##
# @return [Boolean]
# Returns true when the channel is closed.
# Returns true when the channel is closed
def closed?
@r.closed? and @w.closed?
end

##
# Closes the channel.
# Closes the channel
#
# @raise [IOError]
# When the channel is closed.
Expand All @@ -79,13 +80,13 @@ def close
# Performs a blocking write
#
# @param [Object] object
# An object to write to the channel.
# An object
#
# @raise [IOError]
# When the channel is closed.
# When the channel is closed
#
# @return [Object]
# Returns the number of bytes written to the channel.
# Returns the number of bytes written to the channel
def send(object)
send_nonblock(object)
rescue Chan::WaitWritable, Chan::WaitLockable
Expand All @@ -97,24 +98,25 @@ def send(object)
# Performs a non-blocking write
#
# @param [Object] object
# An object to write to the channel.
# An object
#
# @raise [IOError]
# When the channel is closed.
# When the channel is closed
#
# @raise [Chan::WaitWritable]
# When a write to the underlying IO blocks.
# When a write to {#w} blocks
#
# @raise [Chan::WaitLockable]
# When a write blocks because of a lock held by another process.
# When a write blocks because of a lock held by another process
#
# @return [Integer, nil]
# Returns the number of bytes written to the channel.
# Returns the number of bytes written to the channel
def send_nonblock(object)
@lock.lock_nonblock
raise IOError, "channel closed" if closed?
len = @w.write_nonblock(serialize(object))
@bytes.push(len)
@counter.increment!(bytes_written: len)
len.tap { @lock.release }
rescue IOError, IO::WaitWritable, Errno::ENOBUFS => ex
@lock.release
Expand All @@ -134,10 +136,10 @@ def send_nonblock(object)
# Performs a blocking read
#
# @raise [IOError]
# When the channel is closed.
# When the channel is closed
#
# @return [Object]
# Returns an object from the channel.
# Returns an object from the channel
def recv
recv_nonblock
rescue Chan::WaitReadable
Expand All @@ -152,21 +154,22 @@ def recv
# Performs a non-blocking read
#
# @raise [IOError]
# When the channel is closed.
# When the channel is closed
#
# @raise [Chan::WaitReadable]
# When a read from the underlying IO blocks.
# When a read from {#r} blocks
#
# @raise [Chan::WaitLockable]
# When a read blocks because of a lock held by another process.
# When a read blocks because of a lock held by another process
#
# @return [Object]
# Returns an object from the channel.
# Returns an object from the channel
def recv_nonblock
@lock.lock_nonblock
raise IOError, "closed channel" if closed?
len = @bytes.shift
obj = deserialize(@r.read_nonblock(len.zero? ? 1 : len))
@counter.increment!(bytes_read: len)
obj.tap { @lock.release }
rescue IOError => ex
@lock.release
Expand All @@ -190,7 +193,7 @@ def recv_nonblock
# ch.to_a.last # => 4
#
# @return [Array<Object>]
# Returns the consumed contents of the channel.
# Returns the contents of the channel
def to_a
lock do
[].tap { _1.push(recv) until empty? }
Expand All @@ -199,7 +202,7 @@ def to_a

##
# @return [Boolean]
# Returns true when the channel is empty.
# Returns true when the channel is empty
def empty?
return true if closed?
lock { size.zero? }
Expand All @@ -210,17 +213,17 @@ def empty?

##
# @return [Integer]
# Returns the total number of bytes written to the channel.
# Returns the total number of bytes written to the channel
def bytes_sent
lock { @bytes.counter.bytes_written }
lock { @counter.bytes_written }
end
alias_method :bytes_written, :bytes_sent

##
# @return [Integer]
# Returns the total number of bytes read from the channel.
# Returns the total number of bytes read from the channel
def bytes_received
lock { @bytes.counter.bytes_read }
lock { @counter.bytes_read }
end
alias_method :bytes_read, :bytes_received

Expand Down

0 comments on commit dc43469

Please sign in to comment.