Skip to content

Commit

Permalink
Various clarifications and minor improvements.
Browse files Browse the repository at this point in the history
  • Loading branch information
Keith Bennett committed Mar 11, 2015
1 parent 07e6946 commit b1863a4
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 157 deletions.
25 changes: 12 additions & 13 deletions lib/dnsruby/packet_sender.rb
Expand Up @@ -195,7 +195,7 @@ def initialize(*args)
@recurse = true
@tcp_pipelining = false
@tcp_pipelining_max_queries = :infinite
@use_count = {}
@use_counts = {}

if (arg==nil)
# Get default config
Expand Down Expand Up @@ -377,10 +377,10 @@ def tcp_pipeline_socket(src_port)
reuse_pipeline_socket = -> do
begin
max = @tcp_pipelining_max_queries
use = @use_count[@pipeline_socket]
if use && max != :infinite && use >= max
use_count = @use_counts[@pipeline_socket]
if use_count && max != :infinite && use_count >= max
#we can't reuse the socket since max is reached
@use_count.delete(@pipeline_socket)
@use_counts.delete(@pipeline_socket)
@pipeline_socket = nil
Dnsruby.log.debug("Max queries per connection attained - creating new socket")
else
Expand All @@ -395,23 +395,22 @@ def tcp_pipeline_socket(src_port)
end

create_pipeline_socket = -> do
pipeline = Socket.new( AF_INET, SOCK_STREAM, 0 )

src_address = @ipv6 ? @src_address6 : @src_address

pipeline.bind(Addrinfo.tcp(src_address, src_port))

@tcp_pipeline_local_port = src_port
@pipeline_socket = pipeline
src_address = @ipv6 ? @src_address6 : @src_address

@pipeline_socket = Socket.new(AF_INET, SOCK_STREAM, 0)
@pipeline_socket.bind(Addrinfo.tcp(src_address, src_port))
@pipeline_socket.connect(sockaddr)
# NOTE: Moved this here from ||= 0 a few lines below.
@use_counts[@pipeline_socket] = 0
end

# Don't combine the following 2 statements; the reuse lambda can set the
# socket to nil and if so we'd want to call the create lambda to recreate it.
reuse_pipeline_socket.() if @pipeline_socket
create_pipeline_socket.() unless @pipeline_socket

@use_count[@pipeline_socket] ||= 0
@use_count[@pipeline_socket] += 1
@use_counts[@pipeline_socket] += 1

@pipeline_socket
end
Expand Down
57 changes: 24 additions & 33 deletions lib/dnsruby/select_thread.rb
Expand Up @@ -67,9 +67,8 @@ def initialize
BasicSocket.do_not_reverse_lookup = true
# end
# Now start the select thread
@@select_thread = Thread.new {
do_select
}
@@select_thread = Thread.new { do_select }

# # Start the validator thread
# @@validator = ValidatorThread.instance
}
Expand Down Expand Up @@ -199,10 +198,7 @@ def do_select
rescue IOError => e
# print "IO Error =: #{e}\n"
exceptions = clean_up_closed_sockets

exceptions.each do |exception|
send_exception_to_client(*exception)
end
exceptions.each { |exception| send_exception_to_client(*exception) }

next
end
Expand All @@ -218,13 +214,13 @@ def do_select
end
end
if (ready == nil)
# proces the timeouts
# process the timeouts
process_timeouts
unused_loop_count+=1
else
process_ready(ready)
unused_loop_count=0
# process_error(errors)
# process_error(errors)
end
@@mutex.synchronize do
if (unused_loop_count > 10 && @@query_hash.empty? && @@observers.empty?)
Expand All @@ -250,6 +246,8 @@ def do_select
end
end

# Removes closed sockets from @@sockets, and returns an array containing 1
# exception for each closed socket contained in @@socket_hash.
def clean_up_closed_sockets
exceptions = @@mutex.synchronize do
closed_sockets_in_hash = @@sockets.select(&:closed?).select { |s| @@socket_hash[s] }
Expand Down Expand Up @@ -289,17 +287,15 @@ def process_ready(ready)

persistent_sockets.each do |socket|
msg, bytes = get_incoming_data(socket, 0)

process_message(msg, bytes, socket) if msg

ready.delete(socket)
end
end

def process_message(msg, bytes, socket)
@@mutex.synchronize do
ids = get_active_ids(@@query_hash, msg.header.id)
return if ids.empty? #should be only one
return if ids.empty? # should be only one
query_settings = @@query_hash[ids[0]].clone
end

Expand Down Expand Up @@ -374,8 +370,10 @@ def persistent?(socket)
end

def remove_id(id)
socket=nil
close_socket = true
# NOTE: I removed the following, which were never used:
# socket=nil
# close_socket = true

@@mutex.synchronize do
socket = @@query_hash[id].socket
@@timeouts.delete(id)
Expand All @@ -401,22 +399,17 @@ def decrement_remaining_queries(socket)

def max_attained?(socket)
remaining = @@socket_remaining_queries[socket]
if persistent?(socket) && remaining && remaining <= 0
Dnsruby.log.debug("Max queries per conn attained")
true
else
false
end
attained = persistent?(socket) && remaining && remaining <= 0
Dnsruby.log.debug("Max queries per conn attained") if attained
attained
end

def process_timeouts
# NOTE: It's @@timeouts we need to protect; after the clone we're ok
timeouts = @@mutex.synchronize { @@timeouts.clone }
time_now = Time.now
timeouts={}
@@mutex.synchronize {
timeouts = @@timeouts.clone
}
timeouts.each do |client_id, timeout|
if (timeout < time_now)
if timeout < time_now
send_exception_to_client(ResolvTimeout.new("Query timed out"), nil, client_id)
end
end
Expand All @@ -442,19 +435,17 @@ def tcp_read(socket)
if (input=="")
Dnsruby.log.debug("EOF from server - no bytes read - closing socket")
socket.close #EOF closed by server, if we were interrupted we need to resend
exceptions = []
@@mutex.synchronize {

exceptions = @@mutex.synchronize do
@@sockets.delete(socket) #remove ourselves from select, app will have to retry
#maybe fire an event
@@socket_hash[socket].each do | client_id |
exception = [SocketEofResolvError.new("TCP socket closed before all answers received"), socket, client_id]
exceptions << exception
@@socket_hash[socket].map do | client_id |
[SocketEofResolvError.new("TCP socket closed before all answers received"), socket, client_id]
end
}
exceptions.each do |exception|
send_exception_to_client(*exception)
end

exceptions.each { |exception| send_exception_to_client(*exception) }

return false
end
buf += input
Expand Down
1 change: 1 addition & 0 deletions test/spec_helper.rb
Expand Up @@ -11,6 +11,7 @@
add_filter 'test/'
end
end

require 'minitest'
require 'minitest/autorun'

Expand Down

0 comments on commit b1863a4

Please sign in to comment.