From 6b42816f4afc2e64c1fcbcd7d60d81ab8a98bc4b Mon Sep 17 00:00:00 2001 From: Aaron Pfeifer Date: Mon, 7 Oct 2013 12:43:27 -0400 Subject: [PATCH] Allow multiple pipes to be read from in the PipeChannelManager by utilizing threads for each ready pipe. This also prevents "Too many open files" errors from occurring --- lib/new_relic/agent/pipe_channel_manager.rb | 44 ++++++++++++++------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/lib/new_relic/agent/pipe_channel_manager.rb b/lib/new_relic/agent/pipe_channel_manager.rb index 5b19c38971..bfa6bff9b1 100644 --- a/lib/new_relic/agent/pipe_channel_manager.rb +++ b/lib/new_relic/agent/pipe_channel_manager.rb @@ -60,9 +60,10 @@ class Pipe NUM_LENGTH_BYTES = 4 attr_accessor :in, :out - attr_reader :last_read, :parent_pid + attr_reader :id, :last_read, :parent_pid - def initialize + def initialize(id = nil) + @id = id @out, @in = IO.pipe if defined?(::Encoding::ASCII_8BIT) @in.set_encoding(::Encoding::ASCII_8BIT) @@ -152,7 +153,7 @@ def wakeup def register_pipe(id) @pipes_lock.synchronize do - @pipes[id] = Pipe.new + @pipes[id] = Pipe.new(id) end wakeup @@ -225,20 +226,35 @@ def started? def merge_data_from_pipe(pipe_handle) pipe = find_pipe_for_handle(pipe_handle) - raw_payload = pipe.read - if raw_payload && !raw_payload.empty? - if raw_payload == Pipe::READY_MARKER - pipe.after_fork_in_parent - else - payload = unmarshal(raw_payload) - if payload - endpoint, items = payload - NewRelic::Agent.agent.merge_data_for_endpoint(endpoint, items) + return unless pipe + + # Remove the pipe since it's going to be read from and shouldn't + # get processed until that completes + @pipes.delete(pipe.id) + + # Start a new thread in order to allow multiple pipes to be read at once + Thread.new do + begin + raw_payload = pipe.read + + if raw_payload && !raw_payload.empty? + if raw_payload == Pipe::READY_MARKER + pipe.after_fork_in_parent + else + payload = unmarshal(raw_payload) + if payload + endpoint, items = payload + NewRelic::Agent.agent.merge_data_for_endpoint(endpoint, items) + end + end end + + pipe.close if pipe.eof? + ensure + # Always add the pipe back so that it can be cleaned up + @pipes[pipe.id] = pipe end end - - pipe.close if pipe.eof? end def unmarshal(data)