diff --git a/logstash-core/lib/logstash/plugins/registry.rb b/logstash-core/lib/logstash/plugins/registry.rb index 26f0fa1cdfe..161354799e7 100644 --- a/logstash-core/lib/logstash/plugins/registry.rb +++ b/logstash-core/lib/logstash/plugins/registry.rb @@ -262,12 +262,11 @@ def namespace_lookup(type, name) # @param name [String] plugin name # @return [Boolean] true if klass is a valid plugin for name def is_a_plugin?(klass, name) - (klass.class == Java::JavaClass && klass.simple_name.downcase == name) || + (klass.class == Java::JavaClass && klass.simple_name.downcase == name.gsub('_','')) || (klass.ancestors.include?(LogStash::Plugin) && klass.respond_to?(:config_name) && klass.config_name == name) end def add_plugin(type, name, klass) - if klass.respond_to?("javaClass", true) @registry[key_for(type, name)] = PluginSpecification.new(type, name, klass.javaClass) elsif !exists?(type, name) diff --git a/logstash-core/src/main/java/co/elastic/logstash/api/Configuration.java b/logstash-core/src/main/java/co/elastic/logstash/api/Configuration.java index 2762e621d9c..000dbc90e3a 100644 --- a/logstash-core/src/main/java/co/elastic/logstash/api/Configuration.java +++ b/logstash-core/src/main/java/co/elastic/logstash/api/Configuration.java @@ -9,28 +9,21 @@ */ public final class Configuration { - private final Map rawSettings; + private final Map rawSettings; /** * @param raw Configuration Settings Map. Values are serialized. */ - public Configuration(final Map raw) { + public Configuration(final Map raw) { this.rawSettings = raw; } - public T get(final PluginConfigSpec configSpec) { - // TODO: Implement - return null; - } - - public String getRawValue(PluginConfigSpec configSpec) { - String rawValue = rawSettings.get(configSpec.name()); - return rawValue == null ? (String)configSpec.defaultValue() : rawValue; + public Object get(final PluginConfigSpec configSpec) { + return rawSettings.get(configSpec.name()); } public boolean contains(final PluginConfigSpec configSpec) { - // TODO: Implement - return false; + return rawSettings.containsKey(configSpec.name()); } public Collection allKeys() { diff --git a/logstash-core/src/main/java/co/elastic/logstash/api/Filter.java b/logstash-core/src/main/java/co/elastic/logstash/api/Filter.java index e62670ad1de..4074a0dc11a 100644 --- a/logstash-core/src/main/java/co/elastic/logstash/api/Filter.java +++ b/logstash-core/src/main/java/co/elastic/logstash/api/Filter.java @@ -31,8 +31,8 @@ final class Mutate implements Filter { * @param context Logstash Context */ public Mutate(final Configuration configuration, final Context context) { - this.field = configuration.get(FIELD_CONFIG); - this.value = configuration.get(VALUE_CONFIG); + this.field = (String)configuration.get(FIELD_CONFIG); + this.value = (String)configuration.get(VALUE_CONFIG); } @Override diff --git a/logstash-core/src/main/java/org/logstash/RubyUtil.java b/logstash-core/src/main/java/org/logstash/RubyUtil.java index de408750eb2..70c7ad3ec4e 100644 --- a/logstash-core/src/main/java/org/logstash/RubyUtil.java +++ b/logstash-core/src/main/java/org/logstash/RubyUtil.java @@ -207,6 +207,8 @@ public final class RubyUtil { public static final RubyClass JAVA_PIPELINE_CLASS; + public static final RubyClass JAVA_INPUT_WRAPPER_CLASS; + /** * Logstash Ruby Module. */ @@ -450,6 +452,8 @@ public final class RubyUtil { JAVA_PIPELINE_CLASS = setupLogstashClass( ABSTRACT_PIPELINE_CLASS, JavaBasePipelineExt::new, JavaBasePipelineExt.class ); + JAVA_INPUT_WRAPPER_CLASS = setupLogstashClass(PluginFactoryExt.JavaInputWrapperExt::new, + PluginFactoryExt.JavaInputWrapperExt.class); final RubyModule json = LOGSTASH_MODULE.defineOrGetModuleUnder("Json"); final RubyClass stdErr = RUBY.getStandardError(); LOGSTASH_ERROR = LOGSTASH_MODULE.defineClassUnder( diff --git a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java index 23889b1a5c1..e74d1990c2c 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java @@ -22,6 +22,7 @@ import co.elastic.logstash.api.Input; import co.elastic.logstash.api.Configuration; import co.elastic.logstash.api.Context; +import org.logstash.plugins.PluginFactoryExt; import org.logstash.plugins.discovery.PluginRegistry; import org.logstash.ext.JrubyEventExtLibrary; @@ -128,7 +129,7 @@ private Map setupOutputs() { final SourceWithMetadata source = v.getSourceWithMetadata(); res.put(v.getId(), pluginFactory.buildOutput( RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()), - RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def) + RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), def.getArguments() )); }); return res; @@ -146,7 +147,7 @@ private Map setupFilters() { final SourceWithMetadata source = vertex.getSourceWithMetadata(); res.put(vertex.getId(), pluginFactory.buildFilter( RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()), - RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def) + RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), def.getArguments() )); } return res; @@ -164,16 +165,21 @@ private Collection setupInputs() { if (cls != null) { try { final Constructor ctor = cls.getConstructor(Configuration.class, Context.class); - javaInputs.add(ctor.newInstance(new Configuration(Collections.emptyMap()), new Context())); + javaInputs.add(ctor.newInstance(new Configuration(def.getArguments()), new Context())); } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) { throw new IllegalStateException(ex); } } else { final SourceWithMetadata source = v.getSourceWithMetadata(); - nodes.add(pluginFactory.buildInput( + IRubyObject o = pluginFactory.buildInput( RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()), - RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def) - )); + RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), def.getArguments()); + + if (o instanceof PluginFactoryExt.JavaInputWrapperExt) { + javaInputs.add(((PluginFactoryExt.JavaInputWrapperExt)o).getInput()); + } else { + nodes.add(o); + } } }); return nodes; @@ -196,7 +202,8 @@ private RubyHash convertArgs(final PluginDefinition def) { final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition(); toput = pluginFactory.buildCodec( RubyUtil.RUBY.newString(codec.getName()), - Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()) + Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()), + def.getArguments() ); } else { toput = value; diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/PluginFactory.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/PluginFactory.java index fd88f5219af..a95a8feb6b8 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/PluginFactory.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/PluginFactory.java @@ -8,6 +8,8 @@ import co.elastic.logstash.api.Filter; import co.elastic.logstash.api.Input; +import java.util.Map; + /** * Factory that can instantiate Java plugins as well as Ruby plugins. */ @@ -38,23 +40,28 @@ public Filter buildFilter(final String name, final String id, final Configuratio } @Override - public IRubyObject buildInput(final RubyString name, final RubyInteger line, final RubyInteger column, final IRubyObject args) { - return rubyFactory.buildInput(name, line, column, args); + public IRubyObject buildInput(final RubyString name, final RubyInteger line, final RubyInteger column, + final IRubyObject args, Map pluginArgs) { + return rubyFactory.buildInput(name, line, column, args, pluginArgs); } @Override - public AbstractOutputDelegatorExt buildOutput(final RubyString name, final RubyInteger line, final RubyInteger column, final IRubyObject args) { - return rubyFactory.buildOutput(name, line, column, args); + public AbstractOutputDelegatorExt buildOutput(final RubyString name, final RubyInteger line, + final RubyInteger column, final IRubyObject args, + final Map pluginArgs) { + return rubyFactory.buildOutput(name, line, column, args, pluginArgs); } @Override - public AbstractFilterDelegatorExt buildFilter(final RubyString name, final RubyInteger line, final RubyInteger column, final IRubyObject args) { - return rubyFactory.buildFilter(name, line, column, args); + public AbstractFilterDelegatorExt buildFilter(final RubyString name, final RubyInteger line, + final RubyInteger column, final IRubyObject args, + final Map pluginArgs) { + return rubyFactory.buildFilter(name, line, column, args, pluginArgs); } @Override - public IRubyObject buildCodec(final RubyString name, final IRubyObject args) { - return rubyFactory.buildCodec(name, args); + public IRubyObject buildCodec(final RubyString name, final IRubyObject args, Map pluginArgs) { + return rubyFactory.buildCodec(name, args, pluginArgs); } } } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java index 73edf0454cc..ef528c88702 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java @@ -4,6 +4,8 @@ import org.jruby.RubyString; import org.jruby.runtime.builtin.IRubyObject; +import java.util.Map; + /** * This class holds interfaces implemented by Ruby concrete classes. */ @@ -19,13 +21,14 @@ private RubyIntegration() { public interface PluginFactory { IRubyObject buildInput(RubyString name, RubyInteger line, RubyInteger column, - IRubyObject args); + IRubyObject args, Map pluginArgs); AbstractOutputDelegatorExt buildOutput(RubyString name, RubyInteger line, RubyInteger column, - IRubyObject args); + IRubyObject args, Map pluginArgs); - AbstractFilterDelegatorExt buildFilter(RubyString name, RubyInteger line, RubyInteger column, IRubyObject args); + AbstractFilterDelegatorExt buildFilter(RubyString name, RubyInteger line, RubyInteger column, IRubyObject args, + Map pluginArgs); - IRubyObject buildCodec(RubyString name, IRubyObject args); + IRubyObject buildCodec(RubyString name, IRubyObject args, Map pluginArgs); } } diff --git a/logstash-core/src/main/java/org/logstash/execution/PipelineReporterExt.java b/logstash-core/src/main/java/org/logstash/execution/PipelineReporterExt.java index 70955c0921d..70003d67a14 100644 --- a/logstash-core/src/main/java/org/logstash/execution/PipelineReporterExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/PipelineReporterExt.java @@ -1,6 +1,5 @@ package org.logstash.execution; -import java.util.Collection; import org.jruby.Ruby; import org.jruby.RubyArray; import org.jruby.RubyBasicObject; @@ -14,7 +13,9 @@ import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.RubyUtil; -import org.logstash.config.ir.compiler.OutputDelegatorExt; +import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt; + +import java.util.Collection; @JRubyClass(name = "PipelineReporter") public final class PipelineReporterExt extends RubyBasicObject { @@ -163,7 +164,7 @@ private RubyArray outputInfo(final ThreadContext context) { outputIterable = (Iterable) outputs.toJava(Iterable.class); } outputIterable.forEach(output -> { - final OutputDelegatorExt delegator = (OutputDelegatorExt) output; + final AbstractOutputDelegatorExt delegator = (AbstractOutputDelegatorExt) output; final RubyHash hash = RubyHash.newHash(context.runtime); hash.op_aset(context, TYPE_KEY, delegator.configName(context)); hash.op_aset(context, ID_KEY, delegator.getId()); diff --git a/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java b/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java index e225eb0b290..de98fce62b0 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java +++ b/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java @@ -1,11 +1,13 @@ package org.logstash.plugins; +import co.elastic.logstash.api.Input; import org.jruby.Ruby; import org.jruby.RubyArray; import org.jruby.RubyBasicObject; import org.jruby.RubyClass; import org.jruby.RubyHash; import org.jruby.RubyInteger; +import org.jruby.RubyObject; import org.jruby.RubyString; import org.jruby.RubySymbol; import org.jruby.anno.JRubyClass; @@ -37,7 +39,6 @@ import java.lang.reflect.InvocationTargetException; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Locale; @@ -103,12 +104,12 @@ public PluginFactoryExt.Plugins init(final PipelineIR lir, final PluginFactoryEx @SuppressWarnings("unchecked") @Override - public IRubyObject buildInput(final RubyString name, final RubyInteger line, - final RubyInteger column, final IRubyObject args) { + public IRubyObject buildInput(final RubyString name, final RubyInteger line, final RubyInteger column, + final IRubyObject args, Map pluginArgs) { return plugin( RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.INPUT, name.asJavaString(), line.getIntValue(), column.getIntValue(), - (Map) args + (Map) args, pluginArgs ); } @@ -116,18 +117,19 @@ public IRubyObject buildInput(final RubyString name, final RubyInteger line, public IRubyObject buildInput(final ThreadContext context, final IRubyObject[] args) { return buildInput( (RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(), - args[3] + args[3], null ); } @SuppressWarnings("unchecked") @Override public AbstractOutputDelegatorExt buildOutput(final RubyString name, final RubyInteger line, - final RubyInteger column, final IRubyObject args) { + final RubyInteger column, final IRubyObject args, + Map pluginArgs) { return (AbstractOutputDelegatorExt) plugin( RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.OUTPUT, name.asJavaString(), line.getIntValue(), column.getIntValue(), - (Map) args + (Map) args, pluginArgs ); } @@ -135,18 +137,19 @@ public AbstractOutputDelegatorExt buildOutput(final RubyString name, final RubyI public AbstractOutputDelegatorExt buildOutput(final ThreadContext context, final IRubyObject[] args) { return buildOutput( - (RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(), args[3] + (RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(), args[3], null ); } @SuppressWarnings("unchecked") @Override public AbstractFilterDelegatorExt buildFilter(final RubyString name, final RubyInteger line, - final RubyInteger column, final IRubyObject args) { + final RubyInteger column, final IRubyObject args, + Map pluginArgs) { return (AbstractFilterDelegatorExt) plugin( RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.FILTER, name.asJavaString(), line.getIntValue(), column.getIntValue(), - (Map) args + (Map) args, pluginArgs ); } @@ -154,22 +157,22 @@ public AbstractFilterDelegatorExt buildFilter(final RubyString name, final RubyI public IRubyObject buildFilter(final ThreadContext context, final IRubyObject[] args) { return buildFilter( (RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(), - args[3] + args[3], null ); } @SuppressWarnings("unchecked") @Override - public IRubyObject buildCodec(final RubyString name, final IRubyObject args) { + public IRubyObject buildCodec(final RubyString name, final IRubyObject args, Map pluginArgs) { return plugin( RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC, - name.asJavaString(), 0, 0, (Map) args + name.asJavaString(), 0, 0, (Map) args, pluginArgs ); } @JRubyMethod(required = 4) public IRubyObject buildCodec(final ThreadContext context, final IRubyObject[] args) { - return buildCodec((RubyString) args[0], args[1]); + return buildCodec((RubyString) args[0], args[1], null); } @SuppressWarnings("unchecked") @@ -181,13 +184,15 @@ public IRubyObject plugin(final ThreadContext context, final IRubyObject[] args) args[1].asJavaString(), args[2].convertToInteger().getIntValue(), args[3].convertToInteger().getIntValue(), - args.length > 4 ? (Map) args[4] : new HashMap<>() + args.length > 4 ? (Map) args[4] : new HashMap<>(), + null ); } @SuppressWarnings("unchecked") private IRubyObject plugin(final ThreadContext context, final PluginLookup.PluginType type, final String name, - final int line, final int column, final Map args) { + final int line, final int column, final Map args, + Map pluginArgs) { final String id; if (type == PluginLookup.PluginType.CODEC) { id = UUID.randomUUID().toString(); @@ -257,7 +262,7 @@ private IRubyObject plugin(final ThreadContext context, final PluginLookup.Plugi if (cls != null) { try { final Constructor ctor = cls.getConstructor(Configuration.class, Context.class); - output = ctor.newInstance(new Configuration(Collections.EMPTY_MAP /*def.getArguments()*/), new Context()); + output = ctor.newInstance(new Configuration(pluginArgs), new Context()); } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) { throw new IllegalStateException(ex); } @@ -274,7 +279,7 @@ private IRubyObject plugin(final ThreadContext context, final PluginLookup.Plugi if (cls != null) { try { final Constructor ctor = cls.getConstructor(Configuration.class, Context.class); - filter = ctor.newInstance(new Configuration(Collections.EMPTY_MAP /*def.getArguments()*/), new Context()); + filter = ctor.newInstance(new Configuration(pluginArgs), new Context()); } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) { throw new IllegalStateException(ex); } @@ -285,13 +290,50 @@ private IRubyObject plugin(final ThreadContext context, final PluginLookup.Plugi } else { throw new IllegalStateException("Unable to instantiate filter: " + pluginClass); } + } else if (type == PluginLookup.PluginType.INPUT) { + final Class cls = (Class) pluginClass.klass(); + Input input = null; + if (cls != null) { + try { + final Constructor ctor = cls.getConstructor(Configuration.class, Context.class); + input = ctor.newInstance(new Configuration(pluginArgs), new Context()); + } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) { + throw new IllegalStateException(ex); + } + } + + if (input != null) { + return JavaInputWrapperExt.create(context, input); + } else { + throw new IllegalStateException("Unable to instantiate input: " + pluginClass); + } } else { - return context.nil; + throw new IllegalStateException("Unable to create plugin: " + pluginClass.toReadableString()); } } } } + @JRubyClass(name = "JavaInputWrapper") + public static final class JavaInputWrapperExt extends RubyObject { + + private Input input; + + public JavaInputWrapperExt(Ruby runtime, RubyClass metaClass) { + super(runtime, metaClass); + } + + public static JavaInputWrapperExt create(ThreadContext context, Input input) { + JavaInputWrapperExt inputWrapper = new JavaInputWrapperExt(context.runtime, RubyUtil.JAVA_INPUT_WRAPPER_CLASS); + inputWrapper.input = input; + return inputWrapper; + } + + public Input getInput() { + return input; + } + } + @JRubyClass(name = "ExecutionContextFactory") public static final class ExecutionContext extends RubyBasicObject { diff --git a/logstash-core/src/main/java/org/logstash/plugins/PluginLookup.java b/logstash-core/src/main/java/org/logstash/plugins/PluginLookup.java index 962a02c88e0..d5e923ea806 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/PluginLookup.java +++ b/logstash-core/src/main/java/org/logstash/plugins/PluginLookup.java @@ -52,7 +52,6 @@ public Object klass() { : klass; return new PluginLookup.PluginClass() { - @Override public PluginLookup.PluginLanguage language() { return language; @@ -70,6 +69,10 @@ public interface PluginClass { PluginLookup.PluginLanguage language(); Object klass(); + + default String toReadableString() { + return String.format("Plugin class [%s], language [%s]", klass(), language()); + } } public enum PluginLanguage { diff --git a/logstash-core/src/main/java/org/logstash/plugins/codecs/Line.java b/logstash-core/src/main/java/org/logstash/plugins/codecs/Line.java index 2c895956eb0..6b3989f6f3f 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/codecs/Line.java +++ b/logstash-core/src/main/java/org/logstash/plugins/codecs/Line.java @@ -49,9 +49,9 @@ public class Line implements Codec { private String remainder = ""; public Line(final Configuration configuration, final Context context) { - delimiter = configuration.getRawValue(DELIMITER_CONFIG); - charset = Charset.forName(configuration.getRawValue(CHARSET_CONFIG)); - format = configuration.getRawValue(FORMAT_CONFIG); + delimiter = (String)configuration.get(DELIMITER_CONFIG); + charset = Charset.forName((String)configuration.get(CHARSET_CONFIG)); + format = (String)configuration.get(FORMAT_CONFIG); decoder = charset.newDecoder(); decoder.onMalformedInput(CodingErrorAction.IGNORE); } diff --git a/logstash-core/src/main/java/org/logstash/plugins/inputs/Stdin.java b/logstash-core/src/main/java/org/logstash/plugins/inputs/Stdin.java index 271173b507b..240c6583ebf 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/inputs/Stdin.java +++ b/logstash-core/src/main/java/org/logstash/plugins/inputs/Stdin.java @@ -57,7 +57,11 @@ public Stdin(final Configuration configuration, final Context context) { } catch (UnknownHostException e) { hostname = "[unknownHost]"; } - codec = PluginRegistry.getCodec(configuration.getRawValue(CODEC_CONFIG), configuration, context); + String codecName = (String)configuration.get(CODEC_CONFIG); + codec = PluginRegistry.getCodec(codecName, configuration, context); + if (codec == null) { + throw new IllegalStateException(String.format("Unable to obtain codec '%a'", codecName)); + } input = inputChannel; } diff --git a/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java b/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java index 453c681c9e7..9d9e5229291 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java @@ -33,8 +33,6 @@ import co.elastic.logstash.api.Filter; import co.elastic.logstash.api.Input; import co.elastic.logstash.api.Context; -import co.elastic.logstash.api.Output; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; /** * Tests for {@link CompiledPipeline}. @@ -440,36 +438,27 @@ private static final class MockPluginFactory implements PluginFactory { @Override public IRubyObject buildInput(final RubyString name, final RubyInteger line, - final RubyInteger column, final IRubyObject args) { + final RubyInteger column, final IRubyObject args, Map pluginArgs) { return setupPlugin(name, inputs); } @Override public AbstractOutputDelegatorExt buildOutput(final RubyString name, final RubyInteger line, - final RubyInteger column, final IRubyObject args) { + final RubyInteger column, final IRubyObject args, Map pluginArgs) { return PipelineTestUtil.buildOutput(setupPlugin(name, outputs)); } - @Override - public AbstractOutputDelegatorExt buildJavaOutput(String name, int line, int column, Output output, IRubyObject args) { - throw new NotImplementedException(); - } - @Override public AbstractFilterDelegatorExt buildFilter(final RubyString name, final RubyInteger line, - final RubyInteger column, final IRubyObject args) { + final RubyInteger column, final IRubyObject args, + Map pluginArgs) { return new FilterDelegatorExt( RubyUtil.RUBY, RubyUtil.FILTER_DELEGATOR_CLASS) .initForTesting(setupPlugin(name, filters)); } @Override - public AbstractFilterDelegatorExt buildJavaFilter(String name, int line, int column, Filter filter, IRubyObject args) { - throw new NotImplementedException(); - } - - @Override - public IRubyObject buildCodec(final RubyString name, final IRubyObject args) { + public IRubyObject buildCodec(final RubyString name, final IRubyObject args, Map pluginArgs) { throw new IllegalStateException("No codec setup expected in this test."); } diff --git a/logstash-core/src/test/java/org/logstash/plugins/codecs/LineTest.java b/logstash-core/src/test/java/org/logstash/plugins/codecs/LineTest.java index 75de0f7cc72..88a52ea0281 100644 --- a/logstash-core/src/test/java/org/logstash/plugins/codecs/LineTest.java +++ b/logstash-core/src/test/java/org/logstash/plugins/codecs/LineTest.java @@ -205,7 +205,7 @@ private static void compareMessages(String[] expectedMessages, List config = new HashMap<>(); + Map config = new HashMap<>(); if (delimiter != null) { config.put("delimiter", delimiter); }