Permalink
Browse files

make exceptions more user friendly

  • Loading branch information...
1 parent 35932cb commit 5fd90bacdea526fc84663116ab5b67e961dfed79 @gnufied committed Jul 10, 2008
View
@@ -1,3 +1,5 @@
* Some issues that I am aware of *
- Creation of job queue may fail on other that mysql dbs
- Database doesn't get created when someone runs the migrations again
+- Delete doesn't seem to be working test it.
+
@@ -104,6 +104,7 @@ def delete
middle_man.backend_connections.each do |connection|
connection.delete_worker(compact(:worker => worker_name, :worker_key => worker_key))
end
+ return worker_key
end
def choose_connection host_info
@@ -42,6 +42,7 @@ def receive_data p_data
when :delete_worker: delete_drb_worker(t_data)
when :worker_info: pass_worker_info(t_data)
when :all_worker_info: all_worker_info(t_data)
+ else; debug_logger.info("Invalid request")
end
end
end
@@ -73,6 +74,8 @@ def delete_drb_worker(t_data)
begin
worker_instance = reactor.live_workers[worker_name_key]
Process.kill('TERM',worker_instance.pid)
+ # Warning: Change is temporary, may break things
+ reactor.live_workers.delete(worker_name_key)
rescue Packet::DisconnectError => sock_error
reactor.remove_worker(sock_error)
rescue
@@ -86,16 +89,18 @@ def start_worker_request(p_data)
end
def async_method_invoke(t_data)
+ puts "calling crap thing here"
worker_name = t_data[:worker]
worker_name_key = gen_worker_key(worker_name,t_data[:worker_key])
t_data.delete(:worker)
t_data.delete(:type)
begin
ask_worker(worker_name_key,:data => t_data, :type => :request, :result => false)
rescue Packet::DisconnectError => sock_error
+ puts "fuck man"
reactor.live_workers.delete(worker_name_key)
rescue
- debug_logger.info($!.to_s)
+ debug_logger.info($!.message)
debug_logger.info($!.backtrace.join("\n"))
return
end
@@ -127,7 +132,7 @@ def method_invoke(t_data)
rescue Packet::DisconnectError => sock_error
reactor.live_workers.delete(worker_name_key)
rescue
- debug_logger.info($!.to_s)
+ debug_logger.info($!.message)
debug_logger.info($!.backtrace.join("\n"))
return
end
@@ -257,7 +257,7 @@ def check_for_enqueued_tasks
end
return unless task
if self.respond_to? task.worker_method
- Thread.current[:persistent_job_id] = task.id
+ Thread.current[:persistent_job_id] = task[:id]
called_method_arity = self.method(task.worker_method).arity
args = load_data(task.args)
if called_method_arity != 0
@@ -2,16 +2,85 @@
context "Master Worker in general should" do
- specify "read data according to binary protocol and recreate objects" do
+ def dump_object data
+ t = Marshal.dump(data)
+ t.length.to_s.rjust(9,'0') + t
+ end
+ setup do
+ @master_worker = BackgrounDRb::MasterWorker.new
+ @master_worker.post_init
+ class << @master_worker
+ attr_accessor :outgoing_data
+ attr_accessor :key
+ def packet_classify(original_string)
+ word_parts = original_string.split('_')
+ return word_parts.map { |x| x.capitalize}.join
+ end
+
+ def gen_worker_key(worker_name,worker_key = nil)
+ return worker_name if worker_key.nil?
+ return "#{worker_name}_#{worker_key}".to_sym
+ end
+ def ask_worker key,data
+ @key = key
+ @outgoing_data = data
+ end
+ def start_worker data
+ @outgoing_data = data
+ end
+ end
+
+ class DummyLogger
+ def method_missing method_id,*args; error = args.first; puts error; end
+ end
+ logger = DummyLogger.new
+ @master_worker.debug_logger = logger
+ end
+
+ specify "read data according to binary protocol and recreate objects" do
+ sync_request = {
+ :type=>:sync_invoke, :arg=>"boy", :worker=>:foo_worker, :worker_method=>"barbar"
+ }
+ @master_worker.expects(:method_invoke).with(sync_request).returns(nil)
+ @master_worker.receive_data(dump_object(sync_request))
end
specify "ignore errors while recreating object" do
+ sync_request = {
+ :type=>:sync_invoke, :arg=>"boy", :worker=>:foo_worker, :worker_method=>"barbar"
+ }
+ foo = dump_object(sync_request)
+ foo[0] = 'h'
+ @master_worker.expects(:method_invoke).never
+ @master_worker.receive_data(foo)
+ end
+
+ specify "should route async requests" do
+ b = {
+ :type=>:async_invoke, :arg=>"boy", :worker=>:foo_worker, :worker_method=>"barbar"
+ }
+ @master_worker.receive_data(dump_object(b))
+ @master_worker.outgoing_data.should == {:type=>:request, :data=>{:worker_method=>"barbar", :arg=>"boy"}, :result=>false}
+ @master_worker.key.should == :foo_worker
+ end
+ specify "should route sync requests and return results" do
+ a = {:type=>:sync_invoke, :arg=>"boy", :worker=>:foo_worker, :worker_method=>"barbar"}
+ @master_worker.receive_data(dump_object(a))
+ @master_worker.outgoing_data.should == {:type=>:request, :data=>{:worker_method=>"barbar", :arg=>"boy"}, :result=>true}
+ @master_worker.key.should == :foo_worker
end
- specify "extract worker and method and pass the request to appropriate worker" do
+ specify "should route start worker requests" do
+ d = {:worker_key=>"boy", :type=>:start_worker, :worker=>:foo_worker}
+ @master_worker.receive_data(dump_object(d))
+ @master_worker.outgoing_data.should == {:type=>:start_worker, :worker_key=>"boy", :worker=>:foo_worker}
+ end
+ specify "should run delete worker requests itself" do
+ e = {:worker_key=>"boy", :type=>:delete_worker, :worker=>:foo_worker}
+ @master_worker.receive_data(dump_object(e))
end
specify "ignore errors if sending request to worker failed" do
@@ -33,9 +102,6 @@
specify "should load proper environment from config file" do
end
- specify "reload workers which are to be loaded at proper interval" do
- end
-
specify "log all the errors to the log file" do
end
@@ -49,3 +115,23 @@
end
end
+a = {:type=>:sync_invoke, :arg=>"boy", :worker=>:foo_worker, :worker_method=>"barbar"}
+b = {:type=>:async_invoke, :arg=>"boy", :worker=>:foo_worker, :worker_method=>"barbar"}
+c = {:type=>:get_result, :worker=>:foo_worker, :job_key=>:start_message}
+# for new worker
+d = {:worker_key=>"boy", :type=>:start_worker, :worker=>:foo_worker}
+
+# for delete worker
+e = {:worker_key=>"boy", :type=>:delete_worker, :worker=>:foo_worker}
+
+# for all worker info
+f = {:type=>:all_worker_info}
+
+# worker info with a worker key
+g = {:worker_key=>"boy", :type=>:worker_info, :worker=>:foo_worker}
+
+# worker info without a key
+h = {:status=>:running, :worker=>:foo_worker, :worker_key=>nil}
+
+
+
@@ -247,6 +247,7 @@ def ivar(var)
mocked_task = mock()
mocked_task.expects(:worker_method).returns(:barbar).times(3)
mocked_task.expects(:args).returns(Marshal.dump("hello"))
+ mocked_task.expects(:[]).returns(1)
@meta_worker.expects(:barbar).with("hello").returns(true)
BdrbJobQueue.expects(:find_next).with("queue_worker").returns(mocked_task)
@meta_worker.check_for_enqueued_tasks
@@ -255,6 +256,7 @@ def ivar(var)
specify "should run enqueued tasks without arguments if they are there in the queue" do
@meta_worker = QueueWorker.start_worker
mocked_task = mock()
+ mocked_task.expects(:[]).returns(1)
mocked_task.expects(:worker_method).returns(:barbar).times(3)
mocked_task.expects(:args).returns(nil)
@meta_worker.expects(:barbar)

0 comments on commit 5fd90ba

Please sign in to comment.