From 1070eed6a14b2a0288045bd8647cf190b3258898 Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 30 Jan 2018 11:38:26 +0100 Subject: [PATCH 1/8] start --- logstash-core/build.gradle | 1 + .../logstash/execution/DiscoverPlugins.java | 32 ++++++++ .../java/org/logstash/execution/Filter.java | 68 ++++++++++++++++ .../java/org/logstash/execution/Input.java | 77 +++++++++++++++++++ .../logstash/execution/LogstashPlugin.java | 16 ++++ .../logstash/execution/LsConfiguration.java | 17 ++++ .../java/org/logstash/execution/Output.java | 75 ++++++++++++++++++ .../org/logstash/execution/QueueReader.java | 31 ++++++++ .../org/logstash/execution/QueueWriter.java | 60 +++++++++++++++ 9 files changed, 377 insertions(+) create mode 100644 logstash-core/src/main/java/org/logstash/execution/DiscoverPlugins.java create mode 100644 logstash-core/src/main/java/org/logstash/execution/Filter.java create mode 100644 logstash-core/src/main/java/org/logstash/execution/Input.java create mode 100644 logstash-core/src/main/java/org/logstash/execution/LogstashPlugin.java create mode 100644 logstash-core/src/main/java/org/logstash/execution/LsConfiguration.java create mode 100644 logstash-core/src/main/java/org/logstash/execution/Output.java create mode 100644 logstash-core/src/main/java/org/logstash/execution/QueueReader.java create mode 100644 logstash-core/src/main/java/org/logstash/execution/QueueWriter.java diff --git a/logstash-core/build.gradle b/logstash-core/build.gradle index 6b921972dd9..2377b5ccac3 100644 --- a/logstash-core/build.gradle +++ b/logstash-core/build.gradle @@ -121,6 +121,7 @@ dependencies { compile "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${jacksonVersion}" compile "org.jruby:jruby-complete:${jrubyVersion}" compile 'com.google.googlejavaformat:google-java-format:1.5' + compile 'org.reflections:reflections:0.9.11' testCompile 'org.apache.logging.log4j:log4j-core:2.9.1:tests' testCompile 'junit:junit:4.12' testCompile 'net.javacrumbs.json-unit:json-unit:1.9.0' diff --git a/logstash-core/src/main/java/org/logstash/execution/DiscoverPlugins.java b/logstash-core/src/main/java/org/logstash/execution/DiscoverPlugins.java new file mode 100644 index 00000000000..e9d2dfc42bc --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/execution/DiscoverPlugins.java @@ -0,0 +1,32 @@ +package org.logstash.execution; + +import java.lang.reflect.Constructor; +import java.util.Set; +import org.reflections.Reflections; + +/** + * Quick demo of plugin discovery showing that the solution wouldn't require anything beyond + * the plugin classes on the classpath. + */ +public final class DiscoverPlugins { + + public static void main(final String... args) throws NoSuchMethodException { + Reflections reflections = new Reflections("org.logstash"); + Set> annotated = reflections.getTypesAnnotatedWith(LogstashPlugin.class); + for (final Class cls : annotated) { + System.out.println(cls.getName()); + System.out.println(((LogstashPlugin) cls.getAnnotations()[0]).name()); + final Constructor ctor = cls.getConstructor(LsConfiguration.class); + System.out.println("Found Ctor at : " + ctor.getName()); + if (Filter.class.isAssignableFrom(cls)) { + System.out.println("Filter"); + } + if (Output.class.isAssignableFrom(cls)) { + System.out.println("Output"); + } + if (Input.class.isAssignableFrom(cls)) { + System.out.println("Input"); + } + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/execution/Filter.java b/logstash-core/src/main/java/org/logstash/execution/Filter.java new file mode 100644 index 00000000000..37c85a32d3e --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/execution/Filter.java @@ -0,0 +1,68 @@ +package org.logstash.execution; + +import org.logstash.Event; + +/** + * A Filter is simply a mapping of {@link QueueReader} to a new {@link QueueReader}. + */ +public interface Filter extends AutoCloseable { + + QueueReader filter(QueueReader reader); + + void flush(boolean isShutdown); + + @LogstashPlugin(name = "mutate") + final class Mutate implements Filter { + + private final String field; + + private final String value; + + /** + * Required Constructor Signature only taking a {@link LsConfiguration}. + * @param configuration Logstash Configuration + */ + public Mutate(final LsConfiguration configuration) { + this.field = configuration.getString("ls.plugin.mutate.field"); + this.value = configuration.getString("ls.plugin.mutate.value"); + } + + @Override + public QueueReader filter(final QueueReader reader) { + return new QueueReader() { + @Override + public long poll(final Event event) { + final long seq = reader.poll(event); + if (seq > -1L) { + event.setField(field, value); + } + return seq; + } + + @Override + public long poll(final Event event, final long millis) { + final long seq = reader.poll(event, millis); + if (seq > -1L) { + event.setField(field, value); + } + return seq; + } + + @Override + public void acknowledge(final long sequenceNum) { + reader.acknowledge(sequenceNum); + } + }; + } + + @Override + public void flush(final boolean isShutdown) { + // Nothing to do here + } + + @Override + public void close() { + // Nothing to do here + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/execution/Input.java b/logstash-core/src/main/java/org/logstash/execution/Input.java new file mode 100644 index 00000000000..3c800a6d9be --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/execution/Input.java @@ -0,0 +1,77 @@ +package org.logstash.execution; + +import java.util.Collections; +import java.util.Scanner; +import java.util.concurrent.CountDownLatch; + +/** + * A Logstash Pipeline Input pushes to a {@link QueueWriter}. + */ +public interface Input extends AutoCloseable { + + /** + * Start pushing {@link org.logstash.Event} to given {@link QueueWriter}. + * @param writer Queue Writer to Push to + */ + void start(QueueWriter writer); + + /** + * Stop the input. + * Stopping happens asynchronously, use {@link #awaitStop()} to make sure that the input has + * finished. + */ + void stop(); + + /** + * Blocks until the input execution has finished. + * @throws InterruptedException On Interrupt + */ + void awaitStop() throws InterruptedException; + + @LogstashPlugin(name = "stream") + final class StreamInput implements Input { + + private Scanner inpt; + + private final CountDownLatch done = new CountDownLatch(1); + + private volatile boolean stopped; + + /** + * Required Constructor Signature only taking a {@link LsConfiguration}. + * @param configuration Logstash Configuration + */ + public StreamInput(final LsConfiguration configuration) { + // Do whatever + } + + @Override + public void start(final QueueWriter writer) { + inpt = new Scanner(System.in, "\n"); + try { + while (!stopped && inpt.hasNext()) { + final String message = inpt.next(); + writer.push(Collections.singletonMap("message", message)); + } + } finally { + stopped = true; + done.countDown(); + } + } + + @Override + public void stop() { + stopped = true; + } + + @Override + public void awaitStop() throws InterruptedException { + done.await(); + } + + @Override + public void close() { + inpt.close(); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/execution/LogstashPlugin.java b/logstash-core/src/main/java/org/logstash/execution/LogstashPlugin.java new file mode 100644 index 00000000000..db25ffb0dd0 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/execution/LogstashPlugin.java @@ -0,0 +1,16 @@ +package org.logstash.execution; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Logstash plugin annotation for finding plugins on the classpath and setting their name as used + * in the configuration syntax. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface LogstashPlugin { + String name(); +} diff --git a/logstash-core/src/main/java/org/logstash/execution/LsConfiguration.java b/logstash-core/src/main/java/org/logstash/execution/LsConfiguration.java new file mode 100644 index 00000000000..0eade299222 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/execution/LsConfiguration.java @@ -0,0 +1,17 @@ +package org.logstash.execution; + +/** + * LS Configuration example. Should be implemented like Spark config or Hadoop job config classes. + */ +public final class LsConfiguration { + + public String getString(final String key) { + return ""; + } + + public int getInt(final String key) { + return 0; + } + + //TODO: all types we care about +} diff --git a/logstash-core/src/main/java/org/logstash/execution/Output.java b/logstash-core/src/main/java/org/logstash/execution/Output.java new file mode 100644 index 00000000000..93ceec605f8 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/execution/Output.java @@ -0,0 +1,75 @@ +package org.logstash.execution; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.concurrent.CountDownLatch; +import org.logstash.Event; + +/** + * A Logstash Pipeline Output consumes a {@link QueueReader}. + */ +public interface Output extends AutoCloseable { + + /** + * Polls events from event reader and runs output action. + * @param reader Reader to poll events from. + */ + void output(QueueReader reader); + + void stop(); + + void awaitStop() throws InterruptedException; + + @LogstashPlugin(name = "output") + final class StreamOutput implements Output { + + private final PrintStream outpt; + + private volatile boolean stopped; + + private final CountDownLatch done = new CountDownLatch(1); + + /** + * Required Constructor Signature only taking a {@link LsConfiguration}. + * @param configuration Logstash Configuration + */ + public StreamOutput(final LsConfiguration configuration) { + this.outpt = new PrintStream(System.out); + } + + @Override + public void output(final QueueReader reader) { + final Event event = new Event(); + try { + long sequence = reader.poll(event); + while (!stopped && sequence > -1L) { + try { + outpt.println(event.toJson()); + reader.acknowledge(sequence); + } catch (final IOException ex) { + throw new IllegalStateException(ex); + } + sequence = reader.poll(event); + } + } finally { + stopped = true; + done.countDown(); + } + } + + @Override + public void stop() { + stopped = true; + } + + @Override + public void awaitStop() throws InterruptedException { + done.await(); + } + + @Override + public void close() { + outpt.close(); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueReader.java b/logstash-core/src/main/java/org/logstash/execution/QueueReader.java new file mode 100644 index 00000000000..f8856803b07 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/execution/QueueReader.java @@ -0,0 +1,31 @@ +package org.logstash.execution; + +import org.logstash.Event; + +/** + * Reads from the Queue. + */ +public interface QueueReader { + + /** + * Polls for the next event without timeout. + * @param event Event Pointer to write next Event to + * @return Sequence Number of the event, -1 on failure to poll an event + */ + long poll(Event event); + + /** + * Polls for the next event with a timeout. + * @param event Event Pointer to write next event to + * @param millis Timeout for polling the next even in ms + * @return Sequence Number of the event, -1 on failure to poll an event + */ + long poll(Event event, long millis); + + /** + * Acknowledges that an Event has passed through the pipeline and can be acknowledged to the + * input. + * @param sequenceNum Sequence number of the acknowledged event + */ + void acknowledge(long sequenceNum); +} diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueWriter.java b/logstash-core/src/main/java/org/logstash/execution/QueueWriter.java new file mode 100644 index 00000000000..bedab47221b --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/execution/QueueWriter.java @@ -0,0 +1,60 @@ +package org.logstash.execution; + +import java.util.Collection; +import java.util.Map; + +/** + * Writes to the Queue. + */ +public interface QueueWriter { + + /** + * Pushes a single event to the Queue, blocking indefinitely if the Queue is not ready for a + * write. + * @param event Logstash Event Data + * @return Sequence number of the event or -1 if push failed + */ + long push(Map event); + + /** + * Pushes a single event to the Queue, blocking for the given timeout if the Queue is not ready + * for a write. + * @param event Logstash Event Data + * @param millis Timeout in millis + * @return Sequence number of the event or -1 if push failed + */ + long push(Map event, long millis); + + /** + * Pushes a multiple events to the Queue, blocking for the given timeout if the Queue is not + * ready for a write. + * Guarantees that a return {@code != -1} means that all events were pushed to the Queue + * successfully and no partial writes of only a subset of the input events will ever occur. + * @param events Logstash Events Data + * @return Sequence number of the first event or -1 if push failed + */ + long push(Collection> events); + + /** + * Pushes a multiple events to the Queue, blocking for the given timeout if the Queue is not + * ready for a write. + * Guarantees that a return {@code != -1} means that all events were pushed to the Queue + * successfully and no partial writes of only a subset of the input events will ever occur. + * @param events Logstash Events Data + * @param millis Timeout in millis + * @return Sequence number of the first event or -1 if push failed + */ + long push(Collection> events, long millis); + + /** + * Returns the upper bound for acknowledged sequence numbers. + * @return upper bound for acknowledged sequence numbers + */ + long watermark(); + + /** + * Returns the upper bound for unacknowledged sequence numbers. + * @return upper bound for unacknowledged sequence numbers + */ + long highWatermark(); +} From 0f6940c92ab74d9990fc6da50b37fa62caf9c186 Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 13 Feb 2018 21:22:40 +0100 Subject: [PATCH 2/8] LS context --- .../org/logstash/execution/DiscoverPlugins.java | 2 +- .../main/java/org/logstash/execution/Filter.java | 3 ++- .../main/java/org/logstash/execution/Input.java | 3 ++- .../java/org/logstash/execution/LsContext.java | 15 +++++++++++++++ .../main/java/org/logstash/execution/Output.java | 3 ++- 5 files changed, 22 insertions(+), 4 deletions(-) create mode 100644 logstash-core/src/main/java/org/logstash/execution/LsContext.java diff --git a/logstash-core/src/main/java/org/logstash/execution/DiscoverPlugins.java b/logstash-core/src/main/java/org/logstash/execution/DiscoverPlugins.java index e9d2dfc42bc..266a207036f 100644 --- a/logstash-core/src/main/java/org/logstash/execution/DiscoverPlugins.java +++ b/logstash-core/src/main/java/org/logstash/execution/DiscoverPlugins.java @@ -16,7 +16,7 @@ public static void main(final String... args) throws NoSuchMethodException { for (final Class cls : annotated) { System.out.println(cls.getName()); System.out.println(((LogstashPlugin) cls.getAnnotations()[0]).name()); - final Constructor ctor = cls.getConstructor(LsConfiguration.class); + final Constructor ctor = cls.getConstructor(LsConfiguration.class, LsContext.class); System.out.println("Found Ctor at : " + ctor.getName()); if (Filter.class.isAssignableFrom(cls)) { System.out.println("Filter"); diff --git a/logstash-core/src/main/java/org/logstash/execution/Filter.java b/logstash-core/src/main/java/org/logstash/execution/Filter.java index 37c85a32d3e..ec1d3093669 100644 --- a/logstash-core/src/main/java/org/logstash/execution/Filter.java +++ b/logstash-core/src/main/java/org/logstash/execution/Filter.java @@ -21,8 +21,9 @@ final class Mutate implements Filter { /** * Required Constructor Signature only taking a {@link LsConfiguration}. * @param configuration Logstash Configuration + * @param context Logstash Context */ - public Mutate(final LsConfiguration configuration) { + public Mutate(final LsConfiguration configuration, final LsContext context) { this.field = configuration.getString("ls.plugin.mutate.field"); this.value = configuration.getString("ls.plugin.mutate.value"); } diff --git a/logstash-core/src/main/java/org/logstash/execution/Input.java b/logstash-core/src/main/java/org/logstash/execution/Input.java index 3c800a6d9be..56f00a6c437 100644 --- a/logstash-core/src/main/java/org/logstash/execution/Input.java +++ b/logstash-core/src/main/java/org/logstash/execution/Input.java @@ -40,8 +40,9 @@ final class StreamInput implements Input { /** * Required Constructor Signature only taking a {@link LsConfiguration}. * @param configuration Logstash Configuration + * @param context Logstash Context */ - public StreamInput(final LsConfiguration configuration) { + public StreamInput(final LsConfiguration configuration, final LsContext context) { // Do whatever } diff --git a/logstash-core/src/main/java/org/logstash/execution/LsContext.java b/logstash-core/src/main/java/org/logstash/execution/LsContext.java new file mode 100644 index 00000000000..142b3c97549 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/execution/LsContext.java @@ -0,0 +1,15 @@ +package org.logstash.execution; + +import org.logstash.common.io.DeadLetterQueueWriter; + +/** + * Holds Logstash Environment. + */ +public final class LsContext { + + // TODO: Add getters for metrics, logger etc. + + public DeadLetterQueueWriter dlqWriter() { + return null; + } +} diff --git a/logstash-core/src/main/java/org/logstash/execution/Output.java b/logstash-core/src/main/java/org/logstash/execution/Output.java index 93ceec605f8..bf8f255fdb9 100644 --- a/logstash-core/src/main/java/org/logstash/execution/Output.java +++ b/logstash-core/src/main/java/org/logstash/execution/Output.java @@ -32,8 +32,9 @@ final class StreamOutput implements Output { /** * Required Constructor Signature only taking a {@link LsConfiguration}. * @param configuration Logstash Configuration + * @param context Logstash Context */ - public StreamOutput(final LsConfiguration configuration) { + public StreamOutput(final LsConfiguration configuration, final LsContext context) { this.outpt = new PrintStream(System.out); } From 79b379dcf5286417cb244635adffa8decc05ef8c Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 14 Feb 2018 15:55:12 +0100 Subject: [PATCH 3/8] back --- .../java/org/logstash/execution/Filter.java | 70 ++++++++++++++++++- .../logstash/execution/LsConfiguration.java | 16 +++++ 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/execution/Filter.java b/logstash-core/src/main/java/org/logstash/execution/Filter.java index ec1d3093669..7cae3c8c7d9 100644 --- a/logstash-core/src/main/java/org/logstash/execution/Filter.java +++ b/logstash-core/src/main/java/org/logstash/execution/Filter.java @@ -24,8 +24,8 @@ final class Mutate implements Filter { * @param context Logstash Context */ public Mutate(final LsConfiguration configuration, final LsContext context) { - this.field = configuration.getString("ls.plugin.mutate.field"); - this.value = configuration.getString("ls.plugin.mutate.value"); + this.field = configuration.getString("field"); + this.value = configuration.getString("value"); } @Override @@ -66,4 +66,70 @@ public void close() { // Nothing to do here } } + + @LogstashPlugin(name = "clone") + final class Clone implements Filter { + + private Event clone; + + private long lastSeq = -1L; + + /** + * Required Constructor Signature only taking a {@link LsConfiguration}. + * @param configuration Logstash Configuration + * @param context Logstash Context + */ + public Clone(final LsConfiguration configuration, final LsContext context) { + } + + @Override + public QueueReader filter(final QueueReader reader) { + return new QueueReader() { + @Override + public long poll(final Event event) { + if (clone != null) { + event.overwrite(clone); + clone = null; + return lastSeq; + } + final long seq = reader.poll(event); + lastSeq = seq; + if (seq > -1L) { + clone = event.clone(); + } + return seq; + } + + @Override + public long poll(final Event event, final long millis) { + if (clone != null) { + event.overwrite(clone); + clone = null; + return lastSeq; + } + final long seq = reader.poll(event, millis); + lastSeq = seq; + if (seq > -1L) { + clone = event.clone(); + } + return seq; + } + + @Override + public void acknowledge(final long sequenceNum) { + reader.acknowledge(sequenceNum); + } + }; + } + + @Override + public void flush(final boolean isShutdown) { + // Nothing to do here + } + + @Override + public void close() { + // Nothing to do here + } + } } diff --git a/logstash-core/src/main/java/org/logstash/execution/LsConfiguration.java b/logstash-core/src/main/java/org/logstash/execution/LsConfiguration.java index 0eade299222..1d83f3726d7 100644 --- a/logstash-core/src/main/java/org/logstash/execution/LsConfiguration.java +++ b/logstash-core/src/main/java/org/logstash/execution/LsConfiguration.java @@ -1,5 +1,8 @@ package org.logstash.execution; +import java.util.Collection; +import java.util.Properties; + /** * LS Configuration example. Should be implemented like Spark config or Hadoop job config classes. */ @@ -13,5 +16,18 @@ public int getInt(final String key) { return 0; } + public Collection allProperties() { + // TODO: Return list of all defined properties + return null; + } + //TODO: all types we care about + + public static final class Property { + + public Property(final Class type, final String name, final boolean deprecated) { + // TODO: So on and so forth add + } + + } } From 58f95f522bf0befee079a6e5b0175244dbf95768 Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 20 Feb 2018 14:17:09 +0100 Subject: [PATCH 4/8] outline config spec --- .../java/org/logstash/execution/Filter.java | 16 +++++--- .../java/org/logstash/execution/Input.java | 7 ++-- .../java/org/logstash/execution/LsPlugin.java | 8 ++++ .../java/org/logstash/execution/Output.java | 9 +++-- .../logstash/execution/PluginConfigSpec.java | 37 +++++++++++++++++++ 5 files changed, 66 insertions(+), 11 deletions(-) create mode 100644 logstash-core/src/main/java/org/logstash/execution/LsPlugin.java create mode 100644 logstash-core/src/main/java/org/logstash/execution/PluginConfigSpec.java diff --git a/logstash-core/src/main/java/org/logstash/execution/Filter.java b/logstash-core/src/main/java/org/logstash/execution/Filter.java index 7cae3c8c7d9..c2e63dd7cba 100644 --- a/logstash-core/src/main/java/org/logstash/execution/Filter.java +++ b/logstash-core/src/main/java/org/logstash/execution/Filter.java @@ -1,11 +1,14 @@ package org.logstash.execution; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import org.logstash.Event; /** * A Filter is simply a mapping of {@link QueueReader} to a new {@link QueueReader}. */ -public interface Filter extends AutoCloseable { +public interface Filter extends LsPlugin { QueueReader filter(QueueReader reader); @@ -62,8 +65,11 @@ public void flush(final boolean isShutdown) { } @Override - public void close() { - // Nothing to do here + public Collection> configSchema() { + return Arrays.asList( + new PluginConfigSpec<>("field", String.class, null, false), + new PluginConfigSpec<>("value", String.class, null, false) + ); } } @@ -128,8 +134,8 @@ public void flush(final boolean isShutdown) { } @Override - public void close() { - // Nothing to do here + public Collection> configSchema() { + return Collections.emptyList(); } } } diff --git a/logstash-core/src/main/java/org/logstash/execution/Input.java b/logstash-core/src/main/java/org/logstash/execution/Input.java index 56f00a6c437..53e907e0051 100644 --- a/logstash-core/src/main/java/org/logstash/execution/Input.java +++ b/logstash-core/src/main/java/org/logstash/execution/Input.java @@ -1,5 +1,6 @@ package org.logstash.execution; +import java.util.Collection; import java.util.Collections; import java.util.Scanner; import java.util.concurrent.CountDownLatch; @@ -7,7 +8,7 @@ /** * A Logstash Pipeline Input pushes to a {@link QueueWriter}. */ -public interface Input extends AutoCloseable { +public interface Input extends LsPlugin { /** * Start pushing {@link org.logstash.Event} to given {@link QueueWriter}. @@ -71,8 +72,8 @@ public void awaitStop() throws InterruptedException { } @Override - public void close() { - inpt.close(); + public Collection> configSchema() { + return Collections.emptyList(); } } } diff --git a/logstash-core/src/main/java/org/logstash/execution/LsPlugin.java b/logstash-core/src/main/java/org/logstash/execution/LsPlugin.java new file mode 100644 index 00000000000..ce7e60cbd81 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/execution/LsPlugin.java @@ -0,0 +1,8 @@ +package org.logstash.execution; + +import java.util.Collection; + +public interface LsPlugin { + + Collection> configSchema(); +} diff --git a/logstash-core/src/main/java/org/logstash/execution/Output.java b/logstash-core/src/main/java/org/logstash/execution/Output.java index bf8f255fdb9..6c20be7aa8f 100644 --- a/logstash-core/src/main/java/org/logstash/execution/Output.java +++ b/logstash-core/src/main/java/org/logstash/execution/Output.java @@ -2,13 +2,15 @@ import java.io.IOException; import java.io.PrintStream; +import java.util.Collection; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import org.logstash.Event; /** * A Logstash Pipeline Output consumes a {@link QueueReader}. */ -public interface Output extends AutoCloseable { +public interface Output extends LsPlugin { /** * Polls events from event reader and runs output action. @@ -60,6 +62,7 @@ public void output(final QueueReader reader) { @Override public void stop() { + outpt.close(); stopped = true; } @@ -69,8 +72,8 @@ public void awaitStop() throws InterruptedException { } @Override - public void close() { - outpt.close(); + public Collection> configSchema() { + return Collections.emptyList(); } } } diff --git a/logstash-core/src/main/java/org/logstash/execution/PluginConfigSpec.java b/logstash-core/src/main/java/org/logstash/execution/PluginConfigSpec.java new file mode 100644 index 00000000000..55eee3d1b23 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/execution/PluginConfigSpec.java @@ -0,0 +1,37 @@ +package org.logstash.execution; + +public final class PluginConfigSpec { + + private final String name; + + private final Class type; + + private final boolean deprecated; + + private final T defaultValue; + + public PluginConfigSpec(final String name, final Class type, + final T defaultValue, final boolean deprecated) { + this.name = name; + this.type = type; + this.defaultValue = defaultValue; + this.deprecated = deprecated; + } + + public boolean deprecated() { + return this.deprecated; + } + + public T defaultValue() { + return this.defaultValue; + } + + public String name() { + return name; + } + + public Class type() { + return type; + } + +} From fdc6702ab9bc7f5af63fd46bf6963791359bd1f0 Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 21 Feb 2018 08:54:53 +0100 Subject: [PATCH 5/8] remove timeout and batch writes --- .../org/logstash/execution/QueueWriter.java | 31 ------------------- 1 file changed, 31 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueWriter.java b/logstash-core/src/main/java/org/logstash/execution/QueueWriter.java index bedab47221b..d48c0b24a1c 100644 --- a/logstash-core/src/main/java/org/logstash/execution/QueueWriter.java +++ b/logstash-core/src/main/java/org/logstash/execution/QueueWriter.java @@ -1,6 +1,5 @@ package org.logstash.execution; -import java.util.Collection; import java.util.Map; /** @@ -16,36 +15,6 @@ public interface QueueWriter { */ long push(Map event); - /** - * Pushes a single event to the Queue, blocking for the given timeout if the Queue is not ready - * for a write. - * @param event Logstash Event Data - * @param millis Timeout in millis - * @return Sequence number of the event or -1 if push failed - */ - long push(Map event, long millis); - - /** - * Pushes a multiple events to the Queue, blocking for the given timeout if the Queue is not - * ready for a write. - * Guarantees that a return {@code != -1} means that all events were pushed to the Queue - * successfully and no partial writes of only a subset of the input events will ever occur. - * @param events Logstash Events Data - * @return Sequence number of the first event or -1 if push failed - */ - long push(Collection> events); - - /** - * Pushes a multiple events to the Queue, blocking for the given timeout if the Queue is not - * ready for a write. - * Guarantees that a return {@code != -1} means that all events were pushed to the Queue - * successfully and no partial writes of only a subset of the input events will ever occur. - * @param events Logstash Events Data - * @param millis Timeout in millis - * @return Sequence number of the first event or -1 if push failed - */ - long push(Collection> events, long millis); - /** * Returns the upper bound for acknowledged sequence numbers. * @return upper bound for acknowledged sequence numbers From df9bb6cfdfa079f816ecbc647ed150744a493016 Mon Sep 17 00:00:00 2001 From: Armin Date: Fri, 23 Feb 2018 12:36:56 +0100 Subject: [PATCH 6/8] Config spec --- .../java/org/logstash/execution/Filter.java | 15 +++-- .../logstash/execution/LsConfiguration.java | 63 +++++++++++++++---- .../logstash/execution/PluginConfigSpec.java | 9 ++- .../logstash/execution/inputs/HttpPoller.java | 63 +++++++++++++++++++ 4 files changed, 131 insertions(+), 19 deletions(-) create mode 100644 logstash-core/src/main/java/org/logstash/execution/inputs/HttpPoller.java diff --git a/logstash-core/src/main/java/org/logstash/execution/Filter.java b/logstash-core/src/main/java/org/logstash/execution/Filter.java index c2e63dd7cba..6d19990f63e 100644 --- a/logstash-core/src/main/java/org/logstash/execution/Filter.java +++ b/logstash-core/src/main/java/org/logstash/execution/Filter.java @@ -17,6 +17,12 @@ public interface Filter extends LsPlugin { @LogstashPlugin(name = "mutate") final class Mutate implements Filter { + private static final PluginConfigSpec FIELD_CONFIG = + LsConfiguration.requiredStringSetting("field"); + + private static final PluginConfigSpec VALUE_CONFIG = + LsConfiguration.requiredStringSetting("value"); + private final String field; private final String value; @@ -27,8 +33,8 @@ final class Mutate implements Filter { * @param context Logstash Context */ public Mutate(final LsConfiguration configuration, final LsContext context) { - this.field = configuration.getString("field"); - this.value = configuration.getString("value"); + this.field = configuration.get(FIELD_CONFIG); + this.value = configuration.get(VALUE_CONFIG); } @Override @@ -66,10 +72,7 @@ public void flush(final boolean isShutdown) { @Override public Collection> configSchema() { - return Arrays.asList( - new PluginConfigSpec<>("field", String.class, null, false), - new PluginConfigSpec<>("value", String.class, null, false) - ); + return Arrays.asList(FIELD_CONFIG, VALUE_CONFIG); } } diff --git a/logstash-core/src/main/java/org/logstash/execution/LsConfiguration.java b/logstash-core/src/main/java/org/logstash/execution/LsConfiguration.java index 1d83f3726d7..a63ee4c2e34 100644 --- a/logstash-core/src/main/java/org/logstash/execution/LsConfiguration.java +++ b/logstash-core/src/main/java/org/logstash/execution/LsConfiguration.java @@ -1,33 +1,72 @@ package org.logstash.execution; +import java.nio.file.Path; import java.util.Collection; -import java.util.Properties; +import java.util.Map; /** * LS Configuration example. Should be implemented like Spark config or Hadoop job config classes. */ public final class LsConfiguration { - public String getString(final String key) { - return ""; + /** + * @param raw Configuration Settings Map. Values are serialized. + */ + public LsConfiguration(final Map raw) { + + } + + public T get(final PluginConfigSpec configSpec) { + // TODO: Implement + return null; } - public int getInt(final String key) { - return 0; + public boolean contains(final PluginConfigSpec configSpec) { + // TODO: Implement + return false; } - public Collection allProperties() { - // TODO: Return list of all defined properties + public Collection allKeys() { return null; } - //TODO: all types we care about + public static PluginConfigSpec stringSetting(final String name) { + return new PluginConfigSpec<>( + name, String.class, null, false, false + ); + } + + public static PluginConfigSpec requiredStringSetting(final String name) { + return new PluginConfigSpec<>(name, String.class, null, false, true); + } - public static final class Property { + public static PluginConfigSpec numSetting(final String name) { + return new PluginConfigSpec<>( + name, Long.class, null, false, false + ); + } - public Property(final Class type, final String name, final boolean deprecated) { - // TODO: So on and so forth add - } + public static PluginConfigSpec numSetting(final String name, final long defaultValue) { + return new PluginConfigSpec<>( + name, Long.class, defaultValue, false, false + ); + } + + public static PluginConfigSpec pathSetting(final String name) { + return new PluginConfigSpec<>(name, Path.class, null, false, false); + } + + public static PluginConfigSpec booleanSetting(final String name) { + return new PluginConfigSpec<>(name, Boolean.class, null, false, false); + } + + @SuppressWarnings("unchecked") + public static PluginConfigSpec> hashSetting(final String name) { + return new PluginConfigSpec(name, Map.class, null, false, false); + } + @SuppressWarnings("unchecked") + public static PluginConfigSpec> requiredHashSetting(final String name) { + return new PluginConfigSpec(name, Map.class, null, false, true); } } diff --git a/logstash-core/src/main/java/org/logstash/execution/PluginConfigSpec.java b/logstash-core/src/main/java/org/logstash/execution/PluginConfigSpec.java index 55eee3d1b23..c9a37238498 100644 --- a/logstash-core/src/main/java/org/logstash/execution/PluginConfigSpec.java +++ b/logstash-core/src/main/java/org/logstash/execution/PluginConfigSpec.java @@ -8,20 +8,27 @@ public final class PluginConfigSpec { private final boolean deprecated; + private final boolean required; + private final T defaultValue; public PluginConfigSpec(final String name, final Class type, - final T defaultValue, final boolean deprecated) { + final T defaultValue, final boolean deprecated, final boolean required) { this.name = name; this.type = type; this.defaultValue = defaultValue; this.deprecated = deprecated; + this.required = required; } public boolean deprecated() { return this.deprecated; } + public boolean required() { + return this.required; + } + public T defaultValue() { return this.defaultValue; } diff --git a/logstash-core/src/main/java/org/logstash/execution/inputs/HttpPoller.java b/logstash-core/src/main/java/org/logstash/execution/inputs/HttpPoller.java new file mode 100644 index 00000000000..74580421082 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/execution/inputs/HttpPoller.java @@ -0,0 +1,63 @@ +package org.logstash.execution.inputs; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import org.logstash.execution.Input; +import org.logstash.execution.LsConfiguration; +import org.logstash.execution.LsContext; +import org.logstash.execution.PluginConfigSpec; +import org.logstash.execution.QueueWriter; + +public final class HttpPoller implements Input { + + private static final PluginConfigSpec USER_CONFIG = + LsConfiguration.stringSetting("user"); + + private static final PluginConfigSpec PASSWORD_CONFIG = + LsConfiguration.stringSetting("password"); + + private static final PluginConfigSpec AUTOMATIC_RETRIES_CONFIG = + LsConfiguration.numSetting("automatic_retries", 1L); + + private static final PluginConfigSpec CA_CERT_CONFIG = + LsConfiguration.pathSetting("cacert"); + + private static final PluginConfigSpec> URLS_CONFIG = + LsConfiguration.requiredHashSetting("urls"); + + private final LsConfiguration configuration; + + public HttpPoller(final LsConfiguration configuration, final LsContext context) { + this.configuration = configuration; + } + + @Override + public void start(final QueueWriter writer) { + final String user = configuration.get(USER_CONFIG); + final String password; + if (configuration.contains(PASSWORD_CONFIG)) { + // password things + } else { + // no password things + } + } + + @Override + public void stop() { + + } + + @Override + public void awaitStop() throws InterruptedException { + + } + + @Override + public Collection> configSchema() { + return Arrays.asList( + USER_CONFIG, PASSWORD_CONFIG, AUTOMATIC_RETRIES_CONFIG, CA_CERT_CONFIG, URLS_CONFIG + ); + } +} From 1b16b09a60bcb184fcdc74aa81bd6c66af83c6d7 Mon Sep 17 00:00:00 2001 From: Armin Date: Fri, 23 Feb 2018 12:39:14 +0100 Subject: [PATCH 7/8] Remove flush --- .../src/main/java/org/logstash/execution/Filter.java | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/execution/Filter.java b/logstash-core/src/main/java/org/logstash/execution/Filter.java index 6d19990f63e..d32293e131a 100644 --- a/logstash-core/src/main/java/org/logstash/execution/Filter.java +++ b/logstash-core/src/main/java/org/logstash/execution/Filter.java @@ -12,8 +12,6 @@ public interface Filter extends LsPlugin { QueueReader filter(QueueReader reader); - void flush(boolean isShutdown); - @LogstashPlugin(name = "mutate") final class Mutate implements Filter { @@ -65,11 +63,6 @@ public void acknowledge(final long sequenceNum) { }; } - @Override - public void flush(final boolean isShutdown) { - // Nothing to do here - } - @Override public Collection> configSchema() { return Arrays.asList(FIELD_CONFIG, VALUE_CONFIG); @@ -131,11 +124,6 @@ public void acknowledge(final long sequenceNum) { }; } - @Override - public void flush(final boolean isShutdown) { - // Nothing to do here - } - @Override public Collection> configSchema() { return Collections.emptyList(); From 11f27182bd11a8776302b2b8063bfd23e522ad87 Mon Sep 17 00:00:00 2001 From: Armin Date: Fri, 23 Feb 2018 21:14:00 +0100 Subject: [PATCH 8/8] Config spec --- .../logstash/execution/LsConfiguration.java | 7 +++++-- .../logstash/execution/PluginConfigSpec.java | 20 +++++++++++++++++++ .../logstash/execution/inputs/HttpPoller.java | 14 +++++++++++-- 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/execution/LsConfiguration.java b/logstash-core/src/main/java/org/logstash/execution/LsConfiguration.java index a63ee4c2e34..a0143434b96 100644 --- a/logstash-core/src/main/java/org/logstash/execution/LsConfiguration.java +++ b/logstash-core/src/main/java/org/logstash/execution/LsConfiguration.java @@ -66,7 +66,10 @@ public static PluginConfigSpec> hashSetting(final String nam } @SuppressWarnings("unchecked") - public static PluginConfigSpec> requiredHashSetting(final String name) { - return new PluginConfigSpec(name, Map.class, null, false, true); + public static PluginConfigSpec> requiredHashSetting( + final String name, final Collection> spec) { + return new PluginConfigSpec( + name, Map.class, null, false, true + ); } } diff --git a/logstash-core/src/main/java/org/logstash/execution/PluginConfigSpec.java b/logstash-core/src/main/java/org/logstash/execution/PluginConfigSpec.java index c9a37238498..22f0db0809d 100644 --- a/logstash-core/src/main/java/org/logstash/execution/PluginConfigSpec.java +++ b/logstash-core/src/main/java/org/logstash/execution/PluginConfigSpec.java @@ -1,5 +1,9 @@ package org.logstash.execution; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + public final class PluginConfigSpec { private final String name; @@ -12,13 +16,29 @@ public final class PluginConfigSpec { private final T defaultValue; + private final Collection> children; + public PluginConfigSpec(final String name, final Class type, final T defaultValue, final boolean deprecated, final boolean required) { + this(name, type, defaultValue, deprecated, required, Collections.emptyList()); + } + + public PluginConfigSpec(final String name, final Class type, + final T defaultValue, final boolean deprecated, final boolean required, + final Collection> children) { this.name = name; this.type = type; this.defaultValue = defaultValue; this.deprecated = deprecated; this.required = required; + if (!children.isEmpty() && !Map.class.isAssignableFrom(type)) { + throw new IllegalArgumentException("Only map type settings can have defined children."); + } + this.children = children; + } + + public Collection> children() { + return children; } public boolean deprecated() { diff --git a/logstash-core/src/main/java/org/logstash/execution/inputs/HttpPoller.java b/logstash-core/src/main/java/org/logstash/execution/inputs/HttpPoller.java index 74580421082..894f7bc2130 100644 --- a/logstash-core/src/main/java/org/logstash/execution/inputs/HttpPoller.java +++ b/logstash-core/src/main/java/org/logstash/execution/inputs/HttpPoller.java @@ -24,8 +24,13 @@ public final class HttpPoller implements Input { private static final PluginConfigSpec CA_CERT_CONFIG = LsConfiguration.pathSetting("cacert"); - private static final PluginConfigSpec> URLS_CONFIG = - LsConfiguration.requiredHashSetting("urls"); + private static final PluginConfigSpec URL_METHOD_CONFIG = + LsConfiguration.stringSetting("method"); + + private static final PluginConfigSpec> URLS_CONFIG = + LsConfiguration.requiredHashSetting( + "urls", Arrays.asList(URL_METHOD_CONFIG, USER_CONFIG) + ); private final LsConfiguration configuration; @@ -42,6 +47,11 @@ public void start(final QueueWriter writer) { } else { // no password things } + final Map urls = configuration.get(URLS_CONFIG); + urls.forEach((key, config) -> { + System.out.println("Schema on method " + key + " is " + config.get(URL_METHOD_CONFIG)); + System.out.println("User on method " + key + " is " + config.get(USER_CONFIG)); + }); } @Override