Skip to content

Commit

Permalink
Merge pull request #650 from sodabrew/cleanup_machine
Browse files Browse the repository at this point in the history
New method cleanup_machine factors out cleanup code from EM.run
  • Loading branch information
sodabrew committed Oct 28, 2015
2 parents eca70fd + f470ae8 commit 92ad729
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 53 deletions.
52 changes: 24 additions & 28 deletions lib/eventmachine.rb
Expand Up @@ -161,6 +161,7 @@ def self.run blk=nil, tail=nil, &block
# Clean up reactor state so a new reactor boots up in this child.
stop_event_loop
release_machine
cleanup_machine
@reactor_running = false
end

Expand Down Expand Up @@ -198,30 +199,8 @@ def self.run blk=nil, tail=nil, &block
@tails.pop.call
end

begin
release_machine
ensure
if @threadpool
@threadpool.each { |t| t.exit }
@threadpool.each do |t|
next unless t.alive?
begin
# Thread#kill! does not exist on 1.9 or rbx, and raises
# NotImplemented on jruby
t.kill!
rescue NoMethodError, NotImplementedError
t.kill
# XXX t.join here?
end
end
@threadqueue = nil
@resultqueue = nil
@threadpool = nil
@all_threads_spawned = false
end

@next_tick_queue = []
end
release_machine
cleanup_machine
@reactor_running = false
@reactor_thread = nil
end
Expand Down Expand Up @@ -266,13 +245,30 @@ def self.fork_reactor &block
# Original patch by Aman Gupta.
#
Kernel.fork do
if self.reactor_running?
self.stop_event_loop
self.release_machine
if reactor_running?
stop_event_loop
release_machine
cleanup_machine
@reactor_running = false
@reactor_thread = nil
end
self.run block
run block
end
end

# Clean up Ruby space following a release_machine
def self.cleanup_machine
if @threadpool && !@threadpool.empty?
# Tell the threads to stop
@threadpool.each { |t| t.exit }
# Join the threads or bump the stragglers one more time
@threadpool.each { |t| t.join 0.01 || t.exit }
end
@threadpool = nil
@threadqueue = nil
@resultqueue = nil
@all_threads_spawned = false
@next_tick_queue = []
end

# Adds a block to call as the reactor is shutting down.
Expand Down
25 changes: 0 additions & 25 deletions tests/test_basic.rb
Expand Up @@ -246,31 +246,6 @@ def c.unbind
assert_equal 1, num_close_scheduled
end

def test_fork_safe
omit_if(jruby?)
omit_if(windows?)
omit_if(rbx?, 'Omitting test on Rubinius because it hangs for unknown reasons')

read, write = IO.pipe
EM.run do
fork do
write.puts "forked"
EM.run do
EM.next_tick do
write.puts "EM ran"
EM.stop
end
end
end
EM.stop
end
assert_equal "forked\n", read.readline
assert_equal "EM ran\n", read.readline
ensure
read.close rescue nil
write.close rescue nil
end

def test_error_handler_idempotent # issue 185
errors = []
ticks = []
Expand Down
75 changes: 75 additions & 0 deletions tests/test_fork.rb
@@ -0,0 +1,75 @@
require 'em_test_helper'

class TestFork < Test::Unit::TestCase

def test_fork_safe
omit_if(jruby?)
omit_if(windows?)

fork_pid = nil
read, write = IO.pipe
EM.run do
fork_pid = fork do
write.puts "forked"
EM.run do
EM.next_tick do
write.puts "EM ran"
EM.stop
end
end
end
EM.stop
end

sleep 0.1
begin
Timeout::timeout 1 do
assert_equal "forked\n", read.readline
assert_equal "EM ran\n", read.readline
end
rescue Timeout::Error
Process.kill 'TERM', fork_pid
flunk "Timeout waiting for next_tick in new fork reactor"
end
ensure
read.close rescue nil
write.close rescue nil
end

def test_fork_reactor
omit_if(jruby?)
omit_if(windows?)

fork_pid = nil
read, write = IO.pipe
EM.run do
EM.defer do
write.puts Process.pid
EM.defer do
EM.stop
end
end
fork_pid = EM.fork_reactor do
EM.defer do
write.puts Process.pid
EM.stop
end
end
end

sleep 0.1
begin
Timeout::timeout 1 do
assert_equal Process.pid.to_s, read.readline.chomp
assert_equal fork_pid.to_s, read.readline.chomp
end
rescue Timeout::Error
Process.kill 'TERM', fork_pid
flunk "Timeout waiting for deferred block in fork_reactor"
end
ensure
read.close rescue nil
write.close rescue nil
end

end

0 comments on commit 92ad729

Please sign in to comment.