Skip to content

Commit

Permalink
Merge branch 'issue-322-kafka-inputs' into 1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
Jochen Schalanda committed May 22, 2015
2 parents ae78d64 + 63cae8a commit be31766
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 5 deletions.
@@ -0,0 +1,89 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.inputs.gelf.kafka;

import com.codahale.metrics.MetricRegistry;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import org.graylog2.inputs.codecs.GelfCodec;
import org.graylog2.inputs.transports.KafkaTransport;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;

import javax.inject.Inject;

public class GELFKafkaInput extends MessageInput {
private static final String NAME = "GELF Kafka";

@AssistedInject
public GELFKafkaInput(@Assisted Configuration configuration,
MetricRegistry metricRegistry,
KafkaTransport.Factory transport,
GelfCodec.Factory codec,
LocalMetricRegistry localRegistry,
Config config,
Descriptor descriptor, ServerStatus serverStatus) {
this(metricRegistry,
configuration,
transport.create(configuration),
codec.create(configuration),
localRegistry,
config,
descriptor, serverStatus);
}

protected GELFKafkaInput(MetricRegistry metricRegistry,
Configuration configuration,
KafkaTransport radioKafkaTransport,
GelfCodec codec,
LocalMetricRegistry localRegistry,
MessageInput.Config config,
MessageInput.Descriptor descriptor, ServerStatus serverStatus) {
super(metricRegistry, configuration, radioKafkaTransport, localRegistry, codec, config, descriptor, serverStatus);
}

@FactoryClass
public interface Factory extends MessageInput.Factory<GELFKafkaInput> {
@Override
GELFKafkaInput create(Configuration configuration);

@Override
Config getConfig();

@Override
Descriptor getDescriptor();
}

public static class Descriptor extends MessageInput.Descriptor {
@Inject
public Descriptor() {
super(NAME, false, "https://www.graylog.org/documentation/sending/kafka/");
}
}

@ConfigClass
public static class Config extends MessageInput.Config {
@Inject
public Config(KafkaTransport.Factory transport, GelfCodec.Factory codec) {
super(transport.getConfig(), codec.getConfig());
}
}
}
Expand Up @@ -17,7 +17,6 @@
package org.graylog2.inputs.kafka;

import com.codahale.metrics.MetricRegistry;
import javax.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import org.graylog2.inputs.codecs.RadioMessageCodec;
Expand All @@ -26,9 +25,12 @@
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;

public class KafkaInput extends MessageInput {
import javax.inject.Inject;

public class KafkaInput extends MessageInput {
private static final String NAME = "Kafka Input";

@AssistedInject
Expand Down Expand Up @@ -58,6 +60,7 @@ protected KafkaInput(MetricRegistry metricRegistry,
super(metricRegistry, configuration, radioKafkaTransport, localRegistry, radioMessageCodec, config, descriptor, serverStatus);
}

@FactoryClass
public interface Factory extends MessageInput.Factory<KafkaInput> {
@Override
KafkaInput create(Configuration configuration);
Expand All @@ -76,6 +79,7 @@ public Descriptor() {
}
}

@ConfigClass
public static class Config extends MessageInput.Config {
@Inject
public Config(KafkaTransport.Factory transport, RadioMessageCodec.Factory codec) {
Expand Down
@@ -0,0 +1,89 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.inputs.raw.kafka;

import com.codahale.metrics.MetricRegistry;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import org.graylog2.inputs.codecs.RawCodec;
import org.graylog2.inputs.transports.KafkaTransport;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;

import javax.inject.Inject;

public class RawKafkaInput extends MessageInput {
private static final String NAME = "Raw/Plaintext Kafka";

@AssistedInject
public RawKafkaInput(@Assisted Configuration configuration,
MetricRegistry metricRegistry,
KafkaTransport.Factory transport,
RawCodec.Factory codec,
LocalMetricRegistry localRegistry,
Config config,
Descriptor descriptor, ServerStatus serverStatus) {
this(metricRegistry,
configuration,
transport.create(configuration),
codec.create(configuration),
localRegistry,
config,
descriptor, serverStatus);
}

protected RawKafkaInput(MetricRegistry metricRegistry,
Configuration configuration,
KafkaTransport radioKafkaTransport,
RawCodec codec,
LocalMetricRegistry localRegistry,
MessageInput.Config config,
MessageInput.Descriptor descriptor, ServerStatus serverStatus) {
super(metricRegistry, configuration, radioKafkaTransport, localRegistry, codec, config, descriptor, serverStatus);
}

@FactoryClass
public interface Factory extends MessageInput.Factory<RawKafkaInput> {
@Override
RawKafkaInput create(Configuration configuration);

@Override
Config getConfig();

@Override
Descriptor getDescriptor();
}

public static class Descriptor extends MessageInput.Descriptor {
@Inject
public Descriptor() {
super(NAME, false, "https://www.graylog.org/documentation/sending/kafka/");
}
}

@ConfigClass
public static class Config extends MessageInput.Config {
@Inject
public Config(KafkaTransport.Factory transport, RawCodec.Factory codec) {
super(transport.getConfig(), codec.getConfig());
}
}
}
@@ -0,0 +1,89 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.inputs.syslog.kafka;

import com.codahale.metrics.MetricRegistry;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import org.graylog2.inputs.codecs.SyslogCodec;
import org.graylog2.inputs.transports.KafkaTransport;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;

import javax.inject.Inject;

public class SyslogKafkaInput extends MessageInput {
private static final String NAME = "Syslog Kafka";

@AssistedInject
public SyslogKafkaInput(@Assisted Configuration configuration,
MetricRegistry metricRegistry,
KafkaTransport.Factory transport,
SyslogCodec.Factory codec,
LocalMetricRegistry localRegistry,
Config config,
Descriptor descriptor, ServerStatus serverStatus) {
this(metricRegistry,
configuration,
transport.create(configuration),
codec.create(configuration),
localRegistry,
config,
descriptor, serverStatus);
}

protected SyslogKafkaInput(MetricRegistry metricRegistry,
Configuration configuration,
KafkaTransport radioKafkaTransport,
SyslogCodec codec,
LocalMetricRegistry localRegistry,
MessageInput.Config config,
MessageInput.Descriptor descriptor, ServerStatus serverStatus) {
super(metricRegistry, configuration, radioKafkaTransport, localRegistry, codec, config, descriptor, serverStatus);
}

@FactoryClass
public interface Factory extends MessageInput.Factory<SyslogKafkaInput> {
@Override
SyslogKafkaInput create(Configuration configuration);

@Override
Config getConfig();

@Override
Descriptor getDescriptor();
}

public static class Descriptor extends MessageInput.Descriptor {
@Inject
public Descriptor() {
super(NAME, false, "https://www.graylog.org/documentation/sending/kafka/");
}
}

@ConfigClass
public static class Config extends MessageInput.Config {
@Inject
public Config(KafkaTransport.Factory transport, SyslogCodec.Factory codec) {
super(transport.getConfig(), codec.getConfig());
}
}
}
Expand Up @@ -20,23 +20,23 @@
import org.graylog2.inputs.codecs.CodecsModule;
import org.graylog2.inputs.gelf.amqp.GELFAMQPInput;
import org.graylog2.inputs.gelf.http.GELFHttpInput;
import org.graylog2.inputs.gelf.kafka.GELFKafkaInput;
import org.graylog2.inputs.gelf.tcp.GELFTCPInput;
import org.graylog2.inputs.gelf.udp.GELFUDPInput;
import org.graylog2.inputs.kafka.KafkaInput;
import org.graylog2.inputs.misc.jsonpath.JsonPathInput;
import org.graylog2.inputs.misc.metrics.LocalMetricsInput;
import org.graylog2.inputs.random.FakeHttpMessageInput;
import org.graylog2.inputs.raw.kafka.RawKafkaInput;
import org.graylog2.inputs.raw.tcp.RawTCPInput;
import org.graylog2.inputs.raw.udp.RawUDPInput;
import org.graylog2.inputs.syslog.kafka.SyslogKafkaInput;
import org.graylog2.inputs.syslog.tcp.SyslogTCPInput;
import org.graylog2.inputs.syslog.udp.SyslogUDPInput;
import org.graylog2.inputs.transports.TransportsModule;
import org.graylog2.plugin.inject.Graylog2Module;
import org.graylog2.plugin.inputs.MessageInput;

/**
* @author Dennis Oelkers <dennis@torch.sh>
*/
public class MessageInputBindings extends Graylog2Module {
@Override
protected void configure() {
Expand All @@ -47,13 +47,16 @@ protected void configure() {
// new style inputs, using transports and codecs
installInput(inputMapBinder, RawTCPInput.class, RawTCPInput.Factory.class);
installInput(inputMapBinder, RawUDPInput.class, RawUDPInput.Factory.class);
installInput(inputMapBinder, RawKafkaInput.class, RawKafkaInput.Factory.class);
installInput(inputMapBinder, SyslogTCPInput.class, SyslogTCPInput.Factory.class);
installInput(inputMapBinder, SyslogUDPInput.class, SyslogUDPInput.Factory.class);
installInput(inputMapBinder, SyslogKafkaInput.class, SyslogKafkaInput.Factory.class);
installInput(inputMapBinder, FakeHttpMessageInput.class, FakeHttpMessageInput.Factory.class);
installInput(inputMapBinder, GELFTCPInput.class, GELFTCPInput.Factory.class);
installInput(inputMapBinder, GELFHttpInput.class, GELFHttpInput.Factory.class);
installInput(inputMapBinder, GELFUDPInput.class, GELFUDPInput.Factory.class);
installInput(inputMapBinder, GELFAMQPInput.class, GELFAMQPInput.Factory.class);
installInput(inputMapBinder, GELFKafkaInput.class, GELFKafkaInput.Factory.class);
installInput(inputMapBinder, KafkaInput.class, KafkaInput.Factory.class);
installInput(inputMapBinder, JsonPathInput.class, JsonPathInput.Factory.class);
installInput(inputMapBinder, LocalMetricsInput.class, LocalMetricsInput.Factory.class);
Expand Down

0 comments on commit be31766

Please sign in to comment.