Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Java inputs, config support for all Java plugins #10211

Merged
merged 1 commit into from
Dec 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions logstash-core/lib/logstash/plugins/registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,11 @@ def namespace_lookup(type, name)
# @param name [String] plugin name
# @return [Boolean] true if klass is a valid plugin for name
def is_a_plugin?(klass, name)
(klass.class == Java::JavaClass && klass.simple_name.downcase == name) ||
(klass.class == Java::JavaClass && klass.simple_name.downcase == name.gsub('_','')) ||
(klass.ancestors.include?(LogStash::Plugin) && klass.respond_to?(:config_name) && klass.config_name == name)
end

def add_plugin(type, name, klass)

if klass.respond_to?("javaClass", true)
@registry[key_for(type, name)] = PluginSpecification.new(type, name, klass.javaClass)
elsif !exists?(type, name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,21 @@
*/
public final class Configuration {

private final Map<String, String> rawSettings;
private final Map<String, Object> rawSettings;

/**
* @param raw Configuration Settings Map. Values are serialized.
*/
public Configuration(final Map<String, String> raw) {
public Configuration(final Map<String, Object> raw) {
this.rawSettings = raw;
}

public <T> T get(final PluginConfigSpec<T> configSpec) {
// TODO: Implement
return null;
}

public String getRawValue(PluginConfigSpec<?> configSpec) {
String rawValue = rawSettings.get(configSpec.name());
return rawValue == null ? (String)configSpec.defaultValue() : rawValue;
public Object get(final PluginConfigSpec<?> configSpec) {
return rawSettings.get(configSpec.name());
}

public boolean contains(final PluginConfigSpec<?> configSpec) {
// TODO: Implement
return false;
return rawSettings.containsKey(configSpec.name());
}

public Collection<String> allKeys() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ final class Mutate implements Filter {
* @param context Logstash Context
*/
public Mutate(final Configuration configuration, final Context context) {
this.field = configuration.get(FIELD_CONFIG);
this.value = configuration.get(VALUE_CONFIG);
this.field = (String)configuration.get(FIELD_CONFIG);
this.value = (String)configuration.get(VALUE_CONFIG);
}

@Override
Expand Down
4 changes: 4 additions & 0 deletions logstash-core/src/main/java/org/logstash/RubyUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ public final class RubyUtil {

public static final RubyClass JAVA_PIPELINE_CLASS;

public static final RubyClass JAVA_INPUT_WRAPPER_CLASS;

/**
* Logstash Ruby Module.
*/
Expand Down Expand Up @@ -450,6 +452,8 @@ public final class RubyUtil {
JAVA_PIPELINE_CLASS = setupLogstashClass(
ABSTRACT_PIPELINE_CLASS, JavaBasePipelineExt::new, JavaBasePipelineExt.class
);
JAVA_INPUT_WRAPPER_CLASS = setupLogstashClass(PluginFactoryExt.JavaInputWrapperExt::new,
PluginFactoryExt.JavaInputWrapperExt.class);
final RubyModule json = LOGSTASH_MODULE.defineOrGetModuleUnder("Json");
final RubyClass stdErr = RUBY.getStandardError();
LOGSTASH_ERROR = LOGSTASH_MODULE.defineClassUnder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import co.elastic.logstash.api.Input;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import org.logstash.plugins.PluginFactoryExt;
import org.logstash.plugins.discovery.PluginRegistry;
import org.logstash.ext.JrubyEventExtLibrary;

Expand Down Expand Up @@ -128,7 +129,7 @@ private Map<String, AbstractOutputDelegatorExt> setupOutputs() {
final SourceWithMetadata source = v.getSourceWithMetadata();
res.put(v.getId(), pluginFactory.buildOutput(
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def)
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), def.getArguments()
));
});
return res;
Expand All @@ -146,7 +147,7 @@ private Map<String, AbstractFilterDelegatorExt> setupFilters() {
final SourceWithMetadata source = vertex.getSourceWithMetadata();
res.put(vertex.getId(), pluginFactory.buildFilter(
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def)
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), def.getArguments()
));
}
return res;
Expand All @@ -164,16 +165,21 @@ private Collection<IRubyObject> setupInputs() {
if (cls != null) {
try {
final Constructor<Input> ctor = cls.getConstructor(Configuration.class, Context.class);
javaInputs.add(ctor.newInstance(new Configuration(Collections.emptyMap()), new Context()));
javaInputs.add(ctor.newInstance(new Configuration(def.getArguments()), new Context()));
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) {
throw new IllegalStateException(ex);
}
} else {
final SourceWithMetadata source = v.getSourceWithMetadata();
nodes.add(pluginFactory.buildInput(
IRubyObject o = pluginFactory.buildInput(
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def)
));
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), def.getArguments());

if (o instanceof PluginFactoryExt.JavaInputWrapperExt) {
javaInputs.add(((PluginFactoryExt.JavaInputWrapperExt)o).getInput());
} else {
nodes.add(o);
}
}
});
return nodes;
Expand All @@ -196,7 +202,8 @@ private RubyHash convertArgs(final PluginDefinition def) {
final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition();
toput = pluginFactory.buildCodec(
RubyUtil.RUBY.newString(codec.getName()),
Rubyfier.deep(RubyUtil.RUBY, codec.getArguments())
Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()),
def.getArguments()
);
} else {
toput = value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import co.elastic.logstash.api.Filter;
import co.elastic.logstash.api.Input;

import java.util.Map;

/**
* Factory that can instantiate Java plugins as well as Ruby plugins.
*/
Expand Down Expand Up @@ -38,23 +40,28 @@ public Filter buildFilter(final String name, final String id, final Configuratio
}

@Override
public IRubyObject buildInput(final RubyString name, final RubyInteger line, final RubyInteger column, final IRubyObject args) {
return rubyFactory.buildInput(name, line, column, args);
public IRubyObject buildInput(final RubyString name, final RubyInteger line, final RubyInteger column,
final IRubyObject args, Map<String, Object> pluginArgs) {
return rubyFactory.buildInput(name, line, column, args, pluginArgs);
}

@Override
public AbstractOutputDelegatorExt buildOutput(final RubyString name, final RubyInteger line, final RubyInteger column, final IRubyObject args) {
return rubyFactory.buildOutput(name, line, column, args);
public AbstractOutputDelegatorExt buildOutput(final RubyString name, final RubyInteger line,
final RubyInteger column, final IRubyObject args,
final Map<String, Object> pluginArgs) {
return rubyFactory.buildOutput(name, line, column, args, pluginArgs);
}

@Override
public AbstractFilterDelegatorExt buildFilter(final RubyString name, final RubyInteger line, final RubyInteger column, final IRubyObject args) {
return rubyFactory.buildFilter(name, line, column, args);
public AbstractFilterDelegatorExt buildFilter(final RubyString name, final RubyInteger line,
final RubyInteger column, final IRubyObject args,
final Map<String, Object> pluginArgs) {
return rubyFactory.buildFilter(name, line, column, args, pluginArgs);
}

@Override
public IRubyObject buildCodec(final RubyString name, final IRubyObject args) {
return rubyFactory.buildCodec(name, args);
public IRubyObject buildCodec(final RubyString name, final IRubyObject args, Map<String, Object> pluginArgs) {
return rubyFactory.buildCodec(name, args, pluginArgs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import org.jruby.RubyString;
import org.jruby.runtime.builtin.IRubyObject;

import java.util.Map;

/**
* This class holds interfaces implemented by Ruby concrete classes.
*/
Expand All @@ -19,13 +21,14 @@ private RubyIntegration() {
public interface PluginFactory {

IRubyObject buildInput(RubyString name, RubyInteger line, RubyInteger column,
IRubyObject args);
IRubyObject args, Map<String, Object> pluginArgs);

AbstractOutputDelegatorExt buildOutput(RubyString name, RubyInteger line, RubyInteger column,
IRubyObject args);
IRubyObject args, Map<String, Object> pluginArgs);

AbstractFilterDelegatorExt buildFilter(RubyString name, RubyInteger line, RubyInteger column, IRubyObject args);
AbstractFilterDelegatorExt buildFilter(RubyString name, RubyInteger line, RubyInteger column, IRubyObject args,
Map<String, Object> pluginArgs);

IRubyObject buildCodec(RubyString name, IRubyObject args);
IRubyObject buildCodec(RubyString name, IRubyObject args, Map<String, Object> pluginArgs);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.logstash.execution;

import java.util.Collection;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyBasicObject;
Expand All @@ -14,7 +13,9 @@
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.config.ir.compiler.OutputDelegatorExt;
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;

import java.util.Collection;

@JRubyClass(name = "PipelineReporter")
public final class PipelineReporterExt extends RubyBasicObject {
Expand Down Expand Up @@ -163,7 +164,7 @@ private RubyArray outputInfo(final ThreadContext context) {
outputIterable = (Iterable<IRubyObject>) outputs.toJava(Iterable.class);
}
outputIterable.forEach(output -> {
final OutputDelegatorExt delegator = (OutputDelegatorExt) output;
final AbstractOutputDelegatorExt delegator = (AbstractOutputDelegatorExt) output;
final RubyHash hash = RubyHash.newHash(context.runtime);
hash.op_aset(context, TYPE_KEY, delegator.configName(context));
hash.op_aset(context, ID_KEY, delegator.getId());
Expand Down
Loading