-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add synchronous codec encode method to aid in testing, debugging #3486
Comments
I agree the callback mechanism makes it awkward to test. |
Given the current state of things there are some tricky compatibility issues here. I initially hoped that there might be some way to create an implementation of I think the best path forward would be to just start adding this method on a plugin by plugin basis, instead of putting it formally in the core at the moment. There aren't that many Codec plugins so it is doable. @jordansissel @ph What are your thoughts? |
@andrewvc I agree with you we should do it plugin by plugin basis and keep the old method for backward compatibility. Also as soon as we update the plugin to use the sync method we need to make sure to update their dependencies to a specific version of But by forcing a plugin to use a recent version of So maybe we should target this change as 2.0? |
@ph agreed on 2.0. Just tagged it as such. |
@jordansissel @ph if it's a 2.0 target should we just target altering #encode to be sync exclusively? Are any codecs actually dependent on the async behavior? The only case where I can think of this is something using EventMachine or similar to do some external IO, which should probably be in a filter. |
@andrewvc Well if we target 2.0, its a good time to just change the behavior and drop backward support. Also 2.0 is a good time to see if we could benefit to redefine the codec to be more |
We need to revive this issue in light of the new NG pipeline. If codec encoding has an async contract we can't guarantee backpressure, and can't guarantee that we've persisted disk safely. The compress-spooler codec is particularly problematic. It is the only codec we have ( I checked the source of all of them), that has its own internal buffer ,and does truly async encoding.There is no way to make this work with persistence. We need backpressure, and we need to do it through the callstack. @guyboertje 's proposal #4432 for inputs has been mentioned as a possible approach here, but I'm not sure it's a workable approach for outputs/codecs because we need backpressure to always be synchronous. I'd love your feedback here @guyboertje. AFAICT we should just chain function calls because asynchronous contexts and backpressure are not going to be used here. Approach
Proposed interface class HypotheticalJsonCodec < LogStash::Codec
# For a hypothetical JSON encoder
def encode(event)
encoded = event.to_json
@on_event.call(event, encoded) if @on_event # Backwards compatibility
[event, encoded]
end
# This is important for codecs like compress spooler.
# This will let them use the external pipeline buffer
def multi_encode(events)
events.map(&:encode)
end
end class HypotheticalHTTPOutput < LogStash::Output
def multi_receive(encoded_events)
encoded_events.each do |event, encoded|
send_request(url: event.sprintf(event), body: encoded)
end
end
end |
@andrewvc - with components the call chain is synchronous as it the return unwind so we just need to pass a flag back on the return unwind. |
@guyboertje can you post a code sample of what that would look like for an encode from an output plugin author standpoint? |
@andrewvc - sure, if you first post a sample output that wishes to communicate backpressure upstream 😄 I will show how components can facilitate this backpressure communication. |
@guyboertje can you use the hypothetical ones I listed above? All outputs should now exhibit backpressure since they all by definition perform IO. |
Ahh - then synchronous blocking back-pressure it is inherent in the call chain. Now the discussion turns to the when and how the batch elements are handled aka de-batching. De-batch at the encode component: class DebatchingEncoderComponent
include Component
# the pipeline will dequeue a batch and call
# accept here with a context and data (batch)
def add_codec(codec)
@codec = codec
@codec.on_event do |event, encoded|
# de-batch here
# if downstream blocks then this will too
deliver(event, encoded)
end
self
end
# will we need a context?
def do_work(context, data)
#here data is a batch of events
data.each do |event|
# if on_event blocks then this call will too
# if I block here then the pipeline dequeuing
# call to my accept method will block too
@codec.encode(event)
end
end
end
class OutputComponent
include Component
def add_output(output)
@output = output
self
end
def accept(event, data)
@output.receive(event, data)
end
end De-batch at the output component: class BulkEncoderComponent
include Component
# the pipeline will dequeue a batch and call
# accept here with a context and data (batch)
# if downstream blocks then accept blocks
# and the dequeue loop blocks
def add_codec(codec)
@codec = codec
@codec.on_event do |event, encoded|
# this call does not block
assemble(event, encoded)
end
self
end
# will we need a context?
def do_work(context, data)
@assembled = []
#here data is a batch of events
data.each do |event|
@codec.encode(event)
end
# if downstream blocks then this blocks too
deliver(context, @assembled)
end
def assemble(event, encoded)
@assembled.push([event, encoded])
end
end
class DebatchingOutputComponent
include Component
def add_output(output)
@output = output
self
end
def accept(context, data)
# here data is an array of [event, encoded]
# if this call blocks then upstream deliver will block
data.each do |event, encoded|
@output.receive(event, encoded)
end
end
end Output that natively handles bulk write: class BulkEncoderComponent
include Component
# the pipeline will dequeue a batch and call
# accept here with a context and data (batch)
# if downstream blocks then accept blocks
# and the dequeue loop blocks
def add_codec(codec)
@codec = codec
@codec.on_event do |event, encoded|
# this call does not block
assemble(event, encoded)
end
self
end
# will we need a context?
def do_work(context, data)
@assembled = []
#here data is a batch of events
data.each do |event|
@codec.encode(event)
end
# if downstream blocks then this blocks too
deliver(context, @assembled)
end
def assemble(event, encoded)
@assembled.push([event, encoded])
end
end
class BulkOutputComponent
include Component
def add_output(output)
@output = output
self
end
def accept(context, data)
# here data is an array of [event, encoded]
# if this call blocks then upstream deliver will block
@output.multi_receive(context, data)
end
end Using components will separate the concerns of encoding, output writing and batching handling. Also, we can change the codec to use a listener callback instead of a block: class BulkEncoderComponent
include Component
# modified codec to use a listener based callback
# codec will call process and not a block
def add_codec(codec)
@codec = codec.add_listener(self) # add_listener returns the codec (self)
self
end
# will we need a context?
def do_work(context, data)
@assembled = []
#here data is a batch of events
data.each do |event|
@codec.encode(event)
end
# if downstream blocks then this blocks too
deliver(context, @assembled)
end
def process(event, encoded)
@assembled.push([event, encoded])
end
end |
I would think that the channel of components are assembled in the output and not in the pipeline because the output config is how the user communicates the desired output behaviour, the pipeline should not be concerned about this. |
@guyboertje thanks for the verbose examples! . Now that I have a clearer idea of what we're doing I think your proposal is to some extent orthogonal to what I'm discussing (I think the components are good, but I'm actually talking about something different). The main thing I'm interested in fixing here is the async nature of codecs. Additionally, I think we should add a synchronous WRT the pipeline handling the encoding: one advantage of the pipeline handling this would be that in the case where two outputs use the same codec the pipeline would only encode once. I would say that is a niche case however. I have one final question about components in outputs, however, what new use cases would they enable? |
@andrewvc - I can't see that the Compress spooler breaks the on_event contract by offering one arg to the block. The output receives irb(main):001:0> l = ->(a,b) { puts a, b }
=> #<Proc:0x3122b117@(irb):1 (lambda)>
irb(main):002:0> l.call(3)
ArgumentError: wrong number of arguments (1 for 2)
from org/jruby/RubyProc.java:267:in `call'
from (irb):2:in `evaluate'
from org/jruby/RubyKernel.java:1079:in `eval'
from org/jruby/RubyKernel.java:1479:in `loop'
from org/jruby/RubyKernel.java:1242:in `catch'
from org/jruby/RubyKernel.java:1242:in `catch'
from /Users/guy/.rubies/jruby-1.7.22/bin/irb:13:in `(root)'
irb(main):003:0> def ll(&block) block.call(42); end
=> nil
irb(main):004:0> ll {|a,b| puts a, b }
42
=> nil Hmmm. The json codec: def encode(event)
@on_event.call(event, event.to_json)
end # def encode The compress_spooler codec: def encode(event)
# use normalize to make sure returned Hash is pure Ruby for
# MessagePack#pack which relies on pure Ruby object recognition
@buffer << LogStash::Util.normalize(event.to_hash).merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)
# If necessary, we flush the buffer and get the data compressed
if @buffer.length >= @spool_size
@on_event.call(compress(@buffer, @compress_level))
@buffer.clear
end
end # def encode
def close
return if @buffer.empty?
@on_event.call(compress(@buffer, @compress_level))
@buffer.clear
end
private
def compress(data, level)
z = Zlib::Deflate.new(level)
result = z.deflate(MessagePack.pack(data), Zlib::FINISH)
z.close
result
end In summary - in practise, today, we don't have working async codecs in use in outputs. |
In
Acking back to the persistent queue maybe. e.g. tail component is AckingComponent. and comes after output component. Metrics link components. Remember this is about separation of concerns let codecs encode and outputs write and components link/communicate. |
@guyboertje WRT acking back to the persistent queue, my question is why does that need to be explicit. In my mind if we have a sychronous API we just need to make sure |
@andrewvc - What do you think of making the 'batch' that we pass around into an object that wraps the initial array? This Batch object can then hold some state and have behaviour. Imagine a batch size of 10 and a spool_size of 15 and batches A, B, C and the CompressSpooler codec.
I understand the synchronous return success or fail as a mechanism to consider a batch acked. |
@guyboertje I think your design is good, but I believe it solves a different problem than what we have here. The whole reason we need to spool is because the old pipeline processes events one at a time. With the new pipeline the compress spooler needn't spool, the batch size would == the spool size. The codec would just provide a Once we do that we can just keep things simple as in my example above with WRT errors in the return path we can guard against that with a |
@andrewvc - spool size, being a config setting and batch size a command line option are not guaranteed to be equal. Of course, we could force this. I am proposing solutions that are more generic in the context of channels where knowing when an event is 'done' is more deterministic. OTOH, if we are going to modify outputs and codecs to be batch aware then using one Batch object vs one Event we can hide the multi-ness inside the objects. However, components means minimal patching to codecs and outputs. |
@guyboertje my point is that there is no need for spooling if we have the On Thursday, January 14, 2016, Guy Boertje notifications@github.com wrote:
|
Mocking out a codec for plugin testing is irritatingly hard due to the async API. While the async api is fine, I believe the synchronous logic should be wrapped in a blocking
#encode_sync
method. That would make tests like this:look like this:
This was inspired by the discussion over this PR on the SNS input: https://github.com/logstash-plugins/logstash-output-sns/pull/6/files#r32961911
The text was updated successfully, but these errors were encountered: