Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Commit backgroundrb to git

  • Loading branch information...
commit 5849e939924abe2f12e52b275cac11ac087f8ee8 0 parents
gnufied authored
Showing with 3,282 additions and 0 deletions.
  1. +4 −0 LICENSE
  2. +20 −0 MIT-LICENSE
  3. +438 −0 README
  4. +63 −0 Rakefile
  5. +4 −0 TODO.org
  6. +11 −0 config/backgroundrb.yml
  7. +44 −0 examples/foo_controller.rb
  8. +7 −0 examples/god_worker.rb
  9. +8 −0 examples/worker_tests/god_worker_test.rb
  10. +17 −0 examples/workers/error_worker.rb
  11. +38 −0 examples/workers/foo_worker.rb
  12. +7 −0 examples/workers/god_worker.rb
  13. +13 −0 examples/workers/model_worker.rb
  14. +11 −0 examples/workers/renewal_worker.rb
  15. +24 −0 examples/workers/rss_worker.rb
  16. +31 −0 examples/workers/server_worker.rb
  17. +12 −0 examples/workers/world_worker.rb
  18. +7 −0 examples/workers/xmpp_worker.rb
  19. +31 −0 framework/packet.rb
  20. +69 −0 framework/packet/bin_parser.rb
  21. +14 −0 framework/packet/callback.rb
  22. +74 −0 framework/packet/class_helpers.rb
  23. +33 −0 framework/packet/connection.rb
  24. +275 −0 framework/packet/core.rb
  25. +19 −0 framework/packet/cpu_worker.rb
  26. +9 −0 framework/packet/disconnect_error.rb
  27. +30 −0 framework/packet/double_keyed_hash.rb
  28. +25 −0 framework/packet/event.rb
  29. +6 −0 framework/packet/io_worker.rb
  30. +78 −0 framework/packet/meta_pimp.rb
  31. +77 −0 framework/packet/nbio.rb
  32. +16 −0 framework/packet/packet_guid.rb
  33. +163 −0 framework/packet/packet_master.rb
  34. +27 −0 framework/packet/periodic_event.rb
  35. +32 −0 framework/packet/pimp.rb
  36. +94 −0 framework/packet/worker.rb
  37. +16 −0 generators/worker/USAGE
  38. +12 −0 generators/worker/templates/unit_test.rb
  39. +7 −0 generators/worker/templates/worker.rb
  40. +16 −0 generators/worker/worker_generator.rb
  41. +2 −0  init.rb
  42. +1 −0  install.rb
  43. +131 −0 lib/backgroundrb.rb
  44. +8 −0 lib/bdrb_conn_error.rb
  45. +61 −0 script/backgroundrb
  46. +61 −0 script/bdrb_test_helper.rb
  47. +235 −0 server/cron_trigger.rb
  48. +4 −0 server/invalid_dump_error.rb
  49. +25 −0 server/log_worker.rb
  50. +213 −0 server/master_worker.rb
  51. +384 −0 server/meta_worker.rb
  52. +34 −0 server/trigger.rb
  53. +60 −0 tasks/backgroundrb_tasks.rake
  54. +12 −0 test/backgroundrb_test.rb
  55. +26 −0 test/bdrb_test_helper.rb
  56. +53 −0 test/cron_trigger_test.rb
  57. +15 −0 test/master_worker_test.rb
  58. +74 −0 test/meta_worker_test.rb
  59. +1 −0  uninstall.rb
4 LICENSE
@@ -0,0 +1,4 @@
+Copyright (c) 2007 Hemant Kumar ( mail [at] gnufied [dot] org )
+Copyright (c) 2006 Ezra Zygmuntowicz and skaar[at]waste[dot]org
+
+The Library is dual licensed under terms of Ruby License or MIT License.
20 MIT-LICENSE
@@ -0,0 +1,20 @@
+Copyright (c) 2007 Hemant Kumar ( mail [at] gnufied [dot] org )
+Copyright (c) 2006 Ezra Zygmuntowicz and skaar[at]waste[dot]org
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
438 README
@@ -0,0 +1,438 @@
+= BackgrounDRb
+
+BackgrounDRb is a Ruby job server and scheduler. Its main intent is to be
+used with Ruby on Rails applications for offloading long-running tasks.
+Since a Rails application blocks while serving a request it is best to
+move long-running tasks off into a background process that is divorced
+from http request/response cycle.
+
+This new release of BackgrounDRb is also modular and can be used without Rails so that any Ruby program or framework can use it.
+
+Copyright (c) 2006 Ezra Zygmuntowicz,skaar[at]waste[dot]org,
+
+Copyright (c) 2007 Hemant Kumar (gethemant [at] gmail.com )
+
+
+== Usage
+
+=== Installation
+Getting the code:
+ svn co http://svn.devjavu.com/backgroundrb/trunk
+
+Installation with svn externals:
+ svn propedit svn:externals vendor/plugins
+ [add the following line:]
+ backgroundrb http://svn.devjavu.com/backgroundrb/trunk
+ [exit editor]
+
+ svn ci -m 'updating svn:external svn property for backgroundrb' vendor/plugins
+ svn up vendor/plugins
+ rake backgroundrb:setup
+
+Installation with piston:
+ piston import http://svn.devjavu.com/backgroundrb/trunk/ backgroundrb
+
+=== Configuration
+Use rake task for initial configuration:
+
+* Cron style scheduling and config
+
+ | :backgroundrb:
+ | :ip: 0.0.0.0
+ | :port: 11006
+ | :environment: production
+ |
+ | :schedules:
+ | :foo_worker:
+ | :foobar:
+ | :trigger_args: */5 * * * * * *
+ | :data: Hello World
+ | :barbar:
+ | :trigger_args: */10 * * * * * *
+
+The above sample configuration file would schedule worker methods 'foobar' and 'barbar' from within FooWorker to be executed at different trigger periods.
+Also, it would load production rails environment. If you skip the :environment option, development environment will be loaded by default.
+
+NOTE: Because of the addition of this feature the format of backgroundrb.yml has changed slightly and you must modify your config file according to this new option.
+
+
+* Normal Unix scheduler
+
+ | :backgroundrb:
+ | :ip: 0.0.0.0
+ | :port: 11006
+ | :schedules:
+ | :foo_worker:
+ | :foobar:
+ | :trigger_args:
+ | :start: <%= Time.now + 5.seconds %>
+ | :end: <%= Time.now + 10.minutes %>
+ | :repeat_interval: <%= 1.minute %>
+
+* Plain config
+
+ | :backgroundrb:
+ | :ip: 0.0.0.0
+ | :port: 11006
+
+
+=== Scheduling
+
+There are three schemes for periodic execution and scheduling.
+
+- Cron Scheduling
+
+ You can use a configuration file for cron scheduling of workers. The method specified in the configuration
+ file would be called periodically. You should accommodate for the fact that the time gap between periodic
+ invocation of a method should be more than the time that is actually required to execute the method.
+ If a method takes longer time than the time window specified, your method invocations will lag
+ perpetually.
+
+
+- Normal Scheduler
+ You can use second form of scheduling as shown in config file.
+
+- add_periodic_timer method
+ A third and very basic form of scheduling that you can use is, "add_periodic_timer" method. You can call this
+ method from anywhere in your worker.
+
+ def create
+ add_periodic_timer(5) { say_hello }
+ end
+The above snippet would register the proc for periodic execution at every 5 seconds.
+
+=== A Word about Cron Scheduler
+
+ Note that the initial field in the BackgrounDRb cron trigger specifies
+ seconds, not minutes as with Unix-cron.
+
+ The fields (which can be an asterisk, meaning all valid patterns) are:
+
+ sec[0,59] min[0,59], hour[0,23], day[1,31], month[1,12], weekday[0,6], year
+
+ The syntax pretty much follows Unix-cron. The following will trigger
+ on the first hour and the thirtieth minute every day:
+
+ 0 30 1 * * * *
+
+ The following will trigger the specified method every 10 seconds:
+
+ */10 * * * * * *
+
+ The following will trigger the specified method every 1 hour:
+
+ 0 0 * * * * *
+
+ For each field you can use a comma-separated list. The following would
+ trigger on the 5th, 16th and 23rd minute every hour:
+
+ 0 5,16,23 * * * * *
+
+ Fields also support ranges, using a dash between values. The following
+ triggers from 8th through the 17th hour, at five past the hour:
+
+ 0 5 8-17 * * * *
+
+ Finally, fields support repeat interval syntax. The following triggers
+ every five minutes, every other hour after the sixth hour:
+
+ 0 */5 6/2 * * * *
+
+ Here is a more complex example: months 0,2,4,5,6,8,10,12, every day
+ and hour, minutes 1,2,3,4,6,20, seconds: every 5th second counting
+ from the 28th second plus the 59th second:
+
+
+ 28/5,59 1-4,6,20 */1 * 5,0/2 * *
+
+ Note that if you specify an asterisk in the first field (seconds)
+ it will trigger every second for the subsequent match.
+
+
+=== Writing Workers
+
+* Generate a Worker
+
+ Install the plugin and run the setup task (rake backgroundrb:setup). Now create a worker using worker generator.
+
+ ./script/generate worker bar
+
+ This will create a bar_worker.rb in your RAILS_ROOT/lib/workers/ (called WORKER_ROOT henceforth). The generated code will look like this:
+
+ class BarWorker < BackgrounDRb::MetaWorker
+ set_worker_name :bar_worker
+ def create
+ # this method is called, when worker is loaded for the first time
+ puts "starting a bar worker"
+ end
+ # define other methods, that you will invoke from rails.
+ end
+
+ All the workers inside WORKER_ROOT directory will be automatically loaded and forked into a separate process.
+ If you don't want to start one particular worker automatically you can use following class method (set_no_auto_load) to disable that behaviour:
+
+
+ class DynamicWorker < BackgrounDRb::MetaWorker
+ set_worker_name :dynamic_worker
+ set_no_auto_load true
+ end
+
+ The 'create' method gets called when a worker is loaded and created. Each worker runs in its
+ own process and you can use 'create' for initializing worker specific stuff.
+
+ The following code snippet would ask bdrb to execute method 'add_values' in 'foo_worker' with
+ arguments '10+10' and return the result.
+
+ MiddleMan.send_request(:worker => :foo_worker, :worker_method => :add_values,:data => "10+10")
+
+ When you are using the 'send_request' method, you are expecting a result back. As such, the above code
+ will block until your worker invokes a send response. The worker code for handling the above
+
+ method would look like
+
+ class FooWorker < BackgrounDRb::MetaWorker
+ set_worker_name :foo_worker
+ def create(args = nil)
+ #register_status("Running")
+ end
+
+ def add_values(args = nil)
+ evaluated_result = eval(args)
+ return evaluated_result
+ end
+ end
+
+ However, when you want one shot execution of a worker_method without worrying about the results, you
+ can use:
+
+ MiddleMan.ask_work(:worker => :foo_worker, :worker_method => :add_values, :data => "10+10")
+
+ You can also use register_status as described in the following snippet to register the status of
+ your worker with master, which can be directly queried from rails.
+
+ register_status(some_status_data)
+
+ From rails, you can query status of your worker object using following code:
+
+ MiddleMan.ask_status(:worker => :foo_worker)
+
+ The above code would return status object of 'foo_worker'. When you call register_status
+ from a worker, it replaces the older state of the worker with master. Since master process
+ stores the status of the worker, all the status queries are served by master itself. It can be
+
+ used to store result hashes and stuff.
+
+* Starting and stopping a worker from Rails :
+
+ All workers can be dynamically started and stopped from rails. You can also use separate job_keys
+ to run more than one copy of a worker at a time.
+
+ For example, the following code in a rails controller will start "error_worker" and schedule it to run according to the arguments associated with trigger_args.
+
+ MiddleMan.new_worker(:worker => :error_worker, :job_key => :hello_world,:data => "wow_man",:schedule => { :hello_world => { :trigger_args => "*/5 * * * * * *",:data => "hello_world" }})
+
+ NOTE: The first data argument will be passed to the create method inside your worker. However,
+ one specified under the :schedule heading would be used by the worker method when its schedule comes.
+
+
+ To stop a worker, you can use:
+ MiddleMan.delete_worker(:worker => :error_worker, :job_key => :hello_world)
+
+ If no job_key is specified the general worker name itself becomes job_key.
+ You should create job_keys with care so they are never the same for one worker class.
+
+
+* Starting and stopping from CLI :
+
+ To start:
+ ./script/backgroundrb start
+ To stop:
+ ./script/backgroundrb stop
+
+
+* Query Status/Result of a worker :
+
+ All Workers can log their results with master, using the 'register_status' method.
+ This status can be queried from rails using ask_status. For example:
+
+
+ class ProgressWorker < BackgrounDRb::MetaWorker
+ set_worker_name :progress_worker
+ def create
+ @counter = 0
+ add_periodic_timer(2) { increment_counter }
+ end
+ def increment_counter
+ @counter += 1
+ register_status(@counter)
+ end
+ end
+
+ And using MiddleMan proxy, you can keep querying the status of your progress bar:
+
+ MiddleMan.ask_status(:worker => :progress_worker)
+
+* Query status of All workers :
+
+ You can also query the status of all currently running workers in one shot.
+
+ def ask_status
+ t_response = MiddleMan.query_all_workers
+ running_workers = t_response.map { |key,value| "#{key} = #{value}"}.join(',')
+ render :text => running_workers
+ end
+
+ Currently, when a worker is deleted/exits, its result/status is also gone (i.e. you can't
+ query the status of a worker which is not running). This behaviour is expected to change in future releases.
+
+
+* Important difference between MiddleMan.ask_work and MiddleMan.send_request :
+
+ As noted previously ask_work is used when you want one shot execution of a worker method
+ without waiting for results in rails. So an explicit return statement is not required.
+ But when you use MiddleMan.send_request, you are asking BDRB, "ok please execute this method
+ on worker and I will wait for results until the method returns". Hence in this case, you must return
+ the value you want to get back in rails.
+
+ Not all objects can be dumped in ruby. If you are trying to send an object which
+ can't be dumped, you will get error messages logged in your log file and will get an error string in your
+ controller, too.
+
+
+ For example, let's say you are invoking method "hello_world" from 'foo_controller' like this:
+
+ worker_response = MiddleMan.send_request(:worker => :foo_worker, :worker_method => :hello_world)
+
+ And 'hello_world' method inside 'foo_worker' looks like this:
+
+ def hello_world
+ a = lambda { "Hello world" }
+ return a
+ end
+
+ Now since a lambda can't be dumped, the worker_response that you will receive in your controller will be,
+ 'invalid_result_dump_check_log' and an appropriate error will also be logged in the backgroundrb.log file.
+ Now, such an error could potentially abort the BDRB worker. Hence, make sure that you
+ avoid such cases.
+
+* Running BackgrounDRb clusters and storing of results in Memcache cluster
+
+ New version allows access to worker status objects even after a worker has died/exited. By default,
+ this data would be held in Master Process memory. Those of you, who want to run, BackgrounDRb in
+ a cluster, and if you run a BackgrounDRb server on each node and would rather want results to be
+ stored in MemCache, you can use following option for storing results in MemCache:
+
+ # backgroundrb.yml
+
+ | :backgroundrb:
+ | :port: 11006
+ | :ip: 0.0.0.0
+ | :log: foreground
+ | :result_storage:
+ | :memcache: "10.10.10.2:11211,10.10.10.6:11211"
+
+
+* Using Threads inside BackgrounDRb :
+
+ Remember BackgrounDRb follows event model of network programming, but sad truth of life is
+ not all networking libraries follow this model and hence they make use of blocking IO and threads.
+ BackgrounDRb allows you to run all such tasks concurrently in threads
+ which are internally managed by BackgrounDRb thread pool.
+
+ Each worker has access to object +thread_pool+ which can be used to run task in a thread concurrently.
+
+ thread_pool.defer(wiki_scrap_url) { |wiki_url| scrap_wikipedia(wiki_url) }
+
+ So whatever task you specify within +scrap_wikipedia+ is going to run concurrently.
+
+ WARNING: You shouldn't try to use +register_status+ method from within the block supplied to +defer+. Because, if you do that,
+ you can get corrupted result hashes. However, if you are confident, you should wrap your status_hash ( or whatever data type, you
+ are going to store as a status ) in a mutex and then use +register_status+ . It would make sure that, only one thread
+ resisters status at a time.
+
+* Stopping/deleting a worker :
+
+ For a dynamically started worker, probably you would like to delete it after its done with processing your request.
+ You can delete a worker from rails or from worker itself. To exit a worker from worker itself, simple call +exit+
+ on it. Worker would be deleted.
+
+ To delete the worker from rails, you can use:
+
+ MiddleMan.delete_worker(:worker => :foor_worker, :job_key => :hello_world)
+
+
+
+
+* Internal Server and Unhandled Exception Logging on console :
+
+ Sometimes you may want all the internal error messages and unhandled exceptions to appear on the console.
+ For that, you can start backgroundrb with the following config option :
+
+
+ # backgroundrb.yml
+
+ | :backgroundrb:
+ | :port: 11006
+ | :ip: 0.0.0.0
+ | :log: foreground
+
+ When you are using this config option, make sure that you are starting backgroundrb in foreground mode
+ using :
+
+ ./script/backgroundrb # don't use start argument, if you have :log: foreground set
+
+=== Testing
+
+* where will you be without test cases Phaedrus? This new version comes with a baked in mechanism to write test cases.
+ First make sure that you have bdrb_test_helper.rb in the test directory of your rails app (run
+ rake backgroundrb:setup, if you dont have one).
+
+ Just put your worker test cases in test/unit directory of your rails application and require the helper.
+ Now, you should be good to go.
+
+ require File.join(File.dirname(__FILE__) + "/../bdrb_test_helper")
+ require "god_worker"
+
+ context "When god worker starts" do
+ setup do
+ god_worker = GodWorker.new
+ end
+ end
+
+ All above helper file does is that it stubs out, relevant worker methods,
+ which really need network IO. There can be methods added, which aren't
+ stubbed, for all such methods you are encouraged to stub them and send
+ the patch to the backgroundrb mailing list.
+
+
+
+=== Legacy and deprecated stuff
+
+ Although You need to wrap your head a bit to understanding the "evented" model of network programming,
+ it gets easier once you get hang of it. Much of the older stuff is deprecated. Here is a brief list:
+
+
+ - ACL : gone, trust to thy firewalls.
+
+=== Exciting new stuff
+ * Rock solid stable ( or will be , after few bug reports )
+ * Each worker comes with an Event loop of its own and can potentially do lots of fancy stuff. Two noteworthy methods are:
+
+ connect(ip,port,Handler)
+ start_worker(ip,port,Handler)
+
+ If you are familiar with the EventMachine or Twisted style of network programming, the above methods allow you to
+ start tcp servers inside your workers or let you connect to external tcp servers. For Each accepted client or
+ connected socket, an instance of Handler class would be created and integrated with main event loop.
+ This can be used for worker to worker communication between backgroundrb servers running on two machines.
+
+ You are encouraged to look into framework directory and see the code that implements all this stuff. The guts of
+ this new version of bdrb is based on packet library(http://packet.googlecode.com )
+
+
+== Online Resources
+- http://svn.devjavu.com/backgroundrb/trunk
+- http://backgroundrb.devjavu.com (trac)
+- http://backgroundrb.rubyforge.org (rdoc)
+
+
63 Rakefile
@@ -0,0 +1,63 @@
+require 'rake'
+require 'rubygems'
+require 'rake/testtask'
+require 'rake/rdoctask'
+require 'spec/rake/spectask'
+require 'rake/contrib/sshpublisher'
+
+desc 'Default: run unit tests.'
+task :default => :test
+
+desc 'Test the backgroundrb plugin.'
+Rake::TestTask.new(:test) do |t|
+ t.libs << 'lib'
+ t.pattern = 'test/**/*_test.rb'
+ t.verbose = true
+end
+
+desc "Run all specs"
+Spec::Rake::SpecTask.new('specs') do |t|
+ t.spec_opts = ["--format", "specdoc"]
+ t.libs = ['lib', 'server/lib' ]
+ t.spec_files = FileList['specs/**/*_spec.rb']
+end
+
+desc "RCov"
+Spec::Rake::SpecTask.new('rcov') do |t|
+ t.spec_files = FileList['specs/**/*_spec.rb']
+ t.libs = ['lib', 'server/lib' ]
+ t.rcov = true
+end
+
+desc 'Generate documentation for the backgroundrb plugin.'
+Rake::RDocTask.new(:rdoc) do |rdoc|
+ rdoc.rdoc_dir = 'rdoc'
+ rdoc.title = 'Backgroundrb'
+ rdoc.options << '--line-numbers' << '--inline-source'
+ rdoc.rdoc_files.include('README')
+ rdoc.rdoc_files.include('LICENSE')
+ rdoc.rdoc_files.include('lib/*.rb')
+ rdoc.rdoc_files.include('framework/*.rb')
+ rdoc.rdoc_files.include('server/*.rb')
+ rdoc.template = 'jamis'
+end
+
+module Rake
+ class BackgrounDRbPublisher < SshDirPublisher
+ attr_reader :project, :proj_id, :user
+ def initialize(projname, user)
+ super(
+ "#{user}@rubyforge.org",
+ "/var/www/gforge-projects/backgroundrb",
+ "rdoc")
+ end
+ end
+end
+
+desc "Publish documentation to Rubyforge"
+task :publish_rdoc => [:rdoc] do
+ user = ENV['RUBYFORGE_USER']
+ publisher = Rake::BackgrounDRbPublisher.new('backgroundrb', user)
+ publisher.upload
+end
+
4 TODO.org
@@ -0,0 +1,4 @@
+* Implement a way to share code between workers.
+* Implement a way to test workers.
+* Implement cron and normal UNIX schedulers.
+
11 config/backgroundrb.yml
@@ -0,0 +1,11 @@
+## YAML Template.
+:backgroundrb:
+ :ip: 0.0.0.0
+ :port: 11006
+
+:schedules:
+ :foo_worker:
+ :worker_method: foobar
+ :trigger_args: */5 * * * * * *
+
+
44 examples/foo_controller.rb
@@ -0,0 +1,44 @@
+class FooController < ApplicationController
+ layout :choose_layout
+ def index
+ end
+
+ def mobile_action
+ #render :layout => "mobile"
+ end
+
+ def start_worker
+ MiddleMan.new_worker(:worker => :error_worker, :job_key => :hello_world,:data => "wow_man",:schedule => { :hello_world => { :trigger_args => "*/5 * * * * * *",:data => "hello_world" }})
+ render :text => "worker starterd"
+ end
+
+ def stop_worker
+ MiddleMan.delete_worker(:worker => :error_worker, :job_key => :hello_world)
+ render :text => "worker deleted"
+ end
+
+ def invoke_worker_method
+ worker_response = MiddleMan.send_request(:worker => :world_worker, :worker_method => :hello_world)
+ render :text => worker_response
+ end
+
+ def renew
+ MiddleMan.ask_work(:worker => :renewal_worker, :worker_method => :load_policies)
+ render :text => "method invoked"
+ end
+
+ def ask_status
+ t_response = MiddleMan.query_all_workers
+ running_workers = t_response.map { |key,value| "#{key} = #{value}"}.join(',')
+ render :text => running_workers
+ end
+
+ private
+ def choose_layout
+ if action_name == 'mobile_action'
+ "mobile"
+ else
+ "foo"
+ end
+ end
+end
7 examples/god_worker.rb
@@ -0,0 +1,7 @@
+class GodWorker < BackgrounDRb::MetaWorker
+ set_worker_name :god_worker
+ def create(args = nil)
+ logger.info "hello world"
+ end
+end
+
8 examples/worker_tests/god_worker_test.rb
@@ -0,0 +1,8 @@
+require File.join(File.dirname(__FILE__) + "/../bdrb_test_helper")
+require "god_worker"
+
+context "When god worker starts" do
+ setup do
+ god_worker = GodWorker.new
+ end
+end
17 examples/workers/error_worker.rb
@@ -0,0 +1,17 @@
+# Put your code that runs your task inside the do_work method it will be
+# run automatically in a thread. You have access to all of your rails
+# models. You also get logger and results method inside of this class
+# by default.
+class ErrorWorker < BackgrounDRb::MetaWorker
+ set_worker_name :error_worker
+ set_no_auto_load(true)
+
+ def create(args = nil)
+ logger.info "creating error worker"
+ end
+
+ def hello_world(data)
+ logger.info "invoking #{worker_name} hello world #{data} #{Time.now}"
+ end
+end
+
38 examples/workers/foo_worker.rb
@@ -0,0 +1,38 @@
+# Put your code that runs your task inside the do_work method it will be
+# run automatically in a thread. You have access to all of your rails
+# models. You also get logger and results method inside of this class
+# by default.
+
+class TimeClient
+ def receive_data(p_data)
+ worker.get_external_data(p_data)
+ end
+
+ def post_init
+ p "***************** : connection completed"
+ end
+end
+
+class FooWorker < BackgrounDRb::MetaWorker
+ set_worker_name :foo_worker
+ def create(args = nil)
+ #register_status("Running")
+ add_periodic_timer(10) { foobar }
+ external_connection = nil
+ connect("localhost",11009,TimeClient) { |conn| external_connection = conn }
+ end
+
+ def get_external_data(p_data)
+ register_status(p_data)
+ end
+
+ def foobar
+ register_status("Hello #{Time.now}")
+ end
+
+ def barbar(data)
+ logger.info "invoking babrbar on #{Time.now} #{data}"
+ end
+
+end
+
7 examples/workers/god_worker.rb
@@ -0,0 +1,7 @@
+class GodWorker < BackgrounDRb::MetaWorker
+ set_worker_name :god_worker
+ def create(args = nil)
+ logger.info "hello world"
+ end
+end
+
13 examples/workers/model_worker.rb
@@ -0,0 +1,13 @@
+class ModelWorker < BackgrounDRb::MetaWorker
+ set_worker_name :model_worker
+ def create(args = nil)
+ #add_periodic_timer(2) { add_new_user }
+ end
+
+ def add_new_user
+ login,age = "Hemant: #{Time.now}",rand(24)
+ logger.info "creating user #{login} with age #{age}"
+ User.create(:login => login, :age => age)
+ end
+end
+
11 examples/workers/renewal_worker.rb
@@ -0,0 +1,11 @@
+class RenewalWorker < BackgrounDRb::MetaWorker
+ set_worker_name :renewal_worker
+ def create(args = nil)
+
+ end
+ def load_policies(data = nil)
+ logger.info "Loading policies done on #{data}"
+ return "done"
+ end
+end
+
24 examples/workers/rss_worker.rb
@@ -0,0 +1,24 @@
+# this worker would test thread issues that were discussed before
+require "net/http"
+class RssWorker < BackgrounDRb::MetaWorker
+ set_worker_name :rss_worker
+ def create(args = nil)
+ # this method is called, when worker is loaded for the first time
+ end
+
+ # method would fetch supplied urls in a thread
+ def fetch_url(url)
+ puts "fetching url #{url}"
+ thread_pool.defer(url) do |url|
+ begin
+ data = Net::HTTP.get('www.example.com','/')
+ File.open("#{RAILS_ROOT}/log/pages.txt","w") do |fl|
+ fl.puts(data)
+ end
+ rescue
+ logger.info "Error downloading page"
+ end
+ end
+ end
+end
+
31 examples/workers/server_worker.rb
@@ -0,0 +1,31 @@
+class TimeServer
+ # the data send by client would be received here
+ def receive_data(p_data)
+ end
+
+ # would be called when someone connects to the server for the
+ # first time
+ def post_init
+ add_periodic_timer(2) { say_hello_world }
+ end
+
+ # would be called when client connection is complete.
+ def connection_completed
+ end
+
+ def say_hello_world
+ send_data("Hello World\n")
+ end
+end
+
+# this worker is going to act like server.
+class ServerWorker < BackgrounDRb::MetaWorker
+ set_worker_name :server_worker
+ def create(args = nil)
+ # start the server when worker starts
+ start_server("0.0.0.0",11009,TimeServer) do |client_connection|
+ client_connection.say_hello_world
+ end
+ end
+end
+
12 examples/workers/world_worker.rb
@@ -0,0 +1,12 @@
+class WorldWorker < BackgrounDRb::MetaWorker
+ set_worker_name :world_worker
+ def create(args = nil)
+ #logger.info "starting world worker"
+ end
+
+ def hello_world
+ a = lambda { "Hello world" }
+ return a
+ end
+end
+
7 examples/workers/xmpp_worker.rb
@@ -0,0 +1,7 @@
+class XmppWorker < BackgrounDRb::MetaWorker
+ set_worker_name :xmpp_worker
+ def create(args = nil)
+ # this method is called, when worker is loaded for the first time
+ end
+end
+
31 framework/packet.rb
@@ -0,0 +1,31 @@
+require "socket"
+require "yaml"
+require "forwardable"
+require "ostruct"
+require "thread"
+
+require "packet/bin_parser"
+require "packet/packet_guid"
+require "packet/class_helpers"
+require "packet/double_keyed_hash"
+require "packet/event"
+require "packet/periodic_event"
+require "packet/disconnect_error"
+require "packet/callback"
+require "packet/nbio"
+require "packet/pimp"
+require "packet/meta_pimp"
+require "packet/core"
+require "packet/packet_master"
+require "packet/connection"
+require "packet/worker"
+
+
+# This file is just a runner of things and hence does basic initialization of thingies required for running
+# the application.
+
+PACKET_APP = File.expand_path'../' unless defined?(PACKET_APP)
+
+module Packet
+ VERSION='0.1.3'
+end
69 framework/packet/bin_parser.rb
@@ -0,0 +1,69 @@
+class BinParser
+ def initialize
+ @size = 0
+ @data = []
+ # 0 => reading length
+ # 1 => reading actual data
+ @parser_state = 0
+ @length_string = ""
+ @numeric_length = 0
+ end
+
+ def extract new_data, &block
+ extracter_block = block
+ if @parser_state == 0
+ length_to_read = 9 - @length_string.length
+ len_str,remaining = new_data.unpack("a#{length_to_read}a*")
+ if len_str.length < length_to_read
+ @length_string << len_str
+ return
+ else
+ @length_string << len_str
+ @numeric_length = @length_string.to_i
+ @parser_state = 1
+ if remaining.length < @numeric_length
+ @data << remaining
+ @numeric_length = @numeric_length - remaining.length
+ elsif remaining.length == @numeric_length
+ @data << remaining
+ extracter_block.call(@data.join)
+ @data = []
+ @parser_state = 0
+ @length_string = ""
+ @numeric_length = 0
+ else
+ pack_data,remaining = remaining.unpack("a#{@numeric_length}a*")
+ @data << pack_data
+ extracter_block.call(@data.join)
+ @data = []
+ @parser_state = 0
+ @length_string = ""
+ @numeric_length = 0
+ extract(remaining,&extracter_block)
+ end
+ end
+ elsif @parser_state == 1
+ pack_data,remaining = new_data.unpack("a#{@numeric_length}a*")
+ if pack_data.length < @numeric_length
+ @data << pack_data
+ @numeric_length = @numeric_length - pack_data.length
+ elsif pack_data.length == @numeric_length
+ @data << pack_data
+ extracter_block.call(@data.join)
+ @data = []
+ @parser_state = 0
+ @length_string = ""
+ @numeric_length = 0
+ else
+ @data << pack_data
+ extracter_block.call(@data.join)
+ @data = []
+ @parser_state = 0
+ @length_string = ""
+ @numeric_length = 0
+ extract(remaining,&extracter_block)
+ end
+ end
+ end
+end
+
14 framework/packet/callback.rb
@@ -0,0 +1,14 @@
+# class implements a simple callback mechanism for invoking callbacks
+module Packet
+ class Callback
+ attr_accessor :signature,:stored_proc
+ def initialize(&block)
+ @signature = Guid.hexdigest
+ @stored_proc = block
+ end
+
+ def invoke(*args)
+ @stored_proc.call(*args)
+ end
+ end
+end
74 framework/packet/class_helpers.rb
@@ -0,0 +1,74 @@
+module Packet
+ module ClassHelpers
+ def metaclass; class << self; self; end; end
+
+ def iattr_accessor *args
+ metaclass.instance_eval do
+ attr_accessor *args
+ args.each do |attr|
+ define_method("set_#{attr}") do |b_value|
+ self.send("#{attr}=",b_value)
+ end
+ end
+ end
+
+ args.each do |attr|
+ class_eval do
+ define_method(attr) do
+ self.class.send(attr)
+ end
+ define_method("#{attr}=") do |b_value|
+ self.class.send("#{attr}=",b_value)
+ end
+ end
+ end
+ end # end of method iattr_accessor
+
+ def cattr_reader(*syms)
+ syms.flatten.each do |sym|
+ next if sym.is_a?(Hash)
+ class_eval(<<-EOS, __FILE__, __LINE__)
+ unless defined? @@#{sym}
+ @@#{sym} = nil
+ end
+
+ def self.#{sym}
+ @@#{sym}
+ end
+
+ def #{sym}
+ @@#{sym}
+ end
+ EOS
+ end
+ end
+
+ def cattr_writer(*syms)
+ options = syms.last.is_a?(Hash) ? syms.pop : {}
+ syms.flatten.each do |sym|
+ class_eval(<<-EOS, __FILE__, __LINE__)
+ unless defined? @@#{sym}
+ @@#{sym} = nil
+ end
+
+ def self.#{sym}=(obj)
+ @@#{sym} = obj
+ end
+
+ #{"
+ def #{sym}=(obj)
+ @@#{sym} = obj
+ end
+ " unless options[:instance_writer] == false }
+ EOS
+ end
+ end
+
+ def cattr_accessor(*syms)
+ cattr_reader(*syms)
+ cattr_writer(*syms)
+ end
+ module_function :metaclass,:iattr_accessor, :cattr_writer, :cattr_reader, :cattr_accessor
+ end # end of module ClassHelpers
+end
+
33 framework/packet/connection.rb
@@ -0,0 +1,33 @@
+# FIMXE: following class must modify the fd_watchlist thats being monitored by
+# main eventloop.
+
+module Packet
+ module Connection
+ def send_data p_data
+ begin
+ write_data(p_data,connection)
+ rescue DisconnectError => sock_error
+ close_connection
+ end
+ end
+
+ def invoke_init
+ @initialized = true
+ post_init if respond_to?(:post_init)
+ end
+
+ def close_connection
+ unbind if respond_to?(:unbind)
+ reactor.remove_connection(connection)
+ end
+
+ def close_connection_after_writing
+ connection.flush
+ close_connection
+ end
+
+ def send_object p_object
+ dump_object(p_object,connection)
+ end
+ end # end of class Connection
+end # end of module Packet
275 framework/packet/core.rb
@@ -0,0 +1,275 @@
+# FIXME: timer implementation can be optimized
+module Packet
+ module Core
+ def self.included(base_klass)
+ base_klass.extend(ClassMethods)
+ base_klass.instance_eval do
+ # iattr_accessor :connection_callbacks
+ @@connection_callbacks ||= {}
+ cattr_accessor :connection_callbacks
+ attr_accessor :read_ios, :write_ios, :listen_sockets
+ attr_accessor :connection_completion_awaited
+ attr_accessor :connections
+ include CommonMethods
+ end
+ end
+
+ module ClassMethods
+ include Packet::ClassHelpers
+ def after_connection p_method
+ connection_callbacks[:after_connection] ||= []
+ connection_callbacks[:after_connection] << p_method
+ end
+
+ def after_unbind p_method
+ connection_callbacks[:after_unbind] ||= []
+ connection_callbacks[:after_unbind] << p_method
+ end
+
+ def before_unbind p_method
+ connection_callbacks[:before_unbind] ||= []
+ connection_callbacks[:before_unbind] << p_method
+ end
+ end # end of module#ClassMethods
+
+ module CommonMethods
+ include NbioHelper
+ # method
+ def connect(ip,port,t_module,&block)
+ t_socket = Socket.new(Socket::AF_INET,Socket::SOCK_STREAM,0)
+ t_sock_addr = Socket.sockaddr_in(port,ip)
+ t_socket.setsockopt(Socket::IPPROTO_TCP,Socket::TCP_NODELAY,1)
+
+ connection_completion_awaited[t_socket.fileno] =
+ { :sock_addr => t_sock_addr, :module => t_module,:block => block }
+ begin
+ t_socket.connect_nonblock(t_sock_addr)
+ immediate_complete(t_socket,t_sock_addr,t_module,&block)
+ rescue Errno::EINPROGRESS
+ write_ios << t_socket
+ end
+ end
+
+ def reconnect(server,port,handler)
+ raise "invalid handler" unless handler.respond_to?(:connection_completed)
+ return handler if connections.keys.include?(handler.connection.fileno)
+ connect(server,port,handler)
+ end
+
+ def immediate_complete(t_socket,sock_addr,t_module,&block)
+ read_ios << t_socket
+ write_ios.delete(t_socket)
+ decorate_handler(t_socket,true,sock_addr,t_module,&block)
+ connection_completion_awaited.delete(t_socket.fileno)
+ end
+
+ def accept_connection(sock_opts)
+ sock_io = sock_opts[:socket]
+ begin
+ client_socket,client_sockaddr = sock_io.accept_nonblock
+ client_socket.setsockopt(Socket::IPPROTO_TCP,Socket::TCP_NODELAY,1)
+ rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINTR
+ puts "not ready yet"
+ return
+ end
+ read_ios << client_socket
+ decorate_handler(client_socket,true,client_sockaddr,sock_opts[:module],&sock_opts[:block])
+ end
+
+ def complete_connection(t_sock,sock_opts)
+ actually_connected = true
+ begin
+ t_sock.connect_nonblock(sock_opts[:sock_addr])
+ rescue Errno::EISCONN
+ puts "Socket already connected"
+ rescue Errno::ECONNREFUSED
+ actually_connected = false
+ end
+
+ read_ios << t_sock if actually_connected
+ write_ios.delete(t_sock)
+ decorate_handler(t_sock,actually_connected,sock_opts[:sock_addr],\
+ sock_opts[:module],&sock_opts[:block])
+ connection_completion_awaited.delete(t_sock.fileno)
+ end
+
+ # method removes the connection and closes the socket
+ def remove_connection(t_sock)
+ @read_ios.delete(t_sock)
+ @write_ios.delete(t_sock)
+ begin
+ connections.delete(t_sock.fileno)
+ t_sock.close
+ rescue
+ end
+ end
+
+ def socket_really_connected?(t_sock)
+ begin
+ t_data = read_data(t_sock)
+ return true
+ rescue DisconnectError
+ return false
+ end
+ end
+
+ def configure_socket_options
+ case RUBY_PLATFORM
+ when /linux/
+ # 9 is currently TCP_DEFER_ACCEPT
+ @tcp_defer_accept_opts = [Socket::SOL_TCP, 9, 1]
+ @tcp_cork_opts = [Socket::SOL_TCP, 3, 1]
+ when /freebsd(([1-4]\..{1,2})|5\.[0-4])/
+ # Do nothing, just closing a bug when freebsd <= 5.4
+ when /freebsd/
+ # Use the HTTP accept filter if available.
+ # The struct made by pack() is defined in /usr/include/sys/socket.h as accept_filter_arg
+ unless `/sbin/sysctl -nq net.inet.accf.http`.empty?
+ @tcp_defer_accept_opts = [Socket::SOL_SOCKET, Socket::SO_ACCEPTFILTER, ['httpready', nil].pack('a16a240')]
+ end
+ end
+ end
+
+ # method opens a socket for listening
+ def start_server(ip,port,t_module,&block)
+ BasicSocket.do_not_reverse_lookup = true
+ # configure_socket_options
+ t_socket = TCPServer.new(ip,port.to_i)
+ # t_socket.setsockopt(*@tcp_defer_accept_opts) rescue nil
+ listen_sockets[t_socket.fileno] = { :socket => t_socket,:block => block,:module => t_module }
+ @read_ios << t_socket
+ end
+
+ # method starts event loop in the process
+ def start_reactor
+ Signal.trap("TERM") { terminate_me }
+ Signal.trap("INT") { shutdown }
+ loop do
+ check_for_timer_events
+ user_thread_window #=> let user level threads run for a while
+ ready_fds = select(@read_ios,@write_ios,nil,0.005)
+ #next if ready_fds.blank?
+
+ next if !ready_fds or ready_fds.empty?
+
+ ready_fds = ready_fds.flatten.compact
+ ready_fds.each do |t_sock|
+ if t_sock.is_a? UNIXSocket
+ handle_internal_messages(t_sock)
+ else
+ handle_external_messages(t_sock)
+ end
+ end
+ end
+ end
+
+ def user_thread_window
+ run_user_threads if respond_to?(:run_user_threads)
+ end
+
+ def terminate_me
+ # FIXME: close the open sockets
+ exit
+ end
+
+ def shutdown
+ # FIXME: close the open sockets
+ exit
+ end
+
+ def handle_internal_messages(t_sock)
+ raise "Method should be implemented by concerned classes"
+ end
+
+ def handle_external_messages(t_sock)
+ sock_fd = t_sock.fileno
+ if sock_opts = listen_sockets[sock_fd]
+ accept_connection(sock_opts)
+ elsif extern_opts = connection_completion_awaited[sock_fd]
+ complete_connection(t_sock,extern_opts)
+ else
+ read_external_socket(t_sock)
+ end
+ end
+
+ def read_external_socket(t_sock)
+ handler_instance = connections[t_sock.fileno].instance
+ begin
+ t_data = read_data(t_sock)
+ handler_instance.receive_data(t_data) if handler_instance.respond_to?(:receive_data)
+ rescue DisconnectError => sock_error
+ handler_instance.receive_data(sock_error.data) if handler_instance.respond_to?(:receive_data)
+ handler_instance.unbind if handler_instance.respond_to?(:unbind)
+ connections.delete(t_sock.fileno)
+ read_ios.delete(t_sock)
+ end
+ end
+
+ def add_periodic_timer(interval,&block)
+ t_timer = PeriodicEvent.new(interval,&block)
+ @timer_hash[t_timer.timer_signature] = t_timer
+ return t_timer
+ end
+
+ def add_timer(elapsed_time,&block)
+ t_timer = Event.new(elapsed_time,&block)
+ # @timer_hash.store(timer)
+ @timer_hash[t_timer.timer_signature] = t_timer
+ return t_timer
+ end
+
+ def cancel_timer(t_timer)
+ @timer_hash.delete(t_timer.timer_signature)
+ end
+
+ def initialize
+ @read_ios ||= []
+ @write_ios ||= []
+ @connection_completion_awaited ||= {}
+ @connections ||= {}
+ @listen_sockets ||= {}
+
+ # @timer_hash = Packet::TimerStore
+ @timer_hash ||= {}
+ end
+
+ def check_for_timer_events
+ @timer_hash.each do |key,timer|
+ @timer_hash.delete(key) if timer.cancel_flag
+ if timer.run_now?
+ timer.run
+ @timer_hash.delete(key) if !timer.respond_to?(:interval)
+ end
+ end
+ end
+
+ def initialize_handler(p_module)
+ return p_module if(!p_module.is_a?(Class) and !p_module.is_a?(Module))
+ handler =
+ if(p_module and p_module.is_a?(Class))
+ p_module
+ else
+ Class.new(Connection) { p_module and include p_module }
+ end
+ return handler.new
+ end
+
+ def decorate_handler(t_socket,actually_connected,sock_addr,t_module,&block)
+ handler_instance = initialize_handler(t_module)
+ connection_callbacks[:after_connection].each { |t_callback| self.send(t_callback,handler_instance,t_socket)}
+ handler_instance.invoke_init unless handler_instance.initialized
+ unless actually_connected
+ handler_instance.unbind if handler_instance.respond_to?(:unbind)
+ return
+ end
+ t_signature = Guid.hexdigest
+ handler_instance.signature = t_signature
+ connections[t_socket.fileno] =
+ OpenStruct.new(:socket => t_socket, :instance => handler_instance, :signature => t_signature,:sock_addr => sock_addr)
+ block.call(handler_instance) if block
+ handler_instance.connection_completed if handler_instance.respond_to?(:connection_completed)
+ end
+
+ end # end of module#CommonMethods
+ end #end of module#Core
+end #end of module#Packet
19 framework/packet/cpu_worker.rb
@@ -0,0 +1,19 @@
+module Packet
+ class CPUWorker < Packet::Worker
+ @@worker_type = 'cpu'
+ cattr_accessor :worker_type
+ # this is the place where all the worker specific inititlization has to be done
+ def worker_init
+ @worker_started = true
+ end
+
+ def receive_data p_data
+ p p_data
+ end
+
+ def receive_internal_data p_data
+ p p_data
+ end
+
+ end
+end
9 framework/packet/disconnect_error.rb
@@ -0,0 +1,9 @@
+module Packet
+ class DisconnectError < RuntimeError
+ attr_accessor :disconnected_socket,:data
+ def initialize(t_sock,data = nil)
+ @disconnected_socket = t_sock
+ @data = data
+ end
+ end
+end
30 framework/packet/double_keyed_hash.rb
@@ -0,0 +1,30 @@
+class DoubleKeyedHash
+ attr_accessor :internal_hash
+ def initialize
+ @keys1 = {}
+ @internal_hash = {}
+ end
+
+ def []=(key1,key2,value)
+ @keys1[key2] = key1
+ @internal_hash[key1] = value
+ end
+
+ def [] key
+ @internal_hash[key] || @internal_hash[@keys1[key]]
+ end
+
+ def delete(key)
+ worker_key = @keys1[key]
+ @keys1.delete(key)
+ if worker_key
+ @internal_hash.delete(worker_key)
+ else
+ @internal_hash.delete(key)
+ end
+ end
+
+ def each
+ @internal_hash.each { |key,value| yield(key,value)}
+ end
+end
25 framework/packet/event.rb
@@ -0,0 +1,25 @@
+module Packet
+ class Event
+ attr_accessor :timer_signature, :block, :cancel_flag
+ def initialize(elapsed_time,&block)
+ @cancel_flag = false
+ @timer_signature = Guid.hexdigest
+ @block = block
+ @scheduled_time = Time.now + elapsed_time
+ end
+
+ def run_now?
+ return true if @scheduled_time <= Time.now
+ return false
+ end
+
+ def cancel
+ @cancel_flag = true
+ end
+
+ def run
+ @block.call
+ end
+ end
+end
+# WOW
6 framework/packet/io_worker.rb
@@ -0,0 +1,6 @@
+module Packet
+ class IOWorker < Packet::Worker
+ @@worker_type = :io
+ cattr_accessor :worker_type
+ end
+end
78 framework/packet/meta_pimp.rb
@@ -0,0 +1,78 @@
+# Class acts as a pimp for workers, which doesn't have a manually created pimp
+# The idea behind a manually added pimp is to let client handle low level messaging
+# beween workers. A meta pimp, does it for you.
+class Packet::MetaPimp < Packet::Pimp
+ # initializer of pimp
+ attr_accessor :callback_hash
+ attr_accessor :worker_status, :worker_key,:worker_name
+ def pimp_init
+ @callback_hash ||= {}
+ @worker_status = nil
+ @worker_result = nil
+ @worker_key = nil
+ @tokenizer = BinParser.new
+ end
+
+ # will be invoked whenever there is a response from the worker
+ def receive_data p_data
+ @tokenizer.extract(p_data) do |b_data|
+ t_data = Marshal.load(b_data)
+ handle_object(t_data)
+ end
+ end
+
+ def handle_object data_options = {}
+ case data_options[:type]
+ when :request
+ process_request(data_options)
+ when :response
+ process_response(data_options)
+ when :status
+ save_worker_status(data_options)
+ when :result
+ save_worker_result(data_options)
+ end
+ end
+
+ def save_worker_result(data_options = { })
+ @worker_result = data_options[:data]
+ end
+
+ def save_worker_status(data_options = { })
+ # @worker_status = data_options[:data]
+ reactor.update_result(worker_key,data_options[:data])
+ end
+
+ def process_request(data_options = {})
+ if requested_worker = data_options[:requested_worker]
+ reactor.live_workers[requested_worker].send_request(data_options)
+ #workers[requested_worker].send_request(data_options)
+ end
+ end
+
+ def process_response(data_options = {})
+ if callback_signature = data_options[:callback_signature]
+ callback = callback_hash[callback_signature]
+ callback.invoke(data_options)
+ elsif client_signature = data_options[:client_signature]
+ # method writes to the tcp master connection loop
+ begin
+ reactor.connections[client_signature].instance.worker_receive(data_options)
+ rescue
+ end
+ end
+ end
+
+ # can be used to send request to correspoding worker
+ def send_request(data_options = { })
+ if callback = data_options[:callback]
+ callback_hash[callback.signature] = callback
+ data_options.delete(:callback)
+ data_options[:callback_signature] = callback.signature
+ send_data(data_options)
+ else
+ send_data(data_options)
+ end
+ end
+end
+
77 framework/packet/nbio.rb
@@ -0,0 +1,77 @@
+module Packet
+ module NbioHelper
+ def packet_classify(original_string)
+ word_parts = original_string.split('_')
+ return word_parts.map { |x| x.capitalize}.join
+ end
+
+ def gen_worker_key(worker_name,job_key = nil)
+ return worker_name if job_key.nil?
+ return "#{worker_name}_#{job_key}".to_sym
+ end
+
+ def read_data(t_sock)
+ sock_data = ""
+ begin
+ while(t_data = t_sock.recv_nonblock(1023))
+ raise DisconnectError.new(t_sock,sock_data) if t_data.empty?
+ sock_data << t_data
+ end
+ rescue Errno::EAGAIN
+ return sock_data
+ rescue
+ raise DisconnectError.new(t_sock,sock_data)
+ end
+ end
+
+ def write_data(p_data,p_sock)
+ return unless p_data
+ if p_data.is_a? Fixnum
+ t_data = p_data.to_s
+ else
+ t_data = p_data.dup.to_s
+ end
+ t_length = t_data.length
+ begin
+ loop do
+ break if t_length <= 0
+ written_length = p_sock.write_nonblock(t_data)
+ p_sock.flush
+ t_data = t_data[written_length..-1]
+ t_length = t_data.length
+ end
+ rescue Errno::EAGAIN
+ puts "oho"
+ return
+ rescue Errno::EPIPE
+ raise DisconnectError.new(p_sock)
+ rescue
+ raise DisconnectError.new(p_sock)
+ end
+ end
+
+ # method dumps the object in a protocol format which can be easily picked by a recursive descent parser
+ def dump_object(p_data,p_sock)
+ object_dump = Marshal.dump(p_data)
+ dump_length = object_dump.length.to_s
+ length_str = dump_length.rjust(9,'0')
+ final_data = length_str + object_dump
+ write_data(final_data,p_sock)
+
+# final_data_length = final_data.length
+# begin
+# p_sock.write_nonblock(final_data)
+# write_da
+# rescue Errno::EAGAIN
+# puts "EAGAIN Error while writing socket"
+# return
+# rescue Errno::EINTR
+# puts "Interrupt error"
+# return
+# rescue Errno::EPIPE
+# puts "Pipe error"
+# raise DisconnectError.new(p_sock)
+# end
+ end
+ end
+end
16 framework/packet/packet_guid.rb
@@ -0,0 +1,16 @@
+module Packet
+ class Guid
+ def self.hexdigest
+ values = [
+ rand(0x0010000),
+ rand(0x0010000),
+ rand(0x0010000),
+ rand(0x0010000),
+ rand(0x0010000),
+ rand(0x1000000),
+ rand(0x1000000),
+ ]
+ "%04x%04x%04x%04x%04x%06x%06x" % values
+ end
+ end
+end
163 framework/packet/packet_master.rb
@@ -0,0 +1,163 @@
+module Packet
+ class Reactor
+ include Core
+ attr_accessor :fd_writers, :msg_writers,:msg_reader
+ attr_accessor :result_hash
+
+ attr_accessor :live_workers
+ after_connection :provide_workers
+
+ def self.server_logger= (log_file_name)
+ @@server_logger = log_file_name
+ end
+
+ def self.run
+ master_reactor_instance = new
+ master_reactor_instance.result_hash = {}
+ master_reactor_instance.live_workers = DoubleKeyedHash.new
+ yield(master_reactor_instance)
+ master_reactor_instance.load_workers
+ master_reactor_instance.start_reactor
+ end # end of run method
+
+ def set_result_hash(hash)
+ @result_hash = hash
+ end
+
+ def update_result(worker_key,result)
+ @result_hash ||= {}
+ @result_hash[worker_key.to_sym] = result
+ end
+
+ def provide_workers(handler_instance,t_sock)
+ class << handler_instance
+ extend Forwardable
+ attr_accessor :workers,:connection,:reactor, :initialized,:signature
+ include NbioHelper
+ include Connection
+ def ask_worker(*args)
+ worker_name = args.shift
+ data_options = *args
+ worker_name_key = gen_worker_key(worker_name,data_options[:job_key])
+ data_options[:client_signature] = connection.fileno
+ reactor.live_workers[worker_name_key].send_request(data_options)
+ end
+
+ def_delegators(:@reactor, :start_server, :connect, :add_periodic_timer, \
+ :add_timer, :cancel_timer,:reconnect, :start_worker,:delete_worker)
+
+ end
+ handler_instance.workers = @live_workers
+ handler_instance.connection = t_sock
+ handler_instance.reactor = self
+ end
+
+ # FIXME: right now, each worker is tied to its connection and this can be problematic
+ # what if a worker wants to return results in a async manner
+ def handle_internal_messages(t_sock)
+ sock_fd = t_sock.fileno
+ worker_instance = @live_workers[sock_fd]
+ begin
+ raw_data = read_data(t_sock)
+ # t_data = Marshal.load(raw_data)
+ worker_instance.receive_data(raw_data) if worker_instance.respond_to?(:receive_data)
+ rescue DisconnectError => sock_error
+ remove_worker(t_sock)
+ end
+ end
+
+ def remove_worker(t_sock)
+ @live_workers.delete(t_sock.fileno)
+ read_ios.delete(t_sock)
+ end
+
+ def delete_worker(worker_options = {})
+ worker_name = worker_options[:worker]
+ worker_name_key = gen_worker_key(worker_name,worker_options[:job_key])
+ worker_options[:method] = :exit
+ @live_workers[worker_name_key].send_request(worker_options)
+ end
+
+ # method loads workers in new processes
+ # FIXME: this method can be fixed, so as worker code can be actually, required
+ # only in forked process and hence saving upon the memory involved
+ # where worker is actually required in master as well as in worker.
+ def load_workers
+ if defined?(WORKER_ROOT)
+ worker_root = WORKER_ROOT
+ else
+ worker_root = "#{PACKET_APP}/worker"
+ end
+ t_workers = Dir["#{worker_root}/**/*.rb"]
+ return if t_workers.empty?
+ t_workers.each do |b_worker|
+ worker_name = File.basename(b_worker,".rb")
+ require worker_name
+ worker_klass = Object.const_get(packet_classify(worker_name))
+ next if worker_klass.no_auto_load
+ fork_and_load(worker_klass)
+ end
+
+ # FIXME: easiest and yet perhaps a bit ugly, its just to make sure that from each
+ # worker proxy one can access other workers
+ @live_workers.each do |key,worker_instance|
+ worker_instance.workers = @live_workers
+ end
+ end
+
+ def start_worker(worker_options = { })
+ worker_name = worker_options[:worker].to_s
+ worker_name_key = gen_worker_key(worker_name,worker_options[:job_key])
+ return if @live_workers[worker_name_key]
+ worker_options.delete(:worker)
+ begin
+ require worker_name
+ worker_klass = Object.const_get(packet_classify(worker_name))
+ fork_and_load(worker_klass,worker_options)
+ rescue MissingSourceFile
+ puts "no such worker #{worker_name}"
+ return
+ end
+ end
+
+ # method forks given worker file in a new process
+ # method should use job_key if provided in options hash.
+ def fork_and_load(worker_klass,worker_options = { })
+ t_worker_name = worker_klass.worker_name
+ worker_pimp = worker_klass.worker_proxy.to_s
+
+ # socket from which master process is going to read
+ master_read_end,worker_write_end = UNIXSocket.pair(Socket::SOCK_STREAM)
+ # socket to which master process is going to write
+ worker_read_end,master_write_end = UNIXSocket.pair(Socket::SOCK_STREAM)
+ #worker_read_fd,master_write_fd = UNIXSocket.pair
+
+ if((pid = fork()).nil?)
+ $0 = "ruby #{worker_klass.worker_name}"
+ [master_write_end,master_read_end].each { |x| x.close }
+
+ worker_klass.start_worker(:write_end => worker_write_end,:read_end => worker_read_end,\
+ :options => worker_options)
+ end
+ Process.detach(pid)
+
+ worker_name_key = gen_worker_key(t_worker_name,worker_options[:job_key])
+
+ if worker_pimp && !worker_pimp.empty?
+ require worker_pimp
+ pimp_klass = Object.const_get(packet_classify(worker_pimp))
+ @live_workers[worker_name_key,master_read_end.fileno] = pimp_klass.new(master_write_end,pid,self)
+ else
+ t_pimp = Packet::MetaPimp.new(master_write_end,pid,self)
+ t_pimp.worker_key = worker_name_key
+ t_pimp.worker_name = t_worker_name
+ @live_workers[worker_name_key,master_read_end.fileno] = t_pimp
+ end
+
+ worker_read_end.close
+ worker_write_end.close
+ #worker_read_fd.close
+ read_ios << master_read_end
+ end # end of fork_and_load method
+ end # end of Reactor class
+end # end of Packet module
27 framework/packet/periodic_event.rb
@@ -0,0 +1,27 @@
+module Packet
+ class PeriodicEvent
+ attr_accessor :block, :timer_signature, :interval, :cancel_flag
+ def initialize(interval, &block)
+ @cancel_flag = false
+ @timer_signature = Guid.hexdigest
+ @block = block
+ @scheduled_time = Time.now + interval
+ @interval = interval
+ end
+
+ def run_now?
+ return true if @scheduled_time <= Time.now
+ return false
+ end
+
+ def cancel
+ @cancel_flag = true
+ end
+
+ def run
+ @scheduled_time += @interval
+ @block.call
+ end
+
+ end
+end
32 framework/packet/pimp.rb
@@ -0,0 +1,32 @@
+module Packet
+ class Pimp
+ include NbioHelper
+ extend ClassHelpers
+ extend Forwardable
+ iattr_accessor :pimp_name
+ attr_accessor :lifeline, :pid, :signature
+ attr_accessor :fd_write_end
+ attr_accessor :workers, :reactor
+
+ def initialize(lifeline_socket,worker_pid,p_reactor)
+ @lifeline = lifeline_socket
+ @pid = worker_pid
+ @reactor = p_reactor
+ @signature = Guid.hexdigest
+ pimp_init if self.respond_to?(:pimp_init)
+ end
+
+ # encode the data, before writing to the socket
+ def send_data p_data
+ dump_object(p_data,@lifeline)
+ end
+
+ def send_fd sock_fd
+ @fd_write_end.send_io(sock_fd)
+ end
+
+ alias_method :do_work, :send_data
+ def_delegators :@reactor, :connections
+ end
+end
+
94 framework/packet/worker.rb
@@ -0,0 +1,94 @@
+# class implements general worker
+module Packet
+ class Worker
+ include Core
+ iattr_accessor :fd_reader,:msg_writer,:msg_reader,:worker_name
+ iattr_accessor :worker_proxy
+ iattr_accessor :no_auto_load
+
+ attr_accessor :worker_started, :worker_options
+ after_connection :provide_workers
+
+ # method initializes the eventloop for the worker
+ def self.start_worker(messengers = {})
+ # @fd_reader = args.shift if args.length > 2
+ @msg_writer = messengers[:write_end]
+ @msg_reader = messengers[:read_end]
+ # @fd_reader = messengers[:read_fd]
+ t_instance = new
+ t_instance.worker_options = messengers[:options]
+ t_instance.worker_init if t_instance.respond_to?(:worker_init)
+ t_instance.start_reactor
+ end
+
+ def initialize
+ super
+ @read_ios << msg_reader
+ # @read_ios << fd_reader
+ @tokenizer = BinParser.new
+ end
+
+ def send_data p_data
+ dump_object(p_data,msg_writer)
+ end
+
+ def send_request(options = {})
+ t_data = options[:data]
+ if t_callback = options[:callback]
+ callback_hash[t_callback.signature] = t_callback
+ send_data(:data => t_data,:function => options[:function],:callback_signature => t_callback.signature)
+ else
+ send_data(:data => t_data,:function => options[:function],:requested_worker => options[:worker],:requesting_worker => worker_name,:type => :request)
+ end
+ end
+
+ # method handles internal requests from internal sockets
+ def handle_internal_messages(t_sock)
+ t_data = read_data(t_sock)
+ receive_internal_data(t_data)
+ end
+
+ def receive_internal_data data
+ @tokenizer.extract(data) do |b_data|
+ data_obj = Marshal.load(b_data)
+ receive_data(data_obj)
+ end
+ end
+
+ # FIXME: this method is being duplicated between packet and worker classes, may be its a
+ # good idea to merge them.
+ def provide_workers(handler_instance,connection)
+ class << handler_instance
+ extend Forwardable
+ attr_accessor :worker, :connection, :reactor, :initialized, :signature
+ include NbioHelper
+ include Connection
+ def_delegators :@reactor, :start_server, :connect, :add_periodic_timer, :add_timer, :cancel_timer,:reconnect
+ end
+ handler_instance.connection = connection
+ handler_instance.worker = self
+ handler_instance.reactor = self
+ end
+
+ def log log_data
+ send_data(:requested_worker => :log_worker,:data => log_data,:type => :request)
+ end
+
+ # method receives data from external TCP Sockets
+ def receive_data p_data
+ raise "Not implemented for worker"
+ end
+
+ # method checks if client has asked to execute a internal function
+ def invoke_internal_function
+ raise "Not implemented for worker"
+ end
+
+ # message returns data to parent process, using UNIX Sockets
+ def invoke_callback
+ raise "Not implemented for worker"
+ end
+
+ end # end of class#Worker
+end
+
16 generators/worker/USAGE
@@ -0,0 +1,16 @@
+Description:
+ The worker generator creates stubs for a new BackgrounDRb worker.
+
+ The generator takes a worker name as its argument. The worker name may be
+ given in CamelCase or under_score and should not be suffixed with 'Worker'.
+
+ The generator creates a worker class in lib/workers and a test suite in
+ test/unit.
+
+Example:
+ ./script/generate worker Tail
+
+ This will create an Tail worker:
+ Model: lib/workers/tail_worker.rb
+ Test: test/unit/tail_worker_test.rb
+
12 generators/worker/templates/unit_test.rb
@@ -0,0 +1,12 @@
+require File.dirname(__FILE__) + '<%= '/..' * class_nesting_depth %>/../test_helper'
+require "#{RAILS_ROOT}/lib/workers/<%= file_name %>_worker"
+require "#{RAILS_ROOT}/vendor/plugins/backgroundrb/lib/backgroundrb.rb"
+require 'drb'
+
+class <%= class_name %>WorkerTest < Test::Unit::TestCase
+
+ # Replace this with your real tests.
+ def test_truth
+ assert <%= class_name %>Worker.included_modules.include?(DRbUndumped)
+ end
+end
7 generators/worker/templates/worker.rb
@@ -0,0 +1,7 @@
+class <%= class_name %>Worker < BackgrounDRb::MetaWorker
+ set_worker_name :<%= file_name %>_worker
+ def create(args = nil)
+ # this method is called, when worker is loaded for the first time
+ end
+end
+
16 generators/worker/worker_generator.rb
@@ -0,0 +1,16 @@
+class WorkerGenerator < Rails::Generator::NamedBase
+ def manifest
+ record do |m|
+ # Check for class naming collisions.
+ m.class_collisions class_path, class_name, "#{class_name}WorkerTest"
+
+ # Worker and test directories.
+ m.directory File.join('lib/workers', class_path)
+ #m.directory File.join('test/unit', class_path)
+
+ # Worker class and unit tests.
+ m.template 'worker.rb', File.join('lib/workers', class_path, "#{file_name}_worker.rb")
+ #m.template 'unit_test.rb', File.join('test/unit', class_path, "#{file_name}_worker_test.rb")
+ end
+ end
+end
2  init.rb
@@ -0,0 +1,2 @@
+# Include hook code here
+require 'backgroundrb'
1  install.rb
@@ -0,0 +1 @@
+# Install hook code here
131 lib/backgroundrb.rb
@@ -0,0 +1,131 @@
+# Backgroundrb
+# FIXME: check if data that we are writing to the socket should end with newline
+require "pathname"
+BACKGROUNDRB_ROOT = Pathname.new(RAILS_ROOT).realpath.to_s
+require File.dirname(__FILE__) + "/../framework/packet/bin_parser"
+require File.dirname(__FILE__) + "/../framework/packet/nbio"
+require "bdrb_conn_error"
+
+module BackgrounDRb
+end
+class BackgrounDRb::WorkerProxy
+ include Packet::NbioHelper
+ def self.init
+ # @@config = YAML.load(File.open("#{BACKGROUNDRB_ROOT}/config/backgroundrb.yml"))
+ @@config = YAML.load(ERB.new(IO.read("#{BACKGROUNDRB_ROOT}/config/backgroundrb.yml")).result)
+ @@server_ip = @@config[:backgroundrb][:ip]
+ @@server_port = @@config[:backgroundrb][:port]
+ new
+ end
+
+ def establish_connection
+ @tokenizer = BinParser.new
+ begin
+ timeout(3) do
+ @connection = TCPSocket.open(@@server_ip, @@server_port)
+ @connection.setsockopt(Socket::IPPROTO_TCP,Socket::TCP_NODELAY,1)
+ end
+ @connection_status = true
+ rescue Timeout::Error
+ @connection_status = false
+ rescue Exception => e
+ @connection_status = false
+ end
+ end
+
+ def ask_work p_data
+ p_data[:type] = :do_work
+ establish_connection()
+ raise BackgrounDRb::BdrbConnError.new("Not able to connect") unless @connection_status
+ dump_object(p_data,@connection)
+ # @connection.close
+ end
+
+ def new_worker p_data
+ p_data[:type] = :start_worker
+ establish_connection
+ raise BackgrounDRb::BdrbConnError.new("Not able to connect") unless @connection_status
+ dump_object(p_data,@connection)
+ return p_data[:job_key]
+ # @connection.close
+ end
+
+ def worker_info(p_data)
+ p_data[:type] = :worker_info
+ establish_connection
+ raise BackgrounDRb::BdrbConnError.new("Not able to connect") unless @connection_status
+ dump_object(p_data,@connection)
+ return read_from_bdrb()
+ end
+
+
+ def all_worker_info
+ p_data = { }
+ p_data[:type] = :all_worker_info
+ establish_connection
+ raise BackgrounDRb::BdrbConnError.new("Not able to connect") unless @connection_status
+ dump_object(p_data,@connection)
+ return read_from_bdrb
+ end
+
+ def delete_worker p_data
+ p_data[:type] = :delete_worker
+ establish_connection
+ raise BackgrounDRb::BdrbConnError.new("Not able to connect") unless @connection_status
+ dump_object(p_data,@connection)
+ # @connection.close
+ end
+
+ def read_object
+ sock_data = ""
+ begin
+ while(sock_data << @connection.read_nonblock(1023)); end
+ rescue Errno::EAGAIN
+ @tokenizer.extract(sock_data) { |b_data| return b_data }
+ rescue
+ raise BackgrounDRb::BdrbConnError.new("Not able to connect")
+ end
+ end
+
+ def query_all_workers
+ p_data = { }
+ p_data[:type] = :all_worker_status
+ establish_connection
+ raise BackgrounDRb::BdrbConnError.new("Not able to connect") unless @connection_status
+ dump_object(p_data,@connection)
+ return read_from_bdrb
+ end
+
+ def ask_status(p_data)
+ p_data[:type] = :get_status
+ establish_connection()
+
+ raise BackgrounDRb::BdrbConnError.new("Not able to connect") unless @connection_status
+ dump_object(p_data,@connection)
+ return read_from_bdrb
+ end
+
+ def read_from_bdrb(timeout = 3)
+ begin
+ ret_val = select([@connection],nil,nil,timeout)
+ return nil unless ret_val
+ raw_response = read_object()
+ master_response = Marshal.load(raw_response)
+ return master_response
+ rescue
+ return nil
+ end
+ end
+
+ def send_request(p_data)
+ p_data[:type] = :get_result
+ establish_connection()
+
+ raise BackgrounDRb::BdrbConnError.new("Not able to connect") unless @connection_status
+ dump_object(p_data,@connection)
+ return read_from_bdrb(nil)
+ end
+end
+
+MiddleMan = BackgrounDRb::WorkerProxy.init
+
8 lib/bdrb_conn_error.rb
@@ -0,0 +1,8 @@
+module BackgrounDRb
+ class BdrbConnError < RuntimeError
+ attr_accessor :message
+ def initialize(message)
+ @message = message
+ end
+ end
+end
61 script/backgroundrb
@@ -0,0 +1,61 @@
+#!/usr/bin/env ruby
+
+rails_root = File.expand_path(File.join(File.dirname(__FILE__)+"/.."))
+RAILS_HOME = rails_root
+PACKET_APP = rails_root + "/vendor/plugins/backgroundrb"
+WORKER_ROOT = rails_root + "/lib/workers"
+SERVER_LOGGER = rails_root + "/log/backgroundrb_server.log"
+
+["server","framework","lib"].each { |x| $LOAD_PATH.unshift(PACKET_APP + "/#{x}")}
+$LOAD_PATH.unshift(WORKER_ROOT)
+
+require RAILS_HOME + '/config/boot.rb'
+require "active_record"
+require "active_support"
+require "rubygems"
+require "packet"
+require "meta_worker"
+require "cron_trigger"
+require "trigger"
+require "log_worker"
+require "yaml"
+require "erb"
+require "logger"
+require "master_worker"
+
+case ARGV[0]
+when 'start'
+ if fork
+ exit
+ else
+ path = "#{RAILS_HOME}/log/backgroundrb.pid"
+ config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/backgroundrb.yml")).result)
+ op = File.open(path, "w")
+ op.write(Process.pid().to_s)
+ op.close
+ if config_file[:backgroundrb][:log].nil? or config_file[:backgroundrb][:log] != 'foreground'
+ log_file = File.open(SERVER_LOGGER,"w+")
+ [STDIN, STDOUT, STDERR].each {|desc| desc.reopen(log_file)}
+ end
+
+ BackgrounDRb::MasterProxy.new()
+ end
+when 'stop'
+ path = "#{RAILS_HOME}/log/backgroundrb.pid"
+ pid = nil
+ File.open(path, "r") { |pid_handle| pid = pid_handle.gets.strip.chomp.to_i }
+ begin
+ pgid = Process.getpgid(pid)
+ Process.kill('TERM', pid)
+ Process.kill('-TERM', pgid)
+ Process.kill('KILL', pid)
+ rescue Errno::ESRCH => e
+ puts "Deleting pid file"
+ rescue
+ puts $!
+ ensure
+ File.delete(path) if File.exists?(path)
+ end
+else
+ BackgrounDRb::MasterProxy.new()
+end
61 script/bdrb_test_helper.rb
@@ -0,0 +1,61 @@
+require File.dirname(__FILE__) + '/test_helper'
+WORKER_ROOT = RAILS_ROOT + "/lib/workers"
+$LOAD_PATH.unshift(WORKER_ROOT)
+require "mocha"
+
+class Object
+ def self.metaclass; class << self; self; end; end
+
+ def self.iattr_accessor *args
+ metaclass.instance_eval do
+ attr_accessor *args
+ args.each do |attr|
+ define_method("set_#{attr}") do |b_value|
+ self.send("#{attr}=",b_value)
+ end
+ end
+ end
+
+ args.each do |attr|
+ class_eval do
+ define_method(attr) do
+ self.class.send(attr)
+ end
+ define_method("#{attr}=") do |b_value|
+ self.class.send("#{attr}=",b_value)
+ end
+ end
+ end
+ end
+end
+
+module BackgrounDRb
+ class WorkerDummyLogger
+ def info(data)
+ end
+ def debug(data)
+ end
+ end
+ class MetaWorker
+ attr_accessor :logger
+ attr_accessor :thread_pool
+ iattr_accessor :worker_name
+ iattr_accessor :no_auto_load
+
+ def initialize
+ @logger = WorkerDummyLogger.new
+ @thread_pool = ThreadPool.new
+ end
+
+ def register_status(arg)
+ @status = arg
+ end
+ end
+
+ class ThreadPool
+ def defer(args,&block)
+ yield args
+ end
+ end
+end
+
235 server/cron_trigger.rb
@@ -0,0 +1,235 @@
+module BackgrounDRb
+ class CronTrigger
+
+ attr_reader :sec, :min, :hour, :day, :month, :wday, :year, :cron_expr
+
+ def initialize(expr)
+ self.cron_expr = expr
+ end
+
+ def cron_expr=(expr)
+ @cron_expr = expr
+ self.sec, self.min, self.hour, self.day, self.month, self.wday, self.year = @cron_expr.split(' ')
+ # puts inspect
+ end
+
+ def fire_time_after(time)
+ sec, min, hour, day, month, year, wday, yday, isdst, zone = time.to_a
+
+ loop do
+ # year
+ unless @year.nil? or @year.include?(year)
+ return nil if year > @year.max
+ year = @year.detect do |y| y > year end # next allowable year
+ end
+
+ # month
+ unless @month.include?(month)
+ # next allowable month
+ next_month = @month.detect(lambda { @month.min }) do |m| m > month end
+ # reset everything lower
+ day, hour, min, sec = @day.min, @hour.min, @min.min, @sec.min
+ # carry case
+ if next_month < month
+ month = next_month
+ year += 1
+ retry
+ end
+ month = next_month
+ end
+
+ # according to crontab(5):
+ # Note: The day of a command’s execution can be specified by two fields — day of month, and day of week.
+ # If both fields are restricted (i.e., aren’t *), the command will be run when either
+ # field matches the current time. For example, ‘‘30 4 1,15 * 5’’ would cause a command to be
+ # run at 4:30 am on the 1st and 15th of each month, plus every Friday.
+ if !day_restricted? and wday_restricted?
+ # unrestricted day, restricted wday. go by wday
+ unless @wday.include?(wday)
+ next_wday = @wday.detect(lambda { @wday.min }) do |w| w > wday end
+ hour, min, sec = @hour.min, @min.min, @sec.min
+ if next_wday < wday
+ # next week.
+ day += + 7 - (wday - next_wday)
+ if day > month_days(year, month)
+ day -= month_days(year, month)
+ month += 1
+ end
+ wday = next_wday
+ retry
+ end
+
+ day += (next_wday - wday)
+ wday = next_wday
+ end
+ elsif !wday_restricted? and day_restricted?
+ # unrestricted wday, restricted day. go by day
+ month_days = (1 .. month_days(year, month))
+ days = @day.select do |d| month_days === d end
+ unless days.include?(day)
+ next_day = days.detect(lambda { days.min }) do |d| d > day end
+ hour, min, sec = @hour.min, @min.min, @sec.min
+ if next_day.nil? or next_day < day
+ day = next_day.nil? ? @day.min : next_day
+ month += 1
+ retry
+ end
+ day = next_day
+ end
+ else
+ # both @day and @wday are restricted, or unrestricted
+ month_days = (1 .. month_days(year, month))