Permalink
Browse files

Updated collector to track NATS latency

Change-Id: I620e98b40e4afa0450b6bee54e9298d04d6f58d7
  • Loading branch information...
1 parent 54045f7 commit 636d46d99b180a3bdcdceb03a22c75dc92d6575f @olegshaldybin olegshaldybin committed Jan 31, 2012
Showing with 139 additions and 39 deletions.
  1. +1 −0 .gitignore
  2. +3 −1 collector/Gemfile
  3. +44 −22 collector/Gemfile.lock
  4. +6 −1 collector/Rakefile
  5. +3 −0 collector/config/config.yml
  6. +23 −2 collector/lib/collector.rb
  7. +26 −7 collector/lib/collector/config.rb
  8. +3 −3 collector/lib/collector/handlers/dea.rb
  9. +1 −2 collector/lib/collector/handlers/health_manager.rb
  10. +29 −1 collector/spec/unit/collector_spec.rb
  11. BIN collector/vendor/cache/daemons-1.1.3.gem
  12. BIN collector/vendor/cache/daemons-1.1.6.gem
  13. BIN collector/vendor/cache/diff-lcs-1.1.2.gem
  14. BIN collector/vendor/cache/diff-lcs-1.1.3.gem
  15. BIN collector/vendor/cache/escape_utils-0.2.3.gem
  16. BIN collector/vendor/cache/escape_utils-0.2.4.gem
  17. BIN collector/vendor/cache/eventmachine-0.12.10.gem
  18. BIN collector/vendor/cache/eventmachine-0.12.11.cloudfoundry.3.gem
  19. BIN collector/vendor/cache/json_pure-1.5.1.gem
  20. BIN collector/vendor/cache/json_pure-1.6.5.gem
  21. BIN collector/vendor/cache/little-plugger-1.1.3.gem
  22. BIN collector/vendor/cache/logging-1.6.2.gem
  23. BIN collector/vendor/cache/multi_json-1.0.4.gem
  24. BIN collector/vendor/cache/nats-0.4.10.gem
  25. BIN collector/vendor/cache/nats-0.4.22.beta.8.gem
  26. BIN collector/vendor/cache/posix-spawn-0.3.6.gem
  27. BIN collector/vendor/cache/rack-1.4.1.gem
  28. BIN collector/vendor/cache/rake-0.9.2.2.gem
  29. BIN collector/vendor/cache/rcov-0.9.9.gem
  30. BIN collector/vendor/cache/rspec-2.5.0.gem
  31. BIN collector/vendor/cache/rspec-2.8.0.gem
  32. BIN collector/vendor/cache/rspec-core-2.5.1.gem
  33. BIN collector/vendor/cache/rspec-core-2.8.0.gem
  34. BIN collector/vendor/cache/rspec-expectations-2.5.0.gem
  35. BIN collector/vendor/cache/rspec-expectations-2.8.0.gem
  36. BIN collector/vendor/cache/rspec-mocks-2.5.0.gem
  37. BIN collector/vendor/cache/rspec-mocks-2.8.0.gem
  38. BIN collector/vendor/cache/simplecov-0.4.2.gem
  39. BIN collector/vendor/cache/simplecov-0.5.4.gem
  40. BIN collector/vendor/cache/simplecov-html-0.4.5.gem
  41. BIN collector/vendor/cache/simplecov-html-0.5.3.gem
  42. BIN collector/vendor/cache/thin-1.3.1.gem
  43. BIN collector/vendor/cache/vcap_common-1.0.3.gem
  44. BIN collector/vendor/cache/vcap_logging-0.1.0.gem
  45. BIN collector/vendor/cache/vcap_logging-0.1.3.gem
  46. BIN collector/vendor/cache/yajl-ruby-0.8.2.gem
  47. BIN collector/vendor/cache/yajl-ruby-0.8.3.gem
View
@@ -2,3 +2,4 @@
.bundle
.yardoc
.DS_Store
+collector/config/dev*.yml
View
@@ -1,8 +1,10 @@
source :rubygems
+gem "rake"
gem "nats"
gem "em-http-request"
-gem "eventmachine"
+gem "eventmachine", "~> 0.12.11.cloudfoundry.3"
+gem "vcap_common", "~> 1.0.3"
gem "vcap_logging"
gem "yajl-ruby"
View
@@ -2,43 +2,65 @@ GEM
remote: http://rubygems.org/
specs:
addressable (2.2.6)
- daemons (1.1.3)
- diff-lcs (1.1.2)
+ daemons (1.1.6)
+ diff-lcs (1.1.3)
em-http-request (0.3.0)
addressable (>= 2.0.0)
escape_utils
eventmachine (>= 0.12.9)
- escape_utils (0.2.3)
- eventmachine (0.12.10)
- json_pure (1.5.1)
- nats (0.4.10)
- daemons (>= 1.1.0)
+ escape_utils (0.2.4)
+ eventmachine (0.12.11.cloudfoundry.3)
+ json_pure (1.6.5)
+ little-plugger (1.1.3)
+ logging (1.6.2)
+ little-plugger (>= 1.1.3)
+ multi_json (1.0.4)
+ nats (0.4.22.beta.8)
+ daemons (>= 1.1.4)
eventmachine (>= 0.12.10)
- json_pure (>= 1.5.1)
- rcov (0.9.9)
- rspec (2.5.0)
- rspec-core (~> 2.5.0)
- rspec-expectations (~> 2.5.0)
- rspec-mocks (~> 2.5.0)
- rspec-core (2.5.1)
- rspec-expectations (2.5.0)
+ json_pure (>= 1.6.1)
+ thin (>= 1.3.1)
+ posix-spawn (0.3.6)
+ rack (1.4.1)
+ rake (0.9.2.2)
+ rcov (1.0.0)
+ rspec (2.8.0)
+ rspec-core (~> 2.8.0)
+ rspec-expectations (~> 2.8.0)
+ rspec-mocks (~> 2.8.0)
+ rspec-core (2.8.0)
+ rspec-expectations (2.8.0)
diff-lcs (~> 1.1.2)
- rspec-mocks (2.5.0)
- simplecov (0.4.2)
- simplecov-html (~> 0.4.4)
- simplecov-html (0.4.5)
- vcap_logging (0.1.0)
- yajl-ruby (0.8.2)
+ rspec-mocks (2.8.0)
+ simplecov (0.5.4)
+ multi_json (~> 1.0.3)
+ simplecov-html (~> 0.5.3)
+ simplecov-html (0.5.3)
+ thin (1.3.1)
+ daemons (>= 1.0.9)
+ eventmachine (>= 0.12.6)
+ rack (>= 1.0.0)
+ vcap_common (1.0.3)
+ eventmachine (~> 0.12.11.cloudfoundry.3)
+ logging (>= 1.5.0)
+ nats (~> 0.4.22.beta.4)
+ posix-spawn (~> 0.3.6)
+ thin (~> 1.3.1)
+ yajl-ruby (~> 0.8.3)
+ vcap_logging (0.1.3)
+ yajl-ruby (0.8.3)
PLATFORMS
ruby
DEPENDENCIES
em-http-request
- eventmachine
+ eventmachine (~> 0.12.11.cloudfoundry.3)
nats
+ rake
rcov
rspec
simplecov
+ vcap_common (~> 1.0.3)
vcap_logging
yajl-ruby
View
@@ -1,7 +1,12 @@
require "bundler"
require "rspec/core/rake_task"
-RSpec::Core::RakeTask.new(:spec)
+ENV["BUNDLE_GEMFILE"] = "Gemfile"
+
+RSpec::Core::RakeTask.new("spec") do |t|
+ t.rspec_opts = ["--format", "documentation", "--colour"]
+ t.pattern = "spec/unit/**/*_spec.rb"
+end
task :default => :spec
@@ -1,4 +1,5 @@
---
+index: 0
logging:
level: info
tsdb:
@@ -9,4 +10,6 @@ intervals:
discover: 60
varz: 10
healthz: 5
+ local_metrics: 10
prune: 300
+ nats_ping: 10
View
@@ -9,6 +9,7 @@
require "eventmachine"
require "nats/client"
require "vcap/logging"
+require "vcap/rolling_metric"
require "collector/config"
require "collector/handler"
@@ -23,7 +24,6 @@ module Collector
# Varz collector
class Collector
-
ANNOUNCE_SUBJECT = "vcap.component.announce"
DISCOVER_SUBJECT = "vcap.component.discover"
@@ -40,6 +40,7 @@ def initialize
HEALTH_MANAGER_COMPONENT, ROUTER_COMPONENT])
@tsdb_connection = EventMachine.connect(Config.tsdb_host, Config.tsdb_port, TsdbConnection)
+ @nats_latency = VCAP::RollingMetric.new(60)
NATS.on_error do |e|
@logger.fatal("Exiting, NATS error")
@@ -48,6 +49,7 @@ def initialize
end
@nats = NATS.connect(:uri => Config.nats_uri) do
+ @logger.info("Connected to NATS")
# Send initially to discover what's already running
@nats.subscribe(ANNOUNCE_SUBJECT) {|message| process_component_discovery(message)}
@@ -56,6 +58,10 @@ def initialize
@nats.publish(DISCOVER_SUBJECT, "", @inbox)
+ @nats.subscribe("collector.nats.ping") do |message|
+ process_nats_ping(message.to_f)
+ end
+
setup_timers
end
end
@@ -66,6 +72,15 @@ def setup_timers
EM.add_periodic_timer(Config.varz_interval) { fetch_varz }
EM.add_periodic_timer(Config.healthz_interval) { fetch_healthz }
EM.add_periodic_timer(Config.prune_interval) { prune_components }
+ EM.add_periodic_timer(Config.nats_ping_interval) { @nats.publish("collector.nats.ping", Time.now.to_f.to_s) }
+ EM.add_periodic_timer(Config.local_metrics_interval) { send_local_metrics }
+ end
+
+ # Processes NATS ping in order to calculate NATS roundtrip latency
+ #
+ # @param [Float] ping_timestamp UNIX timestamp when the ping was sent
+ def process_nats_ping(ping_timestamp)
+ @nats_latency << ((Time.now.to_f - ping_timestamp) * 1000).to_i
end
# Processes a discovered component message, recording it's location for varz/healthz probes.
@@ -99,6 +114,12 @@ def prune_components
@logger.warn(e)
end
+ # Generates metrics that don't require any interactions with varz or healthz
+ def send_local_metrics
+ handler = Handler.handler(@tsdb_connection, "collector", Config.index, Time.now.to_i)
+ handler.send_latency_metric("nats.latency.1m", @nats_latency.value)
+ end
+
# Fetches the varzs from all the components and calls the proper {Handler} to record the metrics in the TSDB server
def fetch_varz
@components.each do |job, instances|
@@ -165,4 +186,4 @@ def get_job_tags(type)
end
end
-end
+end
@@ -2,25 +2,44 @@ module Collector
# Singleton config used throughout
class Config
class << self
- [:logger, :tsdb_host, :tsdb_port, :nats_uri,
- :discover_interval, :varz_interval, :healthz_interval, :prune_interval].each { |option| attr_accessor option }
+
+ OPTIONS = [
+ :index,
+ :logger,
+ :tsdb_host,
+ :tsdb_port,
+ :nats_uri,
+ :discover_interval,
+ :varz_interval,
+ :healthz_interval,
+ :prune_interval,
+ :nats_ping_interval,
+ :local_metrics_interval
+ ]
+
+ OPTIONS.each { |option| attr_accessor option }
# Configures the various attributes
#
# @param [Hash] config the config Hash
def configure(config)
+ @index = config["index"].to_i
VCAP::Logging.setup_from_config(config["logging"])
@logger = VCAP::Logging.logger("collector")
@tsdb_host = config["tsdb"]["host"]
@tsdb_port = config["tsdb"]["port"]
@nats_uri = config["mbus"]
- @discover_interval = config["intervals"]["discover"]
- @varz_interval = config["intervals"]["varz"]
- @healthz_interval = config["intervals"]["healthz"]
- @prune_interval = config["intervals"]["prune"]
+ intervals = config["intervals"]
+
+ @discover_interval = intervals["discover"]
+ @varz_interval = intervals["varz"]
+ @healthz_interval = intervals["healthz"]
+ @prune_interval = intervals["prune"]
+ @nats_ping_interval = intervals["nats_ping"]
+ @local_metrics_interval = intervals["local_metrics"]
end
end
end
-end
+end
@@ -8,7 +8,7 @@ def process(varz)
if varz["frameworks"]
varz["frameworks"].each do |framework, metrics|
["used_memory", "reserved_memory", "used_disk"].each do |metric_name|
- send_metric("frameworks.#{metric_name}", metrics[metric_name] / 1000, :framework => framework)
+ send_metric("frameworks.#{metric_name}", metrics[metric_name] / 1024, :framework => framework)
end
send_metric("frameworks.used_cpu", metrics["used_cpu"], :framework => framework)
end
@@ -17,7 +17,7 @@ def process(varz)
if varz["runtimes"]
varz["runtimes"].each do |runtime, metrics|
["used_memory", "reserved_memory", "used_disk"].each do |metric_name|
- send_metric("runtimes.#{metric_name}", metrics[metric_name] / 1000, :runtime => runtime)
+ send_metric("runtimes.#{metric_name}", metrics[metric_name] / 1024, :runtime => runtime)
end
send_metric("runtimes.used_cpu", metrics["used_cpu"], :runtime => runtime)
end
@@ -27,4 +27,4 @@ def process(varz)
end
end
-end
+end
@@ -48,9 +48,8 @@ def process(varz)
end
send_metric("total_users", varz["total_users"]) if varz["total_users"]
- send_latency_metric("nats.latency.1m", varz["nats_latency"]) if varz["nats_latency"]
end
end
end
-end
+end
@@ -183,6 +183,34 @@ def setup_healthz_request
end
+ describe :local_metrics do
+ def send_local_metrics
+ Time.stub!(:now).and_return(1000)
+
+ create_fake_collector do |collector, tsdb, nats|
+ collector.process_nats_ping(997)
+ collector.process_nats_ping(998)
+ collector.process_nats_ping(999)
+
+ handler = mock(:Handler)
+ yield handler
+
+ Collector::Handler.should_receive(:handler).
+ with(tsdb, "collector", 0, 1000).
+ and_return(handler)
+
+ collector.send_local_metrics
+ end
+ end
+
+ it "should send nats latency rolling metric" do
+ send_local_metrics do |handler|
+ latency = {:value => 6000, :samples => 3}
+ handler.should_receive(:send_latency_metric).with("nats.latency.1m", latency)
+ end
+ end
+ end
+
describe :get_job_args do
it "should mark the core components" do
create_fake_collector do |collector, _, _|
@@ -209,4 +237,4 @@ def setup_healthz_request
end
end
-end
+end
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit 636d46d

Please sign in to comment.