Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

logging overhaul

  • Loading branch information...
commit e83f4d177d7874ac3f4ae067b10fd2885b492ac4 1 parent 53ded70
@fetep authored
View
2  cepmon.cfg
@@ -3,6 +3,8 @@
listen "127.0.0.1", 8989
+#log :path => cepmon.log, :debug => false
+
amqp_input :host => "127.0.0.1",
:port => "5672",
:user => "user",
View
6 lib/cepmon/alert.rb
@@ -36,12 +36,6 @@ def reason
end
public
- def to_s
- "#{Time.at(@data[:timestamp])} [ALERT] cluster=#{@data[:cluster]}/host=#{@data[:host]} " +
- "| name=#{@data[:name]} | value=#{@data[:value]} | statement=#{@data[:statement]}"
- end # def to_s
-
- public
def to_json(*args)
@data.merge({
:started => @started,
View
25 lib/cepmon/config.rb
@@ -1,8 +1,10 @@
+require "logger"
+
class CEPMon
class Config
attr_reader :amqp
attr_reader :host
- attr_reader :log_path
+ attr_reader :logger
attr_reader :port
attr_reader :statements
@@ -12,12 +14,26 @@ def initialize(cfg_file="cepmon.cfg")
@statements = {}
@host = "0.0.0.0"
@port = 8989
- @log_path = nil
+ @logger = Logger.new(STDOUT)
+ @logger.progname = "cepmon"
+ @logger.level = Logger::INFO
instance_eval(File.read(cfg_file))
end # def initialize
private
+ def log(params)
+ if params[:path]
+ @logger = Logger.new(params[:path])
+ @logger.level = Logger::INFO
+ end
+
+ if params[:debug]
+ @logger.level = Logger::DEBUG
+ end
+ end
+
+ private
def verify_present(params, required)
required.each do |key|
return false unless params.member?(key)
@@ -200,10 +216,5 @@ def listen(host, port = 8989)
@port = port.to_i
end
- private
- def logfile(path)
- @log_path = path
- end
-
end # class Config
end # class CEPMon
View
14 lib/cepmon/eventlistener.rb
@@ -28,12 +28,6 @@ def initialize(engine, config)
else
@exchange = nil
end
-
- if @config.log_path
- @log = File.open(@config.log_path, "a+")
- else
- @log = nil
- end
end
public
@@ -47,8 +41,7 @@ def update(new_events, old_events, statement, provider)
vars[:statement] = statement.getName
vars[:timestamp] = timestamp
- puts "event: #{statement.getName} @#{timestamp} (#{Time.at(timestamp.to_i)}) (engine.uptime=#{@engine.uptime}): #{vars.collect { |h, k| [h, k.inspect].join("=") }.join(" ")}"
- $stderr.puts "event vars=#{vars.inspect}"
+ @config.logger.debug("CEP event: #{statement.getName}: @#{timestamp} (#{Time.at(timestamp.to_i)}) (engine.uptime=#{@engine.uptime}): #{vars.inspect}")
record_alert(vars, statement.getName, @exchange)
end
end # def update
@@ -69,8 +62,9 @@ def record_alert(vars, statement_name, exchange)
@alerts[key].update(vars)
else
alert = CEPMon::Alert.new(vars)
+ @config.logger.info("NEW ALERT: #{statement_name}: cluster=#{alert.data[:cluster]} host=#{alert.data[:host]} reason=#{alert.reason}")
+ @config.logger.debug("NEW ALERT: json: #{alert.to_json}")
exchange.publish(alert.to_json) if exchange
- @log.puts alert.to_json if @log
@alerts[key] = alert
@history << alert
end
@@ -81,7 +75,7 @@ def expire_alerts
now = @engine.runtime.getCurrentTime / 1000
@alerts.each do |key, alert|
next unless alert.expired?(now)
- puts "ALERT EXPIRING: #{alert.to_json}"
+ @config.logger.info("ALERT EXPIRING: #{alert.data[:statement]} cluster=#{alert.data[:cluster]} host=#{alert.data[:host]}")
@alerts.delete(key)
end
end
View
3  lib/cepmon/mon.rb
@@ -46,13 +46,14 @@ def run(args)
end
end
- puts "connecting to rabbitmq..."
+ @config.logger.info("connecting to rabbitmq...")
amqp.start
queue = amqp.queue("cepmon-#{Process.pid}", :auto_delete => true)
exchange = amqp.exchange(@config.amqp[:exchange_metrics],
:type => :topic,
:durable => true)
queue.bind(exchange)
+ @config.logger.info("connected; bound queue cepmon-#{Process.pid} to topic #{config.amqp[:exchange_metrics]}")
Thread.new do
begin
View
2  test.cfg
@@ -3,6 +3,8 @@
listen "127.0.0.1", 8989
+log :debug => true
+
threshold('test_host_gt', 'test.foo',
:operator => '>',
:threshold => 1.0,
Please sign in to comment.
Something went wrong with that request. Please try again.