Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Import of gore and making the specs work with the new code

  • Loading branch information...
commit 492632fdc8196ef191fd28a39ebd49e19d82fd8a 1 parent 7213c40
@nate nate authored
Showing with 920 additions and 450 deletions.
  1. +7 −7 benchmark/sieve.rb
  2. +5 −5 examples/agent-workers.rb
  3. +4 −3 examples/producer-consumer.rb
  4. +1 −26 lib/agent.rb
  5. +20 −0 lib/agent/all.rb
  6. +81 −57 lib/agent/channel.rb
  7. +15 −0 lib/agent/error.rb
  8. +3 −0  lib/agent/errors.rb
  9. +9 −0 lib/agent/go.rb
  10. +7 −0 lib/agent/kernel/channel.rb
  11. +7 −0 lib/agent/kernel/go.rb
  12. +7 −0 lib/agent/kernel/select.rb
  13. +31 −0 lib/agent/notifier.rb
  14. +34 −0 lib/agent/once.rb
  15. +61 −0 lib/agent/pop.rb
  16. +61 −0 lib/agent/push.rb
  17. +130 −0 lib/agent/queue.rb
  18. +20 −0 lib/agent/queues.rb
  19. +74 −56 lib/agent/selector.rb
  20. +0 −93 lib/agent/transport/queue.rb
  21. +12 −0 lib/agent/uuid.rb
  22. +1 −1  lib/agent/version.rb
  23. +30 −0 lib/agent/wait_group.rb
  24. +57 −63 spec/channel_spec.rb
  25. +15 −0 spec/error_spec.rb
  26. +13 −13 spec/examples/channel_of_channels_spec.rb
  27. +12 −11 spec/examples/producer_consumer_spec.rb
  28. +20 −20 spec/examples/sieve_spec.rb
  29. +91 −0 spec/once_spec.rb
  30. +24 −45 spec/queue_spec.rb
  31. +61 −50 spec/selector_spec.rb
  32. +7 −0 spec/uuid_spec.rb
View
14 benchmark/sieve.rb
@@ -2,16 +2,16 @@
require 'lib/agent'
def generate(num)
- ch = Agent::Channel.new(name: "generator_#{num}".to_sym, type: Integer)
- go { |i=1| loop { ch << i+= 1} }
+ ch = channel!(:type => Integer)
+ go! { |i=1| loop { ch << i+= 1} }
@igrigorik Owner

Hmm, any particular reason why you're going with the ! convention for channels and 'goroutines'?

@igrigorik Owner

To answer my own question (reading through the code), guessing to avoid the Kernel::select, etc, conflicts.

Maybe it's the novelty factor, but not yet sold on it.. although don't have a much better recommendation (yet).

@nate Collaborator
nate added a note

Yep, that's the issue I'm trying to solve with the new syntax, and since I was already doing it with select, I thought I'd make it consistent across all the Kernel extensions that Agent adds.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
return ch
end
def filter(in_channel, prime, num)
- out = Agent::Channel.new(name: "filter_#{prime}_#{num}".to_sym, type: Integer)
+ out = channel!(:type => Integer)
- go do
+ go! do
loop do
i = in_channel.receive
out << i if (i % prime) != 0
@@ -22,9 +22,9 @@ def filter(in_channel, prime, num)
end
def sieve(num)
- out = Agent::Channel.new(name: "sieve_#{num}".to_sym, type: Integer)
+ out = channel!(:type => Integer)
- go do
+ go! do
ch = generate(num)
loop do
prime = ch.receive
@@ -49,7 +49,7 @@ def sieve(num)
runners = []
concurrency.times do |n|
- runners << go do
+ runners << go! do
primes = sieve(n)
nth_prime.times { primes.receive }
end
View
10 examples/agent-workers.rb
@@ -6,7 +6,7 @@
# second.
Request = Struct.new(:args, :resultChan)
-clientRequests = Agent::Channel.new(name: :clientRequests, type: Request, size: 2)
+clientRequests = channel!(:type => Request, :size => 2)
# Now, we create a new worker block, which takes in a “reqs” object, calls receive on it
# (hint, req’s is a Channel!), sleeps for a bit, and then sends back a timestamped
@@ -22,15 +22,15 @@
end
# start two workers
-go(clientRequests, &worker)
-go(clientRequests, &worker)
+go!(clientRequests, &worker)
+go!(clientRequests, &worker)
# The rest is simple, we create two distinct requests, which carry a number and a reply
# channel, and pass them to our clientRequests pipe, on which our workers are waiting.
# Once dispatched, we simply call receive and wait for the results!
-req1 = Request.new(1, Agent::Channel.new(:name => :resultChan1, :type => String))
-req2 = Request.new(2, Agent::Channel.new(:name => :resultChan2, :type => String))
+req1 = Request.new(1, channel!(:type => String))
+req2 = Request.new(2, channel!(:type => String))
clientRequests << req1
clientRequests << req2
View
7 examples/producer-consumer.rb
@@ -1,10 +1,11 @@
require 'lib/agent'
-c = Agent::Channel.new(name: :incr, type: Integer)
+c = channel!(:type => Integer)
-go(c) do |c, i=0|
+go!(c) do |c|
+ i = 0
loop { c << i+= 1 }
end
p c.receive # => 1
-p c.receive # => 2
+p c.receive # => 2
View
27 lib/agent.rb
@@ -1,26 +1 @@
-require 'monitor'
-require 'thread'
-require 'securerandom'
-
-require 'agent/channel'
-require 'agent/selector'
-require 'agent/transport/queue'
-
-module Kernel
- def go(*args, &blk)
- Thread.new do
- begin
- blk.call(*args)
- rescue Exception => e
- p e
- p e.backtrace
- end
- end
- end
-
- def select(&blk)
- s = Agent::Selector.new
- yield s
- s.select
- end
-end
+require "agent/all"
View
20 lib/agent/all.rb
@@ -0,0 +1,20 @@
+require "thread"
+require "monitor"
+
+require "agent/version"
+require "agent/errors"
+require "agent/error"
+require "agent/once"
+require "agent/pop"
+require "agent/push"
+require "agent/notifier"
+require "agent/uuid"
+require "agent/wait_group"
+require "agent/go"
+require "agent/queues"
+require "agent/queue"
+require "agent/channel"
+require "agent/selector"
+require "agent/kernel/channel"
+require "agent/kernel/go"
+require "agent/kernel/select"
View
138 lib/agent/channel.rb
@@ -1,101 +1,125 @@
-# Channels combine communication—the exchange of a value—with synchronization—guaranteeing
-# that two calculations (goroutines) are in a known state.
-# - http://golang.org/doc/effective_go.html#channels
+require "agent/uuid"
+require "agent/push"
+require "agent/pop"
+require "agent/queues"
+require "agent/errors"
module Agent
+ def self.channel!(options)
+ Agent::Channel.new(options)
+ end
+
class Channel
- attr_reader :name, :transport, :chan
+ attr_reader :name, :chan, :queue
+
+ class InvalidDirection < Exception; end
+ class Untyped < Exception; end
+ class InvalidType < Exception; end
+ class ChannelClosed < Exception; end
def initialize(opts = {})
@nate Collaborator
nate added a note

What do you think about this change in syntax?

# You only need the type in the case of unbuffered channels
string_unbuffered_channel = channel!(String)
# But if you want to give it a name, that could work like this:
string_unbuffered_channel_with_opts = channel!(String, :name => "foo")

# Allow easier passing of the size of a channel (more go-like in its declaration)
string_buffered_channel = channel!(String, 1)
# But still allow the other options (name)
string_buffered_channel_with_opts = channel!(String, 1, :name => "foo"))

It could be implemented like this:

def initialize(*args)
  opts = args.pop if args.last.is_a?(Hash)
  type = args.shift
  size = args.shift || 0

  raise Untyped unless type

  # Module includes both classes and modules
  raise InvalidType unless type.is_a?(Module)

  @state      = :open
  @name       = opts[:name] || Agent::UUID.generate
  @max        = size
  @type       = type
  @direction  = opts[:direction] || :bidirectional

  @close_mutex = Mutex.new

  @queue = Queues.register(@name, @max)
end
@igrigorik Owner

works for me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
- raise InvalidName if !opts[:name].is_a?(Symbol) || opts[:name].nil?
- raise Untyped if opts[:type].nil?
+ raise Untyped unless opts[:type]
+
+ # Module includes both classes and modules
+ raise InvalidType unless opts[:type].is_a?(Module)
- @state = :active
- @name = opts[:name]
+ @state = :open
+ @name = opts[:name] || Agent::UUID.generate
@max = opts[:size] || 1
@type = opts[:type]
@direction = opts[:direction] || :bidirectional
- @transport = opts[:transport] || Agent::Transport::Queue
- @rcb, @wcb = [], []
- @chan = @transport.new(@name, @max)
+ @queue = Queues.register(@name, @max)
end
- def marshal_load(ary)
- @state, @name, @type, @direction, @transport, @rcb, @wcb = *ary
- @chan = @transport.new(@name)
- self
- end
+ def queue
+ return @queue if @queue
- def register_callback(type, c)
- case type
- when :receive then @rcb << c
- when :send then @wcb << c
- end
+ raise ChannelClosed
end
- def remove_callback(type, name)
- case type
- when :receive then @rcb.delete_if {|c| c.chan.name == name }
- when :send then @wcb.delete_if {|c| c.chan.name == name }
- end
+
+ # Serialization methods
+
+ def marshal_load(ary)
+ @state, @name, @max, @type, @direction = *ary
+ @queue = Queues.queues[@name]
+ self
end
def marshal_dump
- [@state, @name, @type, @direction, @transport, @rcb, @wcb]
+ [@state, @name, @max, @type, @direction]
end
- def push?; @chan.push?; end
- alias :send? :push?
- def send(msg, nonblock = false)
+ # Sending methods
+
+ def send(object, options={})
check_direction(:send)
- check_type(msg)
+ check_type(object)
+
+ push = Push.new(object, options)
+ queue.push(push)
+
+ return push if options[:deferred]
- @chan.send(Marshal.dump(msg), nonblock)
- callback(:receive, @rcb.shift)
+ push.wait
end
alias :push :send
alias :<< :send
- def pop?; @chan.pop?; end
- alias :receive? :pop?
+ def push?; queue.push?; end
+ alias :send? :push?
+
+
+ # Receiving methods
- def receive(nonblock = false)
+ def receive(options={})
check_direction(:receive)
- msg = Marshal.load(@chan.receive(nonblock))
- check_type(msg)
- callback(:send, @wcb.shift)
+ pop = Pop.new(options)
+ queue.pop(pop)
- msg
+ return pop if options[:deferred]
+
+ ok = pop.wait
+ [pop.object, ok]
@nate Collaborator
nate added a note

I'm emulating go's channel behavior right here, but I'm not really sure if that's a good idea.

With go, if a channel gets closed while waiting to write to it, you get a panic. If some code is trying to 'receive' on a channel, and it gets closed, then you get a multi-return value like this.

I like it.

@igrigorik Owner

Since we're trying to emulate Go's behavior... we should probably defer to its mechanics as much as we can. In other words, lgtm.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
end
alias :pop :receive
- def closed?; @state == :closed; end
+ def pop?; queue.pop?; end
+ alias :receive? :pop?
+
+
+ # Closing methods
+
def close
- @chan.close
+ return if @state == :closed
@state = :closed
+ queue.close
+ Queues.remove(@name)
end
+ def closed?; @state == :closed; end
+ def open?; @state == :open; end
- private
+ def remove_operations(operations)
+ # ugly, but it overcomes the race condition without synchronization
+ # since instance variable access is atomic.
+ q = @queue
+ q.remove_operations(operations) if q
+ end
- def callback(type, c)
- c.send Agent::Notification.new(type, self) if c
- end
- def check_type(msg)
- raise InvalidType if !msg.is_a? @type
- end
+ private
- def check_direction(direction)
- return if @direction == :bidirectional
- raise InvalidDirection if @direction != direction
- end
+ def check_type(object)
+ raise InvalidType unless object.is_a?(@type)
+ end
+
+ def check_direction(direction)
+ return if @direction == :bidirectional
+ raise InvalidDirection if @direction != direction
+ end
- class InvalidDirection < Exception; end
- class InvalidName < Exception; end
- class Untyped < Exception; end
- class InvalidType < Exception; end
end
end
View
15 lib/agent/error.rb
@@ -0,0 +1,15 @@
+module Agent
+ class Error
+ def initialize(message)
+ @message = message
+ end
+
+ def to_s
+ @message
+ end
+
+ def message?(message)
+ @message == message
+ end
+ end
+end
View
3  lib/agent/errors.rb
@@ -0,0 +1,3 @@
+module Agent
+ class BlockMissing < Exception; end
+end
View
9 lib/agent/go.rb
@@ -0,0 +1,9 @@
+require "thread"
+require "agent/errors"
+
+module Agent
+ def self.go!(*args, &blk)
+ raise BlockMissing unless blk
+ Thread.new(*args, &blk)
+ end
+end
View
7 lib/agent/kernel/channel.rb
@@ -0,0 +1,7 @@
+require "agent/channel"
+
+module Kernel
+ def channel!(options)
+ Agent.channel!(options)
+ end
+end
View
7 lib/agent/kernel/go.rb
@@ -0,0 +1,7 @@
+require "agent/go"
+
+module Kernel
+ def go!(*args, &blk)
+ Agent.go!(*args, &blk)
+ end
+end
View
7 lib/agent/kernel/select.rb
@@ -0,0 +1,7 @@
+require "agent/selector"
+
+module Kernel
+ def select!(&blk)
+ Agent.select!(&blk)
+ end
+end
View
31 lib/agent/notifier.rb
@@ -0,0 +1,31 @@
+module Agent
+ class Notifier
+ attr_reader :payload
+
+ def initialize
+ @monitor = Monitor.new
+ @cvar = @monitor.new_cond
+ @notified = false
+ @payload = nil
+ end
+
+ def notified?
+ @notified
+ end
+
+ def wait
+ @monitor.synchronize do
+ @cvar.wait_while { !notified? }
+ end
+ end
+
+ def notify(payload)
+ @monitor.synchronize do
+ return @payload if notified?
+ @payload = payload
+ @notified = true
+ @cvar.signal
+ end
+ end
+ end
+end
View
34 lib/agent/once.rb
@@ -0,0 +1,34 @@
+module Agent
+ class Once
+ class AlreadyPerformedError < StandardError; end
+ def initialize
+ @mutex = Mutex.new
+ @performed = false
+ end
+
+ def perform
+ # optimium path
+ return nil, error if @performed
+
+ # slow path
+ @mutex.synchronize do
+ # Hold this mutex for the minimum amount of time possible, since mutexes are slow
+ return nil, error if @performed
+ @performed = true
+ end
+
+ return yield, nil
+ end
+
+ def performed?
+ @performed
+ end
+
+ protected
+
+ def error
+ @error ||= Agent::Error.new("already performed")
+ end
+
+ end
+end
View
61 lib/agent/pop.rb
@@ -0,0 +1,61 @@
+module Agent
+ class Pop
+ attr_reader :uuid, :once, :notifier, :object
+
+ def initialize(options={})
+ @object = nil
+ @uuid = options[:uuid] || Agent::UUID.generate
+ @once = options[:once]
+ @notifier = options[:notifier]
+ @monitor = Monitor.new
+ @cvar = @monitor.new_cond
+ @received = false
+ @closed = false
+ end
+
+ def received?
+ @received
+ end
+
+ def closed?
+ @closed
+ end
+
+ def runnable?
+ !@once || !@once.performed?
+ end
+
+ def wait
+ @monitor.synchronize do
+ @cvar.wait_while{ !received? && !closed? }
+ return received?
+ end
+ end
+
+ def send
+ if @once
+ value, error = @once.perform do
+ @object = Marshal.load(yield)
+ @received = true
+ @monitor.synchronize{ @cvar.signal }
+ @notifier.notify(self) if @notifier
+ end
+
+ return error
+ else
+ @object = Marshal.load(yield)
+ @received = true
+ @monitor.synchronize{ @cvar.signal }
+ @notifier.notify(self) if @notifier
+ end
+ end
+
+ def close
+ @monitor.synchronize do
+ @closed = true
+ @cvar.broadcast
+ end
+ end
+
+ end
+end
View
61 lib/agent/push.rb
@@ -0,0 +1,61 @@
+module Agent
+ class Push
@igrigorik Owner

Pop and Push are different, but share a lot of common code.. perhaps something that can be refactored?

@nate Collaborator
nate added a note

Yep. I was keeping them separate so I could move quickly until I figured out what exactly they needed to do. I figured I should shoot for correctness first, and whatever got me there sooner...

@igrigorik Owner

sounds good

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ attr_reader :object, :uuid, :once, :notifier
+
+ def initialize(object, options={})
+ @object = Marshal.dump(object)
+ @uuid = options[:uuid] || Agent::UUID.generate
+ @once = options[:once]
+ @notifier = options[:notifier]
+ @monitor = Monitor.new
+ @cvar = @monitor.new_cond
+ @sent = false
+ @closed = false
+ end
+
+ def sent?
+ @sent
+ end
+
+ def closed?
+ @closed
+ end
+
+ def runnable?
+ !@once || !@once.performed?
+ end
+
+ def wait
+ @monitor.synchronize do
+ @cvar.wait_while { !sent? && !closed? }
+ raise ChannelClosed if closed?
+ end
+ end
+
+ def receive
+ if @once
+ value, error = @once.perform do
+ yield @object
+ @sent = true
+ @monitor.synchronize{ @cvar.signal }
+ @notifier.notify(self) if @notifier
+ end
+
+ return error
+ else
+ yield @object
+ @sent = true
+ @monitor.synchronize{ @cvar.signal }
+ @notifier.notify(self) if @notifier
+ end
+ end
+
+ def close
+ @monitor.synchronize do
+ @closed = true
+ @cvar.broadcast
+ end
+ end
+
+ end
+end
View
130 lib/agent/queue.rb
@@ -0,0 +1,130 @@
+module Agent
+ class Queue
+ attr_reader :name, :max, :queue, :operations, :push_indexes, :pop_indexes, :monitor
+
+ def initialize(name, max = 1)
+ raise ArgumentError, "queue size must be at least 1" unless max > 0
+
+ @name = name
+ @max = max
+
+ @state = :open
+
+ @queue = []
+
+ @operations = []
+ @push_indexes = []
+ @pop_indexes = []
+ @monitor = Monitor.new
+ end
+
+ def close
+ monitor.synchronize do
+ @state = :closed
+ operations.each{|o| o.close }
+ end
+ end
+ def closed?; @state == :closed; end
+ def open?; @state == :open; end
+
+ def size; queue.size; end
+ def length; queue.size; end
+
+ def push(p)
+ monitor.synchronize do
+ raise ChannelClosed if closed?
+ operations << p
+ push_indexes << (operations.size - 1)
+ process
+ end
+ end
+ def push?; max > size; end
+
+ def pop(p)
+ monitor.synchronize do
+ raise ChannelClosed if closed?
+ operations << p
+ pop_indexes << (operations.size - 1)
+ process
+ end
+ end
+ def pop?; size > 0; end
+
+ def async?; @max > 1; end
+
+ def remove_operations(ops)
+ monitor.synchronize do
+ return if closed?
+
+ ops.each do |operation|
+ index = operations.index(operation)
+ next unless index
+ operations.delete_at(index)
+ if operation.is_a?(Push)
+ push_indexes.delete(index)
+ else
+ pop_indexes.delete(index)
+ end
+ end
+ end
+ end
+
+
+ protected
+
+ def process
@nate Collaborator
nate added a note

I realized last night that this method and this entire class only works on buffered queues. Setting the size to zero breaks absolutely everything. I need to re-think things. I'm wondering if I shouldn't split up the buffered and unbuffered functionality completely, since they behave so differently. I might just check out how go does it at this point...except they can cheat by controlling the scheduler....and I really don't want to use Thread.critical...ugh....

@igrigorik Owner

A separate implementation for for unbuffered is actually pretty reasonable.. I think.

I've long wanted to do - in fact, had some code half way there, but never finished - an unbuffered channel over 0MQ. Benefit being you could piece together multiple workers beautifully.. but it only works on unbuffered channels. :)

(not that unbuffered channel needs to run over 0mq..)

@nate Collaborator
nate added a note

Ah, right, I guessed that was the idea behind the making transports rather agnostic. Again, I was going for the lowest time to implement a correct representation of go's channels. Also, I'd like to wait and see what go replaces netchan with.

@igrigorik Owner

From what I can tell.. netchan was a fun experiment that's gone pretty much nowhere - unfortunately, exactly because its unbuffered. I'm surprised they're still keeping it in stdlib.

@nate Collaborator
nate added a note

Well, at least they're putting it under the "old" package.

@igrigorik Owner

s/old/dead/g :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ return if (push_indexes.empty? || !push?) && (pop_indexes.empty? && !pop?)
+
+ index = 0
+
+ loop do
+ if operations[index].is_a?(Push)
+ if push?
+ operations.delete_at(index).receive do |obj|
+ queue.push(obj)
+ end
+ push_indexes.shift
+ elsif pop? && index = pop_indexes[0]
+ next
+ else
+ break
+ end
+ else # Pop
+ if pop?
+ operations.delete_at(index).send do
+ queue.shift
+ end
+ pop_indexes.shift
+ elsif push? && index = push_indexes[0]
+ next
+ else
+ break
+ end
+ end
+
+ case operations[0]
+ when Push
+ if push?
+ index = 0
+ elsif pop? && index = pop_indexes[0]
+ next
+ else
+ break
+ end
+ when Pop
+ if pop?
+ index = 0
+ elsif push? && index = push_indexes[0]
+ next
+ else
+ break
+ end
+ else
+ break
+ end
+ end
+
+ end
+
+ end
+end
View
20 lib/agent/queues.rb
@@ -0,0 +1,20 @@
+module Agent
+ module Queues
+ LOCK = Monitor.new
+
+ class << self
+ attr_accessor :queues
+ end
+
+ self.queues = {}
+
+ def self.register(name, max)
+ return queues[name] if queues.has_key?(name)
+ LOCK.synchronize{ queues[name] ||= Agent::Queue.new(name, max) }
+ end
+
+ def self.remove(name)
+ LOCK.synchronize{ queues.delete(name) }
+ end
+ end
+end
View
130 lib/agent/selector.rb
@@ -1,83 +1,101 @@
+require "agent/uuid"
+require "agent/push"
+require "agent/pop"
+require "agent/channel"
+require "agent/notifier"
+require "agent/errors"
+
module Agent
- Notification = Struct.new(:type, :chan)
+ def self.select!
+ raise BlockMissing unless block_given?
+ selector = Agent::Selector.new
+ yield selector
+ selector.select
+ ensure
+ selector && selector.dequeue_unrunnable_operations
+ end
class Selector
attr_reader :cases
+ class DefaultCaseAlreadyDefinedError < Exception; end
+
+ Case = Struct.new(:uuid, :channel, :direction, :value, :blk)
+
def initialize
- @cases = {}
- @r, @w = [], []
- @immediate = nil
- @default = nil
+ @ordered_cases = []
+ @cases = {}
+ @operations = {}
+ @once = Once.new
+ @notifier = Notifier.new
end
- def default(&blk); @default = blk; end
+ def default(&blk)
+ if @default_case
+ @default_case.channel.close
+ raise DefaultCaseAlreadyDefinedError
+ else
+ @default_case = self.case(channel!(:type => TrueClass), :receive, &blk)
+ end
+ end
def timeout(t, &blk)
- s = Agent::Channel.new(name: uuid_channel, :type => TrueClass)
- go(s) { sleep t; s.send true; s.close }
+ s = channel!(:type => TrueClass)
+ go!{ sleep t; s.send(true); s.close }
self.case(s, :receive, &blk)
end
- def case(c, op, &blk)
- raise "invalid case, must be a channel" if !c.is_a? Agent::Channel
-
- condition = c.__send__("#{op}?")
- return unless blk
-
- case op
- when :send then @w.push c
- when :receive then @r.push c
- end
-
- @cases["#{c.name}-#{op}"] = blk
- @immediate ||= blk if condition
+ def case(chan, direction, value=nil, &blk)
+ raise "invalid case, must be a channel" unless chan.is_a?(Agent::Channel)
+ raise BlockMissing unless blk
+ uuid = Agent::UUID.generate
+ cse = Case.new(uuid, chan, direction, value, blk)
+ @ordered_cases << cse
+ @cases[uuid] = cse
+ @operations[chan] = []
+ cse
end
def select
- if @immediate
- @immediate.call
- elsif !@default.nil?
- @default.call
- else
-
- op, c = nil, nil
- if !@r.empty? || !@w.empty?
-
- s = Agent::Channel.new(name: uuid_channel, :type => Agent::Notification)
- @w.map {|c| c.register_callback(:send, s) }
- @r.map {|c| c.register_callback(:receive, s) }
-
- begin
- n = s.receive
-
- case n.type
- when :send then @w.map {|c| c.remove_callback(:send, n.chan.name)}
- when :receive then @r.map {|c| c.remove_callback(:receive, n.chan.name)}
- end
-
- op, c = @cases["#{n.chan.name}-#{n.type}"], n.chan
- rescue Exception => e
- if e.message =~ /deadlock/
- raise Exception.new("Selector deadlock: can't select on channel running in same goroutine")
- else
- raise e
- end
- ensure
- s.close
+ if !@ordered_cases.empty?
+ options = {:once => @once, :notifier => @notifier, :deferred => true}
+
+ @ordered_cases.each do |cse|
+ if cse.direction == :send
+ @operations[cse.channel] << cse.channel.send(cse.value, :uuid => cse.uuid,
+ :once => @once,
+ :notifier => @notifier,
+ :deferred => true)
+ else # :receive
+ @operations[cse.channel] << cse.channel.receive(:uuid => cse.uuid,
+ :once => @once,
+ :notifier => @notifier,
+ :deferred => true)
end
+ end
+ if @default_case
+ @default_case.channel.send(true, :uuid => @default_case.uuid, :once => @once, :notifier => @notifier, :deferred => true)
end
- op.call(c) if op
+ @notifier.wait
+ operation = @notifier.payload
+
+ if operation.is_a?(Push)
+ @cases[operation.uuid].blk.call
+ else # Pop
+ @cases[operation.uuid].blk.call(operation.object)
+ end
+
+ @default_case.channel.close if @default_case
end
end
- private
-
- def uuid_channel
- SecureRandom.uuid.gsub('-','_').to_sym
+ def dequeue_unrunnable_operations
+ @operations.each do |channel, operations|
+ channel.remove_operations(operations)
end
+ end
end
end
View
93 lib/agent/transport/queue.rb
@@ -1,93 +0,0 @@
-module Agent
- module Transport
-
- class MemoryQueue
- attr_accessor :que, :monitor, :cvar
- def initialize
- @que = []
- @monitor = Monitor.new
- @cvar = @monitor.new_cond
- end
- end
-
- class Queue
- attr_reader :name, :max
- LOCK = Monitor.new
-
- def self.register(name)
- eval <<-RUBY
- return @@__agent_queue_#{name}__ if defined? @@__agent_queue_#{name}__
- LOCK.synchronize { @@__agent_queue_#{name}__ ||= MemoryQueue.new }
- RUBY
- end
-
- def initialize(name, max = 1)
- raise ArgumentError, "queue size must be at least 1" unless max > 0
-
- @name = name
- @max = max
-
- Queue.register(name)
- end
-
- %w[que monitor cvar].each do |attr|
- define_method attr do
- begin
- Queue.send(:class_variable_get, :"@@__agent_queue_#{@name}__").send attr
- rescue NameError
- retry
- end
- end
- end
-
- def close
- Queue.send(:remove_class_variable, :"@@__agent_queue_#{@name}__")
- end
-
- def size; que.size; end
- def length; que.size; end
-
- def push?; max > size; end
- def push(obj)
- monitor.synchronize {
- cvar.wait_while{ que.length >= @max }
- que.push obj
- cvar.signal
- }
- end
- alias << push
- alias enq push
-
- def pop?; size > 0; end
- def pop(*args)
- monitor.synchronize {
- cvar.wait_while{ que.empty? }
-
- retval = que.shift
- cvar.signal
-
- retval
- }
- end
- alias shift pop
- alias deq pop
-
- def async?; @max > 1; end
-
- def send(msg, nonblock = false)
- monitor.synchronize {
- raise ThreadError, "buffer full" if nonblock && que.length >= @max
- push(msg)
- }
- end
-
- def receive(nonblock = false)
- monitor.synchronize {
- raise ThreadError, "buffer empty" if nonblock && que.empty?
- pop
- }
- end
-
- end
- end
-end
View
12 lib/agent/uuid.rb
@@ -0,0 +1,12 @@
+require 'securerandom'
+
+module Agent
+ module UUID
+ def self.generate
+ ary = SecureRandom.random_bytes(16).unpack("NnnnnN")
+ ary[2] = (ary[2] & 0x0fff) | 0x4000
+ ary[3] = (ary[3] & 0x3fff) | 0x8000
+ "%08x_%04x_%04x_%04x_%04x%08x" % ary
+ end
+ end
+end
View
2  lib/agent/version.rb
@@ -1,3 +1,3 @@
module Agent
- VERSION = "0.1.0"
+ VERSION = "0.0.1"
@igrigorik Owner

guessing you want to increment that up, not down? :)

@nate Collaborator
nate added a note

ha, yep! I have that fixed in my local branch, but haven't pushed it up yet. I thought that might be a bit hasty given that I completely missed unbuffered channels...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
end
View
30 lib/agent/wait_group.rb
@@ -0,0 +1,30 @@
+module Agent
+ class WaitGroup
+ def initialize
+ @count = 0
+ @monitor = Monitor.new
+ @cvar = @monitor.new_cond
+ end
+
+ def wait
+ @monitor.synchronize do
+ @cvar.wait_while{ @count > 0 }
+ end
+ end
+
+ def add(delta)
+ @monitor.synchronize do
+ @count += delta
+ @count = 0 if @count < 0
+ @cvar.signal if @count == 0
+ end
+ end
+
+ def done
+ @monitor.synchronize do
+ @count -= 1 if @count > 0
+ @cvar.signal if @count == 0
+ end
+ end
+ end
+end
View
120 spec/channel_spec.rb
@@ -1,44 +1,41 @@
require "spec_helper"
describe Agent::Channel do
- # http://golang.org/doc/go_spec.html#Agent::Channel_types
+ before do
+ @c = channel!(:type => String)
+ end
- include Agent
- let(:c) { Agent::Channel.new(:name => :spec, :type => String) }
+ after do
+ @c.close
+ end
- it "should have a name" do
- lambda { Agent::Channel.new(:type => String) }.should raise_error(Agent::Channel::InvalidName)
- c.name.should == :spec
+ it "should not require a name" do
@igrigorik Owner

Hmm, really?

@nate Collaborator
nate added a note

Channels in go don't require a name. Do you think they should? It seems to me that channel names are just an implementation detail, for the most part, unless you want to initialize two channels completely independently and have them use the same queue.

@igrigorik Owner

Hmm, now I'm trying to remember why I forced the name requirement to begin with..

@nate Collaborator
nate added a note

Maybe it was a requirement initially before you started doing the UUID stuff with selector?

@nate Collaborator
nate added a note

Sometimes first implementations become permanent implementations. :)

@igrigorik Owner

Still scratching my head. If it hits me, I'll let you know. :-)

In the meantime, let's go with no names.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ lambda { channel!(:type => String) }.should_not raise_error
end
- it "should have a symbol name" do
- lambda { Agent::Channel.new(:type => String, :name => 'a') }.should raise_error(Agent::Channel::InvalidName)
- lambda { Agent::Channel.new(:type => String, :name => 1) }.should raise_error(Agent::Channel::InvalidName)
+ it "allow the name to be set" do
+ channel!(:type => String, :name => "gibberish").name.should == "gibberish"
end
it "should respond to close" do
- lambda { c.close }.should_not raise_error
- c.closed?.should be_true
+ lambda { @c.close }.should_not raise_error
+ @c.closed?.should be_true
end
it "should respond to closed?" do
- c.closed?.should be_false
- c.close
- c.closed?.should be_true
+ @c.closed?.should be_false
+ @c.close
+ @c.closed?.should be_true
end
context "deadlock" do
- it "should deadlock on single thread", :vm => :ruby do
- c = Agent::Channel.new(:name => :deadlock, :type => String)
- lambda { c.receive }.should raise_error
- c.close
+ it "should deadlock on single thread" do
@igrigorik Owner

this works for you across all vms?

@nate Collaborator
nate added a note

Yes, but on accident, I believe, due to the default of buffered queues.

@nate Collaborator
nate added a note

Looks like it's broken on jruby. Heh...

@igrigorik Owner

Right, that's why I had the :vm => :ruby in there ;-)

@nate Collaborator
nate added a note

Fixed. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ lambda { @c.receive }.should raise_error
end
it "should not deadlock with multiple threads" do
- c = Agent::Channel.new(:name => :deadlock, :type => String)
- Thread.new { sleep(0.1); c.push "hi" }
- lambda { c.receive }.should_not raise_error
- c.close
+ Thread.new { sleep(0.1); @c.push "hi" }
+ lambda { @c.receive }.should_not raise_error
end
end
@@ -48,7 +45,7 @@
# type. The value of an uninitialized channel is nil.
it "should support send only" do
- c = Agent::Channel.new(:name => :spec, :direction => :send, :type => String, :size => 3)
+ c = channel!(:direction => :send, :type => String, :size => 3)
lambda { c << "hello" }.should_not raise_error
lambda { c.push "hello" }.should_not raise_error
@@ -61,41 +58,40 @@
end
it "should support receive only" do
- c = Agent::Channel.new(:name => :spec, :direction => :receive, :type => String)
+ c = channel!(:direction => :receive, :type => String)
lambda { c << "hello" }.should raise_error Agent::Channel::InvalidDirection
lambda { c.push "hello" }.should raise_error Agent::Channel::InvalidDirection
lambda { c.send "hello" }.should raise_error Agent::Channel::InvalidDirection
# timeout blocking receive calls
- lambda { Timeout::timeout(0.1) { c.pop } }.should raise_error(Timeout::Error)
- lambda { Timeout::timeout(0.1) { c.receive } }.should raise_error(Timeout::Error)
+ timed_out = false
+ select! do |s|
+ s.case(c, :receive){}
+ s.timeout(0.1){ timed_out = true }
+ end
+ timed_out.should == true
end
it "should default to bi-directional communication" do
- lambda { c.send "hello" }.should_not raise_error
- lambda { c.receive }.should_not raise_error
+ lambda { @c.send "hello" }.should_not raise_error
+ lambda { @c.receive }.should_not raise_error
end
end
context "typed" do
it "should create a typed channel" do
- lambda { Agent::Channel.new(:name => :spec) }.should raise_error Agent::Channel::Untyped
- lambda { Agent::Channel.new(:name => :spec, :type => Integer) }.should_not raise_error
+ lambda { channel!({}) }.should raise_error Agent::Channel::Untyped
+ lambda { channel!(:type => Integer) }.should_not raise_error
end
it "should reject messages of invalid type" do
- lambda { c.send 1 }.should raise_error(Agent::Channel::InvalidType)
- lambda { c.send "hello" }.should_not raise_error
- c.receive
+ lambda { @c.send 1 }.should raise_error(Agent::Channel::InvalidType)
+ lambda { @c.send "hello" }.should_not raise_error
end
end
- context "transport" do
- it "should default to memory transport" do
- c.transport.should == Agent::Transport::Queue
- end
-
+ context "queue" do
context "channels of channels" do
# One of the most important properties of Go is that a channel is a first-class
# value that can be allocated and passed around like any other. A common use of
@@ -103,15 +99,15 @@
# - http://golang.org/doc/effective_go.html#chan_of_chan
it "should be a first class, serializable value" do
- lambda { Marshal.dump(c) }.should_not raise_error
- lambda { Marshal.load(Marshal.dump(c)).is_a? Agent::Channel }.should_not raise_error
+ lambda { Marshal.dump(@c) }.should_not raise_error
+ lambda { Marshal.load(Marshal.dump(@c)).is_a?(Agent::Channel) }.should_not raise_error
end
it "should be able to pass as a value on a different channel" do
- c.send "hello"
+ @c.send "hello"
- cm = Marshal.load(Marshal.dump(c))
- cm.receive.should == "hello"
+ cm = Marshal.load(Marshal.dump(@c))
+ cm.receive[0].should == "hello"
end
end
@@ -122,35 +118,33 @@
# or absent, the communication succeeds only when both a sender and receiver are ready.
it "should default to synchronous communication" do
- c = Agent::Channel.new(:name => :buffered, :type => String)
-
- c.send "hello"
- c.receive.should == "hello"
- lambda { Timeout::timeout(0.1) { c.receive } }.should raise_error(Timeout::Error)
-
- c.close
+ @c.send "hello"
+ @c.receive[0].should == "hello"
+ select! do |s|
+ s.case(@c, :receive){}
+ s.timeout(0.1){}
+ end
end
it "should support asynchronous communication with buffered capacity" do
- c = Agent::Channel.new(:name => :buffered, :type => String, :size => 2)
+ c = channel!(:type => String, :size => 2)
c.send "hello 1"
c.send "hello 2"
- lambda { Timeout::timeout(0.1) { c.send "hello 3" } }.should raise_error(Timeout::Error)
- c.receive.should == "hello 1"
- c.receive.should == "hello 2"
- lambda { Timeout::timeout(0.1) { c.receive } }.should raise_error(Timeout::Error)
+ select! do |s|
+ s.case(@c, :send, "hello 3"){}
+ s.timeout(0.1){}
+ end
- c.close
- end
+ c.receive[0].should == "hello 1"
+ c.receive[0].should == "hello 2"
+ select! do |s|
+ s.case(@c, :receive){}
+ s.timeout(0.1){}
+ end
- it "should support nonblocking mode" do
- c = Agent::Channel.new(:name => :nonblocking, :type => String)
- c.send "hello 1", true
- lambda { c.send "hello 2", true }.should raise_error(ThreadError)
- c.receive(true).should == "hello 1"
- lambda { c.receive(true) }.should raise_error(ThreadError)
+ c.close
end
end
end
View
15 spec/error_spec.rb
@@ -0,0 +1,15 @@
+require "spec_helper"
+
+describe Agent::Error do
+ before do
+ @error = Agent::Error.new("msg")
+ end
+
+ it "should create an error" do
+ @error.to_s.should == "msg"
+ end
+
+ it "should match the error's message" do
+ @error.should be_message("msg")
+ end
+end
View
26 spec/examples/channel_of_channels_spec.rb
@@ -7,24 +7,24 @@
it "should be able to pass channels as first class citizens" do
server = Proc.new do |reqs|
2.times do |n|
- res = Request.new(n, Agent::Channel.new(:name => "resultChan_#{n}".to_sym, :type => Integer))
+ res = Request.new(n, channel!(:type => Integer))
reqs << res
- res.resultChan.receive.should == n+1
+ res.resultChan.receive[0].should == n+1
end
end
worker = Proc.new do |reqs|
loop do
- req = reqs.receive
+ req = reqs.receive[0]
req.resultChan << req.args+1
end
end
- clientRequests = Agent::Channel.new(:name => :clientRequests, :type => Request)
+ clientRequests = channel!(:type => Request)
- s = go(clientRequests, &server)
- c = go(clientRequests, &worker)
+ s = go!(clientRequests, &server)
+ c = go!(clientRequests, &worker)
s.join
clientRequests.close
@@ -33,24 +33,24 @@
it "should work with multiple workers" do
worker = Proc.new do |reqs|
loop do
- req = reqs.receive
+ req = reqs.receive[0]
req.resultChan << req.args+1
end
end
- clientRequests = Agent::Channel.new(:name => :clientRequests, :type => Request)
+ clientRequests = channel!(:type => Request)
# start multiple workers
- go(clientRequests, &worker)
- go(clientRequests, &worker)
+ go!(clientRequests, &worker)
+ go!(clientRequests, &worker)
# start server
- s = go clientRequests do |reqs|
+ s = go! clientRequests do |reqs|
2.times do |n|
- res = Request.new(n, Agent::Channel.new(:name => "resultChan_#{n}".to_sym, :type => Integer))
+ res = Request.new(n, channel!(:type => Integer))
reqs << res
- res.resultChan.receive.should == n+1
+ res.resultChan.receive[0].should == n+1
end
end
View
23 spec/examples/producer_consumer_spec.rb
@@ -49,14 +49,15 @@
s << "consumer finished"
end
- c = Agent::Channel.new(name: :c, type: Integer)
- s = Agent::Channel.new(name: :s, type: String)
+ c = channel!(:type => Integer)
+ s = channel!(:type => String)
- go(c, 3, s, &producer)
- go(c, 3, s, &consumer)
+ go!(c, 3, s, &producer)
+ sleep 0.1
+ go!(c, 3, s, &consumer)
- s.pop.should == "producer finished"
- s.pop.should == "consumer finished"
+ s.pop[0].should == "producer finished"
+ s.pop[0].should == "consumer finished"
c.close
s.close
@@ -68,13 +69,13 @@
end
Generator = Struct.new(:name, :pipe)
- c = Agent::Channel.new(name: :incr, type: Integer)
+ c = channel!(:type => Integer)
g = Generator.new(:incr, c)
- go(g, &producer)
+ go!(g, &producer)
- c.receive.should == 1
- c.receive.should == 2
- c.receive.should == 3
+ c.receive[0].should == 1
+ c.receive[0].should == 2
+ c.receive[0].should == 3
end
end
View
40 spec/examples/sieve_spec.rb
@@ -8,9 +8,9 @@
# send the sequence 2,3,4, ... to returned channel
def generate
- ch = Agent::Channel.new(name: :generator, type: Integer)
+ ch = channel!(:type => Integer)
- go do
+ go! do
i = 1
loop { ch << i+= 1 }
end
@@ -20,11 +20,11 @@ def generate
# filter out input values divisible by *prime*, send rest to returned channel
def filter(in_channel, prime)
- out = Agent::Channel.new(name: "filter_#{prime}".to_sym, type: Integer)
+ out = channel!(:type => Integer)
- go do
+ go! do
loop do
- i = in_channel.receive
+ i = in_channel.receive[0]
out << i if (i % prime) != 0
end
end
@@ -33,12 +33,12 @@ def filter(in_channel, prime)
end
def sieve
- out = Agent::Channel.new(name: :sieve, type: Integer)
+ out = channel!(:type => Integer)
- go do
+ go! do
ch = generate
loop do
- prime = ch.receive
+ prime = ch.receive[0]
out << prime
ch = filter(ch, prime)
end
@@ -56,10 +56,10 @@ def sieve
if nth
n.times { primes.receive }
- puts primes.receive
+ puts primes.receive[0]
else
loop do
- p = primes.receive
+ p = primes.receive[0]
if p <= n
result << p
@@ -76,9 +76,9 @@ def sieve
# send the sequence 2,3,4, ... to returned channel
generate = Proc.new do
- ch = Agent::Channel.new(name: :generator_block, type: Integer)
+ ch = channel!(:type => Integer)
- go do
+ go! do
i = 1
loop { ch << i+= 1 }
end
@@ -88,11 +88,11 @@ def sieve
# filter out input values divisible by *prime*, send rest to returned channel
filtr = Proc.new do |in_channel, prime|
- out = Agent::Channel.new(name: "filter_#{prime}_block".to_sym, type: Integer)
+ out = channel!(:type => Integer)
- go do
+ go! do
loop do
- i = in_channel.receive
+ i = in_channel.receive[0]
out << i if (i % prime) != 0
end
end
@@ -101,13 +101,13 @@ def sieve
end
sieve = Proc.new do
- out = Agent::Channel.new(name: :sieve_block, type: Integer)
+ out = channel!(:type => Integer)
- go do
+ go! do
ch = generate.call
loop do
- prime = ch.receive
+ prime = ch.receive[0]
out << prime
ch = filtr.call(ch, prime)
end
@@ -125,10 +125,10 @@ def sieve
if nth
n.times { primes.receive }
- puts primes.receive
+ puts primes.receive[0]
else
loop do
- p = primes.receive
+ p = primes.receive[0]
if p <= n
result << p
View
91 spec/once_spec.rb
@@ -0,0 +1,91 @@
+require "spec_helper"
+
+describe Agent::Once do
+
+ before do
+ @once = Agent::Once.new
+ end
+
+ it "should execute the block passed to it" do
+ r = []
+
+ @once.perform do
+ r << 1
+ end
+
+ r.size.should == 1
+ r.first.should == 1
+ end
+
+ it "should only execute the first block passed to it" do
+ r = []
+
+ @once.perform do
+ r << 1
+ end
+
+ @once.perform do
+ r << 2
+ end
+
+ r.size.should == 1
+ r.first.should == 1
+ end
+
+ it "should return the value returned from the block" do
+ value, error = @once.perform do
+ 1
+ end
+
+ value.should == 1
+ end
+
+ it "should return nil for value and an error if it has already been used" do
+ value, error = @once.perform{ 1 }
+ value.should == 1
+ error.should be_nil
+
+ value, error = @once.perform{ 2 }
+ value.should be_nil
+ error.should_not be_nil
+ error.should be_message("already performed")
+ end
+
+ it "should have minimal contention between threads when they contend for position" do
+ r, s = [], Time.now.to_f
+
+ # Using condition variables to maximize potential contention
+ monitor = Monitor.new
+ condition = monitor.new_cond
+
+ waiting_channel = channel!(:type => TrueClass, :size => 2)
+ finished_channel = channel!(:type => TrueClass, :size => 2)
+
+ go! do
+ monitor.synchronize{ waiting_channel.send(true); condition.wait }
+ @once.perform{ sleep 0.1; r << 1 }
+ finished_channel.send(true)
+ end
+
+ go! do
+ monitor.synchronize{ waiting_channel.send(true); condition.wait }
+ @once.perform{ sleep 0.1; r << 1 }
+ finished_channel.send(true)
+ end
+
+ # wait for both the goroutines to be waiting
+ 2.times{ waiting_channel.receive }
+
+ monitor.synchronize{ condition.broadcast }
+
+ # wait for the finished channel to be completed
+ 2.times{ finished_channel.receive }
+
+ r.size.should == 1
+ # Onlt the first sleep should be performed, so things should quickly
+ (Time.now.to_f - s).should be_within(0.05).of(0.15)
+
+ waiting_channel.close
+ finished_channel.close
+ end
+end
View
69 spec/queue_spec.rb
@@ -1,54 +1,33 @@
require "spec_helper"
-include Agent::Transport
-describe Agent::Transport::Queue do
- include Agent::Transport
-
- it "should support synchronous, unbuffered communication" do
- lambda { Agent::Transport::Queue.new("spec") }.should_not raise_error
-
- q = Agent::Transport::Queue.new("spec")
- q.max.should == 1
- q.async?.should be_false
-
- lambda { q.send("hello") }.should_not raise_error
- lambda { q.send("hello", true) }.should raise_error(ThreadError, "buffer full")
-
- q.receive.should == "hello"
- lambda { q.receive(true) }.should raise_error(ThreadError, "buffer empty")
+describe Agent::Queue do
+ before do
+ @queue = Agent::Queue.new("name", 2)
end
- it "should support asynchronous, buffered communication" do
- lambda { Agent::Transport::Queue.new("spec", 2) }.should_not raise_error
-
- q = Agent::Transport::Queue.new("spec", 2)
- q.max.should == 2
- q.async?.should be_true
-
- lambda { q.send("hello 1") }.should_not raise_error
- lambda { q.send("hello 2", true) }.should_not raise_error(ThreadError, "buffer full")
- lambda { q.send("hello 3", true) }.should raise_error(ThreadError, "buffer full")
-
- q.receive.should == "hello 1"
- q.receive.should == "hello 2"
- lambda { q.receive(true) }.should raise_error(ThreadError, "buffer empty")
+ it "should be able to be pushed to" do
+ lambda{ @queue.push(Agent::Push.new("1")) }.should_not raise_error
end
- it "should persist data between queue objects" do
- q = Agent::Transport::Queue.new("spec")
- q.send "hello"
-
- q = Agent::Transport::Queue.new("spec")
- q.receive.should == "hello"
+ it "should mark the push as sent" do
+ push = Agent::Push.new("1")
@igrigorik Owner

Yikes, do we really need to pass in and do an explicit wait for every operation?

@nate Collaborator
nate added a note

We need to make sure that the Push is enqueued in the buffered channel before releasing control back to the calling thread. We're doing it this way because we need to allow deferred channel operations for the selector to work, which means on non-deferred operations we need to block until the operation completes. It might seem slow, but it's actually not too bad. We achieved pretty good throughput on the sieve benchmark with this implementation with ruby 1.9.2, somewhere around 7500 messages per second (used a puts in Channel#receive and then used grep and wc -l the output).

@igrigorik Owner

I'm actually more worried about the explicit 3 line push/pop setup from an API standpoint, although the extra objects don't help either. :-)

Any reason why we can't "push" that code down into the library itself, instead of placing the burden on the user?

@nate Collaborator
nate added a note

Users don't directly interface with the queue, though, so this API is hidden in the channel and the selector. I could shove this off into the queue, I suppose?

@nate Collaborator
nate added a note

And users typically won't use deferred operations, so they'll never even see the push and pop objects. These operations are really only for internal use. I considered hiding them away in a subdirectory for the queue to imply that they're not for public consumption, but I felt it would be better to document what is supposed to be used (eventually).

@igrigorik Owner

facepalm .. it's been a long day. Nevermind, you're right.. the users wouldn't see this anyway - all good.

@nate Collaborator
nate added a note

No worries, I absolutely know the feeling. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ @queue.push(push)
+ push.wait
+ push.sent?.should == true
end
- it "should clear registry on close" do
- q = Agent::Transport::Queue.new("spec")
- q.send "hello"
- q.close
-
- q = Agent::Transport::Queue.new("spec")
- lambda { q.receive(true) }.should raise_error(ThreadError, "buffer empty")
- end
-
+ context "when there are elements in the queue" do
+ before do
+ push = Agent::Push.new("1")
+ @queue.push(push)
+ push.wait
+ end
+
+ it "should be able to be popped from" do
+ pop = Agent::Pop.new
+ @queue.pop(pop)
+ pop.wait
+ pop.object.should == "1"
+ end
+ end
end
View
111 spec/selector_spec.rb
@@ -6,42 +6,50 @@
# referring to communication operations.
# - http://golang.org/doc/go_spec.html#Select_statements
- let(:c) { Agent::Channel.new(:name => :selectable, :type => Integer, :size => 1) }
+ before do
+ @c = channel!(:type => Integer)
+ end
+
+ after do
+ @c.close
+ end
it "should yield Selector on select call" do
- select {|s| s.should be_kind_of Agent::Selector}
+ select! {|s| s.should be_kind_of Agent::Selector}
end
it "should evaluate select statements top to bottom" do
- select do |s|
- s.case(c, :send) {}
- s.case(c, :receive) {}
+ select! do |s|
+ s.case(@c, :send, 1) {}
+ s.case(@c, :receive) {}
s.cases.size.should == 2
end
end
- it "should evaluate but skip empty cases" do
- select do |s|
- s.case(c, :send)
- s.cases.size.should == 0
- end
+ it "should raise an error when a block is missing" do
+ lambda {
+ select! do |s|
+ s.case(@c, :send, 1)
@igrigorik Owner

Seems like for :send the block is potentially optional?

@nate Collaborator
nate added a note

Probably, yes.

@nate Collaborator
nate added a note

Fixed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ s.cases.size.should == 0
+ end
+ }.should raise_error(Agent::BlockMissing)
end
it "should return immediately on empty select block" do
s = Time.now.to_f
- select {}
+ select! {}
(Time.now.to_f - s).should be_within(0.05).of(0)
end
it "should scan all cases to identify available actions and execute first available one" do
r = []
- c.send 1
+ @c.send 1
- select do |s|
- s.case(c, :send) { r.push 1 }
- s.case(c, :receive) { r.push 2 }
- s.case(c, :receive) { r.push 3 }
+ select! do |s|
+ s.case(@c, :send, 1) { r.push 1 }
+ s.case(@c, :receive) { r.push 2 }
+ s.case(@c, :receive) { r.push 3 }
end
r.size.should == 1
@@ -50,8 +58,11 @@
it "should evaluate default case immediately if no other cases match" do
r = []
- select do |s|
- s.case(c, :send) { r.push 1 }
+
+ @c.send(1)
+
+ select! do |s|
+ s.case(@c, :send, 1) { r.push 1 }
s.default { r.push :default }
end
@@ -60,23 +71,23 @@
end
it "should timeout select statement" do
- r, s = [], Time.now.to_f
- select do |s|
+ r, now = [], Time.now.to_f
+ select! do |s|
s.timeout(0.1) { r.push :timeout }
end
r.first.should == :timeout
- (Time.now.to_f - s).should be_within(0.05).of(0.1)
+ (Time.now.to_f - now).should be_within(0.05).of(0.1)
end
context "select immediately available channel" do
it "should select read channel" do
- c = Agent::Channel.new(:name => :select_read, :type => Integer, :size => 1)
+ c = channel!(:type => Integer)
c.send 1
r = []
- select do |s|
- s.case(c, :send) { r.push :send }
+ select! do |s|
+ s.case(c, :send, 1) { r.push :send }
s.case(c, :receive) { r.push :receive }
s.default { r.push :empty }
end
@@ -87,11 +98,11 @@
end
it "should select write channel" do
- c = Agent::Channel.new(:name => :select_write, :type => Integer, :size => 1)
+ c = channel!(:type => Integer)
r = []
- select do |s|
- s.case(c, :send) { r.push :send }
+ select! do |s|
+ s.case(c, :send, 1) { r.push :send }
s.case(c, :receive) { r.push :receive }
s.default { r.push :empty }
end
@@ -104,58 +115,58 @@
context "select busy channel" do
it "should select busy read channel" do
- c = Agent::Channel.new(:name => :select_read, :type => Integer, :size => 1)
+ c = channel!(:type => Integer)
r = []
# brittle.. counting on select to execute within 0.5s
- s = Time.now.to_f
- go(c) { |r| sleep(0.2); r.send 1 }
+ now = Time.now.to_f
+ go!{ sleep(0.2); c.send 1 }
- select do |s|
- s.case(c, :receive) { r.push c.receive }
+ select! do |s|
+ s.case(c, :receive) {|value| r.push value }
end
r.size.should == 1
- (Time.now.to_f - s).should be_within(0.1).of(0.2)
+ (Time.now.to_f - now).should be_within(0.1).of(0.2)
c.close
end
it "should select busy write channel" do
- c = Agent::Channel.new(:name => :select_write, :type => Integer, :size => 1)
+ c = channel!(:type => Integer)
c.send 1
# brittle.. counting on select to execute within 0.5s
- s = Time.now.to_f
- go(c) { |r| sleep(0.2); r.receive }
+ now = Time.now.to_f
+ go!{sleep(0.2); c.receive }
- select do |s|
- s.case(c, :send) { c.send 2 }
+ select! do |s|
+ s.case(c, :send, 2) {}
end
- c.receive.should == 2
- (Time.now.to_f - s).should be_within(0.1).of(0.2)
+ c.receive[0].should == 2
+ (Time.now.to_f - now).should be_within(0.1).of(0.2)
c.close
end
it "should select first available channel" do
# create a "full" write channel, and "empty" read channel
- cw = Agent::Channel.new(:name => :select_write, :type => Integer, :size => 1)
- cr = Agent::Channel.new(:name => :select_read, :type => Integer, :size => 1)
+ cw = channel!(:type => Integer)
+ cr = channel!(:type => Integer)
cw.send 1
res = []
# empty read channel will wait for 1s before pushing a message into it
# full write channel will wait for 0.8s before consuming the message
- s = Time.now.to_f
- go(cr) { |r| sleep(0.5); r.send 2 }
- go(cw) { |w| sleep(0.2); res.push w.receive }
+ now = Time.now.to_f
+ go!{ sleep(0.5); cr.send 2 }
+ go!{ sleep(0.2); res.push cw.receive[0] }
# wait until one of the channels become available
# cw should fire first and push '3'
- select do |s|
- s.case(cr, :receive) { |c| res.push c.receive }
- s.case(cw, :send) { |c| c.send 3 }
+ select! do |s|
+ s.case(cr, :receive) {|value| res.push value }
+ s.case(cw, :send, 3) {}
end
# 0.8s goroutine should have consumed the message first
@@ -163,9 +174,9 @@
res.first.should == 1
# send case should have fired, and we should have a message
- cw.receive.should == 3
+ cw.receive[0].should == 3
- (Time.now.to_f - s).should be_within(0.1).of(0.2)
+ (Time.now.to_f - now).should be_within(0.1).of(0.2)
cw.close
cr.close
end
View
7 spec/uuid_spec.rb
@@ -0,0 +1,7 @@
+require "spec_helper"
+
+describe Agent::UUID do
+ it "should generate a uuid" do
+ Agent::UUID.generate.should match(/^[0-9a-f]{8}_[0-9a-f]{4}_[0-9a-f]{4}_[0-9a-f]{4}_[0-9a-f]{12}$/)
+ end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.