Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Swapping out httpq for Redis

  • Loading branch information...
commit 4ab92d077e68c25e30704378a0b3121fca437d65 1 parent f0902bf
@brynary authored
View
10 bin/httpq
@@ -1,10 +0,0 @@
-#!/usr/bin/env ruby
-
-require "rubygems"
-require "rack"
-require "webrick"
-
-require File.expand_path(File.dirname(__FILE__) + "/../lib/testjour")
-require "testjour/http_queue"
-
-Testjour::HttpQueue.run_on(Rack::Handler::WEBrick)
View
115 lib/testjour/commands/run.rb
@@ -11,78 +11,75 @@
module Testjour
module Commands
-
+
class Run < Command
-
+
def execute
configuration.unshift_args(testjour_yml_args)
configuration.parse!
-
- HttpQueue.with_queue_server do
- configuration.setup
-
- if configuration.feature_files.any?
- queue_features
-
- @started_slaves = 0
- start_slaves
-
- puts "Requested build from #{@started_slaves} slaves... (Waiting for #{step_counter.count} results)"
- puts
-
- print_results
- else
- Testjour.logger.info("No feature files. Quitting.")
- end
+ configuration.setup
+
+ if configuration.feature_files.any?
+ RedisQueue.reset_all
+ queue_features
+
+ @started_slaves = 0
+ start_slaves
+
+ puts "Requested build from #{@started_slaves} slaves... (Waiting for #{step_counter.count} results)"
+ puts
+
+ print_results
+ else
+ Testjour.logger.info("No feature files. Quitting.")
end
end
-
+
def queue_features
Testjour.logger.info("Queuing features...")
-
- HttpQueue.with_queue(queue_uri) do |queue|
- configuration.feature_files.each do |feature_file|
- queue.push(:feature_files, feature_file)
- Testjour.logger.info "Queued: #{feature_file}"
- end
+ queue = RedisQueue.new
+
+ configuration.feature_files.each do |feature_file|
+ queue.push(:feature_files, feature_file)
+ Testjour.logger.info "Queued: #{feature_file}"
end
end
-
+
def start_slaves
start_local_slaves
start_remote_slaves
end
-
+
def start_local_slaves
configuration.local_slave_count.times do
@started_slaves += 1
start_slave
end
end
-
+
def start_remote_slaves
configuration.remote_slaves.each do |remote_slave|
@started_slaves += 1
start_remote_slave(remote_slave)
end
end
-
+
def start_remote_slave(remote_slave)
uri = URI.parse(remote_slave)
cmd = remote_slave_run_command(uri.host, uri.path)
Testjour.logger.info "Starting remote slave: #{cmd}"
detached_exec(cmd)
end
-
+
def remote_slave_run_command(host, path)
"ssh #{host} testjour run:remote --in=#{path} #{configuration.run_slave_args.join(' ')} #{testjour_uri}".squeeze(" ")
end
-
+
def start_slave
Testjour.logger.info "Starting slave: #{local_run_command}"
detached_exec(local_run_command)
end
-
+
def testjour_yml_args
@testjour_yml_args ||= begin
if File.exist?("testjour.yml")
@@ -92,62 +89,52 @@ def testjour_yml_args
end
end
end
-
+
def print_results
results_formatter = ResultsFormatter.new(step_counter, configuration.options)
-
- HttpQueue.with_queue(queue_uri) do |queue|
- step_counter.count.times do
- results_formatter.result(queue.pop(:results))
+ queue = RedisQueue.new
+ results_count = 0
+
+ Timeout.timeout(180) do
+ while results_count < step_counter.count
+ if (result = queue.pop(:results))
+ results_formatter.result(result)
+ results_count += 1
+ else
+ sleep 0.1
+ end
end
end
-
+
results_formatter.finish
-
return results_formatter.failed? ? 1 : 0
- rescue => ex
- if ex.message =~ /result overdue/
- $stderr.puts
- $stderr.puts "Missing steps:"
- $stderr.puts
- results_formatter.missing_backtrace_lines.each do |line|
- $stderr.puts " #{line}"
- end
- $stderr.puts
- end
-
- raise
end
-
+
def step_counter
return @step_counter if @step_counter
-
+
features = load_plain_text_features(configuration.feature_files)
@step_counter = Testjour::StepCounter.new(step_mother)
@step_counter.options = configuration.cucumber_configuration.options
@step_counter.visit_features(features)
return @step_counter
end
-
+
def local_run_command
"testjour run:slave #{configuration.run_slave_args.join(' ')} #{testjour_uri}".squeeze(" ")
end
-
- def queue_uri
- "http://localhost:#{Testjour::HttpQueue.port}/"
- end
-
+
def testjour_uri
user = `whoami`.strip
host = Socket.gethostname
- "http://#{user}@#{host}:#{Testjour::HttpQueue.port}" + File.expand_path(".")
+ "http://#{user}@#{host}" + File.expand_path(".")
end
-
+
def testjour_path
File.expand_path(File.dirname(__FILE__) + "/../../../bin/testjour")
end
-
+
end
-
+
end
end
View
35 lib/testjour/commands/run_slave.rb
@@ -31,31 +31,30 @@ def execute
end
def work
- HttpQueue.with_queue(configuration.queue_uri) do |queue|
- feature_file = true
+ queue = RedisQueue.new
+ feature_file = true
- while feature_file
- begin
- feature_file = queue.pop(:feature_files)
- rescue Curl::Err::ConnectionFailedError
- feature_file = false
- end
+ while feature_file
+ begin
+ feature_file = queue.pop(:feature_files)
+ rescue Curl::Err::ConnectionFailedError
+ feature_file = false
+ end
- if feature_file
- Testjour.logger.info "Running: #{feature_file}"
- features = load_plain_text_features(feature_file)
- Testjour.logger.info "Loaded: #{feature_file}"
- execute_features(features)
- Testjour.logger.info "Finished running: #{feature_file}"
- else
- Testjour.logger.info "No feature file found. Finished"
- end
+ if feature_file
+ Testjour.logger.info "Running: #{feature_file}"
+ features = load_plain_text_features(feature_file)
+ Testjour.logger.info "Loaded: #{feature_file}"
+ execute_features(features)
+ Testjour.logger.info "Finished running: #{feature_file}"
+ else
+ Testjour.logger.info "No feature file found. Finished"
end
end
end
def execute_features(features)
- visitor = Testjour::HttpFormatter.new(step_mother, StringIO.new, configuration.queue_uri)
+ visitor = Testjour::HttpFormatter.new(step_mother)
visitor.options = configuration.cucumber_configuration.options
Testjour.logger.info "Visiting..."
visitor.visit_features(features)
View
10 lib/testjour/cucumber_extensions/http_formatter.rb
@@ -7,11 +7,6 @@ module Testjour
class HttpFormatter < ::Cucumber::Ast::Visitor
- def initialize(step_mother, io, queue_uri)
- super(step_mother)
- @queue_uri = queue_uri
- end
-
def visit_multiline_arg(multiline_arg)
@multiline_arg = true
super
@@ -36,9 +31,8 @@ def visit_table_cell_value(value, status)
private
def progress(time, step_invocation, status = nil)
- HttpQueue.with_queue(@queue_uri) do |queue|
- queue.push(:results, Result.new(time, step_invocation, status))
- end
+ queue = RedisQueue.new
+ queue.push(:results, Result.new(time, step_invocation, status))
end
end
View
201 lib/testjour/http_queue.rb
@@ -1,191 +1,32 @@
require "testjour/core_extensions/wait_for_service"
-require "curb"
+require "redis"
module Testjour
- class HttpQueue
- class ResultOverdueError < StandardError; end
-
- class QueueProxy
-
- def initialize(uri = nil)
- @uri = uri
- end
-
- def uri
- @uri || "http://0.0.0.0:#{Testjour::HttpQueue.port}/"
- end
-
- def push(queue_name, data)
- c = Curl::Easy.http_post(uri + queue_name.to_s,
- Curl::PostField.content("data", Marshal.dump(data)))
-
- c.response_code == 200
- end
-
- def pop(queue_name)
- c = Curl::Easy.new(uri + queue_name.to_s)
- c.perform
-
- if c.response_code == 200
- return Marshal.load(c.body_str)
- elsif c.response_code == 404
- return nil
- else
- raise "Bad response: #{c.body_str}"
- end
- end
-
- protected
-
- def self.with_net_http(&block)
- Net::HTTP.start("0.0.0.0", HttpQueue.port, &block)
- end
-
- end
-
- def self.timeout_in_seconds
- 180
- end
-
- def self.run_on(handler)
- handler.run self, :Port => port
- end
-
- def self.wait_for_service
- TCPSocket.wait_for_service :host => "0.0.0.0", :port => port
- end
-
- def self.wait_for_no_service
- TCPSocket.wait_for_no_service :host => "0.0.0.0", :port => port
- end
-
- def self.with_queue(uri = nil, &block)
- yield QueueProxy.new(uri)
- end
-
- def self.with_queue_server
- existing_pid = `ps | grep httpq | grep -v grep`.strip.split.first
-
- if existing_pid
- Testjour.logger.info "Killing running httpq PID #{existing_pid}..."
- Process.kill(9, existing_pid.to_i)
- HttpQueue.wait_for_no_service
- end
-
- Testjour.logger.info "Starting httpq..."
- pid = detached_exec(File.expand_path(File.dirname(__FILE__) + "/../../bin/httpq"))
- kill_at_exit(pid)
- wait_for_service
-
- Testjour.logger.info "Started httpq."
-
- yield
- end
-
- def self.kill_at_exit(pid)
- at_exit do
- Process.kill("INT", pid)
- end
- end
-
- def self.port
- 15434
- end
-
- def self.queues
- @queues ||= {
- :feature_files => Queue.new,
- :results => Queue.new
- }
- end
- def self.call(env)
- new(env).call
- end
-
- def initialize(env)
- @request = Rack::Request.new(env)
- end
-
- def call
- if request.post?
- handle_post
- else
- handle_get
- end
- end
-
- protected
-
- def request
- @request
- end
-
- def handle_get
- case request.path_info
- when "/reset" then reset
- when "/feature_files" then pop(:feature_files)
- when "/results" then pop(:results, false)
- else error("unknown path: #{request.path_info}")
- end
- end
-
- def handle_post
- case request.path_info
- when "/feature_files" then push(:feature_files)
- when "/results" then push(:results)
- else error("unknown path: #{request.path_info}")
- end
- end
-
- def reset
- self.class.queues.each do |name, queue|
- queue.clear
- end
-
- ok
- end
-
- def pop(queue_name, non_block = true)
- data = nil
-
- begin
- data = Timeout.timeout(self.class.timeout_in_seconds, ResultOverdueError) do
- queue(queue_name).pop(non_block)
- end
- rescue ResultOverdueError
- return error("result overdue")
- end
-
- [200, { "Content-Type" => "text/plain" }, data]
- rescue ThreadError => ex
- if ex.message =~ /queue empty/
- missing
- else
- error("uncaught exception")
- end
- end
-
- def push(queue_name)
- queue(queue_name).push(request.POST["data"])
- ok
+ class RedisQueue
+
+ def self.reset_all
+ redis.del "testjour:feature_files"
+ redis.del "testjour:results"
end
-
- def queue(name)
- self.class.queues[name]
+
+ def self.redis
+ @redis ||= Redis.new
end
-
- def ok
- [200, { "Content-Type" => "text/plain" }, "OK"]
+
+ def redis
+ self.class.redis
end
-
- def missing
- [404, { "Content-Type" => "text/plain" }, "Not Found"]
+
+ def push(queue_name, data)
+ redis.lpush("testjour:#{queue_name}", Marshal.dump(data))
end
-
- def error(message = nil)
- [500, { "Content-Type" => "text/plain" }, "Server error: #{message}"]
+
+ def pop(queue_name)
+ result = redis.rpop("testjour:#{queue_name}")
+ result ? Marshal.load(result) : nil
end
-
+
end
+
end
View
98 spec/httpq_spec.rb
@@ -1,98 +0,0 @@
-require File.dirname(__FILE__) + "/spec_helper"
-
-describe "httpq" do
- before :suite do
- start_queue
- end
-
- after :suite do
- shutdown_queue
- end
-
- before :each do
- @http_queue = Testjour::HttpQueue::QueueProxy.new
- get "/reset"
- end
-
- describe "feature files queue" do
- it "is empty by default (returns 404)" do
- @http_queue.pop(:feature_files).should == nil
- end
-
- it "accepts work" do
- @http_queue.push(:feature_files, "features/test.feature").should == true
- end
-
- it "returns work from the queue" do
- @http_queue.push(:feature_files, "features/test.feature")
- @http_queue.pop(:feature_files).should == "features/test.feature"
- end
-
- it "is empty after all work is returned" do
- @http_queue.push(:feature_files, "features/test.feature")
- @http_queue.pop(:feature_files)
- @http_queue.pop(:feature_files).should be_nil
- end
- end
-
- describe "results queue" do
- it "is empty by default (returns 404)" do
- lambda {
- Timeout.timeout(1) do
- @http_queue.pop(:results)
- end
- }.should raise_error(Timeout::Error)
- end
-
- it "accepts work" do
- @http_queue.push(:results, "1").should == true
- end
-
- it "returns work from the queue" do
- @http_queue.push(:results, "result")
- @http_queue.pop(:results).should == "result"
- end
-
- it "is empty after all work is returned" do
- @http_queue.push(:results, "result")
- @http_queue.pop(:results)
-
- lambda {
- Timeout.timeout(1) do
- @http_queue.pop(:results)
- end
- }.should raise_error(Timeout::Error)
- end
- end
-
- describe "reset" do
- it "should reset the feature files" do
- @http_queue.push(:feature_files, "features/test.feature")
- get "/reset"
- @response_code.should == 200
- @http_queue.pop(:feature_files).should be_nil
- end
-
- it "should reset the results" do
- @http_queue.push(:results, "result")
- get "/reset"
- @response_code.should == 200
-
- lambda {
- Timeout.timeout(1) do
- @http_queue.pop(:results)
- end
- }.should raise_error(Timeout::Error)
- end
- end
-
- it "raises errors for unknown GETs" do
- get "/unknown"
- @response_code.should == 500
- end
-
- it "raises errors for unknown POSTs" do
- post "/unknown"
- @response_code.should == 500
- end
-end
View
50 spec/spec_helper.rb
@@ -1,50 +0,0 @@
-require "rubygems"
-require "spec"
-require "fileutils"
-require "net/http"
-
-require File.expand_path(File.dirname(__FILE__) + "/../lib/testjour")
-require "testjour/http_queue"
-
-Spec::Runner.configure do |config|
-
- def start_queue
- $tjqueue_pid = fork do
- Dir.chdir(File.join(File.dirname(__FILE__), "..", "bin"))
-
- silence_stream(STDOUT) do
- silence_stream(STDERR) do
- exec "ruby httpq"
- end
- end
- end
-
- Testjour::HttpQueue.wait_for_service
- end
-
- def shutdown_queue
- Process.kill 9, $tjqueue_pid
- Process.wait($tjqueue_pid)
- end
-
- def get(path)
- require "curb"
-
- @response = Curl::Easy.new("http://0.0.0.0:#{Testjour::HttpQueue.port}" + path)
- @response.perform
- @response_code = @response.response_code
- end
-
- def post(path)
- require "curb"
-
- @response = Curl::Easy.http_post("http://0.0.0.0:#{Testjour::HttpQueue.port}" + path)
- @response.perform
- @response_code = @response.response_code
- end
-
- def response
- @response
- end
-
-end
Please sign in to comment.
Something went wrong with that request. Please try again.