Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discuss the introduction of a channel of components inside an input. #4432

Closed
guyboertje opened this issue Jan 7, 2016 · 3 comments
Closed

Comments

@guyboertje
Copy link
Contributor

Motivation

  • inputs have a mechanism to 'source' data externally, lets call this the source. Sources usually have their own internal loop that pumps data into the input via some callback mechanism - a block or a subscriber callback. Often enough, the source has context that should ride with the data, e.g. a path in the case of the file input.
  • the existing call input.run, when codecs are involved, calls decode on the codec with a block. The block binding closes over the queue and any number of other input ivars or source context. The codec does not know the content of this binding, and the binding can be different for each decode call. It cannot cache any one block and call it if there is buffered data that the codec must flush asynchronously - it would have to cache each block along with the buffered data.
  • some inputs have extra event decoration that is dependent on the context from the source and again this decoration is done in the input via the block that the codec calls yields to.
  • it is not possible for codecs, lets generically think of them as event processors, to use the context given by the source.
  • it is not possible to introduce new event processors/codec before or after the existing codec either declaratively or dynamically. For example, the stdin input's source does not produce line oriented data and can behave incorrectly if it is used with a multiline codec that expects line oriented data. To overcome this we have fudged it somewhat by adding composite codecs, e.g. the json_lines codec that internally chains the line and json codecs.
  • it is not possible for the source to communicate other 'events' to the input. For example the filewatch as a source can communicate when a file is opened, closed, renamed or reached eof but presently it can only communicate data 'events' to the file input.

Concepts

  • A channel is a double linked list collection of components. It can be built statically or dynamically. The 'head' of the channel is the source component and the 'tail' is the queuing component. Intermediate components are called 'links'. Head, tail or link is also known as the component_type.
  • A component wraps a unit of work in a simple chainable interface. It is stateless and knows about its upstream and downstream neighbours. When it receives a call from upstream it executes the unit of work and calls the downstream component with the modified data and context.
  • The data call flow is from upstream downwards and the control call flow is from downstream upwards. Initially, I expect that control flow is backpressure.
  • The input hold the channel and its components and can exert control on the components or channel if necessary.
  • A component can take metadata, see accept_meta below, from the input when it builds the channel, usually some configuration values it would need for its unit of work.
  • The preliminary API as implemented by the base component is:
  def initialize(component_type, upstream, downstream)
    @component_type, @upstream, @downstream = component_type, upstream, downstream
    @logger = Cabin::Channel.get(LogStash)
  end

  def accept_meta(opts = {})
    @meta = opts
    self
  end

  def accept(context, data = nil)
    do_work(context, data)
  end

  def do_work(context, data)
    deliver(context, data)
  end

  def deliver(context, data)
    downstream.accept(context, data)
  end
  • A head component should redefine the deliver method.
  • A tail component should redefine the accept method.
  • A link component should redefine the do_work method.
  • All components redefining a base method should call super after it has done some local work.

POC Implementation

Implications

  • This proposal paves the way for codec chaining in the configuration.
  • Each component should be unit tested.
  • Some components can be shared across inputs.
  • Proper backpressure signals can flow up to the source.
  • Acking can flow up through the channel, especially when the persistent queue persists the event. This is a future extension, but I expect something like this:
  def ack(ctx)
    respond_to_ack_locally_if_needed(ctx)
    @upstream.ack(ctx) if @component_type != 'head'
  end
  • Dynamic channels can be created where necessary.
  • We can break down some of the block heavy designs seen in some inputs.
  • Difficult problems like contextual multiline match patterns might be easier to solve.
  • It is possible to have predicate components that allow or deny data flow using rules applied to context or data.
  • It is possible to have multi-channels that share a head component and a predicate component as the first link after the head. The predicate component will have be multiple downstream chains and the predicate rule outcome selects which downstream to call accept on.
  • It may be interesting consider this proposal for other parts of the pipeline, e.g. outputs or filters + output
  • It will make writing a new input more 'interesting', on the one hand only the head component needs most thought and for most cases we can use a generic codec component and on the other the general move to Java will make writing any plugins 'interesting' for the community in the future.
  • This proposal will make the transition to pure Java inputs easier because we will have removed the block calls.
@andrewvc
Copy link
Contributor

I really like this proposal, regarding the output side of this I've added some thoughts on a relevant issue here #3486 (comment) . From what I can tell the outputs have different concerns here.

@guyboertje
Copy link
Contributor Author

Think of a component as a both a Facade and an Adapter

The Facade is what it presents to the caller (pipeline or plugin).
The Adapter is what is used to communicate with what the component wraps.

---> Facade ----> Facade ----> Facade ----> Facade ----> push
        |            |            |            |
     Adapter      Adapter      Adapter      Persistent
      Input        Codec        Codec         Queue

As we have not coded the persistent queue API, a queue component would not need an adapter. It just needs a mechanism to receive data (an event) from upstream, persist it, and push data downstream (a batch of events). On LS restart the queue component will block the receive side and push unacked data downstream.

@colinsurprenant
Copy link
Contributor

I am +1 on moving this idea forward.

  • I'd like to see if/how we can come up with a phased & backward compatible implementation?
  • conceptually the idea of encapsulating units of work into "bite-size" components is powerful, will be easier to test, will facilitate reusability which in turn will improve quality.
  • following out recent codecs discussion, we should look into moving the components chain outside the input directly in the pipeline.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants