Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java API Proposal #9137

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions logstash-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ dependencies {
compile "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${jacksonVersion}"
compile "org.jruby:jruby-complete:${jrubyVersion}"
compile 'com.google.googlejavaformat:google-java-format:1.5'
compile 'org.reflections:reflections:0.9.11'
testCompile 'org.apache.logging.log4j:log4j-core:2.9.1:tests'
testCompile 'junit:junit:4.12'
testCompile 'net.javacrumbs.json-unit:json-unit:1.9.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.logstash.execution;

import java.lang.reflect.Constructor;
import java.util.Set;
import org.reflections.Reflections;

/**
* Quick demo of plugin discovery showing that the solution wouldn't require anything beyond
* the plugin classes on the classpath.
*/
public final class DiscoverPlugins {

public static void main(final String... args) throws NoSuchMethodException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are your thoughts on isolated class loaders per plugin ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakelandis I think I'd stay away from that for the time being (meaning, for while JRuby is around probably ...). Though I think this is something we can work towards once we got rid of JRuby types and the Event only contain standard JDK types (but as long as Event holds RubyString etc. that introduces a lot of potentially complex constraints for the JRuby version we have to enforce plugins and I'm not even sure it's easily possible in principle since JRuby sets up its own classloader for the JRuby classes).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your assessment of introduction of alot of complexity. However, if using a security managed LS is in the near future, it may be worth tackling that complexity sooner then later since its usually easier to bake things like that in rather then layer them in them later. Also if security managed LS is in the future, then maybe Java9 modules provides some wins there without isolated class loading (just speculation, I haven't researched).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think a security managed LS is that close. I'd rather bite off that complexity a bit later.

Reflections reflections = new Reflections("org.logstash");
Set<Class<?>> annotated = reflections.getTypesAnnotatedWith(LogstashPlugin.class);
for (final Class<?> cls : annotated) {
System.out.println(cls.getName());
System.out.println(((LogstashPlugin) cls.getAnnotations()[0]).name());
final Constructor<?> ctor = cls.getConstructor(LsConfiguration.class, LsContext.class);
System.out.println("Found Ctor at : " + ctor.getName());
if (Filter.class.isAssignableFrom(cls)) {
System.out.println("Filter");
}
if (Output.class.isAssignableFrom(cls)) {
System.out.println("Output");
}
if (Input.class.isAssignableFrom(cls)) {
System.out.println("Input");
}
}
}
}
132 changes: 132 additions & 0 deletions logstash-core/src/main/java/org/logstash/execution/Filter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package org.logstash.execution;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import org.logstash.Event;

/**
* A Filter is simply a mapping of {@link QueueReader} to a new {@link QueueReader}.
*/
public interface Filter extends LsPlugin {

QueueReader filter(QueueReader reader);

@LogstashPlugin(name = "mutate")
final class Mutate implements Filter {

private static final PluginConfigSpec<String> FIELD_CONFIG =
LsConfiguration.requiredStringSetting("field");

private static final PluginConfigSpec<String> VALUE_CONFIG =
LsConfiguration.requiredStringSetting("value");

private final String field;

private final String value;

/**
* Required Constructor Signature only taking a {@link LsConfiguration}.
* @param configuration Logstash Configuration
* @param context Logstash Context
*/
public Mutate(final LsConfiguration configuration, final LsContext context) {
this.field = configuration.get(FIELD_CONFIG);
this.value = configuration.get(VALUE_CONFIG);
}

@Override
public QueueReader filter(final QueueReader reader) {
return new QueueReader() {
@Override
public long poll(final Event event) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really do want the ability for filtering to be done batch-wise. This is the only way to efficiently do things like Elasticsearch lookups. We kind of getaway with not doing this with the JDBC filters, but that's only possible due to caching. Even with that, we can't exploit the ability to search multiple values in a single query for the cache seeding portion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this API let you create new events? I understand we can drop events with Event.cancel, how would the split filter work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewvc the API doesn't prevent you from first collecting x Events before actually doing anything to them in a batched way and then returning another Reader that wraps batched results :) That's the beauty here imo ... if your plugin needs batches you can do batching, if it doesn't it can just save a bunch of memory by going one by one.

Also, there's nothing preventing you from creating new events here. Simply overwrite the given Event with the new event, don't increment the sequence number and don't call poll on the QueueReader and you created a new Event :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see! While I do like that I feel like we should implement higher level default implementations here for real-world plugins. The problem with patterns is that it's an extra place for bugs and code duplication. Previously each output would do its own batching and there'd be occasional bugs and differences between them.

Additionally, the batch size is set at the logstash level. We don't expose that option to the plugin, and each plugin would have to know what that size is to batch as the user is expecting it to. A default implementation could do that correctly.

Can we add the ability for plugin authors to instead define different callbacks as shown below? I think keeping the low-level API is fine, but I don't think it should be the default.

Something like...

//for the common case of writing a filter that takes one event and modifies (or cancels) an event
public void filterSingle(Event event);

//For the common case of filters that take a batch
public void filterBatch(Collection<Event>);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WRT creating new events, I realize that's relatively rare, but I think some sugar would be nice. Implementing the split filter would be awkward as is, because you need to extract stuff out of the event, then queue it waiting for poll to be invoked again.

Where do these Event objects come from in this API? Are they from an Event pool? Would there be a way to simply ask for a new event from the pool instead without waiting for a new poll?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@original-brownbear so, in this example, I can get a new event by invoking what? It'd help me to see a short version of the clone filter written out here if you have the chance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewvc I think this is how I'd do clone here:


               private Event clone;

               private long lastSeq = -1L;

                @Override
                public long poll(final Event event) {
                    if (clone != null) {
                        event.overwrite(clone);
                        clone = null;
                        return lastSeq;
                    }
                    final long seq = reader.poll(event);
                    lastSeq = seq;
                    if (seq > -1L) {
                        clone = event.clone();
                    }
                    return seq;
                }

Just don't poll upstream if there is a clone to return and don't increment the sequence number since it's still the result of the last "real" input event that was polled from upstream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After meeting with @original-brownbear we decided to use a filter instance per pipeline pattern as we do today. This means that there will be one instance used across pipeline worker threads. These instances will not be invoked concurrently to prevent people from shooting themselves in the foot. There will be an annotation for opting out of this, much like the concurrency :shared option we have today.

We won't tackle sugaring the clone/split filter use case initially.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewvc - can you clarify ?

we decided to use a filter instance per pipeline pattern as we do today. This means that there will be one instance used across pipeline worker threads.

Seem to be saying opposite things... do you mean one instance per pipeline worker thread, or do you really mean a single instance across all pipeline worker threads ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One instance across all threads, just as we do today. One instance per pipeline, shared across all worker threads.

final long seq = reader.poll(event);
if (seq > -1L) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would the sequence be negative?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewvc see https://github.com/elastic/logstash/pull/9137/files#diff-cf259dc78531f006beca5e143b529554R13 ... -1 indicates a failure to poll an event (think interrupts, timeouts).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, of course :)

event.setField(field, value);
}
return seq;
}

@Override
public long poll(final Event event, final long millis) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking through this, most people will need to write the same polling code over and over again.

I think we should create some simple default implementations here that people can use. I put two up here: https://github.com/original-brownbear/logstash/pull/1/files

WDYT? Batching is necessary for things like the ES filter, etc. It's pretty complicated to implement in terms of this API (I'm sure there's bugs in my code up there for instance).

I'd rather we promote those two filter APIs (and make similar ones for the Output class) than have people writing this boilerplate over and over again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewvc

Batching is necessary for things like the ES filter, etc

Yea maybe, but the efficient way of doing it is also highly plugin dependent. As of right now, I think it's fair to say that there are no plugins out there that actively make use of the batched interface in a quantifiable way. Otherwise, I wouldn't have found #8428 by mere chance :)

=> I'm not really comfortable with defining a batched API like that (yet). I'd much rather build a few plugins and see what works in the real world than guessing the right approach here now tbh.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how it's plugin dependent? The only reason filters don't batch today is that it isn't possible because they can only get one event at a time. I'd rather not box us in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, how does the efficiency of https://github.com/original-brownbear/logstash/pull/1/files#diff-eb733fe2c6eb811410b0e0a88226263d , it should be garbage free, aside from a single new Event[] call per batch, which is pretty cheap given a reasonable batch size.

Also, looking forward to your feedback on the SimpleFilter API.

I'd rather we only expose these APIs than the low level one here. The low level one is so implementation dependent it boxes us in. I'm also not sure who would actually want to use them directly, they're much more finnicky, and I'm not sure how much they buy a programmer in any situation I can think of.

final long seq = reader.poll(event, millis);
if (seq > -1L) {
event.setField(field, value);
}
return seq;
}

@Override
public void acknowledge(final long sequenceNum) {
reader.acknowledge(sequenceNum);
}
};
}

@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return Arrays.asList(FIELD_CONFIG, VALUE_CONFIG);
}
}

@LogstashPlugin(name = "clone")
final class Clone implements Filter {

private Event clone;

private long lastSeq = -1L;

/**
* Required Constructor Signature only taking a {@link LsConfiguration}.
* @param configuration Logstash Configuration
* @param context Logstash Context
*/
public Clone(final LsConfiguration configuration, final LsContext context) {
}

@Override
public QueueReader filter(final QueueReader reader) {
return new QueueReader() {
@Override
public long poll(final Event event) {
if (clone != null) {
event.overwrite(clone);
clone = null;
return lastSeq;
}
final long seq = reader.poll(event);
lastSeq = seq;
if (seq > -1L) {
clone = event.clone();
}
return seq;
}

@Override
public long poll(final Event event, final long millis) {
if (clone != null) {
event.overwrite(clone);
clone = null;
return lastSeq;
}
final long seq = reader.poll(event, millis);
lastSeq = seq;
if (seq > -1L) {
clone = event.clone();
}
return seq;
}

@Override
public void acknowledge(final long sequenceNum) {
reader.acknowledge(sequenceNum);
}
};
}

@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return Collections.emptyList();
}
}
}
79 changes: 79 additions & 0 deletions logstash-core/src/main/java/org/logstash/execution/Input.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.logstash.execution;

import java.util.Collection;
import java.util.Collections;
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;

/**
* A Logstash Pipeline Input pushes to a {@link QueueWriter}.
*/
public interface Input extends LsPlugin {

/**
* Start pushing {@link org.logstash.Event} to given {@link QueueWriter}.
* @param writer Queue Writer to Push to
*/
void start(QueueWriter writer);

/**
* Stop the input.
* Stopping happens asynchronously, use {@link #awaitStop()} to make sure that the input has
* finished.
*/
void stop();

/**
* Blocks until the input execution has finished.
* @throws InterruptedException On Interrupt
*/
void awaitStop() throws InterruptedException;

@LogstashPlugin(name = "stream")
final class StreamInput implements Input {

private Scanner inpt;

private final CountDownLatch done = new CountDownLatch(1);

private volatile boolean stopped;

/**
* Required Constructor Signature only taking a {@link LsConfiguration}.
* @param configuration Logstash Configuration
* @param context Logstash Context
*/
public StreamInput(final LsConfiguration configuration, final LsContext context) {
// Do whatever
}

@Override
public void start(final QueueWriter writer) {
inpt = new Scanner(System.in, "\n");
try {
while (!stopped && inpt.hasNext()) {
final String message = inpt.next();
writer.push(Collections.singletonMap("message", message));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you talk me through why the writer writes Map objects, not Event objects? Was that more efficient given that a Map is more lightweight?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewvc yea that was just an idea for optimizing things a little more. It's kinda annoying that we have all these Eventthat don't hold any state. For filters and outputs I can see the value in having Event around for serialization and such, but for Inputs it's just a waste of memory to already create Event when all inputs in fact simply create Map<String, Object>.

}
} finally {
stopped = true;
done.countDown();
}
}

@Override
public void stop() {
stopped = true;
}

@Override
public void awaitStop() throws InterruptedException {
done.await();
}

@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return Collections.emptyList();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.logstash.execution;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Logstash plugin annotation for finding plugins on the classpath and setting their name as used
* in the configuration syntax.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface LogstashPlugin {
String name();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.logstash.execution;

import java.nio.file.Path;
import java.util.Collection;
import java.util.Map;

/**
* LS Configuration example. Should be implemented like Spark config or Hadoop job config classes.
*/
public final class LsConfiguration {

/**
* @param raw Configuration Settings Map. Values are serialized.
*/
public LsConfiguration(final Map<String, String> raw) {

}

public <T> T get(final PluginConfigSpec<T> configSpec) {
// TODO: Implement
return null;
}

public boolean contains(final PluginConfigSpec<?> configSpec) {
// TODO: Implement
return false;
}

public Collection<String> allKeys() {
return null;
}

public static PluginConfigSpec<String> stringSetting(final String name) {
return new PluginConfigSpec<>(
name, String.class, null, false, false
);
}

public static PluginConfigSpec<String> requiredStringSetting(final String name) {
return new PluginConfigSpec<>(name, String.class, null, false, true);
}

public static PluginConfigSpec<Long> numSetting(final String name) {
return new PluginConfigSpec<>(
name, Long.class, null, false, false
);
}

public static PluginConfigSpec<Long> numSetting(final String name, final long defaultValue) {
return new PluginConfigSpec<>(
name, Long.class, defaultValue, false, false
);
}

public static PluginConfigSpec<Path> pathSetting(final String name) {
return new PluginConfigSpec<>(name, Path.class, null, false, false);
}

public static PluginConfigSpec<Boolean> booleanSetting(final String name) {
return new PluginConfigSpec<>(name, Boolean.class, null, false, false);
}

@SuppressWarnings("unchecked")
public static PluginConfigSpec<Map<String, String>> hashSetting(final String name) {
return new PluginConfigSpec(name, Map.class, null, false, false);
}

@SuppressWarnings("unchecked")
public static PluginConfigSpec<Map<String, LsConfiguration>> requiredHashSetting(
final String name, final Collection<PluginConfigSpec<?>> spec) {
return new PluginConfigSpec(
name, Map.class, null, false, true
);
}
}
15 changes: 15 additions & 0 deletions logstash-core/src/main/java/org/logstash/execution/LsContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.logstash.execution;

import org.logstash.common.io.DeadLetterQueueWriter;

/**
* Holds Logstash Environment.
*/
public final class LsContext {

// TODO: Add getters for metrics, logger etc.

public DeadLetterQueueWriter dlqWriter() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.logstash.execution;

import java.util.Collection;

public interface LsPlugin {

Collection<PluginConfigSpec<?>> configSchema();
}
Loading