Skip to content

Commit

Permalink
Merge 52c35e0 into fbc5a5d
Browse files Browse the repository at this point in the history
  • Loading branch information
MaheshKReddy committed May 7, 2019
2 parents fbc5a5d + 52c35e0 commit d6af79c
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 8 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ Note: The following are in addition to the common parameters shown above.
|:exchange|:string|nil| What exchange should the queue bind to? |
|:exchange_type|:string|"direct"| Type of exchange ( direct, fanout, topic, headers, x-consistent-hash, x-modulus-hash )|
|:routing_key|:string|nil| What exchange should the queue bind to? |
|:include_headers|:bool|false| If true, include Message headers in the parsed payload with key "headers" |

### Example

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.13.0
0.13.1
22 changes: 15 additions & 7 deletions lib/fluent/plugin/in_amqp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class AMQPInput < Input
config_param :bind_exchange, :bool, default: false
config_param :exchange, :string, default: ""
config_param :routing_key, :string, default: "#" # The routing key used to bind queue to exchange - # = matches all, * matches section (tag.*.info)
config_param :include_headers, :bool, default: false

def configure(conf)
conf['format'] ||= conf['payload_format'] # legacy
Expand Down Expand Up @@ -83,8 +84,10 @@ def start
end

q.subscribe do |delivery, meta, msg|
log.debug "Recieved message #{@msg}"
payload = parse_payload(msg)
log.debug "Recieved message #{msg}"
log.debug "Recieved message MetaData #{meta}"
payload = parse_payload(msg, meta)
log.debug "Parsed Payload #{payload}"
router.emit(parse_tag(delivery, meta), parse_time(meta), payload)
end
end # AMQPInput#run
Expand All @@ -100,20 +103,25 @@ def multi_workers_ready?
end

private
def parse_payload(msg)
def parse_payload(msg, meta)
parsed = nil
if @parser
parsed = nil
@parser.parse msg do |_, payload|
if payload.nil?
log.warn "failed to parse #{msg}"
parsed = { "message" => msg }
parsed = { 'message' => msg }
else
parsed = payload
end
end
parsed
else
{ "message" => msg }
parsed = { 'message' => msg }
end
if @include_headers
log.debug 'Adding headers'
{ 'headers' => meta[:headers] }.merge(parsed)
else
parsed
end
end

Expand Down
95 changes: 95 additions & 0 deletions test/plugin/test_in_amqp-headers.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
require_relative '../helper'
require 'fluent/test'
require 'fluent/test/driver/input'
require 'fluent/test/helpers'
require 'fluent/plugin/in_amqp'
require 'bunny-mock'

class AMPQInputTestForHeaders < Test::Unit::TestCase
include Fluent::Test::Helpers

CONFIG = %(
type amqp
format json
host localhost
port 5672
vhost /
user guest
pass guest
queue test_in_fanout
)

def setup
Fluent::Test.setup
end

def create_driver(conf)
Fluent::Test::Driver::Input.new(Fluent::Plugin::AMQPInput).configure(conf)
end

def get_plugin(configuration = CONFIG)
omit('BunnyMock is not avaliable') unless Object.const_defined?('BunnyMock')

@driver = create_driver(configuration)
plugin = @driver.instance
plugin.connection = BunnyMock.new

# Start the driver and wait while it initialises the threads etc
plugin.start
5.times { sleep 0.05 }
return plugin
end

sub_test_case 'headers' do

test 'test_omit_headers' do
plugin = get_plugin()

# Should have created the 'test_in_fanout' queue
assert_equal true, plugin.connection.queue_exists?('test_in_fanout')

# bind a testing queue to the exchange
queue = plugin.connection.channel.queue('test_in_fanout')
assert_equal true, (queue != nil)

hash = { 'foo' => 'bar' }
headers = { 'GroupId' => 'H123' }
expect_hash = hash.dup

@driver.run(expect_emits: 1) do
queue.publish( hash.to_json, headers: headers )
end

@driver.events.each do |event|
assert_equal expect_hash, event[2]
end
end

test 'test_emit_headers' do
conf = CONFIG.clone
conf << "\ninclude_headers true\n"
plugin = get_plugin(conf)

# Should have created the 'test_in_fanout' queue
assert_equal true, plugin.connection.queue_exists?('test_in_fanout')

# bind a testing queue to the exchange
queue = plugin.connection.channel.queue('test_in_fanout')
assert_equal true, (queue != nil)

hash = { 'foo' => 'bar' }
headers = { 'GroupId' => 'H123' }
expect_hash = hash.dup
expect_hash['headers'] = headers

# Emit an event through the plugins driver
@driver.run(expect_emits: 1) do
queue.publish( hash.to_json, headers: headers )
end

@driver.events.each do |event|
assert_equal expect_hash, event[2]
end
end
end
end

0 comments on commit d6af79c

Please sign in to comment.