Skip to content

Commit

Permalink
Merge pull request huginn#6 in SQ/huginn from samsa to master
Browse files Browse the repository at this point in the history
* commit '45d95128b487d11ba514ac2aa5abe0345f9a5f39':
  Curl instructions for spamming errors.
  SamsaConsumerAgent
  Add every_1s to our list of changes.
  Speed up delayed worker to match every_1s schedule.
  • Loading branch information
xaviershay committed Jul 28, 2014
2 parents d08bf1f + 45d9512 commit 06995d9
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 2 deletions.
7 changes: 6 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
source 'https://rubygems.org'
source 'https://nexus.corp.squareup.com/content/repositories/square-gems/'
source 'https://nexus.corp.squareup.com/content/repositories/rubygems-org-cache/'

# Bundler <1.5 does not recognize :x64_mingw as a valid platform name.
# Unfortunately, it can't self-update because it errors when encountering :x64_mingw.
Expand Down Expand Up @@ -36,6 +37,8 @@ gem 'delayed_job', '~> 4.0.0'
gem 'delayed_job_active_record', '~> 4.0.0'
gem 'daemons', '~> 1.1.9'

gem 'poseidon', '~> 0.0.4'

# To enable DelayedJobWeb, see the 'Enable DelayedJobWeb' section of the README.
# gem 'delayed_job_web'

Expand Down Expand Up @@ -113,3 +116,5 @@ else
gem 'rails_12factor', platform: :ruby_18
end

gem 'sq-protos'
gem 'snappy'
19 changes: 18 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ GIT
simple_oauth (~> 0.2.0)

GEM
remote: https://rubygems.org/
remote: https://nexus.corp.squareup.com/content/repositories/square-gems/
remote: https://nexus.corp.squareup.com/content/repositories/rubygems-org-cache/
specs:
ace-rails-ap (2.0.1)
actionmailer (4.1.4)
Expand Down Expand Up @@ -172,9 +173,12 @@ GEM
memoizable (0.4.2)
thread_safe (~> 0.3, >= 0.3.1)
method_source (0.8.2)
middleware (0.1.0)
mime-types (1.25.1)
mini_portile (0.6.0)
minitest (5.3.5)
money (6.1.1)
i18n (~> 0.6.4)
mqtt (0.2.0)
multi_json (1.10.1)
multi_xml (0.5.5)
Expand All @@ -191,8 +195,14 @@ GEM
rack (~> 1.2)
pg (0.17.1)
polyglot (0.3.5)
poseidon (0.0.4)
protected_attributes (1.0.8)
activemodel (>= 4.0.1, < 5.0)
protobuf (3.0.5)
activesupport (>= 3.2)
middleware
multi_json
thor
pry (0.10.0)
coderay (~> 1.1.0)
method_source (~> 0.8.1)
Expand Down Expand Up @@ -278,6 +288,7 @@ GEM
simplecov-html (0.8.0)
slack-notifier (0.5.0)
slop (3.5.0)
snappy (0.0.10)
sprockets (2.11.0)
hike (~> 1.2)
multi_json (~> 1.0)
Expand All @@ -287,6 +298,9 @@ GEM
actionpack (>= 3.0)
activesupport (>= 3.0)
sprockets (~> 2.8)
sq-protos (1.1.20140728.135543)
money
protobuf (~> 3.0.0)
systemu (2.6.4)
term-ansicolor (1.3.0)
tins (~> 1.0)
Expand Down Expand Up @@ -382,6 +396,7 @@ DEPENDENCIES
mysql2 (~> 0.3.16)
nokogiri (~> 1.6.1)
pg
poseidon (~> 0.0.4)
protected_attributes (~> 1.0.8)
pry
quiet_assets
Expand All @@ -398,6 +413,8 @@ DEPENDENCIES
select2-rails (~> 3.5.4)
shoulda-matchers
slack-notifier (~> 0.5.0)
snappy
sq-protos
therubyracer (~> 0.12.1)
twilio-ruby (~> 3.11.5)
twitter (~> 5.8.0)
Expand Down
8 changes: 8 additions & 0 deletions SQUARE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,11 @@ Square customizations
`noreply@squareup.com`. Note that the test fixtures still contain `bob` and
`jane` users - this is intentional to minimize our diff from upstream.
* **No public/private scenarios.** All scenarios are "public".
* **Provide 1s recurring agents.**
* **Provides SamsaErrorAgent.** Use `script/samsa_tunnel` to test, create fake
exceptions on exemplar with:

curl http://localhost:22280/services/squareup.testing.ExampleService/ThrowError \
-X POST \
-d '{"server_error": true, "error_code": 501, "message": "test"}' \
--header "Content-Type: application/json"`
79 changes: 79 additions & 0 deletions app/models/agents/samsa_error_agent.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
require 'squareup/logging/events/ext/logging_event.pb'

module Agents
class SamsaErrorAgent < Agent
cannot_receive_events!

describe <<-MD
TODO
MD

event_description <<-MD
TODO
MD

default_schedule "every_1s"

def validate_options
unless options['dc'].present? &&
options['env'].present? &&
options['app'].present?
errors.add(:base, "dc, env, and app are required fields")
end
end

def default_options
{
'dc' => 'sjc1',
'env' => 'staging',
'app' => 'exemplar',
}
end

def check
topic_prefix = "logging.event.%s.%s.%s" %
options.values_at(:app, :env, :dc)

SamsaConsumer.topics.grep(/^#{topic_prefix}/).each do |topic|
host = topic.split('.')[-2..-1].reverse.join('.') + '.square'
key = "offset.#{topic}"
offset = memory.fetch(key, :latest_offset)
sc = SamsaConsumer.new(topic, offset)

sc.consume do |msg|
e = msg.ext_logging_event.log4j_logging_event

next unless e.level == 'ERROR'

create_event :payload => {
:app => options[:app],
:dc => options[:dc],
:env => options[:env],
:host => host,
:message => e.logging_event_message,
:backtrace => e.throwable_str_rep.to_a.join("\n"),
:request => mdc_value(e, 'com.squareup.logging.HttpRequest'),
:action => mdc_value(e, 'com.squareup.logging.ActionName'),
:user_token => mdc_value(e, 'com.squareup.logging.user_token'),
}
end
memory[key] = sc.next_offset
end
rescue => e
$stderr.puts e.message
$stderr.puts e.backtrace
$stderr.puts
end

def working?
true
end

def mdc_value(e, key)
tuple = e.mdc.detect {|x|
x.key == key
}
tuple ? tuple.value : ''
end
end
end
168 changes: 168 additions & 0 deletions app/services/samsa_consumer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
require 'poseidon'
require 'sq/protos'
require 'squareup/samsa/common.pb'

require 'poseidon'
require 'snappy'
require 'stringio'

# Helps consume events from Samsa and unpack the protos
class SamsaConsumer
attr_reader :consumer

def initialize(topic, offset)
@consumer = build_consumer(topic, offset)
end

# Consumes events and yields a single message at a time
# @param opts [true,false] :once If true, exits after first message
# @yieldparam [Squareup::Samsa::Common::SamsaMessage]
def each(opts = {}, &block)
each_batch(opts) { |batch| batch.each { |m| yield m } }
end

# Consumes events and yields a batch
# @yieldparam [Array<Squareup::Samsa::Common::SamsaMessage>]
def each_batch(opts = {}, &block)
loop do
messages = consumer.fetch

yield messages.map { |m| decode(m) }

return if opts[:once]
end
end

def consume(&block)
consumer.fetch.map {|m| decode(m) }.each(&block)
end

def next_offset
consumer.next_offset
end

def self.topics
connection = Poseidon::Connection.new('localhost', 4321, Socket.gethostname)
connection.topic_metadata([]).topics.map(&:name)
end

protected

def decode(message)
raw_proto = message.value
::Squareup::Samsa::Common::SamsaMessage.decode(raw_proto)
end

def brokers
if Rails.env.production?
['samsa-kafkaserver.vip.sjc1b.square:27330']
else # see script/samsa_tunnels
['localhost:4321']
end
end

def consumer_name
hostname = Socket.gethostname
"huginn-#{hostname}"
end

def build_consumer(topic, offset)
Poseidon::PartitionConsumer.consumer_for_partition(
consumer_name, brokers, topic, 0, offset
)
end
end

# Remove once either of these PRs is merged upstream:
# - https://github.com/bpot/poseidon/pull/28
# - https://github.com/bpot/poseidon/pull/52
module Poseidon
module Compression
module SnappyCodec
def self.decompress(s)
io = StringIO.new(s)
io.set_encoding("ASCII-8BIT")
::Snappy::Reader.new(io).read
end
end
end
end

require 'poseidon'

# The current version of poseidon uses #slice not #byteslice, which gets super
# cranky with our binary protobuf payloads at Square.
# Remove once these changes are merged:
# - https://github.com/bpot/poseidon/pull/28
module Poseidon
module Protocol
class ResponseBuffer
def initialize(response)
@s = response
@pos = 0
end

def int8
byte = @s.byteslice(@pos, 1).unpack("C").first
@pos += 1
byte
end

def int16
short = @s.byteslice(@pos, 2).unpack("s>").first
@pos += 2
short
end

def int32
int = @s.byteslice(@pos, 4).unpack("l>").first
@pos += 4
int
end

def int64
long = @s.byteslice(@pos, 8).unpack("q>").first
@pos += 8
long
end

def string
len = int16
string = @s.byteslice(@pos, len)
@pos += len
string
end

def read(bytes)
data = @s.byteslice(@pos, bytes)
@pos += bytes
data
end

def peek(bytes)
@s.byteslice(@pos, bytes)
end

def bytes
n = int32
if n == -1
return nil
else
read(n)
end
end

def bytes_remaining
@s.size - @pos
end

def eof?
@pos == @s.size
end

def to_s
@s
end
end
end
end
2 changes: 2 additions & 0 deletions bin/threaded.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def safely(&block)
threads << Thread.new do
safely do
require 'delayed/command'
Delayed::Worker.read_ahead = 1
Delayed::Worker.sleep_delay = 0.2
@dj = Delayed::Worker.new
@dj.start
puts "Delayed job stopped ..."
Expand Down
8 changes: 8 additions & 0 deletions script/samsa_tunnel
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

LOCAL_PORT=4321
SAMSA_PORT=27330

echo "Opening SSH tunnel to Samsa/Kafka, ^C me when you're done."
exec ssh -L $LOCAL_PORT:samsa-kafkaserver.stage.vip.sjc1.square:$SAMSA_PORT \
aia12.sjc1.square -N

0 comments on commit 06995d9

Please sign in to comment.