Skip to content

Commit

Permalink
Distinct seperation of Mapper and Agent in terms of code and usage
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian Leitch committed Feb 14, 2009
1 parent a68e0fa commit d94e8ce
Show file tree
Hide file tree
Showing 30 changed files with 711 additions and 989 deletions.
17 changes: 9 additions & 8 deletions README.rdoc
Expand Up @@ -49,7 +49,7 @@ is used for the default request dispatching based on least loaded server.
You can change what is advertised as status to anything you want that is
comparable(<=>) by doing something like this in your agent's init.rb file:

Nanite.status_proc = lambda { MyApp.some_statistic_indicating_load }
status_proc = lambda { MyApp.some_statistic_indicating_load }

This proc will be recalled every @ping_time and sent to the mappers.

Expand Down Expand Up @@ -218,12 +218,12 @@ Now lets run a few agents. Each of these is a long running process and needs to
first shell

cd examples/simpleagent
nanite --token fred
nanite-agent --token fred

second shell

cd examples/simpleagent
nanite --token bob
nanite-agent --token bob

Now run a mapper. Mappers can be run from within your Merb or Rails app, from an interactive irb shell, or from the command line. For this example we'll run it from the command line so open a third shell window and run the following:

Expand All @@ -241,23 +241,24 @@ If you want to try requesting work to be done by an agent from an interactive co

cd nanite
./bin/nanite-mapper -i -u mapper -p testing -v /nanite
>> Nanite.request('/simple/echo') {|res| p res }
Starting mapper console
>> request('/simple/echo') {|res| p res }

By default this will dispatch to the agent with the lowest reported load average.

There are a few other selectors as well:

# run this request on *all* agents that expose the /foo/bar Foo#bar actor
>> Nanite.request('/foo/bar', 'hi', :selector => :all) {|res| p res }
>> request('/foo/bar', 'hi', :selector => :all) {|res| p res }

# run this request on one random agent that expose the /whatever/hello Whatever#hello actor
>> Nanite.request('/whatever/hello', 42, :selector => :random) {|res| p res }
>> request('/whatever/hello', 42, :selector => :random) {|res| p res }

You can create your own selectors based on arbitrary stuff you put in status from your agents see mapper.rb for examples of how least_loaded, all and random are implemented.
You can create your own selectors based on arbitrary stuff you put in status from your agents see cluster.rb for examples of how least_loaded, all and random are implemented.

You can run as many mappers as you want, they will all be hot masters.

The calls are asynchronous. This means the block you pass to Nanite.request is not run until the response from the agent(s) have returned. So keep that in mind. Should you need to poll from an ajax web app for results you should have your block stuff the results in the database for any web front end to pick up with the next poll.
The calls are asynchronous. This means the block you pass to Nanite::Agent#request is not run until the response from the agent(s) have returned. So keep that in mind. Should you need to poll from an ajax web app for results you should have your block stuff the results in the database for any web front end to pick up with the next poll.

Have fun!

Expand Down
2 changes: 1 addition & 1 deletion Rakefile
Expand Up @@ -31,7 +31,7 @@ spec = Gem::Specification.new do |s|
s.homepage = HOMEPAGE

s.bindir = "bin"
s.executables = %w( nanite nanite-mapper nanite-admin )
s.executables = %w( nanite-agent nanite-mapper nanite-admin )

s.add_dependency "extlib"
s.add_dependency('amqp', '>= 0.6.0')
Expand Down
15 changes: 15 additions & 0 deletions TODO
Expand Up @@ -4,3 +4,18 @@ TODO:
documented as part of a working example.
- examples/async_rack_front/async_rack_front.ru needs to be documented and verified working.

Ian:
- Sync Mapper/Agent#start with nanite-mapper/agent
- Update docs for Agent#start and Mapper#start
- Update docs in nanite-agent and nanite-mapper
- What's the :log option?
- Ensure file transfer works
- Ensure admin works
- Check secure stuff still works
- Check custom status_proc works
- Update README
- Check documentation, only document public methods

Points:

- Is exchanges.rb old code?
5 changes: 2 additions & 3 deletions bin/nanite-admin
Expand Up @@ -35,9 +35,8 @@ require 'thin'
# When you need to update this Thin install you should be able to do a 'git pull' on the
# "async_for_rack" branch.


EM.run do
agent = Nanite.start(:log_level => 'info', :mapper => true, :host => 'localhost', :user => 'mapper', :pass => 'testing', :vhost => '/nanite')
mapper = Nanite::Mapper.start(:log_level => 'info', :host => 'localhost', :user => 'mapper', :pass => 'testing', :vhost => '/nanite')
puts "starting nanite-admin"
Rack::Handler::Thin.run Nanite::Admin.new(agent), :Port => 4000
Rack::Handler::Thin.run Nanite::Admin.new(mapper), :Port => 4000
end
7 changes: 3 additions & 4 deletions bin/nanite → bin/nanite-agent
Expand Up @@ -79,11 +79,10 @@ opts = OptionParser.new do |opts|
puts "Nanite Version #{opts.version}"
exit
end

end

opts.parse!

EM.run {
Nanite.start options
}
EM.run do
Nanite::Agent.start(options)
end
15 changes: 11 additions & 4 deletions bin/nanite-mapper
Expand Up @@ -47,6 +47,10 @@ opts = OptionParser.new do |opts|
options[:pass] = pass
end

opts.on("-a", "--agent-timeout", "How long to wait before an agent is considered to be offline and thus removed from the list of available agents.") do |timeout|
options[:agent_timeout] = timeout
end

opts.on("--ping-time PINGTIME", "Specify the ping time (how often the nanites contact the mappers)") do |ping|
options[:ping_time] = ping
end
Expand All @@ -63,15 +67,18 @@ opts = OptionParser.new do |opts|
options[:secure] = true
end

opts.on("-r", "--offline-redelivery-frequency", "The frequency in seconds that messages stored in the offline queue will be retrieved for attempted redelivery to the nanites. Default is 10 seconds.") do |frequency|
options[:offline_redelivery_frequency] = frequency
end

opts.on("--version", "Show the nanite version number") do |res|
puts "Nanite Version #{opts.version}"
exit
end

end

opts.parse!

EM.run {
Nanite.start(options)
}
EM.run do
Nanite::Mapper.start(options)
end
19 changes: 7 additions & 12 deletions examples/async_rack_front/async_rack_front.ru
Expand Up @@ -10,34 +10,29 @@ require 'nanite/mapper'
# git pull origin async_for_rack
# rake install
# thin -R async_rack_front.ru -p 4000 start

class NaniteApp

AsyncResponse = [-1, {}, []].freeze

def call(env)
AMQP.start :host => 'localhost', :user => 'mapper', :pass => 'testing',
:vhost => '/nanite'
Nanite.identity = "mapper"
Nanite.mapper = Nanite::Mapper.new(15)
mapper = Nanite::Mapper.start
def call(env)
env.delete('rack.errors')
input = env.delete('rack.input')
async_callback = env.delete('async.callback')
Nanite.request('/rack/call', env, :selector => :random, :timeout => 15) do |response|

mapper.request('/rack/call', env, :selector => :random, :timeout => 15) do |response|
if response
async_callback.call response.values.first
else
async_callback.call [500, {'Content-Type' => 'text/html'}, "Request Timeout"]
end
end
end
AsyncResponse
end
[200, {'Content-Type' => 'text/html'}, "warmed up nanite mapper"]
end

end


run NaniteApp.new
11 changes: 4 additions & 7 deletions examples/cli.rb
Expand Up @@ -18,21 +18,18 @@
# those agents and their methods. When this process is presumed complete after
# 16 seconds we can finally send the nanite agent the task to execute.


EM.run {
EM.run do
# start up a new mapper with a ping time of 15 seconds
Nanite.start :mapper => true, :host => 'localhost',
:user => 'mapper', :pass => 'testing',
:vhost => '/nanite', :log_level => 'info'
mapper = Nanite::Mapper.start(:host => 'localhost', :user => 'mapper', :pass => 'testing', :vhost => '/nanite', :log_level => 'info')

# have this run after 16 seconds so we can be pretty sure that the mapper
# has already received pings from running nanites and registered them.
EM.add_timer(16) do
# call our /simple/echo nanite, and pass it a string to echo back
Nanite.request("/simple/echo", "hello world!") do |res|
mapper.request("/simple/echo", "hello world!") do |res|
p res
EM.stop_event_loop
end
end
}
end

2 changes: 1 addition & 1 deletion examples/crew.rb
Expand Up @@ -9,7 +9,7 @@ def process_exists?(str)

def run_agent(name, num, root)
if !process_exists?(name)
system("#{File.dirname(__FILE__)}/nanite -u #{name} -p testing -t #{name} -n #{root} -j &")
system("#{File.dirname(__FILE__)}/nanite-agent -u #{name} -p testing -t #{name} -n #{root} -j &")
end
end

Expand Down
141 changes: 9 additions & 132 deletions lib/nanite.rb
Expand Up @@ -3,148 +3,25 @@
require 'mq'
require 'json'
require 'logger'
require 'yaml'

$:.unshift File.dirname(__FILE__)
require 'nanite/amqp'
require 'extlib'
require 'nanite/packets'
require 'nanite/reducer'
require 'nanite/identity'
require 'nanite/console'
require 'nanite/daemonize'
require 'nanite/mapper'
require 'nanite/dispatcher'
require 'nanite/actor'
require 'nanite/streaming'
require 'nanite/agent'
require 'nanite/exchanges'
require 'nanite/marshal'
require 'nanite/console'
require 'nanite/agent'
require 'nanite/cluster'
require 'nanite/reaper'
require 'nanite/serializer'
require 'nanite/log'

module Nanite

VERSION = '0.2.0' unless defined?(Nanite::VERSION)

class AgentNotRunning < StandardError; end

class << self

attr_reader :agent

# Registers actor instance with given prefix
def register(actor_instance, prefix = nil)
@agent.register(actor_instance, prefix)
end

# Initializes a new agent and establishes
# AMQP connection. To run agent as Mapper, pass :mapper => true.
# This must be used inside EM.run block or if EventMachine reactor
# is already started, for instance, by a Thin server that your Merb/Rails
# application runs on.
#
# Agent options:
#
# identity : identity of this agent, may be any string
#
# status_proc : a callable object that returns agent load as a string,
# defaults to load averages string extracted from `uptime`
# format : format to use for packets serialization. One of the two:
# :marshall or :json. Defaults to
# Ruby's Marshall format. For interoperability with
# AMQP clients implemented in other languages, use JSON.
#
# Note that Nanite uses JSON gem,
# and ActiveSupport's JSON encoder may cause clashes
# if ActiveSupport is loaded after JSON gem.
#
# root : application root for this agent, defaults to Dir.pwd
#
# log_dir : path to directory where agent stores it's log file
# if not given, app_root is used.
#
# file_root : path to directory to files this agent provides
# defaults to app_root/files
#
# ping_time : time interval in seconds between two subsequent heartbeat messages
# this agent broadcasts. Default value is 15.
#
# log_file : log file path, defaults to log_dir/nanite.[identity].log
#
# threaded_actors : when true, each message agent handles is handled in a separate thread
#
# console : true tells Nanite to start interactive console
#
# daemonize : true tells Nanite to daemonize
#
# services : list of services provided by this agent, by default
# all methods exposed by actors are listed
#
# Mapper options:
#
# offline_redelivery_frequency : The frequency in seconds that messages stored in the offline queue will be retrieved
# for attempted redelivery to the nanites. Default is 10 seconds.
#
# Connection options:
#
# vhost : AMQP broker vhost that should be used
#
# user : AMQP broker user
#
# pass : AMQP broker password
#
# host : host AMQP broker (or node of interest) runs on,
# defaults to 0.0.0.0
#
# port : port AMQP broker (or node of interest) runs on,
# this defaults to 5672, port used by some widely
# used AMQP brokers (RabbitMQ and ZeroMQ)
#
#
# On start Nanite reads config.yml, so it is common to specify
# options in the YAML file. However, when both Ruby code options
# and YAML file specify option, Ruby code options take precedence.
#
# Command line runner provided with Nanite out of the box parses
# command line options and then uses this method, so it is safe to
# consider it a single initialization point for every Nanite agent.
#
# @api :public:

def start(options)
@agent = Agent.new(options)
@agent.start
@agent
end

def request(*args, &blk)
check_agent
@agent.request(*args, &blk)
end

def push(*args, &blk)
check_agent
@agent.push(*args, &blk)
end

def log(*args)
check_agent
@agent.log(*args)
end

def gensym
values = [
rand(0x0010000),
rand(0x0010000),
rand(0x0010000),
rand(0x0010000),
rand(0x0010000),
rand(0x1000000),
rand(0x1000000),
]
"%04x%04x%04x%04x%04x%06x%06x" % values
end

private
def check_agent
raise AgentNotRunning, "An agent needs to be started via Nanite.start" unless @agent
end
end
end

0 comments on commit d94e8ce

Please sign in to comment.