diff --git a/Rakefile b/Rakefile index a9625de..b2e3ed7 100644 --- a/Rakefile +++ b/Rakefile @@ -49,7 +49,11 @@ namespace "test" do sh("cd spec && rake spec") end - task "spec:rcov" do |t| + task "test" do |t| + sh("cd spec && rake test") + end + + task "spec:rcov" do |t| sh("cd spec && rake spec:rcov") end end diff --git a/lib/router.rb b/lib/router.rb index a2c594b..2251c7f 100644 --- a/lib/router.rb +++ b/lib/router.rb @@ -21,7 +21,6 @@ require 'router/const' require 'router/router' -require 'router/app_connection' require 'router/client_connection' require 'router/utils' @@ -183,4 +182,3 @@ end end - diff --git a/lib/router/app_connection.rb b/lib/router/app_connection.rb deleted file mode 100644 index d924e38..0000000 --- a/lib/router/app_connection.rb +++ /dev/null @@ -1,200 +0,0 @@ -# Copyright (c) 2009-2011 VMware, Inc. -module AppConnection - - attr_reader :outstanding_requests - - def initialize(client, request, droplet) - Router.log.debug "Creating AppConnection" - @client, @request, @droplet = client, request, droplet - @start_time = Time.now - @connected = false - @outstanding_requests = 1 - Router.outstanding_request_count += 1 - end - - def post_init - VCAP::Component.varz[:app_connections] = Router.app_connection_count += 1 - Router.log.debug "Completed AppConnection" - Router.log.debug Router.connection_stats - Router.log.debug "------------" - end - - def connection_completed - @connected = true - #proxy_incoming_to(@client) if @client - send_data(@request) if @client && @request - end - - # queue data until connection completed. - def deliver_data(data) - return send_data(data) if @connected - @request << data - end - - # We have the HTTP Headers complete from the client - def on_headers_complete(headers) - check_sticky_session = STICKY_SESSIONS =~ headers[SET_COOKIE_HEADER] - sent_session_cookie = false # Only send one in case of multiple hits - - header_lines = @headers.split("\r\n") - header_lines.each do |line| - @client.send_data(line) - @client.send_data(CR_LF) - if (check_sticky_session && !sent_session_cookie && STICKY_SESSIONS =~ line) - sid = Router.generate_session_cookie(@droplet) - scl = line.sub(/\S+\s*=\s*\w+/, "#{VCAP_SESSION_ID}=#{sid}") - sent_session_cookie = true - @client.send_data(scl) - @client.send_data(CR_LF) - end - end - # Trace if properly requested - if @client.trace - router_trace = "#{VCAP_ROUTER_HEADER}:#{Router.inet}#{CR_LF}" - be_trace = "#{VCAP_BACKEND_HEADER}:#{@droplet[:host]}:#{@droplet[:port]}#{CR_LF}" - @client.send_data(router_trace) - @client.send_data(be_trace) - end - # Ending CR_LF - @client.send_data(CR_LF) - end - - def process_response_body_chunk(data) - return unless data and data.bytesize > 0 - - # Let parser process as well to properly determine end of message. - # TODO: Once EM 1.0, add in optional bytsize proxy if Content-Length is present. - psize = @parser << data - if (psize == data.bytesize) - @client.send_data(data) - else - Router.log.info "Pipelined response detected!" - # We have a pipelined response, we need to hand over the new headers and only send the proper - # body segments to the backend - body = data.slice!(0, psize) - @client.send_data(body) - receive_data(data) - end - end - - def record_stats - return unless @parser - - latency = ((Time.now - @start_time) * 1000).to_i - response_code = @parser.status_code - response_code_metric = :responses_xxx - if (200..299).include?(response_code) - response_code_metric = :responses_2xx - elsif (300..399).include?(response_code) - response_code_metric = :responses_3xx - elsif (400..499).include?(response_code) - response_code_metric = :responses_4xx - elsif (500..599).include?(response_code) - response_code_metric = :responses_5xx - end - - VCAP::Component.varz[response_code_metric] += 1 - VCAP::Component.varz[:latency] << latency - - if @droplet[:tags] - @droplet[:tags].each do |key, value| - tag_metrics = VCAP::Component.varz[:tags][key][value] - tag_metrics[response_code_metric] += 1 - tag_metrics[:latency] << latency - end - end - end - - def on_message_complete - record_stats - @parser = nil - @outstanding_requests -= 1 - Router.outstanding_request_count -= 1 - :stop - end - - def cant_be_recycled? - error? || @parser != nil || @connected == false || @outstanding_requests > 0 - end - - def recycle - stop_proxying - @client = @request = @headers = nil - end - - def receive_data(data) - # Parser is created after headers have been received and processed. - # If it exists we are continuing the processing of body fragments. - # Allow the parser to process to signal proper end of message, e.g. chunked, etc - return process_response_body_chunk(data) if @parser - - # We are awaiting headers here. - # We buffer them if needed to determine the header/body boundary correctly. - @buf = @buf ? @buf << data : data - if hindex = @buf.index(HTTP_HEADERS_END) # all http headers received, figure out where to route to.. - @parser = Http::Parser.new(self) - - # split headers and rest of body out here. - @headers = @buf.slice!(0...hindex+HTTP_HEADERS_END_SIZE) - - # Process headers - @parser << @headers - - # Process left over body fragment if any - process_response_body_chunk(@buf) if @parser - @buf = @headers = nil - end - - rescue Http::Parser::Error => e - Router.log.debug "HTTP Parser error on response: #{e}" - close_connection - end - - def rebind(client, request) - @start_time = Time.now - @client = client - reuse(request) - end - - def reuse(new_request) - @request = new_request - @outstanding_requests += 1 - Router.outstanding_request_count += 1 - deliver_data(@request) - end - - def proxy_target_unbound - Router.log.debug "Proxy connection dropped" - #close_connection_after_writing - end - - def unbind - Router.outstanding_request_count -= @outstanding_requests - unless @connected - Router.log.info "Could not connect to backend for url:#{@droplet[:url]} @ #{@droplet[:host]}:#{@droplet[:port]}" - if @client - @client.send_data(Router.notfound_redirect || ERROR_404_RESPONSE) - @client.close_connection_after_writing - end - # TODO(dlc) fix - We will unregister bad backends here, should retry the request if possible. - Router.unregister_droplet(@droplet[:url], @droplet[:host], @droplet[:port]) - end - - VCAP::Component.varz[:app_connections] = Router.app_connection_count -= 1 - Router.log.debug 'Unbinding AppConnection' - Router.log.debug Router.connection_stats - Router.log.debug "------------" - - # Remove ourselves from the connection pool - @droplet[:connections].delete(self) - - @client.close_connection_after_writing if @client - end - - def terminate - stop_proxying - close_connection - on_message_complete if @outstanding_requests > 0 - end - -end diff --git a/lib/router/client_connection.rb b/lib/router/client_connection.rb index c406612..41376e7 100644 --- a/lib/router/client_connection.rb +++ b/lib/router/client_connection.rb @@ -1,13 +1,14 @@ # Copyright (c) 2009-2011 VMware, Inc. + module ClientConnection HTTP_11 ='11'.freeze - attr_reader :close_connection_after_request, :trace + attr_reader :close_connection_after_request def initialize(is_unix_socket) - Router.log.debug "Created Client Connection" - @droplet, @bound_app_conn = nil, nil, nil + Router.log.debug "Created uls Connection" + @droplet = nil # Default to be on the safe side @close_connection_after_request = true @is_unix_socket = is_unix_socket @@ -17,123 +18,119 @@ def post_init VCAP::Component.varz[:client_connections] = Router.client_connection_count += 1 Router.log.debug Router.connection_stats Router.log.debug "------------" - self.comm_inactivity_timeout = Router.client_inactivity_timeout + @parser = Http::Parser.new(self) + self.comm_inactivity_timeout = 60.0 end - def recycle_app_conn - return if (!@droplet || !@bound_app_conn) # Could we leak @bound_app_conn here? - - # Don't recycle if we are not supposed to.. - if @close_connection_after_request - Router.log.debug "NOT placing bound AppConnection back into free list because a close was requested.." - @bound_app_conn.close_connection_after_writing - return - end - - # Place any bound connections back into the droplets connection pool. - # This happens on client connection reuse, HTTP 1.1. - # Check for errors, overcommits, etc.. - - if (@bound_app_conn.cant_be_recycled?) - Router.log.debug "NOT placing AppConnection back into free list, can't be recycled" - @bound_app_conn.close_connection_after_writing - return - end - - if @droplet[:connections].index(@bound_app_conn) - Router.log.debug "NOT placing AppConnection back into free list, already exists in free list.." - return - end - - if @droplet[:connections].size >= MAX_POOL - Router.log.debug "NOT placing AppConnection back into free list, MAX_POOL connections already exist.." - @bound_app_conn.close_connection_after_writing - return - end - - Router.log.debug "Placing bound AppConnection back into free list.." - @bound_app_conn.recycle - @droplet[:connections].push(@bound_app_conn) - @bound_app_conn = nil + def connection_completed + Router.log.debug "uls connection completed" end - def terminate_app_conn - return unless @bound_app_conn - Router.log.debug "Terminating AppConnection" - @bound_app_conn.terminate - @bound_app_conn = nil + def receive_data(data) + @parser << data + rescue Http::Parser::Error => e + Router.log.debug "HTTP Parser error on request: #{e}" + close_connection end - def process_request_body_chunk(data) - return unless data - if (@droplet && @bound_app_conn && !@bound_app_conn.error?) - - # Let parser process as well to properly determine end of message. - psize = @parser << data - - if (psize != data.bytesize) - Router.log.info "Pipelined request detected!" - # We have a pipelined request, we need to hand over the new headers and only send the proper - # body segments to the backend - body = data.slice!(0, psize) - @bound_app_conn.deliver_data(body) - receive_data(data) - else - @bound_app_conn.deliver_data(data) - end - else # We do not have a backend droplet anymore - Router.log.info "Backend connection dropped" - terminate_app_conn - close_connection # Should we retry here? - end + def on_message_begin + @headers = nil + @body = '' end - # We have the HTTP Headers complete from the client def on_headers_complete(headers) - return close_connection unless headers and host = headers[HOST_HEADER] + @headers = @parser.headers + end + def on_body(chunk) + @body << chunk + end + + def on_message_complete # Support for HTTP/1.1 connection reuse and possible pipelining @close_connection_after_request = (@parser.http_version.to_s == HTTP_11) ? false : true # Support for Connection:Keep-Alive requests on HTTP/1.0, e.g. ApacheBench - @close_connection_after_request = false if (headers[CONNECTION_HEADER] == KEEP_ALIVE) - - @trace = (headers[VCAP_TRACE_HEADER] == Router.trace_key) + @close_connection_after_request = false if (@headers[CONNECTION_HEADER] == KEEP_ALIVE) # Update # of requests.. VCAP::Component.varz[:requests] += 1 - # Clear and recycle previous state - recycle_app_conn if @bound_app_conn - @droplet = @bound_app_conn = nil + @droplet = nil + + Router.log.debug "request body content:#{@body}" + uls_req = JSON.parse(@body, :symbolize_keys => true) + + # Update request stats carried with this uls query + if uls_req[ULS_STATS_UPDATE] + uls_req[ULS_STATS_UPDATE].each do |stat| + if stat[ULS_REQUEST_TAGS].length > 0 + tags = Marshal.load(Base64.decode64(stat[ULS_REQUEST_TAGS])) + end + + latency = stat[ULS_RESPONSE_LATENCY] + samples = stat[ULS_RESPONSE_SAMPLES] + + # We may find a better solution for latency + 1.upto samples do + VCAP::Component.varz[:latency] << latency + end + + stat[ULS_RESPONSE_STATUS].each_pair do |k, v| + response_code_metric = k.to_sym + VCAP::Component.varz[response_code_metric] += v + if not tags then next end + + tags.each do |key, value| + # In case some req tags of syncup state may be invalid at this time + if not VCAP::Component.varz[:tags][key] or + not VCAP::Component.varz[:tags][key][value] + next + end + + tag_metrics = VCAP::Component.varz[:tags][key][value] + tag_metrics[response_code_metric] += v + 1.upto samples do + tag_metrics[:latency] << latency + end + end + end + end + end + + url = uls_req[ULS_HOST_QUERY] + # In case a stats syncup only request + if not url then + if close_connection_after_request + close_connection_after_writing + end + return + end # Lookup a Droplet - unless droplets = Router.lookup_droplet(host) - Router.log.debug "No droplet registered for #{host}" + unless droplets = Router.lookup_droplet(url) + Router.log.debug "No droplet registered for #{url}" VCAP::Component.varz[:bad_requests] += 1 send_data(Router.notfound_redirect || ERROR_404_RESPONSE) close_connection_after_writing return end - Router.log.debug "#{droplets.size} servers available for #{host}" - # Check for session state - if VCAP_COOKIE =~ headers[COOKIE_HEADER] - url, host, port = Router.decrypt_session_cookie($1) - Router.log.debug "Client has __VCAP_ID__ for #{url}@#{host}:#{port}" + if uls_req[ULS_BACKEND_ADDR] + host, port = uls_req[ULS_BACKEND_ADDR].split(":") + Router.log.debug "request has __VCAP_ID__ cookie for #{host}:#{port}" # Check host? droplets.each { |droplet| - # If we already now about them just update the timestamp.. - if(droplet[:host] == host && droplet[:port] == port) + if(droplet[:host] == host && droplet[:port] == port.to_i) @droplet = droplet break; end } - Router.log.debug "Client's __VCAP_ID__ is stale" unless @droplet + Router.log.debug "request's __VCAP_ID__ is stale" unless @droplet end - # pick a random backend unless selected from above already + # Pick a random backend unless selected from above already @droplet = droplets[rand*droplets.size] unless @droplet if @droplet[:tags] @@ -141,64 +138,28 @@ def on_headers_complete(headers) tag_metrics = VCAP::Component.varz[:tags][key][value] tag_metrics[:requests] += 1 end + uls_req_tags = Base64.encode64(Marshal.dump(@droplet[:tags])).strip end @droplet[:requests] += 1 - # Client tracking, override with header if its set (nginx to unix domain socket) - _, client_ip = Socket.unpack_sockaddr_in(get_peername) unless @is_unix_socket - client_ip = headers[REAL_IP_HEADER] if headers[REAL_IP_HEADER] - - @droplet[:clients][client_ip] += 1 if client_ip + Router.log.debug "Routing #{@droplet[:url]} to #{@droplet[:host]}:#{@droplet[:port]}" - Router.log.debug "Routing on #{@droplet[:url]} to #{@droplet[:host]}:#{@droplet[:port]}" + # send upstream addr back to nginx + uls_response = { + ULS_BACKEND_ADDR => "#{@droplet[:host]}:#{@droplet[:port]}", + ULS_REQUEST_TAGS => "#{uls_req_tags}", + ULS_ROUTER_IP => "#{Router.inet}" + }.to_json + send_data(HTTP_200_RESPONSE + "#{uls_response}") - # Reuse an existing connection or create one. - # Proxy the rest of the traffic without interference. - Router.log.debug "Droplet has #{@droplet[:connections].size} pooled connections waiting.." - @bound_app_conn = @droplet[:connections].pop - if (@bound_app_conn && !@bound_app_conn.error?) - Router.log.debug "Reusing pooled AppConnection.." - @bound_app_conn.rebind(self, @headers) - else - host, port = @droplet[:host], @droplet[:port] - @bound_app_conn = EM.connect(host, port, AppConnection, self, @headers, @droplet) - end - - end - - def on_message_complete - @parser = nil - :stop - end - - def receive_data(data) - # Parser is created after headers have been received and processed. - # If it exists we are continuing the processing of body fragments. - # Allow the parser to process to signal proper end of message, e.g. chunked, etc - return process_request_body_chunk(data) if @parser - - # We are awaiting headers here. - # We buffer them if needed to determine the header/body boundary correctly. - @buf = @buf ? @buf << data : data - - if hindex = @buf.index(HTTP_HEADERS_END) # all http headers received, figure out where to route to.. - @parser = Http::Parser.new(self) - - # split headers and rest of body out here. - @headers = @buf.slice!(0...hindex+HTTP_HEADERS_END_SIZE) - - # Process headers - @parser << @headers - - # Process left over body fragment if any - process_request_body_chunk(@buf) if @parser - @buf = @headers = nil + if close_connection_after_request + close_connection_after_writing end - rescue Http::Parser::Error => e - Router.log.debug "HTTP Parser error on request: #{e}" - close_connection + rescue JSON::ParserError + send_data(HTTP_400_RESPONSE) + close_connection_after_writing end def unbind @@ -206,7 +167,6 @@ def unbind VCAP::Component.varz[:client_connections] = Router.client_connection_count -= 1 Router.log.debug Router.connection_stats Router.log.debug "------------" - @close_connection_after_request ? terminate_app_conn : recycle_app_conn end end diff --git a/lib/router/const.rb b/lib/router/const.rb index 6f2c4d9..2a17121 100644 --- a/lib/router/const.rb +++ b/lib/router/const.rb @@ -2,23 +2,23 @@ # HTTP Header processing HOST_HEADER = 'Host'.freeze CONNECTION_HEADER = 'Connection'.freeze -REAL_IP_HEADER = 'X-Real_IP'.freeze -HTTP_HEADERS_END = "\r\n\r\n".freeze -HTTP_HEADERS_END_SIZE = HTTP_HEADERS_END.bytesize KEEP_ALIVE = 'keep-alive'.freeze -SET_COOKIE_HEADER = 'Set-Cookie'.freeze -COOKIE_HEADER = 'Cookie'.freeze -CR_LF = "\r\n".freeze -STICKY_SESSIONS = /(JSESSIONID)/i - -VCAP_SESSION_ID = '__VCAP_ID__'.freeze -VCAP_COOKIE = /__VCAP_ID__=([^;]+)/ +#STICKY_SESSIONS = /(JSESSIONID)/i VCAP_BACKEND_HEADER = 'X-Vcap-Backend' VCAP_ROUTER_HEADER = 'X-Vcap-Router' VCAP_TRACE_HEADER = 'X-Vcap-Trace' +ULS_HOST_QUERY = :"host" +ULS_STATS_UPDATE = :"stats" +ULS_REQUEST_TAGS = :"request_tags" +ULS_RESPONSE_STATUS = :"response_codes" +ULS_RESPONSE_SAMPLES = :"response_samples" +ULS_RESPONSE_LATENCY = :"response_latency" +ULS_BACKEND_ADDR = :"backend_addr" +ULS_ROUTER_IP = :"router_ip" + # Max Connections to Pool MAX_POOL = 32 @@ -29,7 +29,12 @@ CHECK_SWEEPER = 30 # Check time for watching health of registered droplet MAX_AGE_STALE = 120 # Max stale age, unregistered if older then 2 minutes -# 404 Response -ERROR_404_RESPONSE="HTTP/1.1 404 Not Found\r\nConnection: close\r\n\r\n" + - "VCAP ROUTER: 404 - DESTINATION NOT FOUND\r\n".freeze +# 200 Response +HTTP_200_RESPONSE = "HTTP/1.1 200 OK\r\n\r\n".freeze +# 400 Response +ERROR_400_RESPONSE = "HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n".freeze + +# 404 Response +ERROR_404_RESPONSE = "HTTP/1.1 404 Not Found\r\nConnection: close\r\n\r\n" + + "VCAP ROUTER: 404 - DESTINATION NOT FOUND\r\n".freeze diff --git a/lib/router/router.rb b/lib/router/router.rb index 847082d..2364068 100644 --- a/lib/router/router.rb +++ b/lib/router/router.rb @@ -4,9 +4,9 @@ class Router VERSION = 0.98 class << self - attr_reader :log, :notfound_redirect, :session_key, :trace_key, :client_inactivity_timeout + attr_reader :log, :notfound_redirect, :client_inactivity_timeout attr_accessor :server, :local_server, :timestamp, :shutting_down - attr_accessor :client_connection_count, :app_connection_count, :outstanding_request_count + attr_accessor :client_connection_count attr_accessor :inet, :port alias :shutting_down? :shutting_down @@ -17,7 +17,7 @@ def version def config(config) @droplets = {} - @client_connection_count = @app_connection_count = @outstanding_request_count = 0 + @client_connection_count = 0 VCAP::Logging.setup_from_config(config['logging'] || {}) @log = VCAP::Logging.logger('router') if config['404_redirect'] @@ -25,8 +25,6 @@ def config(config) log.info "Registered 404 redirect at #{config['404_redirect']}" end - @session_key = config['session_key'] || '14fbc303b76bacd1e0a3ab641c11d11400341c5d' - @trace_key = config['trace_key'] || '22' @client_inactivity_timeout = config['client_inactivity_timeout'] || 60 @expose_all_apps = config['status']['expose_all_apps'] if config['status'] end @@ -120,40 +118,16 @@ def check_registered_urls def connection_stats tc = EM.connection_count - ac = Router.app_connection_count cc = Router.client_connection_count - "Connections: [Clients: #{cc}, Apps: #{ac}, Total: #{tc}]" + "Connections: [uls Clients: #{cc}, Total: #{tc}]" end def log_connection_stats tc = EM.connection_count - ac = Router.app_connection_count cc = Router.client_connection_count log.info connection_stats end - def generate_session_cookie(droplet) - token = [ droplet[:url], droplet[:host], droplet[:port] ] - c = OpenSSL::Cipher::Cipher.new('blowfish') - c.encrypt - c.key = @session_key - e = c.update(Marshal.dump(token)) - e << c.final - [e].pack('m0').gsub("\n",'') - end - - def decrypt_session_cookie(key) - e = key.unpack('m*')[0] - d = OpenSSL::Cipher::Cipher.new('blowfish') - d.decrypt - d.key = @session_key - p = d.update(e) - p << d.final - Marshal.load(p) - rescue - nil - end - def lookup_droplet(url) @droplets[url] end diff --git a/lib/router/utils.rb b/lib/router/utils.rb index 03724c4..0b0b782 100644 --- a/lib/router/utils.rb +++ b/lib/router/utils.rb @@ -17,10 +17,11 @@ def stop(pidfile) Router.log.info 'waiting for pending requests to complete.' EM.stop_server(Router.server) if Router.server EM.stop_server(Router.local_server) if Router.local_server - if Router.outstanding_request_count <= 0 + + if Router.client_connection_count <= 0 exit_router(pidfile) else - EM.add_periodic_timer(0.25) { exit_router(pidfile) if (Router.outstanding_request_count <= 0) } + EM.add_periodic_timer(0.25) { exit_router(pidfile) if (Router.client_connection_count <= 0) } EM.add_timer(10) { exit_router(pidfile) } # Wait at most 10 secs end @@ -32,4 +33,3 @@ def exit_router(pidfile) FileUtils.rm_f(pidfile) exit end - diff --git a/spec/Rakefile b/spec/Rakefile index e4d921d..5d1f8a5 100644 --- a/spec/Rakefile +++ b/spec/Rakefile @@ -18,6 +18,11 @@ desc "Run specs using RCov" task "spec:rcov" => ["ci:setup:rspec", "spec:rcov_internal", "convert_rcov_to_clover"] RSpec::Core::RakeTask.new do |t| + t.pattern = "{functional,unit}/*_spec.rb" + t.rspec_opts = ["--format", "documentation", "--colour"] +end + +RSpec::Core::RakeTask.new("test") do |t| t.pattern = "**/*_spec.rb" t.rspec_opts = ["--format", "documentation", "--colour"] end diff --git a/spec/functional/router_spec.rb b/spec/functional/router_spec.rb index 6f5fc8d..673a819 100644 --- a/spec/functional/router_spec.rb +++ b/spec/functional/router_spec.rb @@ -1,7 +1,10 @@ # Copyright (c) 2009-2011 VMware, Inc. require File.dirname(__FILE__) + '/spec_helper' +require "base64" describe 'Router Functional Tests' do + include Functional + before :each do @nats_server = NatsServer.new @nats_server.start_server @@ -50,10 +53,7 @@ healthz_resp = Net::HTTP.new(host, port).start { |http| http.request(healthz_req) } healthz_resp.body.should =~ /ok/i - varz_req = Net::HTTP::Get.new("/varz") - varz_req.basic_auth *credentials - varz_resp = Net::HTTP.new(host, port).start { |http| http.request(varz_req) } - varz = JSON.parse(varz_resp.body, :symbolize_keys => true) + varz = get_varz() varz[:requests].should be_a_kind_of(Integer) varz[:bad_requests].should be_a_kind_of(Integer) varz[:client_connections].should be_a_kind_of(Integer) @@ -65,8 +65,7 @@ app = TestApp.new('router_test.vcap.me') dea = DummyDea.new(@nats_server.uri, '1234') dea.register_app(app) - app.verify_registered('127.0.0.1', RouterServer.port) - app.stop + app.verify_registered end it 'should properly unregister an application endpoint' do @@ -74,91 +73,9 @@ app = TestApp.new('router_test.vcap.me') dea = DummyDea.new(@nats_server.uri, '1234') dea.register_app(app) - app.verify_registered('127.0.0.1', RouterServer.port) + app.verify_registered dea.unregister_app(app) - # We should be unregistered here.. - # Send out simple request and check request and response - req = simple_http_request('router_test.cap.me', '/') - verify_vcap_404(req, '127.0.0.1', RouterServer.port) - app.stop - end - - it 'should properly distibute messages between multiple backends' do - num_apps = 10 - num_requests = 100 - dea = DummyDea.new(@nats_server.uri, '1234') - - apps = [] - for ii in (0...num_apps) - app = TestApp.new('lb_test.vcap.me') - dea.register_app(app) - apps << app - end - - req = simple_http_request('lb_test.vcap.me', '/') - for ii in (0...num_requests) - TCPSocket.open('127.0.0.1', RouterServer.port) {|rs| rs.send(req, 0) } - end - sleep(0.25) # Wait here for requests to trip accept state - - app_sockets = apps.collect { |a| a.socket } - ready = IO.select(app_sockets, nil, nil, 1) - ready[0].should have(num_apps).items - apps.each {|a| a.stop } - end - - it 'should properly do sticky sessions' do - num_apps = 10 - dea = DummyDea.new(@nats_server.uri, '1234') - - apps = [] - for ii in (0...num_apps) - app = TestApp.new('sticky.vcap.me') - dea.register_app(app) - apps << app - end - - vcap_id = app_socket = nil - app_sockets = apps.collect { |a| a.socket } - - TCPSocket.open('127.0.0.1', RouterServer.port) do |rs| - rs.send(STICKY_REQUEST, 0) - ready = IO.select(app_sockets, nil, nil, 1) - ready[0].should have(1).items - app_socket = ready[0].first - ss = app_socket.accept_nonblock - req_received = ss.recv(STICKY_REQUEST.bytesize) - req_received.should == STICKY_REQUEST - # Send a response back.. This will set the sticky session - ss.send(STICKY_RESPONSE, 0) - response = rs.read(STICKY_RESPONSE.bytesize) - # Make sure the __VCAP_ID__ has been set - response =~ /Set-Cookie:\s*__VCAP_ID__=([^;]+);/ - (vcap_id = $1).should be - end - - cookie = "__VCAP_ID__=#{vcap_id}" - sticky_request = simple_sticky_request('sticky.vcap.me', '/sticky', cookie) - - # Now fire off requests, all should go to same socket as first - (0...5).each do - TCPSocket.open('127.0.0.1', RouterServer.port) do |rs| - rs.send(sticky_request, 0) - end - end - - ready = IO.select(app_sockets, nil, nil, 1) - ready[0].should have(1).items - app_socket.should == ready[0].first - - for app in apps - dea.unregister_app(app) - end - - # Check that is is gone - verify_vcap_404(STICKY_REQUEST, '127.0.0.1', RouterServer.port) - - apps.each {|a| a.stop } + app.verify_unregistered end it 'should properly exit when NATS fails to reconnect' do @@ -194,11 +111,20 @@ def json_request(uri, subj, data=nil, timeout=1) reply end - def verify_vcap_404(req, router_host, router_port) - TCPSocket.open(router_host, router_port) do |rs| - rs.send(req, 0) - response = rs.read(VCAP_NOT_FOUND.bytesize) - response.should == VCAP_NOT_FOUND - end + def get_varz + reply = json_request(@nats_server.uri, 'vcap.component.discover') + reply.should_not be_nil + + credentials = reply[:credentials] + credentials.should_not be_nil + + host, port = reply[:host].split(":") + + varz_req = Net::HTTP::Get.new("/varz") + varz_req.basic_auth *credentials + varz_resp = Net::HTTP.new(host, port).start { |http| http.request(varz_req) } + varz = JSON.parse(varz_resp.body, :symbolize_keys => true) + varz end + end diff --git a/spec/functional/spec_helper.rb b/spec/functional/spec_helper.rb index 4ffbfe2..ed3d4b7 100644 --- a/spec/functional/spec_helper.rb +++ b/spec/functional/spec_helper.rb @@ -1,229 +1,118 @@ # Copyright (c) 2009-2011 VMware, Inc. -require File.dirname(__FILE__) + '/../spec_helper' +require File.dirname(__FILE__) + '/../lib/spec_helper' -require 'fileutils' -require 'nats/client' -require 'yajl/json_gem' -require 'vcap/common' -require 'openssl' -require 'net/http' -require 'uri' +module Functional -require 'pp' + class TestApp + class UsageError < StandardError; end -class NatsServer + attr_reader :uris, :droplet - TEST_PORT = 4228 - - def initialize(uri="nats://localhost:#{TEST_PORT}", pid_file='/tmp/nats-router-tests.pid') - @uri = URI.parse(uri) - @pid_file = pid_file - end - - def uri - @uri.to_s - end - - def server_pid - @pid ||= File.read(@pid_file).chomp.to_i - end - - def start_server - return if NATS.server_running? @uri - %x[ruby -S bundle exec nats-server -p #{@uri.port} -P #{@pid_file} -d 2> /dev/null] - NATS.wait_for_server(@uri) # New version takes opt_timeout - end - - def is_running? - NATS.server_running? @uri - end - - def kill_server - if File.exists? @pid_file - Process.kill('KILL', server_pid) - FileUtils.rm_f(@pid_file) + def initialize(*uris) + @uris = uris end - end -end - -class RouterServer - - PID_FILE = '/tmp/router-test.pid' - CONFIG_FILE = '/tmp/router-test.yml' - LOG_FILE = '/tmp/router-test.log' - PORT = 2228 - def initialize(nats_uri) - port = "port: #{PORT}" - mbus = "mbus: #{nats_uri}" - log_info = "log_level: DEBUG\nlog_file: #{LOG_FILE}" - @config = %Q{#{port}\ninet: 127.0.0.1\n#{mbus}\n#{log_info}\npid: #{PID_FILE}} - end - - def self.port - PORT - end - - def server_pid - File.read(PID_FILE).chomp.to_i - end - - def start_server - return if is_running? - - # Write the config - File.open(CONFIG_FILE, 'w') { |f| f.puts "#{@config}" } - - # Wipe old log file, but truncate so running tail works - if (File.exists? LOG_FILE) - File.truncate(LOG_FILE, 0) - # %x[rm #{LOG_FILE}] if File.exists? LOG_FILE + def bind_droplet(droplet) + @droplet = droplet end - server = File.expand_path(File.join(__FILE__, '../../../bin/router')) - nats_timeout = File.expand_path(File.join(File.dirname(__FILE__), 'nats_timeout')) - # pid = Process.fork { %x[#{server} -c #{CONFIG_FILE} 2> /dev/null] } - pid = Process.fork { %x[ruby -r#{nats_timeout} #{server} -c #{CONFIG_FILE} 2> /dev/null] } - Process.detach(pid) - - wait_for_server - end - - def is_running? - require 'socket' - s = TCPSocket.new('localhost', PORT) - s.close - return true - rescue - return false - end - - def wait_for_server(max_wait = 5) - start = Time.now - while (Time.now - start < max_wait) # Wait max_wait seconds max - break if is_running? - sleep(0.2) + def unbind_droplet + @droplet = nil end - end - def kill_server - if File.exists? PID_FILE - %x[kill -9 #{server_pid} 2> /dev/null] - %x[rm #{PID_FILE}] + def port + @droplet.port if @droplet end - %x[rm #{CONFIG_FILE}] if File.exists? CONFIG_FILE - end -end - - -# HTTP REQUESTS / RESPONSES - -FOO_HTTP_RESPONSE = "HTTP/1.1 200 OK\r\nContent-Type: text/html;charset=utf-8\r\nContent-Length: 53\r\nConnection: keep-alive\r\nServer: thin 1.2.7 codename No Hup\r\n\r\n