Skip to content

How To: Do Log Processing

jondot edited this page Oct 30, 2013 · 9 revisions

Let's build a log processing framework which will take JSON structured logs, and count the number and types of errors that occur in real-time.

Quick start

Set up a Gemfile

source 'https://rubygems.org'
gem 'sneakers'
gem 'json'
gem 'redis'

And a first iteration of a worker

# boot.rb
require 'sneakers'

class Processor
  include Sneakers::Worker
  from_queue :logs


  def work(msg)
    logger.info msg
  end
end

Let's test it out quickly from the command line:

sneakers work Processor --require boot.rb

We just told Sneakers to spawn a worker named Processor, but first --require a file that we dedicate to setting up environment, including workers and what-not.

For simplicity, we also told Sneakers to not daemonize because we did not use --daemonize. In general I don't tend to daemonize processes unless I must, it tends to make things fail less obviously. Instead, I run processes at the front and I let the operating system daemonize them for me with Upstart, init.d, runit, systemd, or what have you.

I also like to run workers in their own shell and project, see How To: Running a stand-alone worker when you're ready. Not to make a spoiler of it, but it presents a clean and modular worker project and how to run and maintain a worker as a first-class citizen.

Back to business. If you go to your RabbitMQ admin now, you'll see a new queue named logs was created. Push a couple messages, and this is the output you should see at your terminal.

2013-10-11T19:26:36Z p-4718 t-ovqgyb31o DEBUG: [worker-logs:1:213mmy][#<Thread:0x007fae6b05cc58>][logs][{:prefetch=>10, :durable=>true, :ack=>true, :heartbeat_interval=>2, :exchange=>"sneakers"}] Working off: log log
2013-10-11T19:26:36Z p-4718 t-ovqgyrxu4 INFO: log log
2013-10-11T19:26:40Z p-4719 t-ovqgyb364 DEBUG: [worker-logs:1:h23iit][#<Thread:0x007fae6b05cd98>][logs][{:prefetch=>10, :durable=>true, :ack=>true, :heartbeat_interval=>2, :exchange=>"sneakers"}] Working off: log log
2013-10-11T19:26:40Z p-4719 t-ovqgyrx8g INFO: log log

We're basically done with the ceremonies and all is left is to do some real work.

Doing real work

We'll count errors and error types with Redis. Specifically for an error that looks like this:

{
   "type": "error",
   "message": "HALP!",
   "error": "CODE001"
}

Let's update the worker code:

require 'sneakers'
require 'redis'
require 'json'

$redis = Redis.new

class Processor
  include Sneakers::Worker
  from_queue :logs


  def work(msg)
    err = JSON.parse(msg)
    if err["type"] == "error"
      $redis.incr "processor:#{err["error"]}"
    end

    ack!
  end
end

Since tracing is enabled with a debug-level logger, you'll see when a worker pulls off a message from the queue, and what this message really was:

2013-10-11T19:38:49Z p-7545 t-oxeymqln8 DEBUG: [worker-logs:1:o49a6u][#<Thread:0x007febbc035c28>][logs][{:prefetch=>10, :durable=>true, :ack=>true, :heartbeat_interval=>2, :exchange=>"sneakers"}] Working off: {
   "type": "error",
   "message": "HALP!",
   "error": "CODE001"
}

Soon you'll see this at your Redis end:

➜  ~  redis-cli monitor
1381520329.888581 [0 127.0.0.1:49182] "incr" "processor:CODE001"

Looking at metrics

Let's use the logging_metrics provider just for the sake of fun of seeing the metrics as they happen.

# boot.rb
require 'sneakers'
require 'redis'
require 'json'
require 'sneakers/metrics/logging_metrics'
Sneakers.configure :metrics => Sneakers::Metrics::LoggingMetrics.new

# ... rest of code

Now push a message again and you'll see:

2013-10-11T19:44:37Z p-9219 t-oxh8owywg INFO: INC: work.Processor.started
2013-10-11T19:44:37Z p-9219 t-oxh8owywg INFO: TIME: work.Processor.time 0.00242
2013-10-11T19:44:37Z p-9219 t-oxh8owywg INFO: INC: work.Processor.handled.ack

Which increments start + end, and times the work unit.

Finale

This is it for now. You've seen how to interactively build and tweak a worker. If you use the --daemonize flag, you'll have a production-ready processor, spitting logs into sneakers.log by default.

In production, you can also use the StatsdMetrics module in order to ship metrics to a production Statsd server.