Skip to content

Commit

Permalink
[feature] Remove ThreadSafeArray
Browse files Browse the repository at this point in the history
Convert Adhearsion::Process to an actor as a dependency of removing ThreadSafeArray
  • Loading branch information
bklang authored and benlangfeld committed Aug 17, 2013
1 parent 5bf39a0 commit 08509ed
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 71 deletions.
7 changes: 6 additions & 1 deletion lib/adhearsion.rb
Expand Up @@ -99,6 +99,11 @@ def active_calls
Celluloid::Actor[:active_calls]
end

def process
Celluloid::Actor[:process] || Process.supervise_as(:process)
Celluloid::Actor[:process]
end

#
# @return [Adhearsion::Statistics] a statistics aggregator object capable of producing stats dumps
def statistics
Expand All @@ -110,7 +115,7 @@ def statistics
end

def status
Adhearsion::Process.state_name
process.state_name
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/adhearsion/console.rb
Expand Up @@ -58,7 +58,7 @@ def log_level(level = nil)
end

def shutdown!
Process.shutdown!
Adhearsion.process.shutdown!
end

def calls
Expand Down
12 changes: 0 additions & 12 deletions lib/adhearsion/foundation/thread_safety.rb
Expand Up @@ -9,15 +9,3 @@ def synchronize(&block)
end
end

class ThreadSafeArray < BasicObject
def initialize
@mutex = ::Mutex.new
@array = []
end

def method_missing(method, *args, &block)
@mutex.synchronize do
@array.send method, *args, &block
end
end
end
14 changes: 7 additions & 7 deletions lib/adhearsion/initializer.rb
Expand Up @@ -63,7 +63,7 @@ def start
run_plugins
trigger_after_initialized_hooks

Adhearsion::Process.booted if Adhearsion.status == :booting
Adhearsion.process.booted if Adhearsion.status == :booting

logger.info "Adhearsion v#{Adhearsion::VERSION} initialized in \"#{Adhearsion.config.platform.environment}\"!" if Adhearsion.status == :running
end
Expand Down Expand Up @@ -158,7 +158,7 @@ def handle_signal(signal)
case signal
when 'INT', 'TERM'
logger.info "Received SIG#{signal}. Shutting down."
Adhearsion::Process.shutdown
Adhearsion.process.shutdown
when 'HUP'
logger.debug "Received SIGHUP. Reopening logfiles."
Adhearsion::Logging.reopen_logs
Expand All @@ -167,7 +167,7 @@ def handle_signal(signal)
Adhearsion::Logging.toggle_trace!
when 'ABRT'
logger.info "Received ABRT signal. Forcing stop."
Adhearsion::Process.force_stop
Adhearsion.process.force_stop
end
end

Expand Down Expand Up @@ -247,7 +247,7 @@ def daemonize!
end

def launch_console
Adhearsion::Process.important_threads << Thread.new do
Adhearsion.process.important_threads << Thread.new do
catching_standard_errors do
Adhearsion::Console.run
end
Expand Down Expand Up @@ -307,16 +307,16 @@ def trigger_after_initialized_hooks
end

##
# This method will block Thread.main() until calling join() has returned for all Threads in Adhearsion::Process.important_threads.
# This method will block Thread.main() until calling join() has returned for all Threads in Adhearsion.process.important_threads.
# Note: important_threads won't always contain Thread instances. It simply requires the objects respond to join().
#
def join_important_threads
# Note: we're using this ugly accumulator to ensure that all threads have ended since IMPORTANT_THREADS will almost
# certainly change sizes after this method is called.
index = 0
until index == Adhearsion::Process.important_threads.size
until index == Adhearsion.process.important_threads.size
begin
Adhearsion::Process.important_threads[index].join
Adhearsion.process.important_threads[index].join
rescue => e
logger.error "Error after joining Thread #{Thread.inspect}. #{e.message}"
ensure
Expand Down
4 changes: 2 additions & 2 deletions lib/adhearsion/process.rb
Expand Up @@ -6,7 +6,7 @@

module Adhearsion
class Process
include Singleton
include Celluloid

state_machine :initial => :booting do
before_transition :log_state_change
Expand Down Expand Up @@ -58,7 +58,7 @@ class Process
attr_accessor :important_threads

def initialize
@important_threads = ThreadSafeArray.new
@important_threads = []
super
end

Expand Down
16 changes: 8 additions & 8 deletions lib/adhearsion/punchblock_plugin/initializer.rb
Expand Up @@ -80,12 +80,12 @@ def run
end

def connect
return unless Process.state_name == :booting
return unless Adhearsion.process.state_name == :booting
m = Mutex.new
blocker = ConditionVariable.new

Events.punchblock Punchblock::Connection::Connected do
Adhearsion::Process.booted
Adhearsion.process.booted
m.synchronize { blocker.broadcast }
end

Expand All @@ -94,7 +94,7 @@ def connect
m.synchronize { blocker.broadcast }
end

Adhearsion::Process.important_threads << Thread.new do
Adhearsion.process.important_threads << Thread.new do
catching_standard_errors { connect_to_server }
end

Expand All @@ -109,15 +109,15 @@ def connect_to_server
client.run
rescue Punchblock::DisconnectedError => e
# We only care about disconnects if the process is up or booting
return unless [:booting, :running].include? Adhearsion::Process.state_name
return unless [:booting, :running].include? Adhearsion.process.state_name

Adhearsion::Process.reset unless Adhearsion::Process.state_name == :booting
Adhearsion.process.reset unless Adhearsion.process.state_name == :booting

self.attempts += 1

if self.attempts >= self.config.reconnect_attempts
logger.fatal "Connection lost. Connection retry attempts exceeded."
Adhearsion::Process.stop!
Adhearsion.process.stop!
return
end

Expand All @@ -132,7 +132,7 @@ def connect_to_server
def dispatch_offer(offer)
catching_standard_errors do
call = Adhearsion.active_calls.from_offer offer
case Adhearsion::Process.state_name
case Adhearsion.process.state_name
when :booting, :rejecting
logger.info "Declining call because the process is not yet running."
call.reject :decline
Expand Down Expand Up @@ -161,7 +161,7 @@ def handle_event(event)
end

def resource
[Adhearsion::Process.fqdn, ::Process.pid].join '-'
[Adhearsion.process.fqdn, ::Process.pid].join '-'
end

def connection
Expand Down
2 changes: 1 addition & 1 deletion spec/adhearsion/console_spec.rb
Expand Up @@ -61,7 +61,7 @@ module Adhearsion

describe "#shutdown!" do
it "should tell the process to shutdown" do
Adhearsion::Process.should_receive(:shutdown!).once
Adhearsion.process.should_receive(:shutdown!).once
Console.shutdown!
end
end
Expand Down
50 changes: 25 additions & 25 deletions spec/adhearsion/process_spec.rb
Expand Up @@ -3,47 +3,47 @@
require 'spec_helper'

module Adhearsion
describe Adhearsion::Process do
describe Adhearsion.process do
before :all do
Adhearsion.active_calls.clear
end

before :each do
Adhearsion::Process.reset
Adhearsion.process.reset
end

it 'should trigger :stop_requested events on #shutdown' do
Events.should_receive(:trigger_immediately).once.with(:stop_requested).ordered
Events.should_receive(:trigger_immediately).once.with(:shutdown).ordered
Adhearsion::Process.booted
Adhearsion::Process.shutdown
Adhearsion.process.booted
Adhearsion.process.shutdown
sleep 0.2
end

it '#stop_when_zero_calls should wait until the list of active calls reaches 0' do
pending
calls = ThreadSafeArray.new
calls = []
3.times do
fake_call = Object.new
fake_call.should_receive(:hangup).once
calls << fake_call
end
Adhearsion.should_receive(:active_calls).and_return calls
Adhearsion::Process.instance.should_receive(:final_shutdown).once
calls = []
Adhearsion.process.instance.should_receive(:final_shutdown).once
blocking_threads = []
3.times do
calls << Thread.new do
blocking_threads << Thread.new do
sleep 1
calls.pop
end
end
Adhearsion::Process.stop_when_zero_calls
calls.each { |thread| thread.join }
Adhearsion.process.stop_when_zero_calls
blocking_threads.each { |thread| thread.join }
end

it 'should terminate the process immediately on #force_stop' do
::Process.should_receive(:exit).with(1).once.and_return true
Adhearsion::Process.force_stop
Adhearsion.process.force_stop
end

describe "#final_shutdown" do
Expand All @@ -55,7 +55,7 @@ module Adhearsion
Adhearsion.active_calls << fake_call
end

Adhearsion::Process.final_shutdown
Adhearsion.process.final_shutdown

Adhearsion.active_calls.clear
end
Expand All @@ -71,32 +71,32 @@ module Adhearsion
Events.shutdown { sleep 1; foo[:b] }
Events.shutdown { foo[:c] }

Adhearsion::Process.final_shutdown
Adhearsion.process.final_shutdown
end

it "should stop the console" do
Console.should_receive(:stop).once
Adhearsion::Process.final_shutdown
Adhearsion.process.final_shutdown
end
end

it 'should handle subsequent :shutdown events in the correct order' do
Adhearsion::Process.booted
Adhearsion::Process.state_name.should be :running
Adhearsion::Process.shutdown
Adhearsion::Process.state_name.should be :stopping
Adhearsion::Process.shutdown
Adhearsion::Process.state_name.should be :rejecting
Adhearsion::Process.shutdown
Adhearsion::Process.state_name.should be :stopped
Adhearsion::Process.instance.should_receive(:die_now!).once
Adhearsion::Process.shutdown
Adhearsion.process.booted
Adhearsion.process.state_name.should be :running
Adhearsion.process.shutdown
Adhearsion.process.state_name.should be :stopping
Adhearsion.process.shutdown
Adhearsion.process.state_name.should be :rejecting
Adhearsion.process.shutdown
Adhearsion.process.state_name.should be :stopped
Adhearsion.process.should_receive(:die_now!).once
Adhearsion.process.shutdown
sleep 0.2
end

it 'should forcibly kill the Adhearsion process on :force_stop' do
::Process.should_receive(:exit).once.with(1)
Adhearsion::Process.force_stop
Adhearsion.process.force_stop
end
end
end
26 changes: 13 additions & 13 deletions spec/adhearsion/punchblock_plugin/initializer_spec.rb
Expand Up @@ -59,7 +59,7 @@ def initialize_punchblock(options = {})
mock_client.as_null_object
mock_client.stub :event_handler= => true
Events.refresh!
Adhearsion::Process.stub :fqdn => 'hostname'
Adhearsion.process.stub :fqdn => 'hostname'
::Process.stub :pid => 1234
end

Expand Down Expand Up @@ -144,22 +144,22 @@ def initialize_punchblock(options = {})

describe '#connect_to_server' do
before :each do
Adhearsion::Process.reset
Adhearsion.process.reset
Initializer.config = reset_default_config
Initializer.config.reconnect_attempts = 1
Adhearsion::Logging.get_logger(Initializer).should_receive(:fatal).at_most(:once)
Initializer.stub(:client).and_return mock_client
end

after :each do
Adhearsion::Process.reset
Adhearsion.process.reset
end

it 'should reset the Adhearsion process state to "booting"' do
Adhearsion::Process.booted
Adhearsion::Process.state_name.should be == :running
Adhearsion.process.booted
Adhearsion.process.state_name.should be == :running
mock_client.stub(:run).and_raise Punchblock::DisconnectedError
Adhearsion::Process.should_receive(:reset).at_least(:once)
Adhearsion.process.should_receive(:reset).at_least(:once)
Initializer.connect_to_server
end

Expand All @@ -176,8 +176,8 @@ def initialize_punchblock(options = {})
end

it 'should not attempt to reconnect if Adhearsion is shutting down' do
Adhearsion::Process.booted
Adhearsion::Process.shutdown
Adhearsion.process.booted
Adhearsion.process.shutdown
mock_client.stub(:run).and_raise Punchblock::DisconnectedError
Initializer.should_not raise_error Punchblock::DisconnectedError
end
Expand Down Expand Up @@ -210,11 +210,11 @@ def initialize_punchblock(options = {})
describe "dispatching an offer" do
before do
initialize_punchblock
Adhearsion::Process.should_receive(:state_name).once.and_return process_state
Adhearsion.process.should_receive(:state_name).once.and_return process_state
Adhearsion.active_calls.should_receive(:from_offer).once.and_return mock_call
end

context "when the Adhearsion::Process is :booting" do
context "when the Adhearsion.process is :booting" do
let(:process_state) { :booting }

it 'should reject a call with cause :declined' do
Expand All @@ -223,7 +223,7 @@ def initialize_punchblock(options = {})
end

[ :running, :stopping ].each do |state|
context "when when Adhearsion::Process is in :#{state}" do
context "when when Adhearsion.process is in :#{state}" do
let(:process_state) { state }

it "should dispatch via the router" do
Expand All @@ -235,15 +235,15 @@ def initialize_punchblock(options = {})
end
end

context "when when Adhearsion::Process is in :rejecting" do
context "when when Adhearsion.process is in :rejecting" do
let(:process_state) { :rejecting }

it 'should reject a call with cause :declined' do
mock_call.should_receive(:reject).once.with(:decline)
end
end

context "when when Adhearsion::Process is not :running, :stopping or :rejecting" do
context "when when Adhearsion.process is not :running, :stopping or :rejecting" do
let(:process_state) { :foobar }

it 'should reject a call with cause :error' do
Expand Down

0 comments on commit 08509ed

Please sign in to comment.