Skip to content

Commit

Permalink
Java plugin API
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann committed Dec 17, 2018
1 parent dc174b2 commit 3dd1288
Show file tree
Hide file tree
Showing 66 changed files with 4,420 additions and 357 deletions.
6 changes: 5 additions & 1 deletion logstash-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ dependencies {
compile group: 'com.google.guava', name: 'guava', version: '22.0'
// Do not upgrade this, later versions require GPL licensed code in javac-shaded that is
// Apache2 incompatible
compile 'com.google.googlejavaformat:google-java-format:1.1'
compile('com.google.googlejavaformat:google-java-format:1.1') {
exclude group: 'com.google.guava', module: 'guava'
}
compile 'org.javassist:javassist:3.22.0-GA'
compile 'com.google.guava:guava:20.0'
testCompile 'org.apache.logging.log4j:log4j-core:2.9.1:tests'
testCompile 'junit:junit:4.12'
testCompile 'net.javacrumbs.json-unit:json-unit:1.9.0'
Expand Down
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/config/config_ast.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def compile_initializer
events.each{|e| block.call(e)}
end
if @generated_objects[:#{name}].respond_to?(:flush)
if !@generated_objects[:#{name}].nil? && @generated_objects[:#{name}].has_flush
@periodic_flushers << @generated_objects[:#{name}_flush] if @generated_objects[:#{name}].periodic_flush
@shutdown_flushers << @generated_objects[:#{name}_flush]
end
Expand Down
71 changes: 2 additions & 69 deletions logstash-core/lib/logstash/filter_delegator.rb
Original file line number Diff line number Diff line change
@@ -1,69 +1,2 @@
# encoding: utf-8
#
module LogStash
class FilterDelegator
extend Forwardable
DELEGATED_METHODS = [
:register,
:close,
:threadsafe?,
:do_close,
:do_stop,
:periodic_flush,
:reloadable?
]
def_delegators :@filter, *DELEGATED_METHODS

attr_reader :id

def initialize(filter, id)
@klass = filter.class
@id = id
@filter = filter

# Scope the metrics to the plugin
namespaced_metric = filter.metric
@metric_events = namespaced_metric.namespace(:events)
@metric_events_in = @metric_events.counter(:in)
@metric_events_out = @metric_events.counter(:out)
@metric_events_time = @metric_events.counter(:duration_in_millis)
namespaced_metric.gauge(:name, config_name)

# Not all the filters will do bufferings
define_flush_method if @filter.respond_to?(:flush)
end

def config_name
@klass.config_name
end

def multi_filter(events)
@metric_events_in.increment(events.size)

start_time = java.lang.System.nano_time
new_events = @filter.multi_filter(events)
@metric_events_time.increment((java.lang.System.nano_time - start_time) / 1_000_000)

# There is no guarantee in the context of filter
# that EVENTS_IN == EVENTS_OUT, see the aggregates and
# the split filter
c = new_events.count { |event| !event.cancelled? }
@metric_events_out.increment(c) if c > 0
new_events
end

private
def define_flush_method
define_singleton_method(:flush) do |options = {}|
# we also need to trace the number of events
# coming from a specific filters.
new_events = @filter.flush(options)

# Filter plugins that does buffering or spooling of events like the
# `Logstash-filter-aggregates` can return `NIL` and will flush on the next flush ticks.
@metric_events_out.increment(new_events.size) if new_events && new_events.size > 0
new_events
end
end
end
end
# The contents of this file have been ported to Java. It is included for for compatibility
# with plugins that directly include it.
5 changes: 5 additions & 0 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def initialize(pipeline_config, namespaced_metric = nil, agent = nil)

@worker_threads = []

@java_inputs_controller = org.logstash.execution.InputsController.new(lir_execution.javaInputs)

@drain_queue = settings.get_value("queue.drain") || settings.get("queue.type") == "memory"

@events_filtered = java.util.concurrent.atomic.LongAdder.new
Expand Down Expand Up @@ -241,6 +243,7 @@ def start_workers

def wait_inputs
@input_threads.each(&:join)
@java_inputs_controller.awaitStop
end

def start_inputs
Expand All @@ -259,6 +262,7 @@ def start_inputs

# then after all input plugins are successfully registered, start them
inputs.each { |input| start_input(input) }
@java_inputs_controller.startInputs(self)
end

def start_input(plugin)
Expand Down Expand Up @@ -324,6 +328,7 @@ def wait_for_workers
def stop_inputs
@logger.debug("Closing inputs", default_logging_keys)
inputs.each(&:do_stop)
@java_inputs_controller.stopInputs
@logger.debug("Closed inputs", default_logging_keys)
end

Expand Down
7 changes: 5 additions & 2 deletions logstash-core/lib/logstash/plugins/registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,14 @@ def namespace_lookup(type, name)
# @param name [String] plugin name
# @return [Boolean] true if klass is a valid plugin for name
def is_a_plugin?(klass, name)
klass.ancestors.include?(LogStash::Plugin) && klass.respond_to?(:config_name) && klass.config_name == name
(klass.class == Java::JavaClass && klass.simple_name.downcase == name.gsub('_','')) ||
(klass.ancestors.include?(LogStash::Plugin) && klass.respond_to?(:config_name) && klass.config_name == name)
end

def add_plugin(type, name, klass)
if !exists?(type, name)
if klass.respond_to?("javaClass", true)
@registry[key_for(type, name)] = PluginSpecification.new(type, name, klass.javaClass)
elsif !exists?(type, name)
specification_klass = type == :universal ? UniversalPluginSpecification : PluginSpecification
@registry[key_for(type, name)] = specification_klass.new(type, name, klass)
else
Expand Down
14 changes: 2 additions & 12 deletions logstash-core/spec/logstash/filter_delegator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def filter(event)
end

it "defines a flush method" do
expect(subject.respond_to?(:flush)).to be_truthy
expect(subject.has_flush).to be_truthy
end

context "when the flush return events" do
Expand Down Expand Up @@ -128,7 +128,7 @@ def filter(event)
end

it "doesnt define a flush method" do
expect(subject.respond_to?(:flush)).to be_falsey
expect(subject.has_flush).to be_falsey
end

it "increments the in/out of the metric" do
Expand All @@ -145,14 +145,4 @@ def filter(event)
end
end

context "delegate methods to the original plugin" do
# I am not testing the behavior of these methods
# this is done in the plugin tests. I just want to make sure
# the proxy delegates the methods.
LogStash::FilterDelegator::DELEGATED_METHODS.each do |method|
it "delegate method: `#{method}` to the filter" do
expect(subject.respond_to?(method))
end
end
end
end
12 changes: 1 addition & 11 deletions logstash-core/spec/logstash/java_filter_delegator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

java_import org.logstash.RubyUtil

describe LogStash::JavaFilterDelegator do
describe LogStash::FilterDelegator do

class MockGauge
def increment(_)
Expand Down Expand Up @@ -182,14 +182,4 @@ def filter(event)
end
end

context "delegate methods to the original plugin" do
# I am not testing the behavior of these methods
# this is done in the plugin tests. I just want to make sure
# the proxy delegates the methods.
LogStash::FilterDelegator::DELEGATED_METHODS.each do |method|
it "delegate method: `#{method}` to the filter" do
expect(subject.respond_to?(method))
end
end
end
end
106 changes: 106 additions & 0 deletions logstash-core/src/main/java/co/elastic/logstash/api/Configuration.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package co.elastic.logstash.api;

import java.nio.file.Path;
import java.util.Collection;
import java.util.Map;

/**
* Configuration for Logstash Java plugins.
*/
public final class Configuration {

private final Map<String, Object> rawSettings;

/**
* @param raw Configuration Settings Map. Values are serialized.
*/
public Configuration(final Map<String, Object> raw) {
this.rawSettings = raw;
}

@SuppressWarnings("unchecked")
public <T> T get(final PluginConfigSpec<T> configSpec) {
if (rawSettings.containsKey(configSpec.name())) {
Object o = rawSettings.get(configSpec.name());
if (configSpec.type().isAssignableFrom(o.getClass())) {
return (T) o;
} else {
throw new IllegalStateException(
String.format("Setting value for '%s' of type '%s' incompatible with defined type of '%s'",
configSpec.name(), o.getClass(), configSpec.type()));
}
} else {
return configSpec.defaultValue();
}
}

public Object getRawValue(final PluginConfigSpec<?> configSpec) {
return rawSettings.get(configSpec.name());
}

public boolean contains(final PluginConfigSpec<?> configSpec) {
return rawSettings.containsKey(configSpec.name());
}

public Collection<String> allKeys() {
return rawSettings.keySet();
}

public static PluginConfigSpec<String> stringSetting(final String name) {
return new PluginConfigSpec<>(
name, String.class, null, false, false
);
}

public static PluginConfigSpec<String> stringSetting(final String name, final String defaultValue) {
return new PluginConfigSpec<>(
name, String.class, defaultValue, false, false
);
}

public static PluginConfigSpec<String> requiredStringSetting(final String name) {
return new PluginConfigSpec<>(name, String.class, null, false, true);
}

public static PluginConfigSpec<Long> numSetting(final String name) {
return new PluginConfigSpec<>(
name, Long.class, null, false, false
);
}

public static PluginConfigSpec<Long> numSetting(final String name, final long defaultValue) {
return new PluginConfigSpec<>(
name, Long.class, defaultValue, false, false
);
}

public static PluginConfigSpec<Path> pathSetting(final String name) {
return new PluginConfigSpec<>(name, Path.class, null, false, false);
}

public static PluginConfigSpec<Boolean> booleanSetting(final String name) {
return new PluginConfigSpec<>(name, Boolean.class, null, false, false);
}

@SuppressWarnings("unchecked")
public static PluginConfigSpec<Map<String, String>> hashSetting(final String name) {
return new PluginConfigSpec(name, Map.class, null, false, false);
}

@SuppressWarnings("unchecked")
public static <T> PluginConfigSpec<Map<String, T>> requiredFlatHashSetting(
final String name, Class<T> type) {
//TODO: enforce subtype
return new PluginConfigSpec(
name, Map.class, null, false, true
);
}

@SuppressWarnings("unchecked")
public static PluginConfigSpec<Map<String, Configuration>> requiredNestedHashSetting(
final String name, final Collection<PluginConfigSpec<?>> spec) {
return new PluginConfigSpec(
name, Map.class, null, false, true, spec
);
}
}
13 changes: 13 additions & 0 deletions logstash-core/src/main/java/co/elastic/logstash/api/Context.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package co.elastic.logstash.api;

import org.logstash.common.io.DeadLetterQueueWriter;

/**
* Holds Logstash Environment.
*/
public final class Context {

public DeadLetterQueueWriter dlqWriter() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package co.elastic.logstash.api;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Logstash plugin annotation for finding plugins on the classpath and setting their name as used
* in the configuration syntax.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface LogstashPlugin {
String name();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package co.elastic.logstash.api;

import java.util.Collection;

public interface Plugin {

Collection<PluginConfigSpec<?>> configSchema();
}
Loading

0 comments on commit 3dd1288

Please sign in to comment.