Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Implemented gzip encoding, closes #5

  • Loading branch information...
commit bb107c2c7d1d5b13b4cbb9d17eedb5ff2e6e51c4 1 parent cf54d36
Theo Hultberg iconara authored
63 lib/autobahn/encoder.rb
@@ -2,10 +2,10 @@
2 2
3 3 module Autobahn
4 4 class StringEncoder
5   - CONTENT_TYPE = 'application/octet-stream'.freeze
  5 + PROPERTIES = {:content_type => 'application/octet-stream'.freeze}.freeze
6 6
7   - def content_type
8   - CONTENT_TYPE
  7 + def properties
  8 + PROPERTIES
9 9 end
10 10
11 11 def encodes_batches?
@@ -25,11 +25,11 @@ def decode(str)
25 25 require 'json'
26 26
27 27 class JsonEncoder
28   - CONTENT_TYPE = 'application/json'.freeze
  28 + PROPERTIES = {:content_type => 'application/json'.freeze}.freeze
29 29
30   - def content_type
31   - CONTENT_TYPE
32   - end
  30 + def properties
  31 + PROPERTIES
  32 + end
33 33
34 34 def encodes_batches?
35 35 true
@@ -50,10 +50,10 @@ def decode(str)
50 50 require 'msgpack'
51 51
52 52 class MsgPackEncoder
53   - CONTENT_TYPE = 'application/msgpack'.freeze
  53 + PROPERTIES = {:content_type => 'application/msgpack'.freeze}.freeze
54 54
55   - def content_type
56   - CONTENT_TYPE
  55 + def properties
  56 + PROPERTIES
57 57 end
58 58
59 59 def encodes_batches?
@@ -65,7 +65,7 @@ def encode(obj)
65 65 end
66 66
67 67 def decode(str)
68   - MessagePack.unpack(str.force_encoding(Encoding::ASCII_8BIT))
  68 + MessagePack.unpack(str.force_encoding(Encoding::BINARY))
69 69 end
70 70 end
71 71 rescue LoadError
@@ -75,7 +75,7 @@ def decode(str)
75 75 require 'bson'
76 76
77 77 class BsonEncoder
78   - CONTENT_TYPE = 'application/bson'.freeze
  78 + PROPERTIES = {:content_type => 'application/bson'.freeze}.freeze
79 79
80 80 def content_type
81 81 CONTENT_TYPE
@@ -95,4 +95,43 @@ def decode(str)
95 95 end
96 96 rescue LoadError
97 97 end
  98 +
  99 + begin
  100 + require 'zlib'
  101 + require 'stringio'
  102 +
  103 + class GzipEncoder
  104 + CONTENT_ENCODING = 'gzip'.freeze
  105 +
  106 + def initialize(decorated_encoder)
  107 + @decorated_encoder = decorated_encoder
  108 + end
  109 +
  110 + def properties
  111 + @properties ||= begin
  112 + p = @decorated_encoder.properties.dup
  113 + p[:content_encoding] = CONTENT_ENCODING
  114 + p
  115 + end
  116 + end
  117 +
  118 + def encodes_batches?
  119 + @decorated_encoder.encodes_batches?
  120 + end
  121 +
  122 + def encode(obj)
  123 + io = StringIO.new
  124 + gz = Zlib::GzipWriter.new(io)
  125 + gz.print(@decorated_encoder.encode(obj))
  126 + gz.close
  127 + io.string
  128 + end
  129 +
  130 + def decode(str)
  131 + io = StringIO.new(str)
  132 + gz = Zlib::GzipReader.new(io)
  133 + @decorated_encoder.decode(gz.read).tap { gz.close }
  134 + end
  135 + end
  136 + end
98 137 end
2  lib/autobahn/publisher.rb
@@ -14,7 +14,7 @@ def publish(message)
14 14 rk = @strategy.select_routing_key(routing_keys, message)
15 15 ex = exchanges_by_routing_key[rk]
16 16 em = @encoder.encode(message)
17   - op = {:routing_key => rk, :properties => {:content_type => @encoder.content_type}}
  17 + op = {:routing_key => rk, :properties => @encoder.properties}
18 18 ex.publish(em, op)
19 19 end
20 20
70 spec/integration/autobahn_intg_spec.rb
@@ -92,16 +92,48 @@ def counting_down(n, options={})
92 92 end
93 93
94 94 context 'with encoded messages' do
  95 + before do
  96 + @encoded_transport_system = Autobahn.transport_system(api_uri, exchange_name, :encoder => Autobahn::JsonEncoder.new)
  97 + end
  98 +
  99 + after do
  100 + @encoded_transport_system.disconnect! if @encoded_transport_system
  101 + end
  102 +
95 103 it 'uses the provided encoder to encode messages' do
96   - begin
97   - transport_system = Autobahn.transport_system(api_uri, exchange_name, :encoder => Autobahn::JsonEncoder.new)
98   - publisher = transport_system.publisher
99   - publisher.publish({'hello' => 'world'})
100   - sleep(0.1) # allow time for delivery
101   - @queues.map { |q| h, m = q.get; m }.compact.first.should == '{"hello":"world"}'
102   - ensure
103   - transport_system.disconnect! if transport_system
104   - end
  104 + publisher = @encoded_transport_system.publisher
  105 + publisher.publish({'hello' => 'world'})
  106 + sleep(0.1) # allow time for delivery
  107 + @queues.map { |q| h, m = q.get; m }.compact.first.should == '{"hello":"world"}'
  108 + end
  109 + end
  110 +
  111 + context 'with compressed messages' do
  112 + before do
  113 + encoder = Autobahn::GzipEncoder.new(Autobahn::JsonEncoder.new)
  114 + options = {:encoder => encoder}
  115 + @compressed_transport_system = Autobahn.transport_system(api_uri, exchange_name, options)
  116 + publisher = @compressed_transport_system.publisher
  117 + publisher.publish({'hello' => 'world'})
  118 + sleep(0.1) # allow time for delivery
  119 + end
  120 +
  121 + after do
  122 + @compressed_transport_system.disconnect! if @compressed_transport_system
  123 + end
  124 +
  125 + it 'uses the provided encoder to also compress messages' do
  126 + compressed_message = [31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 171, 86, 202, 72, 205, 201, 201, 87, 178, 82, 42, 207, 47, 202, 73, 81, 170, 5, 0, 209, 65, 9, 216, 17, 0, 0, 0].pack('C*')
  127 + actual_message = @queues.map { |q| h, m = q.get; m }.compact.first
  128 + actual_message.should == compressed_message
  129 + end
  130 +
  131 + it 'sets the content-encoding header' do
  132 + @queues.map { |q| h, m = q.get; h.properties.content_encoding if h }.compact.first.should == 'gzip'
  133 + end
  134 +
  135 + it 'sets the content-type header' do
  136 + @queues.map { |q| h, m = q.get; h.properties.content_type if h }.compact.first.should == 'application/json'
105 137 end
106 138 end
107 139
@@ -271,6 +303,26 @@ def counting_down(n, options={})
271 303 end
272 304 end
273 305
  306 + context 'with compressed messages' do
  307 + before do
  308 + options = {:encoder => Autobahn::GzipEncoder.new(Autobahn::JsonEncoder.new)}
  309 + @compressed_transport_system = Autobahn.transport_system(api_uri, exchange_name, options)
  310 + compressed_message = [31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 171, 86, 202, 72, 205, 201, 201, 87, 178, 82, 42, 207, 47, 202, 73, 81, 170, 5, 0, 209, 65, 9, 216, 17, 0, 0, 0].pack('C*')
  311 + @exchange.publish(compressed_message, :routing_key => routing_keys.sample)
  312 + sleep(0.1) # allow time for delivery
  313 + end
  314 +
  315 + after do
  316 + @compressed_transport_system.disconnect! if @compressed_transport_system
  317 + end
  318 +
  319 + it 'uses the provided encoder to decode messages' do
  320 + consumer = @compressed_transport_system.consumer
  321 + h, m = consumer.next(5)
  322 + m.should == {'hello' => 'world'}
  323 + end
  324 + end
  325 +
274 326 context 'with batched messages' do
275 327 before do
276 328 @encoder = Autobahn::JsonEncoder.new

0 comments on commit bb107c2

Please sign in to comment.
Something went wrong with that request. Please try again.