Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Branch] Java API #9342

Closed
wants to merge 50 commits into from
Closed

[Feature Branch] Java API #9342

wants to merge 50 commits into from

Conversation

andrewvc
Copy link
Contributor

@andrewvc andrewvc commented Apr 9, 2018

@original-brownbear just opening this so we can track the deltas on this branch.

@andrewvc
Copy link
Contributor Author

andrewvc commented Apr 9, 2018

One thing that occurs to me, since we'll want to publish an artifact, is that we'll have to stop using the org.logstash package name since we don't actually own that.

we'll have to use net.logstash or co.elastic (or org.elasticsearch)

@Override
public Collection<Event> filter(final Collection<Event> events) {
//TODO: Impl.
return events;
Copy link
Contributor Author

@andrewvc andrewvc Apr 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this just returns a new collection. Something like:

Collection<Event> cloned = new ArrayList<>(events.size*2);
events.forEach( e -> { cloned.add(e); cloned.add(e.clone()) })
return cloned;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewvc jup will adjust soon :)

@original-brownbear original-brownbear self-assigned this Apr 9, 2018
@original-brownbear
Copy link
Member

@andrewvc yikes on the package change, that'll break at least the date filter. Should we maybe make that move in master independently of this PR to keep the noise level lower here?

@elastic elastic deleted a comment from original-brownbear Apr 12, 2018
@original-brownbear
Copy link
Member

@danhermann we gotta fix the build here :) Taking a look at the ITs now.

@original-brownbear
Copy link
Member

@danhermann it's back alive :) Fixed all merge conflicts and at least locally tests were green again.

Copy link
Member

@yaauie yaauie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I think this is pretty amazing stuff 🎉


I've left some comments in-line, but my primary issues are:

  • mismatch in abstractions between Codec#decode(ByteBuffer) and Codec#encode(OutuputStream) feels awkward and a little half-baked. This may be just a side-effect of us using "codec" within Logstash to encapsulate two related but different concepts (in which case I'm begrudgingly okay with it)
  • doc blocks in API interface methods could use clarity; separating the concerns between implementations requirements and client requirements may make things more clear.
  • validation gap; is Plugin#configSchema() used anywhere?


/**
* Decodes events from the specified {@link ByteBuffer} and passes them to the provided
* {@link Consumer}. Clients of the codec are responsible for ensuring that the input buffer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At my first reading (and second and third, honestly), it looked like we were putting a lot of requirements on the implementation of the Codec, but I now understand that we are attempting to describe the whole relationship to both clients and implementations.


I think this section could benefit from bullet points:

  • The client (typically an {@link Input}) must provide a ByteBuffer that is ready for reading (with {@link ByteBuffer#position} indicating the next position to read and {@link ByteBuffer#limit} indicating the first byte in the buffer that is not safe to read)
  • Implementations of {@link Codec} must ensure that the {@link ByteBuffer#position} indicates the last-read position before returning control.
  • The client must then return the buffer to write mode (typically with {@link ByteBuffer#compact}) before writing more bytes.

* Decodes events from the specified {@link ByteBuffer} and passes them to the provided
* {@link Consumer}. Clients of the codec are responsible for ensuring that the input buffer
* is in a valid state for reading. Upon completion of {@link Codec#decode}, the codec is
* responsible for ensuring that {@link ByteBuffer#limit} reflects the last point at which
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we mean ByteBuffer#position?


I don't think that the "ensuring that {@link ByteBuffer#limit} reflects the last point at which input bytes were decoded to events" requirement is quite aligned with the spirit of Buffer#limit:

A buffer's limit is the index of the first element that should not be read or written
-- java.nio.Buffer

By setting ByteBuffer#limit to the point at which the codec stopped processing, we prevent the client from being able to reliably use ByteBuffer#compact() as we later declare in this comment that it should:

The bytes between the buffer's current position and its limit, if any, are copied to the beginning of the buffer. That is, the byte at index p = position() is copied to index zero, the byte at index p + 1 is copied to index one, and so forth until the byte at index limit() - 1 is copied to index n = limit() - 1 - p. The buffer's position is then set to n+1 and its limit is set to its capacity. The mark, if defined, is discarded.
-- ByteBuffer#compact()

By requiring that implementations set the ByteBuffer#limit to reflect "the last point at which input bytes were decoded to events", we are ensuring that bytes beyond that point are not moved up to the beginning of the byte buffer and setting them up to be overwritten.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the code functions properly using ByteBuffer#position but the comment mistakenly specifies ByteBuffer#limit. I'll fix it.

* @param event The event to encode.
* @param output The stream to which the encoded event should be written.
*/
void encode(Event event, OutputStream output);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ByteBuffer/OutputStream mismatch between the encode and decode operations feels a bit awkward, like we're using one class for two distinct things (admittedly: we are). Can we converge on one shared abstraction?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the shared abstraction is that decode takes a stream of bytes through the provided ByteBuffer and returns one or more Events and encode provides the reverse operation by taking an Event and writing a stream of bytes to the provided OutputStream. The logical interface is the same as the Ruby codecs in which the decode method takes a byte array and yields one or more Events and the encode method takes an Event and yields a string. Am I misunderstanding the shared abstraction that you'd like to see?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, the OutputStream is a higher level of abstraction than ByteBuffer, so it seems unbalanced that the Output gets to take advantage of simpler requirements while Input has to juggle with flipping the buffer between read and write modes.

It would make more sense to me if either both used ByteBuffer or if decode used an InputStream to match encode's OutputStream.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make sense to use ByteBuffers for both encoding and decoding.

A peculiar thing the current codec implementation in Logstash is that it does not differentiate between stream-based codecs (e.g. reading from a TCP stream) and event-based codecs (e.g. reading from a Kafka topic). If a single interface is defined for both use cases, I think a ByteBuffer-based would make the most sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaauie, @praseodym, I see what you're saying and I agree that using a ByteBuffer for encode as decode already does could make sense. I'm going to work on the other changes first and see if I can get this one in before feature freeze. If not, I'll add it to the requirements list for the beta phase if that's acceptable.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danhermann, the explicit no_codec is great, I hadn't seen that before. Ensuring that codecs stay optional should be good enough for now.

For the near future, it'd be good to have a way to package 'codec plugins' that do not strictly conform to the Logstash codec interface, so that it would be possible to e.g. package a custom Netty ChannelInboundHandlerAdapter for use in a Netty ChannelPipeline, or package a custom Kafka custom Serde class. That is related to #9521 and I'm not sure if that's already in scope for this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I look at it, the more I think that java.nio.channels.WritableByteChannel and java.nio.channels.ReadableByteChannel make the most sense for Codec#encode and Codec#decode respectively; there are a lot of usable implementations, including those that efficiently wrap InputStream, OutputStream, to eliminate unnecessary copying, and they allow the buffering (or lack thereof) to be solely a concern of the codecs, which I think makes sense because the codecs are the things that need to hold onto state (especially while decoding).

I have an 80% WIP to swap that out that I hope to complete in the morning, so keep an eye out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see the point in using ByteBuffer for both encode and decode since ByteBuffer is equally usable for either stream-based or channel-based IO, but I think going all the way to Channel for both encode and decode unnecessarily disadvantages plugins that use stream-based IO. Most Java libraries use stream-based rather than channel-based IO, so I wouldn't want to impose that penalty on them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are the Channels.newChannel(InputStream) and Channels.newChannel(OutputStream) wrappers that allow us to consume streams as channels with very minimal overhead (e.g., no extra memory copying), so I still think it is a "nicer" abstraction to work with than passing ByteBuffers around. I'd like to at least get to a POC before ruling it out.

But the channel-based aplroach does diverge from the Ruby API enough that it would require shim implementations to support pipelines that mixed plugins from the two APIs, and that is more than I can commit to before the coming feature freeze.


I am okay with merging as-is, with ByteBuffer used on one and OutputStream used in the other IFF we include something like a v0 in the API'S package name (e.g., making this co.elastic.logstash.api.v0.Codec) and document that the v0 java API is not yet stable and requires compiling your plugin against the exact major/minor pair of Logstash. That would give us the freedom to iterate without breaking expectations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we always documented features in experimental status as subject to breaking changes?

}

@LogstashPlugin(name = "stream")
final class StreamInput implements Input {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be worth moving these example implementations either to their own files or to the test package?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely. These are examples from the really early days of the Java API and I'll probably just remove them since the example Java plugins in https://github.com/logstash-plugins should take their place.

buffer.compact();
}
} catch (AsynchronousCloseException e2) {
// do nothing -- this happens when stop is called during a pending read
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth logging?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That exception occurs in response to the user action of either shutting down LS or reloading the pipeline when the input loop is waiting on the input.read(buffer) statement rather than in the codec.decode statements (in practice, it's usually the former). I'm not sure that's useful information to log, but I'm open to reasons why it might be.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have accepted bytes from the wire (e.g., TCP ACK), and those bytes don't end up contributing to events due to a shutdown or pipeline reload, having a clue somewhere in the logs could save a whole lot of debugging.

// do nothing -- this happens when stop is called during a pending read
} catch (IOException e) {
stopRequested = true;
throw new IllegalStateException(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logging here may prove helpful

}

Stdout(final Configuration configuration, final Context context, OutputStream targetStream) {
printer = new PrintStream(targetStream); // replace this with a codec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unresolved TODO

}

@SuppressWarnings("unchecked")
public <T> T get(final PluginConfigSpec<T> configSpec) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have a gap in validation that could lead to the silent acceptance of bad input that is then ignored.

Because this method is only ever called from the implementations with the PluginConfigSpecs that the particular implementation knows about, I believe we have nothing to validate that plugins correctly report errors when a pipeline configuration attempts to use config directives that the implementation does not know about.

Example:

input {
  stdin {
    codek => typo
  }
} 

I do see that we have a Plugin#configSchema that must return a `Collection<PluginConfigSpec<?>>, but I do not see it used anywhere to perform this validation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. It wasn't explicit that config validation would be part of the beta phase of the API, so I just now added it as a requirement on the Java API meta issue.

import java.util.Map;

/**
* LS Configuration example. Should be implemented like Spark config or Hadoop job config classes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it an example, or is it the implementation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the implementation. I'll update the javadoc accordingly.

@danhermann
Copy link
Contributor

@yaauie, thanks for the detailed review! I will add some replies and make a number of changes later this afternoon.

@danhermann
Copy link
Contributor

@yaauie, I think I have addressed all of your review comments except for the change to the codec interface which I might defer to the beta phase if that's ok. I'm working on resolving the many merge conflicts between this PR and the 6.x branch.

@yaauie
Copy link
Member

yaauie commented Dec 17, 2018

It looks like we have a couple errors in CI; once these are resolved, I'm 👍 for a merge.

17:49:42     An error occurred while loading ./logstash-core/spec/logstash/filter_delegator_spec.rb.
17:49:42     Failure/Error: Unable to find org/jruby/RubyModule.java to read failed line
17:49:42     
17:49:42     NameError:
17:49:42       uninitialized constant LogStash::FilterDelegator::DELEGATED_METHODS
17:49:42     # ./logstash-core/spec/logstash/filter_delegator_spec.rb:152:in `block in (root)'
17:49:42     # ./logstash-core/spec/logstash/filter_delegator_spec.rb:148:in `block in (root)'
17:49:42     # ./logstash-core/spec/logstash/filter_delegator_spec.rb:8:in `<main>'

@danhermann
Copy link
Contributor

This work was completed in #10216

@danhermann danhermann closed this Dec 18, 2018
@jsvd jsvd deleted the feature/java-api branch October 11, 2019 14:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants