Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #1 from mluban/master

Tweaks based on deployment and integration testing @ewhauser
  • Loading branch information...
commit 50f19484c9173e3862da194a5fed2abccce30832 2 parents 8b7035d + 112404c
@ewhauser authored
View
2  project/build/Project.scala
@@ -22,7 +22,7 @@ class Project(info: ProjectInfo) extends StandardLibraryProject(info)
val junitInterface = "com.novocode" % "junit-interface" % "0.6" % "test->default"
val zookeeper = "org.apache.zookeeper" % "zookeeper" % "3.3.3"
- val log4j = "log4j" % "log4j" % "1.2.15"
+ val log4j = "log4j" % "log4j" % "1.2.16"
val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2"
val flume = "com.cloudera" % "flume-core" % "0.9.4-cdh3u1"
val zkClient = "com.github.sgroschupf" % "zkclient" % "0.1"
View
2  project/plugins/project/build.properties
@@ -1,3 +1,3 @@
#Project properties
-#Fri Sep 16 09:41:20 EDT 2011
+#Sun Oct 02 05:56:49 EDT 2011
plugin.uptodate=true
View
12 src/main/java/org/apache/flume/kafka/KafkaSink.java
@@ -62,7 +62,7 @@ synchronized public void close() throws IOException {
@Override
synchronized public void open() throws IOException {
checkState(producer == null, "Kafka sink is already initialized. Looks like sink close() " +
- "hasn't been proceeded properly.");
+ "hasn't proceeded properly.");
Properties properties = new Properties();
properties.setProperty("zk.connect", zkConnect);
@@ -78,13 +78,13 @@ public static SinkBuilder builder() {
@Override
public EventSink build(Context context, String... argv) {
- checkArgument(argv.length < 1, USAGE);
+ checkArgument(argv.length == 2, USAGE);
String zkConnect = argv[0];
- String topic = argv[0];
+ String topic = argv[1];
- checkState(!isNullOrEmpty(zkConnect), "zk.connect cannot be empty");
- checkState(!isNullOrEmpty(topic), "topic cannot be empty");
+ checkArgument(!isNullOrEmpty(zkConnect), "zk.connect cannot be empty");
+ checkArgument(!isNullOrEmpty(topic), "topic cannot be empty");
return new KafkaSink(zkConnect, topic);
}
@@ -95,4 +95,4 @@ public EventSink build(Context context, String... argv) {
public static List<Pair<String, SinkFactory.SinkBuilder>> getSinkBuilders() {
return asList(new Pair<String, SinkFactory.SinkBuilder>("kafka", builder()));
}
-}
+}
View
13 src/main/java/org/apache/flume/kafka/KafkaSource.java
@@ -6,6 +6,7 @@
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSource;
+import com.cloudera.util.Pair;
import com.google.common.collect.Maps;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
@@ -24,6 +25,7 @@
import java.util.concurrent.*;
import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Arrays.asList;
public class KafkaSource extends EventSource.Base {
static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
@@ -59,12 +61,14 @@ public Thread newThread(Runnable runnable) {
});
}
+ @Override
public void close() throws IOException {
// make sure
shutdown = true;
try {
executor.shutdown();
executor.awaitTermination(2, TimeUnit.SECONDS);
+ LOG.info("Kafka source successfully closed");
} catch (InterruptedException e) {
LOG.debug("Waiting for Kafka consumer threads to exit was interrupted", e);
}
@@ -74,6 +78,7 @@ public void close() throws IOException {
* Blocks on either getting an event from the queue or process exit (at which
* point it throws an exception).
*/
+ @Override
public Event next() throws IOException {
Event evt;
try {
@@ -90,6 +95,7 @@ public Event next() throws IOException {
}
}
+ @Override
public void open() throws IOException {
Map<String, Integer> topicCountMap = Maps.newHashMapWithExpectedSize(1);
topicCountMap.put(topic, threads);
@@ -98,6 +104,8 @@ public void open() throws IOException {
for (KafkaMessageStream stream : consumerMap.get(topic)) {
executor.submit(new KafkaConsumerThread(stream));
}
+
+ LOG.info("Kafka source successfully opened");
}
class KafkaConsumerThread implements Callable<Object> {
@@ -147,5 +155,10 @@ public EventSource build(Context ctx, String... argv) {
};
}
+ @SuppressWarnings("unchecked")
+ public static List<Pair<String, SourceFactory.SourceBuilder>> getSourceBuilders() {
+ return asList(new Pair<String, SourceFactory.SourceBuilder>("kafka", kafkaSourceBuilder()));
+ }
+
}
View
4 .../org/apache/flume/kafka/TestKafaSink.java → ...org/apache/flume/kafka/TestKafkaSink.java
@@ -23,7 +23,7 @@
import static junit.framework.Assert.*;
-public class TestKafaSink {
+public class TestKafkaSink {
private EmbeddedZookeeper zkServer;
@@ -118,4 +118,4 @@ public void after() throws Exception {
zkServer.shutdown();
}
-}
+}
Please sign in to comment.
Something went wrong with that request. Please try again.