From 3fec163dcd66b075f9bb8cf9ce58d70777a29ab9 Mon Sep 17 00:00:00 2001 From: anferneeg Date: Wed, 14 Dec 2011 16:42:24 -0800 Subject: [PATCH] First pass for router v2 implementation. The init version of router v2 is done by Jun Xiao, polished by Anfernee with a couple of bug fixes and enhancement. router v2 decouples control and data path to some extent, it's a logic router including both nginx and uls (upstream location server) parts. uls (aka control plane) is based on legacy router: - all communications with nats are kept the same as before - client facing interface (actually connected to nginx) is simplified to handle the location query/stats update from nginx only. - the original app facing interface is totally removed. nginx (aka data plane) - package lua in nginx so most functionalities can be implemented with it. - for each request, nginx will generate a subrequest (to fit in nginx single thread event model) which carries location query/stats info to uls, the response of the subrequest will return backend addr and also an opaque tags (only known by uls), and the corresponding main request will also be awaked and routed to the backend server, when a response from backend is returned back, the counter associated with the particular opaque tags will be incremented accoring to the response code. The next incoming request will always help to carry not-synced stats to uls along with its location query. - sticky session and trace header are also implemented here, also their key related config are moved to nginx part. testing - "rake spec" will only test uls, while "rake test" will run both unit test and integration test with nginx - integration test requires nginx with lua module installed running Change-Id: I9b25b24c449b0421754e541ab5ba60a00c551da9 --- Rakefile | 6 +- lib/router.rb | 2 - lib/router/app_connection.rb | 200 --------------- lib/router/client_connection.rb | 232 +++++++---------- lib/router/const.rb | 31 ++- lib/router/router.rb | 34 +-- lib/router/utils.rb | 6 +- spec/Rakefile | 5 + spec/functional/router_spec.rb | 118 ++------- spec/functional/spec_helper.rb | 275 ++++++--------------- spec/integration/router_spec.rb | 241 ++++++++++++++++++ spec/integration/spec_helper.rb | 302 +++++++++++++++++++++++ spec/{functional => lib}/nats_timeout.rb | 0 spec/lib/spec_helper.rb | 130 ++++++++++ spec/unit/router_spec.rb | 34 --- 15 files changed, 908 insertions(+), 708 deletions(-) delete mode 100644 lib/router/app_connection.rb create mode 100644 spec/integration/router_spec.rb create mode 100644 spec/integration/spec_helper.rb rename spec/{functional => lib}/nats_timeout.rb (100%) create mode 100644 spec/lib/spec_helper.rb diff --git a/Rakefile b/Rakefile index a9625de..b2e3ed7 100644 --- a/Rakefile +++ b/Rakefile @@ -49,7 +49,11 @@ namespace "test" do sh("cd spec && rake spec") end - task "spec:rcov" do |t| + task "test" do |t| + sh("cd spec && rake test") + end + + task "spec:rcov" do |t| sh("cd spec && rake spec:rcov") end end diff --git a/lib/router.rb b/lib/router.rb index a2c594b..2251c7f 100644 --- a/lib/router.rb +++ b/lib/router.rb @@ -21,7 +21,6 @@ require 'router/const' require 'router/router' -require 'router/app_connection' require 'router/client_connection' require 'router/utils' @@ -183,4 +182,3 @@ end end - diff --git a/lib/router/app_connection.rb b/lib/router/app_connection.rb deleted file mode 100644 index d924e38..0000000 --- a/lib/router/app_connection.rb +++ /dev/null @@ -1,200 +0,0 @@ -# Copyright (c) 2009-2011 VMware, Inc. -module AppConnection - - attr_reader :outstanding_requests - - def initialize(client, request, droplet) - Router.log.debug "Creating AppConnection" - @client, @request, @droplet = client, request, droplet - @start_time = Time.now - @connected = false - @outstanding_requests = 1 - Router.outstanding_request_count += 1 - end - - def post_init - VCAP::Component.varz[:app_connections] = Router.app_connection_count += 1 - Router.log.debug "Completed AppConnection" - Router.log.debug Router.connection_stats - Router.log.debug "------------" - end - - def connection_completed - @connected = true - #proxy_incoming_to(@client) if @client - send_data(@request) if @client && @request - end - - # queue data until connection completed. - def deliver_data(data) - return send_data(data) if @connected - @request << data - end - - # We have the HTTP Headers complete from the client - def on_headers_complete(headers) - check_sticky_session = STICKY_SESSIONS =~ headers[SET_COOKIE_HEADER] - sent_session_cookie = false # Only send one in case of multiple hits - - header_lines = @headers.split("\r\n") - header_lines.each do |line| - @client.send_data(line) - @client.send_data(CR_LF) - if (check_sticky_session && !sent_session_cookie && STICKY_SESSIONS =~ line) - sid = Router.generate_session_cookie(@droplet) - scl = line.sub(/\S+\s*=\s*\w+/, "#{VCAP_SESSION_ID}=#{sid}") - sent_session_cookie = true - @client.send_data(scl) - @client.send_data(CR_LF) - end - end - # Trace if properly requested - if @client.trace - router_trace = "#{VCAP_ROUTER_HEADER}:#{Router.inet}#{CR_LF}" - be_trace = "#{VCAP_BACKEND_HEADER}:#{@droplet[:host]}:#{@droplet[:port]}#{CR_LF}" - @client.send_data(router_trace) - @client.send_data(be_trace) - end - # Ending CR_LF - @client.send_data(CR_LF) - end - - def process_response_body_chunk(data) - return unless data and data.bytesize > 0 - - # Let parser process as well to properly determine end of message. - # TODO: Once EM 1.0, add in optional bytsize proxy if Content-Length is present. - psize = @parser << data - if (psize == data.bytesize) - @client.send_data(data) - else - Router.log.info "Pipelined response detected!" - # We have a pipelined response, we need to hand over the new headers and only send the proper - # body segments to the backend - body = data.slice!(0, psize) - @client.send_data(body) - receive_data(data) - end - end - - def record_stats - return unless @parser - - latency = ((Time.now - @start_time) * 1000).to_i - response_code = @parser.status_code - response_code_metric = :responses_xxx - if (200..299).include?(response_code) - response_code_metric = :responses_2xx - elsif (300..399).include?(response_code) - response_code_metric = :responses_3xx - elsif (400..499).include?(response_code) - response_code_metric = :responses_4xx - elsif (500..599).include?(response_code) - response_code_metric = :responses_5xx - end - - VCAP::Component.varz[response_code_metric] += 1 - VCAP::Component.varz[:latency] << latency - - if @droplet[:tags] - @droplet[:tags].each do |key, value| - tag_metrics = VCAP::Component.varz[:tags][key][value] - tag_metrics[response_code_metric] += 1 - tag_metrics[:latency] << latency - end - end - end - - def on_message_complete - record_stats - @parser = nil - @outstanding_requests -= 1 - Router.outstanding_request_count -= 1 - :stop - end - - def cant_be_recycled? - error? || @parser != nil || @connected == false || @outstanding_requests > 0 - end - - def recycle - stop_proxying - @client = @request = @headers = nil - end - - def receive_data(data) - # Parser is created after headers have been received and processed. - # If it exists we are continuing the processing of body fragments. - # Allow the parser to process to signal proper end of message, e.g. chunked, etc - return process_response_body_chunk(data) if @parser - - # We are awaiting headers here. - # We buffer them if needed to determine the header/body boundary correctly. - @buf = @buf ? @buf << data : data - if hindex = @buf.index(HTTP_HEADERS_END) # all http headers received, figure out where to route to.. - @parser = Http::Parser.new(self) - - # split headers and rest of body out here. - @headers = @buf.slice!(0...hindex+HTTP_HEADERS_END_SIZE) - - # Process headers - @parser << @headers - - # Process left over body fragment if any - process_response_body_chunk(@buf) if @parser - @buf = @headers = nil - end - - rescue Http::Parser::Error => e - Router.log.debug "HTTP Parser error on response: #{e}" - close_connection - end - - def rebind(client, request) - @start_time = Time.now - @client = client - reuse(request) - end - - def reuse(new_request) - @request = new_request - @outstanding_requests += 1 - Router.outstanding_request_count += 1 - deliver_data(@request) - end - - def proxy_target_unbound - Router.log.debug "Proxy connection dropped" - #close_connection_after_writing - end - - def unbind - Router.outstanding_request_count -= @outstanding_requests - unless @connected - Router.log.info "Could not connect to backend for url:#{@droplet[:url]} @ #{@droplet[:host]}:#{@droplet[:port]}" - if @client - @client.send_data(Router.notfound_redirect || ERROR_404_RESPONSE) - @client.close_connection_after_writing - end - # TODO(dlc) fix - We will unregister bad backends here, should retry the request if possible. - Router.unregister_droplet(@droplet[:url], @droplet[:host], @droplet[:port]) - end - - VCAP::Component.varz[:app_connections] = Router.app_connection_count -= 1 - Router.log.debug 'Unbinding AppConnection' - Router.log.debug Router.connection_stats - Router.log.debug "------------" - - # Remove ourselves from the connection pool - @droplet[:connections].delete(self) - - @client.close_connection_after_writing if @client - end - - def terminate - stop_proxying - close_connection - on_message_complete if @outstanding_requests > 0 - end - -end diff --git a/lib/router/client_connection.rb b/lib/router/client_connection.rb index c406612..41376e7 100644 --- a/lib/router/client_connection.rb +++ b/lib/router/client_connection.rb @@ -1,13 +1,14 @@ # Copyright (c) 2009-2011 VMware, Inc. + module ClientConnection HTTP_11 ='11'.freeze - attr_reader :close_connection_after_request, :trace + attr_reader :close_connection_after_request def initialize(is_unix_socket) - Router.log.debug "Created Client Connection" - @droplet, @bound_app_conn = nil, nil, nil + Router.log.debug "Created uls Connection" + @droplet = nil # Default to be on the safe side @close_connection_after_request = true @is_unix_socket = is_unix_socket @@ -17,123 +18,119 @@ def post_init VCAP::Component.varz[:client_connections] = Router.client_connection_count += 1 Router.log.debug Router.connection_stats Router.log.debug "------------" - self.comm_inactivity_timeout = Router.client_inactivity_timeout + @parser = Http::Parser.new(self) + self.comm_inactivity_timeout = 60.0 end - def recycle_app_conn - return if (!@droplet || !@bound_app_conn) # Could we leak @bound_app_conn here? - - # Don't recycle if we are not supposed to.. - if @close_connection_after_request - Router.log.debug "NOT placing bound AppConnection back into free list because a close was requested.." - @bound_app_conn.close_connection_after_writing - return - end - - # Place any bound connections back into the droplets connection pool. - # This happens on client connection reuse, HTTP 1.1. - # Check for errors, overcommits, etc.. - - if (@bound_app_conn.cant_be_recycled?) - Router.log.debug "NOT placing AppConnection back into free list, can't be recycled" - @bound_app_conn.close_connection_after_writing - return - end - - if @droplet[:connections].index(@bound_app_conn) - Router.log.debug "NOT placing AppConnection back into free list, already exists in free list.." - return - end - - if @droplet[:connections].size >= MAX_POOL - Router.log.debug "NOT placing AppConnection back into free list, MAX_POOL connections already exist.." - @bound_app_conn.close_connection_after_writing - return - end - - Router.log.debug "Placing bound AppConnection back into free list.." - @bound_app_conn.recycle - @droplet[:connections].push(@bound_app_conn) - @bound_app_conn = nil + def connection_completed + Router.log.debug "uls connection completed" end - def terminate_app_conn - return unless @bound_app_conn - Router.log.debug "Terminating AppConnection" - @bound_app_conn.terminate - @bound_app_conn = nil + def receive_data(data) + @parser << data + rescue Http::Parser::Error => e + Router.log.debug "HTTP Parser error on request: #{e}" + close_connection end - def process_request_body_chunk(data) - return unless data - if (@droplet && @bound_app_conn && !@bound_app_conn.error?) - - # Let parser process as well to properly determine end of message. - psize = @parser << data - - if (psize != data.bytesize) - Router.log.info "Pipelined request detected!" - # We have a pipelined request, we need to hand over the new headers and only send the proper - # body segments to the backend - body = data.slice!(0, psize) - @bound_app_conn.deliver_data(body) - receive_data(data) - else - @bound_app_conn.deliver_data(data) - end - else # We do not have a backend droplet anymore - Router.log.info "Backend connection dropped" - terminate_app_conn - close_connection # Should we retry here? - end + def on_message_begin + @headers = nil + @body = '' end - # We have the HTTP Headers complete from the client def on_headers_complete(headers) - return close_connection unless headers and host = headers[HOST_HEADER] + @headers = @parser.headers + end + def on_body(chunk) + @body << chunk + end + + def on_message_complete # Support for HTTP/1.1 connection reuse and possible pipelining @close_connection_after_request = (@parser.http_version.to_s == HTTP_11) ? false : true # Support for Connection:Keep-Alive requests on HTTP/1.0, e.g. ApacheBench - @close_connection_after_request = false if (headers[CONNECTION_HEADER] == KEEP_ALIVE) - - @trace = (headers[VCAP_TRACE_HEADER] == Router.trace_key) + @close_connection_after_request = false if (@headers[CONNECTION_HEADER] == KEEP_ALIVE) # Update # of requests.. VCAP::Component.varz[:requests] += 1 - # Clear and recycle previous state - recycle_app_conn if @bound_app_conn - @droplet = @bound_app_conn = nil + @droplet = nil + + Router.log.debug "request body content:#{@body}" + uls_req = JSON.parse(@body, :symbolize_keys => true) + + # Update request stats carried with this uls query + if uls_req[ULS_STATS_UPDATE] + uls_req[ULS_STATS_UPDATE].each do |stat| + if stat[ULS_REQUEST_TAGS].length > 0 + tags = Marshal.load(Base64.decode64(stat[ULS_REQUEST_TAGS])) + end + + latency = stat[ULS_RESPONSE_LATENCY] + samples = stat[ULS_RESPONSE_SAMPLES] + + # We may find a better solution for latency + 1.upto samples do + VCAP::Component.varz[:latency] << latency + end + + stat[ULS_RESPONSE_STATUS].each_pair do |k, v| + response_code_metric = k.to_sym + VCAP::Component.varz[response_code_metric] += v + if not tags then next end + + tags.each do |key, value| + # In case some req tags of syncup state may be invalid at this time + if not VCAP::Component.varz[:tags][key] or + not VCAP::Component.varz[:tags][key][value] + next + end + + tag_metrics = VCAP::Component.varz[:tags][key][value] + tag_metrics[response_code_metric] += v + 1.upto samples do + tag_metrics[:latency] << latency + end + end + end + end + end + + url = uls_req[ULS_HOST_QUERY] + # In case a stats syncup only request + if not url then + if close_connection_after_request + close_connection_after_writing + end + return + end # Lookup a Droplet - unless droplets = Router.lookup_droplet(host) - Router.log.debug "No droplet registered for #{host}" + unless droplets = Router.lookup_droplet(url) + Router.log.debug "No droplet registered for #{url}" VCAP::Component.varz[:bad_requests] += 1 send_data(Router.notfound_redirect || ERROR_404_RESPONSE) close_connection_after_writing return end - Router.log.debug "#{droplets.size} servers available for #{host}" - # Check for session state - if VCAP_COOKIE =~ headers[COOKIE_HEADER] - url, host, port = Router.decrypt_session_cookie($1) - Router.log.debug "Client has __VCAP_ID__ for #{url}@#{host}:#{port}" + if uls_req[ULS_BACKEND_ADDR] + host, port = uls_req[ULS_BACKEND_ADDR].split(":") + Router.log.debug "request has __VCAP_ID__ cookie for #{host}:#{port}" # Check host? droplets.each { |droplet| - # If we already now about them just update the timestamp.. - if(droplet[:host] == host && droplet[:port] == port) + if(droplet[:host] == host && droplet[:port] == port.to_i) @droplet = droplet break; end } - Router.log.debug "Client's __VCAP_ID__ is stale" unless @droplet + Router.log.debug "request's __VCAP_ID__ is stale" unless @droplet end - # pick a random backend unless selected from above already + # Pick a random backend unless selected from above already @droplet = droplets[rand*droplets.size] unless @droplet if @droplet[:tags] @@ -141,64 +138,28 @@ def on_headers_complete(headers) tag_metrics = VCAP::Component.varz[:tags][key][value] tag_metrics[:requests] += 1 end + uls_req_tags = Base64.encode64(Marshal.dump(@droplet[:tags])).strip end @droplet[:requests] += 1 - # Client tracking, override with header if its set (nginx to unix domain socket) - _, client_ip = Socket.unpack_sockaddr_in(get_peername) unless @is_unix_socket - client_ip = headers[REAL_IP_HEADER] if headers[REAL_IP_HEADER] - - @droplet[:clients][client_ip] += 1 if client_ip + Router.log.debug "Routing #{@droplet[:url]} to #{@droplet[:host]}:#{@droplet[:port]}" - Router.log.debug "Routing on #{@droplet[:url]} to #{@droplet[:host]}:#{@droplet[:port]}" + # send upstream addr back to nginx + uls_response = { + ULS_BACKEND_ADDR => "#{@droplet[:host]}:#{@droplet[:port]}", + ULS_REQUEST_TAGS => "#{uls_req_tags}", + ULS_ROUTER_IP => "#{Router.inet}" + }.to_json + send_data(HTTP_200_RESPONSE + "#{uls_response}") - # Reuse an existing connection or create one. - # Proxy the rest of the traffic without interference. - Router.log.debug "Droplet has #{@droplet[:connections].size} pooled connections waiting.." - @bound_app_conn = @droplet[:connections].pop - if (@bound_app_conn && !@bound_app_conn.error?) - Router.log.debug "Reusing pooled AppConnection.." - @bound_app_conn.rebind(self, @headers) - else - host, port = @droplet[:host], @droplet[:port] - @bound_app_conn = EM.connect(host, port, AppConnection, self, @headers, @droplet) - end - - end - - def on_message_complete - @parser = nil - :stop - end - - def receive_data(data) - # Parser is created after headers have been received and processed. - # If it exists we are continuing the processing of body fragments. - # Allow the parser to process to signal proper end of message, e.g. chunked, etc - return process_request_body_chunk(data) if @parser - - # We are awaiting headers here. - # We buffer them if needed to determine the header/body boundary correctly. - @buf = @buf ? @buf << data : data - - if hindex = @buf.index(HTTP_HEADERS_END) # all http headers received, figure out where to route to.. - @parser = Http::Parser.new(self) - - # split headers and rest of body out here. - @headers = @buf.slice!(0...hindex+HTTP_HEADERS_END_SIZE) - - # Process headers - @parser << @headers - - # Process left over body fragment if any - process_request_body_chunk(@buf) if @parser - @buf = @headers = nil + if close_connection_after_request + close_connection_after_writing end - rescue Http::Parser::Error => e - Router.log.debug "HTTP Parser error on request: #{e}" - close_connection + rescue JSON::ParserError + send_data(HTTP_400_RESPONSE) + close_connection_after_writing end def unbind @@ -206,7 +167,6 @@ def unbind VCAP::Component.varz[:client_connections] = Router.client_connection_count -= 1 Router.log.debug Router.connection_stats Router.log.debug "------------" - @close_connection_after_request ? terminate_app_conn : recycle_app_conn end end diff --git a/lib/router/const.rb b/lib/router/const.rb index 6f2c4d9..2a17121 100644 --- a/lib/router/const.rb +++ b/lib/router/const.rb @@ -2,23 +2,23 @@ # HTTP Header processing HOST_HEADER = 'Host'.freeze CONNECTION_HEADER = 'Connection'.freeze -REAL_IP_HEADER = 'X-Real_IP'.freeze -HTTP_HEADERS_END = "\r\n\r\n".freeze -HTTP_HEADERS_END_SIZE = HTTP_HEADERS_END.bytesize KEEP_ALIVE = 'keep-alive'.freeze -SET_COOKIE_HEADER = 'Set-Cookie'.freeze -COOKIE_HEADER = 'Cookie'.freeze -CR_LF = "\r\n".freeze -STICKY_SESSIONS = /(JSESSIONID)/i - -VCAP_SESSION_ID = '__VCAP_ID__'.freeze -VCAP_COOKIE = /__VCAP_ID__=([^;]+)/ +#STICKY_SESSIONS = /(JSESSIONID)/i VCAP_BACKEND_HEADER = 'X-Vcap-Backend' VCAP_ROUTER_HEADER = 'X-Vcap-Router' VCAP_TRACE_HEADER = 'X-Vcap-Trace' +ULS_HOST_QUERY = :"host" +ULS_STATS_UPDATE = :"stats" +ULS_REQUEST_TAGS = :"request_tags" +ULS_RESPONSE_STATUS = :"response_codes" +ULS_RESPONSE_SAMPLES = :"response_samples" +ULS_RESPONSE_LATENCY = :"response_latency" +ULS_BACKEND_ADDR = :"backend_addr" +ULS_ROUTER_IP = :"router_ip" + # Max Connections to Pool MAX_POOL = 32 @@ -29,7 +29,12 @@ CHECK_SWEEPER = 30 # Check time for watching health of registered droplet MAX_AGE_STALE = 120 # Max stale age, unregistered if older then 2 minutes -# 404 Response -ERROR_404_RESPONSE="HTTP/1.1 404 Not Found\r\nConnection: close\r\n\r\n" + - "VCAP ROUTER: 404 - DESTINATION NOT FOUND\r\n".freeze +# 200 Response +HTTP_200_RESPONSE = "HTTP/1.1 200 OK\r\n\r\n".freeze +# 400 Response +ERROR_400_RESPONSE = "HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n".freeze + +# 404 Response +ERROR_404_RESPONSE = "HTTP/1.1 404 Not Found\r\nConnection: close\r\n\r\n" + + "VCAP ROUTER: 404 - DESTINATION NOT FOUND\r\n".freeze diff --git a/lib/router/router.rb b/lib/router/router.rb index 847082d..2364068 100644 --- a/lib/router/router.rb +++ b/lib/router/router.rb @@ -4,9 +4,9 @@ class Router VERSION = 0.98 class << self - attr_reader :log, :notfound_redirect, :session_key, :trace_key, :client_inactivity_timeout + attr_reader :log, :notfound_redirect, :client_inactivity_timeout attr_accessor :server, :local_server, :timestamp, :shutting_down - attr_accessor :client_connection_count, :app_connection_count, :outstanding_request_count + attr_accessor :client_connection_count attr_accessor :inet, :port alias :shutting_down? :shutting_down @@ -17,7 +17,7 @@ def version def config(config) @droplets = {} - @client_connection_count = @app_connection_count = @outstanding_request_count = 0 + @client_connection_count = 0 VCAP::Logging.setup_from_config(config['logging'] || {}) @log = VCAP::Logging.logger('router') if config['404_redirect'] @@ -25,8 +25,6 @@ def config(config) log.info "Registered 404 redirect at #{config['404_redirect']}" end - @session_key = config['session_key'] || '14fbc303b76bacd1e0a3ab641c11d11400341c5d' - @trace_key = config['trace_key'] || '22' @client_inactivity_timeout = config['client_inactivity_timeout'] || 60 @expose_all_apps = config['status']['expose_all_apps'] if config['status'] end @@ -120,40 +118,16 @@ def check_registered_urls def connection_stats tc = EM.connection_count - ac = Router.app_connection_count cc = Router.client_connection_count - "Connections: [Clients: #{cc}, Apps: #{ac}, Total: #{tc}]" + "Connections: [uls Clients: #{cc}, Total: #{tc}]" end def log_connection_stats tc = EM.connection_count - ac = Router.app_connection_count cc = Router.client_connection_count log.info connection_stats end - def generate_session_cookie(droplet) - token = [ droplet[:url], droplet[:host], droplet[:port] ] - c = OpenSSL::Cipher::Cipher.new('blowfish') - c.encrypt - c.key = @session_key - e = c.update(Marshal.dump(token)) - e << c.final - [e].pack('m0').gsub("\n",'') - end - - def decrypt_session_cookie(key) - e = key.unpack('m*')[0] - d = OpenSSL::Cipher::Cipher.new('blowfish') - d.decrypt - d.key = @session_key - p = d.update(e) - p << d.final - Marshal.load(p) - rescue - nil - end - def lookup_droplet(url) @droplets[url] end diff --git a/lib/router/utils.rb b/lib/router/utils.rb index 03724c4..0b0b782 100644 --- a/lib/router/utils.rb +++ b/lib/router/utils.rb @@ -17,10 +17,11 @@ def stop(pidfile) Router.log.info 'waiting for pending requests to complete.' EM.stop_server(Router.server) if Router.server EM.stop_server(Router.local_server) if Router.local_server - if Router.outstanding_request_count <= 0 + + if Router.client_connection_count <= 0 exit_router(pidfile) else - EM.add_periodic_timer(0.25) { exit_router(pidfile) if (Router.outstanding_request_count <= 0) } + EM.add_periodic_timer(0.25) { exit_router(pidfile) if (Router.client_connection_count <= 0) } EM.add_timer(10) { exit_router(pidfile) } # Wait at most 10 secs end @@ -32,4 +33,3 @@ def exit_router(pidfile) FileUtils.rm_f(pidfile) exit end - diff --git a/spec/Rakefile b/spec/Rakefile index e4d921d..5d1f8a5 100644 --- a/spec/Rakefile +++ b/spec/Rakefile @@ -18,6 +18,11 @@ desc "Run specs using RCov" task "spec:rcov" => ["ci:setup:rspec", "spec:rcov_internal", "convert_rcov_to_clover"] RSpec::Core::RakeTask.new do |t| + t.pattern = "{functional,unit}/*_spec.rb" + t.rspec_opts = ["--format", "documentation", "--colour"] +end + +RSpec::Core::RakeTask.new("test") do |t| t.pattern = "**/*_spec.rb" t.rspec_opts = ["--format", "documentation", "--colour"] end diff --git a/spec/functional/router_spec.rb b/spec/functional/router_spec.rb index 6f5fc8d..673a819 100644 --- a/spec/functional/router_spec.rb +++ b/spec/functional/router_spec.rb @@ -1,7 +1,10 @@ # Copyright (c) 2009-2011 VMware, Inc. require File.dirname(__FILE__) + '/spec_helper' +require "base64" describe 'Router Functional Tests' do + include Functional + before :each do @nats_server = NatsServer.new @nats_server.start_server @@ -50,10 +53,7 @@ healthz_resp = Net::HTTP.new(host, port).start { |http| http.request(healthz_req) } healthz_resp.body.should =~ /ok/i - varz_req = Net::HTTP::Get.new("/varz") - varz_req.basic_auth *credentials - varz_resp = Net::HTTP.new(host, port).start { |http| http.request(varz_req) } - varz = JSON.parse(varz_resp.body, :symbolize_keys => true) + varz = get_varz() varz[:requests].should be_a_kind_of(Integer) varz[:bad_requests].should be_a_kind_of(Integer) varz[:client_connections].should be_a_kind_of(Integer) @@ -65,8 +65,7 @@ app = TestApp.new('router_test.vcap.me') dea = DummyDea.new(@nats_server.uri, '1234') dea.register_app(app) - app.verify_registered('127.0.0.1', RouterServer.port) - app.stop + app.verify_registered end it 'should properly unregister an application endpoint' do @@ -74,91 +73,9 @@ app = TestApp.new('router_test.vcap.me') dea = DummyDea.new(@nats_server.uri, '1234') dea.register_app(app) - app.verify_registered('127.0.0.1', RouterServer.port) + app.verify_registered dea.unregister_app(app) - # We should be unregistered here.. - # Send out simple request and check request and response - req = simple_http_request('router_test.cap.me', '/') - verify_vcap_404(req, '127.0.0.1', RouterServer.port) - app.stop - end - - it 'should properly distibute messages between multiple backends' do - num_apps = 10 - num_requests = 100 - dea = DummyDea.new(@nats_server.uri, '1234') - - apps = [] - for ii in (0...num_apps) - app = TestApp.new('lb_test.vcap.me') - dea.register_app(app) - apps << app - end - - req = simple_http_request('lb_test.vcap.me', '/') - for ii in (0...num_requests) - TCPSocket.open('127.0.0.1', RouterServer.port) {|rs| rs.send(req, 0) } - end - sleep(0.25) # Wait here for requests to trip accept state - - app_sockets = apps.collect { |a| a.socket } - ready = IO.select(app_sockets, nil, nil, 1) - ready[0].should have(num_apps).items - apps.each {|a| a.stop } - end - - it 'should properly do sticky sessions' do - num_apps = 10 - dea = DummyDea.new(@nats_server.uri, '1234') - - apps = [] - for ii in (0...num_apps) - app = TestApp.new('sticky.vcap.me') - dea.register_app(app) - apps << app - end - - vcap_id = app_socket = nil - app_sockets = apps.collect { |a| a.socket } - - TCPSocket.open('127.0.0.1', RouterServer.port) do |rs| - rs.send(STICKY_REQUEST, 0) - ready = IO.select(app_sockets, nil, nil, 1) - ready[0].should have(1).items - app_socket = ready[0].first - ss = app_socket.accept_nonblock - req_received = ss.recv(STICKY_REQUEST.bytesize) - req_received.should == STICKY_REQUEST - # Send a response back.. This will set the sticky session - ss.send(STICKY_RESPONSE, 0) - response = rs.read(STICKY_RESPONSE.bytesize) - # Make sure the __VCAP_ID__ has been set - response =~ /Set-Cookie:\s*__VCAP_ID__=([^;]+);/ - (vcap_id = $1).should be - end - - cookie = "__VCAP_ID__=#{vcap_id}" - sticky_request = simple_sticky_request('sticky.vcap.me', '/sticky', cookie) - - # Now fire off requests, all should go to same socket as first - (0...5).each do - TCPSocket.open('127.0.0.1', RouterServer.port) do |rs| - rs.send(sticky_request, 0) - end - end - - ready = IO.select(app_sockets, nil, nil, 1) - ready[0].should have(1).items - app_socket.should == ready[0].first - - for app in apps - dea.unregister_app(app) - end - - # Check that is is gone - verify_vcap_404(STICKY_REQUEST, '127.0.0.1', RouterServer.port) - - apps.each {|a| a.stop } + app.verify_unregistered end it 'should properly exit when NATS fails to reconnect' do @@ -194,11 +111,20 @@ def json_request(uri, subj, data=nil, timeout=1) reply end - def verify_vcap_404(req, router_host, router_port) - TCPSocket.open(router_host, router_port) do |rs| - rs.send(req, 0) - response = rs.read(VCAP_NOT_FOUND.bytesize) - response.should == VCAP_NOT_FOUND - end + def get_varz + reply = json_request(@nats_server.uri, 'vcap.component.discover') + reply.should_not be_nil + + credentials = reply[:credentials] + credentials.should_not be_nil + + host, port = reply[:host].split(":") + + varz_req = Net::HTTP::Get.new("/varz") + varz_req.basic_auth *credentials + varz_resp = Net::HTTP.new(host, port).start { |http| http.request(varz_req) } + varz = JSON.parse(varz_resp.body, :symbolize_keys => true) + varz end + end diff --git a/spec/functional/spec_helper.rb b/spec/functional/spec_helper.rb index 4ffbfe2..ed3d4b7 100644 --- a/spec/functional/spec_helper.rb +++ b/spec/functional/spec_helper.rb @@ -1,229 +1,118 @@ # Copyright (c) 2009-2011 VMware, Inc. -require File.dirname(__FILE__) + '/../spec_helper' +require File.dirname(__FILE__) + '/../lib/spec_helper' -require 'fileutils' -require 'nats/client' -require 'yajl/json_gem' -require 'vcap/common' -require 'openssl' -require 'net/http' -require 'uri' +module Functional -require 'pp' + class TestApp + class UsageError < StandardError; end -class NatsServer + attr_reader :uris, :droplet - TEST_PORT = 4228 - - def initialize(uri="nats://localhost:#{TEST_PORT}", pid_file='/tmp/nats-router-tests.pid') - @uri = URI.parse(uri) - @pid_file = pid_file - end - - def uri - @uri.to_s - end - - def server_pid - @pid ||= File.read(@pid_file).chomp.to_i - end - - def start_server - return if NATS.server_running? @uri - %x[ruby -S bundle exec nats-server -p #{@uri.port} -P #{@pid_file} -d 2> /dev/null] - NATS.wait_for_server(@uri) # New version takes opt_timeout - end - - def is_running? - NATS.server_running? @uri - end - - def kill_server - if File.exists? @pid_file - Process.kill('KILL', server_pid) - FileUtils.rm_f(@pid_file) + def initialize(*uris) + @uris = uris end - end -end - -class RouterServer - - PID_FILE = '/tmp/router-test.pid' - CONFIG_FILE = '/tmp/router-test.yml' - LOG_FILE = '/tmp/router-test.log' - PORT = 2228 - def initialize(nats_uri) - port = "port: #{PORT}" - mbus = "mbus: #{nats_uri}" - log_info = "log_level: DEBUG\nlog_file: #{LOG_FILE}" - @config = %Q{#{port}\ninet: 127.0.0.1\n#{mbus}\n#{log_info}\npid: #{PID_FILE}} - end - - def self.port - PORT - end - - def server_pid - File.read(PID_FILE).chomp.to_i - end - - def start_server - return if is_running? - - # Write the config - File.open(CONFIG_FILE, 'w') { |f| f.puts "#{@config}" } - - # Wipe old log file, but truncate so running tail works - if (File.exists? LOG_FILE) - File.truncate(LOG_FILE, 0) - # %x[rm #{LOG_FILE}] if File.exists? LOG_FILE + def bind_droplet(droplet) + @droplet = droplet end - server = File.expand_path(File.join(__FILE__, '../../../bin/router')) - nats_timeout = File.expand_path(File.join(File.dirname(__FILE__), 'nats_timeout')) - # pid = Process.fork { %x[#{server} -c #{CONFIG_FILE} 2> /dev/null] } - pid = Process.fork { %x[ruby -r#{nats_timeout} #{server} -c #{CONFIG_FILE} 2> /dev/null] } - Process.detach(pid) - - wait_for_server - end - - def is_running? - require 'socket' - s = TCPSocket.new('localhost', PORT) - s.close - return true - rescue - return false - end - - def wait_for_server(max_wait = 5) - start = Time.now - while (Time.now - start < max_wait) # Wait max_wait seconds max - break if is_running? - sleep(0.2) + def unbind_droplet + @droplet = nil end - end - def kill_server - if File.exists? PID_FILE - %x[kill -9 #{server_pid} 2> /dev/null] - %x[rm #{PID_FILE}] + def port + @droplet.port if @droplet end - %x[rm #{CONFIG_FILE}] if File.exists? CONFIG_FILE - end -end - - -# HTTP REQUESTS / RESPONSES - -FOO_HTTP_RESPONSE = "HTTP/1.1 200 OK\r\nContent-Type: text/html;charset=utf-8\r\nContent-Length: 53\r\nConnection: keep-alive\r\nServer: thin 1.2.7 codename No Hup\r\n\r\n

Hello from the Clouds!

" - -VCAP_NOT_FOUND = "HTTP/1.1 404 Not Found\r\nConnection: close\r\n\r\nVCAP ROUTER: 404 - DESTINATION NOT FOUND\r\n" - -STICKY_REQUEST = "GET /sticky HTTP/1.1\r\nHost: sticky.vcap.me\r\nConnection: keep-alive\r\nAccept: application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5\r\nUser-Agent: Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.10 (KHTML, like Gecko) Chrome/8.0.552.237 Safari/534.10\r\nAccept-Encoding: gzip,deflate,sdch\r\nAccept-Language: en-US,en;q=0.8\r\nAccept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.3\r\n\r\n" -STICKY_RESPONSE = "HTTP/1.1 200 OK\r\nContent-Type: text/html;charset=utf-8\r\nContent-Length: 242\r\nSet-Cookie: _session_id=be009e56c7be0e855d951a3b49e288c98aa36ede; path=/\r\nSet-Cookie: JSESSIONID=be009e56c7be0e855d951a3b49e288c98aa36ede; path=/\r\nConnection: keep-alive\r\nServer: thin 1.2.7 codename No Hup\r\n\r\n

Hello from the Cookie Monster! via: 10.0.1.222:35267

session = be009e56c7be0e855d951a3b49e288c98aa36ede

Cookies set: _session_id, JSESSIONID

Note: Trigger new sticky session cookie name via ?ss=NAME appended to URL

" - - -def simple_http_request(host, path, http_version='1.1') - "GET #{path} HTTP/#{http_version}\r\nUser-Agent: curl/7.19.7 (i486-pc-linux-gnu) libcurl/7.19.7 OpenSSL/0.9.8k zlib/1.2.3.3 libidn/1.15\r\nHost: #{host}\r\nAccept: */*\r\n\r\n" -end - -def simple_sticky_request(host, path, cookie, http_version='1.1') - "GET #{path} HTTP/#{http_version}\r\nHost: #{host}\r\nConnection: keep-alive\r\nAccept: application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5\r\nUser-Agent: Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.10 (KHTML, like Gecko) Chrome/8.0.552.237 Safari/534.10\r\nAccept-Encoding: gzip,deflate,sdch\r\nAccept-Language: en-US,en;q=0.8\r\nAccept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.3\r\nCookie: #{cookie}\r\n\r\n" -end + def verify_registered + for uri in @uris + droplet.host_port.should == query_uls(uri) + end + end -def new_app_socket - app_socket = TCPServer.new('127.0.0.1', 0) - app_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true) - Socket.do_not_reverse_lookup = true - app_port = app_socket.addr[1] - [app_socket, app_port] -end + def verify_unregistered + for uri in @uris + query_should_fail(uri) + end + end + private -class TestApp - class UsageError < StandardError; end + def query_should_fail(uri) + req = simple_uls_request(uri) + res = nil + UNIXSocket.open(RouterServer.sock) do |socket| + socket.send(req, 0) + buf = socket.read + buf.should == ERROR_404_RESPONSE + end + end - attr_reader :host, :port, :uris, :socket + def query_uls(uri) + req = simple_uls_request(uri) + res = nil + UNIXSocket.open(RouterServer.sock) do |socket| + socket.send(req, 0) + buf = socket.read + body = buf.split("\r\n\r\n")[1] + res = Yajl::Parser.parse(body)["backend_addr"] + socket.close + end + res + end - def initialize(*uris) - @uris = uris - @port = nil - @socket = nil - start - end + def simple_uls_request(host) + body = { :host => host }.to_json + "GET / HTTP/1.0\r\nConnection: Keep-alive\r\nHost: localhost\r\nContent-Length: #{body.length}\r\nContent-Type: application/json\r\nX-Vcap-Service-Token: changemysqltoken\r\nUser-Agent: EventMachine HttpClient\r\n\r\n#{body}" + end - def start - raise UsageError, "Already started" if @socket - sock, port = new_app_socket - @socket = sock - @port = port end - def stop - raise UsageError, "Already stopped" if !@socket - @socket.close - @socket = nil - @port = nil - end + class Droplet + attr_reader :host, :port - # Simple check that the app can be queried via the router - def verify_registered(router_host, router_port) - for uri in @uris - verify_path_registered(uri, '/', router_host, router_port) + def initialize(host) + @host = host + @port = Random.rand(100_000) end - end - private - - def verify_path_registered(host, path, router_host, router_port) - req = simple_http_request(host, path) - # Send out simple request and check request and response - TCPSocket.open(router_host, router_port) do |rs| - rs.send(req, 0) - IO.select([@socket], nil, nil, 2) # 2 secs timeout - ss = @socket.accept_nonblock - req_received = ss.recv(req.bytesize) - req_received.should == req - # Send a response back.. - ss.send(FOO_HTTP_RESPONSE, 0) - response = rs.read(FOO_HTTP_RESPONSE.bytesize) - response.should == FOO_HTTP_RESPONSE - ss.close + def host_port + "#{host}:#{port}" end end -end -class DummyDea + class DummyDea - attr_reader :nats_uri, :dea_id + attr_reader :nats_uri, :dea_id - def initialize(nats_uri, dea_id, host='127.0.0.1') - @nats_uri = nats_uri - @dea_id = dea_id - @host = host - end + def initialize(nats_uri, dea_id, host='127.0.0.1') + @nats_uri = nats_uri + @dea_id = dea_id + @host = host + end - def reg_hash_for_app(app) - { :dea => @dea_id, - :host => @host, - :port => app.port, - :uris => app.uris - } - end + def reg_hash_for_app(app, tags = {}) + { :dea => @dea_id, + :host => @host, + :port => app.port, + :uris => app.uris, + :tags => tags + } + end - def register_app(app) - NATS.start(:uri => @nats_uri) do - NATS.publish('router.register', reg_hash_for_app(app).to_json) { NATS.stop } + def register_app(app, tags = {}) + droplet = Droplet.new(@host) + app.bind_droplet(droplet) + NATS.start(:uri => @nats_uri) do + NATS.publish('router.register', reg_hash_for_app(app, tags).to_json) { NATS.stop } + end end - end - def unregister_app(app) - NATS.start(:uri => @nats_uri) do - NATS.publish('router.unregister', reg_hash_for_app(app).to_json) { NATS.stop } + def unregister_app(app) + NATS.start(:uri => @nats_uri) do + NATS.publish('router.unregister', reg_hash_for_app(app).to_json) { NATS.stop } + end end end + end diff --git a/spec/integration/router_spec.rb b/spec/integration/router_spec.rb new file mode 100644 index 0000000..4caa454 --- /dev/null +++ b/spec/integration/router_spec.rb @@ -0,0 +1,241 @@ +# Copyright (c) 2009-2011 VMware, Inc. +require File.dirname(__FILE__) + '/spec_helper' +require "base64" + +describe 'Router Integration Tests (require nginx running)' do + include Integration + + before :each do + @nats_server = NatsServer.new + @nats_server.start_server + @nats_server.is_running?.should be_true + + @router = RouterServer.new(@nats_server.uri) + # The router will only announce itself after it has subscribed to 'vcap.component.discover'. + NATS.start(:uri => @nats_server.uri) do + NATS.subscribe('vcap.component.announce') { NATS.stop } + # Ensure that NATS has processed our subscribe from above before we start the router + NATS.publish('xxx') { @router.start_server } + EM.add_timer(5) { NATS.stop } + end + @router.is_running?.should be_true + end + + after :each do + @router.kill_server + @router.is_running?.should be_false + + @nats_server.kill_server + @nats_server.is_running?.should be_false + end + + it 'should get health status via nginx' do + body = get_healthz() + body.should =~ /ok/i + end + + it 'should properly register an application endpoint' do + # setup the "app" + app = TestApp.new('router_test.vcap.me') + dea = DummyDea.new(@nats_server.uri, '1234') + dea.register_app(app) + app.verify_registered('127.0.0.1', RouterServer.port) + app.stop + end + + it 'should properly unregister an application endpoint' do + # setup the "app" + app = TestApp.new('router_test.vcap.me') + dea = DummyDea.new(@nats_server.uri, '1234') + dea.register_app(app) + app.verify_registered('127.0.0.1', RouterServer.port) + dea.unregister_app(app) + # We should be unregistered here.. + # Send out simple request and check request and response + req = simple_http_request('router_test.cap.me', '/') + verify_vcap_404(req, '127.0.0.1', RouterServer.port) + app.stop + end + + it 'should properly distibute messages between multiple backends' do + num_apps = 10 + num_requests = 100 + dea = DummyDea.new(@nats_server.uri, '1234') + + apps = [] + for ii in (0...num_apps) + app = TestApp.new('lb_test.vcap.me') + dea.register_app(app) + apps << app + end + + req = simple_http_request('lb_test.vcap.me', '/') + app_sockets = apps.collect { |a| a.socket } + + results = send_requests_to_apps("127.0.0.1", RouterServer.port, + req, num_requests, app_sockets, + FOO_HTTP_RESPONSE) + results.should have(num_apps).items + recv_requests = 0 + results.each { |entry| + recv_requests += entry[:counter] + } + recv_requests.should == num_requests + apps.each {|a| a.stop } + + end + + it 'should get correct statistics' do + num_apps = 10 + num_requests = 1000 + dea = DummyDea.new(@nats_server.uri, '1234') + + apps = [] + for ii in (0...num_apps) + app = TestApp.new('lb_test.vcap.me') + dea.register_app(app, {"component" => "test#{ii}", "runtime" => "ruby"}) + apps << app + end + + # Before we count the statistics, we should restart nginx worker + # to cleanup unsynced stats. + # TODO: It's hard to tell which nginx process belongs to us since it was + # started by vcap_dev_setup, we may figure out an elegant way to do this + # and get notification when it's ready instead of sleep 2 seconds. + %x[ps -ef|grep "nginx: master process"|grep -v grep|awk '{print $2}'|xargs sudo kill -HUP 2> /dev/null] + sleep(2) + + req = simple_http_request('lb_test.vcap.me', '/') + app_sockets = apps.collect { |a| a.socket } + + results = send_requests_to_apps("127.0.0.1", RouterServer.port, + req, num_requests, app_sockets, + FOO_HTTP_RESPONSE) + # Verify all apps get request and the totally number is correct + results.should have(num_apps).items + recv_requests = 0 + results.each { |entry| + recv_requests += entry[:counter] + } + recv_requests.should == num_requests + for app in apps + dea.unregister_app(app) + end + + apps.each {|a| a.stop } + + varz = get_varz() + varz[:requests].should be_a_kind_of(Integer) + varz[:client_connections].should be_a_kind_of(Integer) + varz[:type].should =~ /router/i + + # Requests are collected exactly the same number as we received + # since each of them triggers a location query to uls + varz[:requests].should == num_requests + + # send_requests_to_apps is sequentially sending out num_requests requests, + # so each response of a outstanding request updates its previous one + # and we are sure the status of the last request is still in nginx + varz[:responses_2xx].should == (num_requests - 1) + + comp_reqs = comp_resps_2xx = 0 + # Verify the statistics for each type of request tags + for ii in (0...num_apps) + comp_reqs += varz[:tags][:component]["test#{ii}".to_sym][:requests] + comp_resps_2xx += varz[:tags][:component]["test#{ii}".to_sym][:responses_2xx] + end + comp_reqs.should == num_requests + comp_resps_2xx.should == num_requests - 1 + varz[:tags][:runtime][:ruby][:requests].should == num_requests + varz[:tags][:runtime][:ruby][:responses_2xx].should == num_requests - 1 + + # Send an monitor request to nginx to syncup the left stats + body = get_healthz() + body.should =~ /ok/i + + varz = get_varz() + comp_resps_2xx = 0 + # Verify the statistics for each type of request tags + for ii in (0...num_apps) + comp_resps_2xx += varz[:tags][:component]["test#{ii}".to_sym][:responses_2xx] + end + comp_resps_2xx.should == num_requests + varz[:tags][:runtime][:ruby][:responses_2xx].should == num_requests + end + + it 'should properly do sticky sessions' do + num_apps = 10 + num_requests = 100 + dea = DummyDea.new(@nats_server.uri, '1234') + + apps = [] + for ii in (0...num_apps) + app = TestApp.new('sticky.vcap.me') + dea.register_app(app) + apps << app + end + + vcap_id = app_socket = nil + app_sockets = apps.collect { |a| a.socket } + + TCPSocket.open('127.0.0.1', RouterServer.port) do |rs| + rs.send(STICKY_REQUEST, 0) + ready = IO.select(app_sockets, nil, nil, 1) + ready[0].should have(1).items + app_socket = ready[0].first + ss = app_socket.accept_nonblock + + smsg, sbody = parse_http_msg_from_buf(STICKY_REQUEST) + rmsg, rbody = parse_http_msg_from_socket(ss) + validate_recv_msg_against_send(smsg, sbody, rmsg, rbody).should == true + + ss.send(STICKY_RESPONSE, 0) + smsg, sbody = parse_http_msg_from_buf(STICKY_RESPONSE) + rmsg, rbody = parse_http_msg_from_socket(rs) + validate_recv_msg_against_send(smsg, sbody, rmsg, rbody).should == true + rmsg.headers["Set-Cookie"] =~ /\s*__VCAP_ID__=([^,;]+)/ + (vcap_id = $1).should be + end + + cookie = "__VCAP_ID__=#{vcap_id}" + sticky_request = simple_sticky_request('sticky.vcap.me', '/sticky', cookie) + + results = send_requests_to_apps("127.0.0.1", RouterServer.port, + sticky_request, num_requests, app_sockets, + FOO_HTTP_RESPONSE) + # Now fire off requests, all should go to same socket as first + results.should have(1).items + results[0][:app_socket].should == app_socket + recv_requests = 0 + results.each { |entry| + recv_requests += entry[:counter] + } + recv_requests.should == num_requests + + for app in apps + dea.unregister_app(app) + end + + # Check that it is gone + verify_vcap_404(STICKY_REQUEST, '127.0.0.1', RouterServer.port) + + apps.each {|a| a.stop } + end + + it 'should add vcap trace headers' do + app = TestApp.new('trace.vcap.me') + dea = DummyDea.new(@nats_server.uri, '1234') + dea.register_app(app, {"component" => "trace", "runtime" => "ruby"}) + + resp = app.get_trace_header("127.0.0.1", RouterServer.port) + resp.headers["X-Vcap-Backend"].should_not be_nil + h, p = resp.headers["X-Vcap-Backend"].split(":") + p.to_i.should == app.port.to_i + + dea.unregister_app(app) + + app.stop + end + + +end diff --git a/spec/integration/spec_helper.rb b/spec/integration/spec_helper.rb new file mode 100644 index 0000000..74497cc --- /dev/null +++ b/spec/integration/spec_helper.rb @@ -0,0 +1,302 @@ +# Copyright (c) 2009-2011 VMware, Inc. +require File.dirname(__FILE__) + '/../lib/spec_helper' + +# HTTP REQUESTS / RESPONSES +FOO_HTTP_RESPONSE = "HTTP/1.1 200 OK\r\nContent-Type: text/html;charset=utf-8\r\nContent-Length: 31\r\nConnection: keep-alive\r\nServer: thin 1.2.7 codename No Hup\r\n\r\n

Hello from the Clouds!

" + +VCAP_NOT_FOUND = "HTTP/1.1 404 Not Found\r\nConnection: close\r\n\r\nVCAP ROUTER: 404 - DESTINATION NOT FOUND\r\n" + +STICKY_REQUEST = "GET /sticky HTTP/1.1\r\nHost: sticky.vcap.me\r\nConnection: keep-alive\r\nAccept: application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5\r\nUser-Agent: Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.10 (KHTML, like Gecko) Chrome/8.0.552.237 Safari/534.10\r\nAccept-Encoding: gzip,deflate,sdch\r\nAccept-Language: en-US,en;q=0.8\r\nAccept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.3\r\n\r\n" + +STICKY_RESPONSE = "HTTP/1.1 200 OK\r\nContent-Type: text/html;charset=utf-8\r\nContent-Length: 242\r\nSet-Cookie: _session_id=be009e56c7be0e855d951a3b49e288c98aa36ede; path=/\r\nSet-Cookie: JSESSIONID=be009e56c7be0e855d951a3b49e288c98aa36ede; path=/\r\nConnection: keep-alive\r\nServer: thin 1.2.7 codename No Hup\r\n\r\n

Hello from the Cookie Monster! via: 10.0.1.222:35267

session = be009e56c7be0e855d951a3b49e288c98aa36ede

Cookies set: _session_id, JSESSIONID

Note: Trigger new sticky session cookie name via ?ss=NAME appended to URL

" + +def simple_http_request(host, path, http_version='1.1') + "GET #{path} HTTP/#{http_version}\r\nUser-Agent: curl/7.19.7 (i486-pc-linux-gnu) libcurl/7.19.7 OpenSSL/0.9.8k zlib/1.2.3.3 libidn/1.15\r\nHost: #{host}\r\nAccept: */*\r\nContent-Length: 11\r\n\r\nhello world" +end + +def parse_http_msg_from_socket(socket) + parser = Http::Parser.new + complete = false + body = '' + + parser.on_body = proc do |chunk| + body << chunk + end + + parser.on_message_complete = proc do + complete = true + :stop + end + + while not complete + raw_data = socket.recv(1024) + parser << raw_data + end + + return parser, body +end + +def parse_http_msg_from_buf(buf) + parser = Http::Parser.new + body = '' + + parser.on_body = proc do |chunk| + body << chunk + end + parser.on_message_complete = proc do + :stop + end + + parser << buf + + return parser, body +end + +def validate_recv_msg_against_send(send_msg, send_body, recv_msg, recv_body) + recv_body.should == send_body + + recv_msg.http_method.should == send_msg.http_method + recv_msg.request_url.should == send_msg.request_url + recv_msg.status_code.should == send_msg.status_code + + # Verify most of the headers are preserved when traversing the "router" + send_msg.headers.each do |hdr, val| + # Skip the headers nginx will rewrite + if (hdr == "Server" or hdr == "Date" or hdr == "Connection") then next end + + if hdr == "Set-Cookie" + # Http Parser concatenates all Set-Cookie headers together + val.split(',').each do |cookie| + (recv_msg.headers["Set-Cookie"].include? cookie).should == true + end + else + val.should == recv_msg.headers[hdr] + end + end + + return true +end + + +# Encodes _data_ as json, decodes reply as json +def json_request(uri, subj, data=nil, timeout=1) + reply = nil + data_enc = data ? Yajl::Encoder.encode(data) : nil + NATS.start(:uri => uri) do + NATS.request(subj, data_enc) do |msg| + reply = JSON.parse(msg, :symbolize_keys => true) + NATS.stop + end + EM.add_timer(timeout) { NATS.stop } + end + + reply +end + +def verify_vcap_404(req, router_host, router_port) + TCPSocket.open(router_host, router_port) do |rs| + rs.send(req, 0) + rmsg, rbody = parse_http_msg_from_socket(rs) + rmsg.status_code.should == 404 + end +end + +def get_varz + reply = json_request(@nats_server.uri, 'vcap.component.discover') + reply.should_not be_nil + + credentials = reply[:credentials] + credentials.should_not be_nil + + host, port = reply[:host].split(":") + + varz_req = Net::HTTP::Get.new("/varz") + varz_req.basic_auth *credentials + varz_resp = Net::HTTP.new(host, port).start { |http| http.request(varz_req) } + varz = JSON.parse(varz_resp.body, :symbolize_keys => true) + varz +end + +def get_healthz + reply = json_request(@nats_server.uri, 'vcap.component.discover') + reply.should_not be_nil + + credentials = reply[:credentials] + credentials.should_not be_nil + + rbody = nil + TCPSocket.open("127.0.0.1", RouterServer.port) {|rs| + rs.send(healthz_request(Base64.encode64(credentials*':').strip), 0) + + resp, rbody = parse_http_msg_from_socket(rs) + resp.status_code.should == 200 + } + rbody +end + +def simple_sticky_request(host, path, cookie, http_version='1.1') + "GET #{path} HTTP/#{http_version}\r\nHost: #{host}\r\nConnection: keep-alive\r\nAccept: application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5\r\nUser-Agent: Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.10 (KHTML, like Gecko) Chrome/8.0.552.237 Safari/534.10\r\nAccept-Encoding: gzip,deflate,sdch\r\nAccept-Language: en-US,en;q=0.8\r\nAccept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.3\r\nCookie: #{cookie}\r\n\r\n" +end + +def healthz_request(auth) + "GET / HTTP/1.0\r\nAuthorization: Basic #{auth}\r\nUser-Agent: HTTP-Monitor/1.1\r\n\r\n" +end + +def trace_request(trace_key) + "GET /trace HTTP/1.1\r\nHost: trace.vcap.me\r\nConnection: keep-alive\r\nX-Vcap-Trace: #{trace_key}\r\nAccept: application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5\r\nUser-Agent: Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.10 (KHTML, like Gecko) Chrome/8.0.552.237 Safari/534.10\r\nAccept-Encoding: gzip,deflate,sdch\r\nAccept-Language: en-US,en;q=0.8\r\nAccept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.3\r\n\r\n" +end + +def new_app_socket + app_socket = TCPServer.new('127.0.0.1', 0) + app_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true) + Socket.do_not_reverse_lookup = true + app_port = app_socket.addr[1] + [app_socket, app_port] +end + + +def send_requests_to_apps(ip, port, req, num_requests, app_sockets, resp) + results = [] + for i in (0...num_requests) + TCPSocket.open(ip, port) {|rs| + rs.send(req, 0) + ready = IO.select(app_sockets, nil, nil, 1) + ready[0].should have(1).items + app_socket = ready[0].first + ss = app_socket.accept_nonblock + # Drain the socket + parse_http_msg_from_socket(ss) + # Send a response back to client to emulate a full req/resp cycle + # to avoid nginx 499 error + ss.send(resp, 0) + + found = false + results.each { |entry| + if (entry[:app_socket] == app_socket) + entry[:counter] += 1 + found = true + break + end + } + if not found + entry = { + :app_socket => app_socket, + :counter => 1 + } + results << entry + end + } + end + results +end + +module Integration + + class TestApp + class UsageError < StandardError; end + + attr_reader :host, :port, :uris, :socket + + def initialize(*uris) + @uris = uris + @port = nil + @socket = nil + start + end + + def start + raise UsageError, "Already started" if @socket + sock, port = new_app_socket + @socket = sock + @port = port + end + + def stop + raise UsageError, "Already stopped" if !@socket + @socket.close + @socket = nil + @port = nil + end + + # Simple check that the app can be queried via the router + def verify_registered(router_host, router_port) + for uri in @uris + verify_path_registered(uri, '/', router_host, router_port) + end + end + + def get_trace_header(router_host, router_port) + req = trace_request("222") # Make sure consistent with deployment config + # Send out simple request and check request and response + TCPSocket.open(router_host, router_port) do |rs| + rs.send(req, 0) + IO.select([@socket], nil, nil, 2) # 2 secs timeout + ss = @socket.accept_nonblock + + # Send a response back.. + ss.send(FOO_HTTP_RESPONSE, 0) + rmsg, rbody = parse_http_msg_from_socket(rs) + ss.close + return rmsg + end + end + + private + + def verify_path_registered(host, path, router_host, router_port) + req = simple_http_request(host, path) + # Send out simple request and check request and response + TCPSocket.open(router_host, router_port) do |rs| + rs.send(req, 0) + IO.select([@socket], nil, nil, 2) # 2 secs timeout + ss = @socket.accept_nonblock + + smsg, sbody = parse_http_msg_from_buf(req) + rmsg, rbody = parse_http_msg_from_socket(ss) + validate_recv_msg_against_send(smsg, sbody, rmsg, rbody).should == true + + # Send a response back.. + ss.send(FOO_HTTP_RESPONSE, 0) + smsg, sbody = parse_http_msg_from_buf(FOO_HTTP_RESPONSE) + rmsg, rbody = parse_http_msg_from_socket(rs) + validate_recv_msg_against_send(smsg, sbody, rmsg, rbody).should == true + + ss.close + end + end + + + end + + class DummyDea + + attr_reader :nats_uri, :dea_id + + def initialize(nats_uri, dea_id, host='127.0.0.1') + @nats_uri = nats_uri + @dea_id = dea_id + @host = host + end + + def reg_hash_for_app(app, tags = {}) + { :dea => @dea_id, + :host => @host, + :port => app.port, + :uris => app.uris, + :tags => tags + } + end + + def register_app(app, tags = {}) + NATS.start(:uri => @nats_uri) do + NATS.publish('router.register', reg_hash_for_app(app, tags).to_json) { NATS.stop } + end + end + + def unregister_app(app) + NATS.start(:uri => @nats_uri) do + NATS.publish('router.unregister', reg_hash_for_app(app).to_json) { NATS.stop } + end + end + end +end diff --git a/spec/functional/nats_timeout.rb b/spec/lib/nats_timeout.rb similarity index 100% rename from spec/functional/nats_timeout.rb rename to spec/lib/nats_timeout.rb diff --git a/spec/lib/spec_helper.rb b/spec/lib/spec_helper.rb new file mode 100644 index 0000000..1413228 --- /dev/null +++ b/spec/lib/spec_helper.rb @@ -0,0 +1,130 @@ +# Copyright (c) 2009-2011 VMware, Inc. +require File.dirname(__FILE__) + '/../spec_helper' + +require 'fileutils' +require 'nats/client' +require 'yajl/json_gem' +require 'vcap/common' +require 'openssl' +require 'net/http' +require 'uri' +require "http/parser" +require "router/const" + +require 'pp' + +class NatsServer + + TEST_PORT = 4228 + + def initialize(uri="nats://localhost:#{TEST_PORT}", pid_file='/tmp/nats-router-tests.pid') + @uri = URI.parse(uri) + @pid_file = pid_file + end + + def uri + @uri.to_s + end + + def server_pid + @pid ||= File.read(@pid_file).chomp.to_i + end + + def start_server + return if NATS.server_running? @uri + %x[ruby -S bundle exec nats-server -p #{@uri.port} -P #{@pid_file} -d 2> /dev/null] + NATS.wait_for_server(@uri) # New version takes opt_timeout + end + + def is_running? + NATS.server_running? @uri + end + + def kill_server + if File.exists? @pid_file + Process.kill('KILL', server_pid) + FileUtils.rm_f(@pid_file) + end + sleep(0.5) + end +end + +class RouterServer + + PID_FILE = '/tmp/router-test.pid' + CONFIG_FILE = '/tmp/router-test.yml' + LOG_FILE = '/tmp/router-test.log' + UNIX_SOCK = '/tmp/router.sock' # unix socket between nginx and uls + PORT = 80 # nginx listening port + STATUS_PORT = 8081 # must be consistent with nginx config + STATUS_USER = "admin" + STATUS_PASSWD = "password" + + # We verify functionalities for the whole "router" (i.e. nginx + uls). + # In all tests, when a client like to send a request to an test app, + # it has to send to the port which nginx is listening. + def initialize(nats_uri) + mbus = "mbus: #{nats_uri}" + log_info = "logging:\n level: debug\n file: #{LOG_FILE}" + @config = %Q{sock: #{UNIX_SOCK}\n#{mbus}\n#{log_info}\npid: #{PID_FILE}\nlocal_route: 127.0.0.1\nstatus:\n port: #{STATUS_PORT}\n user: #{STATUS_USER}\n password: #{STATUS_PASSWD}} + end + + def self.port + PORT + end + + def self.sock + UNIX_SOCK + end + + def server_pid + File.read(PID_FILE).chomp.to_i + end + + def start_server + return if is_running? + + # Write the config + File.open(CONFIG_FILE, 'w') { |f| f.puts "#{@config}" } + + # Wipe old log file, but truncate so running tail works + if (File.exists? LOG_FILE) + File.truncate(LOG_FILE, 0) + # %x[rm #{LOG_FILE}] if File.exists? LOG_FILE + end + + server = File.expand_path(File.join(__FILE__, '../../../bin/router')) + nats_timeout = File.expand_path(File.join(File.dirname(__FILE__), 'nats_timeout')) + #pid = Process.fork { %x[#{server} -c #{CONFIG_FILE} 2> /dev/null] } + pid = Process.fork { %x[ruby -r#{nats_timeout} #{server} -c #{CONFIG_FILE} 2> /dev/null] } + Process.detach(pid) + + wait_for_server + end + + def is_running? + require 'socket' + s = UNIXSocket.new(UNIX_SOCK) + s.close + return true + rescue + return false + end + + def wait_for_server(max_wait = 5) + start = Time.now + while (Time.now - start < max_wait) # Wait max_wait seconds max + break if is_running? + sleep(0.2) + end + end + + def kill_server + if File.exists? PID_FILE + %x[kill -9 #{server_pid} 2> /dev/null] + %x[rm #{PID_FILE}] + end + %x[rm #{CONFIG_FILE}] if File.exists? CONFIG_FILE + sleep(0.2) + end +end diff --git a/spec/unit/router_spec.rb b/spec/unit/router_spec.rb index 9b686ac..888b752 100644 --- a/spec/unit/router_spec.rb +++ b/spec/unit/router_spec.rb @@ -23,19 +23,6 @@ def setup before :each do clear_router end - - it 'should set up a session key' do - Router.session_key.should be - end - - it 'should set a default client inactivity timeout' do - Router.client_inactivity_timeout.should be - end - - it 'should respect a client_inactivity_timeout key when supplied' do - Router.config('client_inactivity_timeout' => 30) - Router.client_inactivity_timeout.should == 30 - end end describe 'Router.register_droplet' do @@ -139,27 +126,6 @@ def setup end end - describe 'Router.session_keys' do - before :each do - clear_router - end - - it 'should properly encrypt and decrypt session keys' do - Router.register_droplet('foo.vcap.me', '10.0.1.22', 2222, {}) - droplets = Router.lookup_droplet('foo.vcap.me') - droplets.should have(1).items - droplet = droplets.first - key = Router.generate_session_cookie(droplet) - key.should be - droplet_array = Router.decrypt_session_cookie(key) - droplet_array.should be_instance_of Array - droplet_array.should have(3).items - droplet_array[0].should == droplet[:url] - droplet_array[1].should == droplet[:host] - droplet_array[2].should == droplet[:port] - end - end - def clear_router Router.config({}) Router.instance_variable_set(:@log, double(:black_hole).as_null_object)