Skip to content

Commit

Permalink
cleaning up after refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Rhoads committed Jan 16, 2013
1 parent 1a10f74 commit 70e4c09
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 118 deletions.
2 changes: 2 additions & 0 deletions .nerve.rc
@@ -0,0 +1,2 @@
export DATA_BAG_DIR=/Users/martin/Dropbox/airbnb/src/chef/data_bags
export COOKBOOK_DIR=/Users/martin/Dropbox/airbnb/src/chef/cookbooks
128 changes: 38 additions & 90 deletions lib/nerve.rb
@@ -1,18 +1,24 @@
require 'nerve/version'
require 'nerve/base'
require 'logger'
require 'json'

require 'zk'

require_relative './nerve/log'
require_relative './nerve/ring_buffer'
require_relative './nerve/zk_helper'
require_relative './nerve/service_watcher'
require_relative './nerve/service_watcher/tcp'
require_relative './nerve/service_watcher/http'
require_relative './nerve/machine_watcher'
require_relative './nerve/machine_watcher/cpuidle'



## a config might look like this:
config = {
'instance_name' => '$instance_id',
'instance_id' => '$instance_id',
'voter_status' => {
'metric' => 'cpuidle',
'hold' => '60',
Expand All @@ -37,121 +43,63 @@
},
},
},
},
},
}


module Nerve
# Your code goes here...
class Nerve < Base
attr_reader :instance_id, :service_port, :zk_path, :service_watchers, :machine_watcher

include Logging

def initialize(opts={})

log.info "starting nerve"

# required options
%w{instance_id zk_path}.each do |required|
log.debug "checking for required inputs"
%w{instance_id service_checks machine_check}.each do |required|
raise ArgumentError, "you need to specify required argument #{required}" unless opts[required]
instance_variable_set("@#{required}",opts[required])
end

# internal settings
@zk = nil
@failedure_threshold = 2
@exiting = false

# create service watcher objects
puts "creating service watcher objects"
log.debug "creating service watchers"
opts['service_checks'] ||= {}
@service_watchers=[]
opts['service_checks'].each do |name,params|
@service_watchers << ServiceWatcher.new(params.merge({'name' => name}))
@service_watchers << ServiceWatcher.new(params.merge({'instance_id' => @instance_id, 'name' => name}))
end

# create machine watcher object
puts "creating machine watcher"
if opts['machine_check']
machine_check_class_name = opts['machine_check']['metric'].split('_').map(&:capitalize).join
machine_check_class_name << 'MachineCheck'
begin
machine_check_class = MachineCheck.const_get(machine_check_class_name)
rescue
raise ArgumentError, "machine check #{opts['machine_check']['metric']} is not valid"
end
@machine_check = machine_check_class.new(opts['machine_check'])
else
@machine_check = nil
end
puts "end of init function"
log.debug "creating machine watcher"
@machine_check = MachineWatcher.new(opts['machine_check'].merge({'instance_id' => @instance_id}))

log.debug 'completed init for nerve'
end

def run
puts "starting run..."
@zk = ZKHelper()
log.info "starting run"
begin
puts "registering machine..."
register_thread = Thread.new{register_machine}

puts "registering service"
Thread.new{register_service}

puts "waiting for children"
register_thread.join()
ensure
@exiting = true
end
end



def register_machine
failed = true
machine_node_path = "/machines/#{@instance_id}"

while not @exiting
begin
@zk.ping?
rescue Zookeeeper::Exceptions::NotConnected => e
failed = true
else
# write json hash {service:service_name, host:host, cpu_idle:cpu_idle} into ephemeral node
if failed or not @zk.exists?(machine_node_path)
create_ephemeral_node(
machine_node_path,
{'vote' => @machine_check.vote })
failed = false
end
end

sleep(5)
end
end

def run_service_checks
# TODO(mkr): could consider doing this in parallel
@service_checks.each do |service_check|
return false unless service_check.check
end
retrun true
end

def register_service
return unless (@service_port && run_service_checks)

failures = @failure_threshold
while not @exiting
puts "looping inside register service"
passed = run_service_checks
if passed
failures -= 1 unless failures == 0
else
failures += 1 unless failures == @failure_threshold
children = []
log.debug "launching machine check thread"
children << Thread.new{@machine_check.run}

# log.debug "launching service check threads"
# @service_watchers.each do |watcher|
# children << Thread.new{watcher.run}
# end

log.info "waiting for children"
children.each do |child|
child.join
end

create_service_node unless failures > 0
delete_service_node if failures == @failure_threshold

sleep(5)
ensure
self.class.const_set(:EXIT,true)
end
log.info "ending run"
end


end
end
24 changes: 24 additions & 0 deletions lib/nerve/log.rb
@@ -0,0 +1,24 @@
module Nerve
module Logging

def log
@logger ||= Logging.logger_for(self.class.name)
end

# Use a hash class-ivar to cache a unique Logger per class:
@loggers = {}

class << self
def logger_for(classname)
@loggers[classname] ||= configure_logger_for(classname)
end

def configure_logger_for(classname)
logger = Logger.new(STDOUT)
logger.progname = classname
logger
end

end
end
end
24 changes: 16 additions & 8 deletions lib/nerve/machine_watcher.rb
@@ -1,9 +1,13 @@
module Nerve
class MachineWatcher
include Logging
def initialize(opts={})
log.debug 'creating machine watcher'
# required inputs
%w{metric zk_path instance_id}.each do |required|
raise ArgumentError, "you need to specify required argument #{required}" unless opts[required]
instance_variable_set("@#{required}",opts[required])
log.debug "set @#{required} to #{opts[required]}"
end

machine_check_class_name = @metric.split('_').map(&:capitalize).join
Expand All @@ -13,31 +17,35 @@ def initialize(opts={})
rescue
raise ArgumentError, "machine check #{@metric} is not valid"
end

@machine_check = machine_check_class.new(opts)
@exiting = false
@previous_vote = nil
end

def run
log.info 'watching machine'
@zk = ZKHelper.new(@zk_path)
@zk.create_ephemeral_node(@instance_id,{vote: 0})
unless defined?(EXIT)
@zk.create_ephemeral_node(@instance_id,{'vote'=>0})

until defined?(EXIT)
begin
@zk.ping?
vote = @machine_check.vote
if vote != @previous_vote
@zk.update(@instance_id,{vote: vote})
end

@previous_vote = vote
sleep 1
ensure
EXIT = true
rescue Object => o
log.error "hit an error, setting exit: "
log.error o.inspect
log.error o.backtrace
self.class.const_set(:EXIT,true)
end
end

log.info "ending machine watch"
end
end
end
13 changes: 6 additions & 7 deletions lib/nerve/machine_watcher/cpuidle.rb
@@ -1,25 +1,24 @@
module Nerve
module MachineCheck
class CpuidleMachineCheck

include Logging
def initialize(opts={})
puts "creating machine check"
log.debug "creating machine check"
%w{hold up down}.each do |required|
raise ArgumentError, "you need to provide #{required}" unless opts[required]
instance_variable_set("@#{required}",opts[required])
end

@exiting = false
puts "creating polling thread"
Thread.new{poll}
puts "returning from machine watcher init"
log.debug "returning from cpuidle check init"
end


def poll
log.debug "creating polling thread"
# keep the last hold time of info
@buffer = RingBuffer.new(@hold)
while not @exiting
until defined?(EXIT)
@buffer.push get_idle
sleep 1
end
Expand Down
17 changes: 12 additions & 5 deletions lib/nerve/service_watcher.rb
@@ -1,8 +1,11 @@
module Nerve
class ServiceWatcher
attr_reader :name, :port, :host, :zk_path, :service_checks

include Logging

def initialize(opts={})
%w{name port zk_path instance_id}.each do |required|
log.debug "creating service watcher object"
%w{port zk_path instance_id name}.each do |required|
raise ArgumentError, "you need to specify required argument #{required}" unless opts[required]
instance_variable_set("@#{required}",opts['required'])
end
Expand All @@ -22,6 +25,7 @@ def initialize(opts={})
end

def run()
log.info "watching service #{@name}"
@zk = ZKHelper.new(@zk_path)
@zk.delete(@instance_id)
ring_buffer = RingBuffer(@threshold)
Expand All @@ -36,8 +40,11 @@ def run()
else
@zk.ensure_ephemeral_node(@instance_id)
end
ensure
EXIT = true
rescue Object => o
log.error "hit an error, setting exit: "
log.error o.inspect
log.error o.backtrace
self.class.const_set(:EXIT,true)
end
end
end
Expand All @@ -48,6 +55,6 @@ def check?
end
return true
end

end
end

0 comments on commit 70e4c09

Please sign in to comment.