Skip to content

Commit

Permalink
Add code to disable individual actions.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgeiger committed Mar 8, 2010
1 parent 3adfa38 commit a13a3f4
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 43 deletions.
6 changes: 5 additions & 1 deletion config/config.example.yml
Expand Up @@ -45,9 +45,13 @@
:login: [your login name]
:password: [your password]

# Disable the default built-in actions
# Disable all the default built-in actions
# :disable_default_actions: true

# Disable specific actions for the node
# Use this if you want to disable a limited number of actions
# :disabled_actions: ['word_count']

# By default, CloudCrowd looks for installed actions inside the 'actions'
# subdirectory of this configuration folder. 'actions_path' allows you to load
# additional actions from a location of your choice.
Expand Down
84 changes: 42 additions & 42 deletions lib/cloud_crowd/node.rb
@@ -1,57 +1,57 @@
module CloudCrowd

# A Node is a Sinatra/Thin application that runs a single instance per-machine
# It registers with the central server, receives WorkUnits, and forks off
# It registers with the central server, receives WorkUnits, and forks off
# Workers to process them. The actions are:
#
# [get /heartbeat] Returns 200 OK to let monitoring tools know the server's up.
# [post /work] The central server hits <tt>/work</tt> to dispatch a WorkUnit to this Node.
class Node < Sinatra::Default

# A Node's default port. You only run a single node per machine, so they
# can all use the same port without any problems.
DEFAULT_PORT = 9063
# A list of regex scrapers, which let us extract the one-minute load

# A list of regex scrapers, which let us extract the one-minute load
# average and the amount of free memory on different flavors of UNIX.

SCRAPE_UPTIME = /\d+\.\d+/
SCRAPE_LINUX_MEMORY = /MemFree:\s+(\d+) kB/
SCRAPE_MAC_MEMORY = /Pages free:\s+(\d+)./
SCRAPE_MAC_MEMORY = /Pages free:\s+(\d+)./
SCRAPE_MAC_PAGE = /page size of (\d+) bytes/

# The interval at which the node monitors the machine's load and memory use
# (if configured to do so in config.yml).
MONITOR_INTERVAL = 3

# The interval at which the node regularly checks in with central (5 min).
CHECK_IN_INTERVAL = 300

# The response sent back when this node is overloaded.
OVERLOADED_MESSAGE = 'Node Overloaded'

attr_reader :enabled_actions, :host, :port, :central

set :root, ROOT
set :authorization_realm, "CloudCrowd"

helpers Helpers

# methodoverride allows the _method param.
enable :methodoverride

# Enabling HTTP Authentication turns it on for all requests.
# This works the same way as in the central CloudCrowd::Server.
before do
login_required if CloudCrowd.config[:http_authentication]
end
# To monitor a Node with Monit, God, Nagios, or another tool, you can hit

# To monitor a Node with Monit, God, Nagios, or another tool, you can hit
# /heartbeat to make sure its still online.
get '/heartbeat' do
"buh-bump"
end

# Posts a WorkUnit to this Node. Forks a Worker and returns the process id.
# Returns a 503 if this Node is overloaded.
post '/work' do
Expand All @@ -61,22 +61,22 @@ class Node < Sinatra::Default
Process.detach(pid)
json :pid => pid
end

# When creating a node, specify the port it should run on.
def initialize(port=nil, daemon=false)
require 'json'
CloudCrowd.identity = :node
@central = CloudCrowd.central_server
@host = Socket.gethostname
@enabled_actions = CloudCrowd.actions.keys
@enabled_actions = CloudCrowd.actions.keys - (CloudCrowd.config[:disabled_actions] || [])
@port = port || DEFAULT_PORT
@daemon = daemon
@overloaded = false
@max_load = CloudCrowd.config[:max_load]
@min_memory = CloudCrowd.config[:min_free_memory]
start unless test?
end

# Starting up a Node registers with the central server and begins to listen
# for incoming WorkUnits.
def start
Expand All @@ -94,9 +94,9 @@ def start
monitor_system if @max_load || @min_memory
@server_thread.join
end
# Checking in with the central server informs it of the location and
# configuration of this Node. If it can't check-in, there's no point in

# Checking in with the central server informs it of the location and
# configuration of this Node. If it can't check-in, there's no point in
# starting.
def check_in(critical=false)
@central["/node/#{@host}"].put(
Expand All @@ -109,31 +109,31 @@ def check_in(critical=false)
puts "Failed to connect to the central server (#{@central.to_s})."
raise SystemExit if critical
end

# Before exiting, the Node checks out with the central server, releasing all
# of its WorkUnits for other Nodes to handle
def check_out
@central["/node/#{@host}"].delete
end

# Lazy-initialize the asset_store, preferably after the Node has launched.
def asset_store
@asset_store ||= AssetStore.new
end
# Is the node overloaded? If configured, checks if the load average is

# Is the node overloaded? If configured, checks if the load average is
# greater than 'max_load', or if the available RAM is less than
# 'min_free_memory'.
def overloaded?
(@max_load && load_average > @max_load) ||
(@min_memory && free_memory < @min_memory)
end

# The current one-minute load average.
def load_average
`uptime`.match(SCRAPE_UPTIME).to_s.to_f
end

# The current amount of free memory in megabytes.
def free_memory
case RUBY_PLATFORM
Expand All @@ -147,12 +147,12 @@ def free_memory
raise NotImplementedError, "'min_free_memory' is not yet implemented on your platform"
end
end


private
# Launch a monitoring thread that periodically checks the node's load
# average and the amount of free memory remaining. If we transition out of

# Launch a monitoring thread that periodically checks the node's load
# average and the amount of free memory remaining. If we transition out of
# the overloaded state, let central know.
def monitor_system
@monitor_thread = Thread.new do
Expand All @@ -164,9 +164,9 @@ def monitor_system
end
end
end
# If communication is interrupted for external reasons, the central server
# will assume that the node has gone down. Checking in will let central know

# If communication is interrupted for external reasons, the central server
# will assume that the node has gone down. Checking in will let central know
# it's still online.
def check_in_periodically
@check_in_thread = Thread.new do
Expand All @@ -176,15 +176,15 @@ def check_in_periodically
end
end
end

# Trap exit signals in order to shut down cleanly.
def trap_signals
Signal.trap('QUIT') { shut_down }
Signal.trap('INT') { shut_down }
Signal.trap('KILL') { shut_down }
Signal.trap('TERM') { shut_down }
end

# At shut down, de-register with the central server before exiting.
def shut_down
@check_in_thread.kill if @check_in_thread
Expand All @@ -193,7 +193,7 @@ def shut_down
@server_thread.kill if @server_thread
Process.exit
end

end

end

0 comments on commit a13a3f4

Please sign in to comment.