Skip to content
This repository has been archived by the owner on Nov 28, 2023. It is now read-only.

Commit

Permalink
Make Kafka Producer's Managed
Browse files Browse the repository at this point in the history
Producer's should be properly managed by the Dropwizard lifecycle.

Also fixes incorrect capitalization in the producer package name.
  • Loading branch information
nicktelford committed Sep 1, 2014
1 parent 8e0fa6c commit dc21341
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 5 deletions.
@@ -1,7 +1,8 @@
package com.datasift.dropwizard.kafka;

import com.datasift.dropwizard.kafka.Producer.InstrumentedProducer;
import com.datasift.dropwizard.kafka.Producer.KafkaProducer;
import com.datasift.dropwizard.kafka.producer.InstrumentedProducer;
import com.datasift.dropwizard.kafka.producer.KafkaProducer;
import com.datasift.dropwizard.kafka.producer.ManagedProducer;
import com.datasift.dropwizard.kafka.util.Compression;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
Expand Down Expand Up @@ -265,8 +266,10 @@ public <K, V> KafkaProducer<K, V> build(final Class<? extends Encoder<K>> keyEnc
final Class<Partitioner> partitioner,
final Environment environment,
final String name) {
final Producer<K, V> producer = build(keyEncoder, messageEncoder, partitioner, name);
environment.lifecycle().manage(new ManagedProducer(producer));
return new InstrumentedProducer<>(
build(keyEncoder, messageEncoder, partitioner, name),
producer,
environment.metrics(),
name);
}
Expand Down
@@ -1,4 +1,4 @@
package com.datasift.dropwizard.kafka.Producer;
package com.datasift.dropwizard.kafka.producer;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
Expand Down
@@ -1,4 +1,4 @@
package com.datasift.dropwizard.kafka.Producer;
package com.datasift.dropwizard.kafka.producer;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
Expand Down
@@ -0,0 +1,26 @@
package com.datasift.dropwizard.kafka.producer;

import io.dropwizard.lifecycle.Managed;
import kafka.javaapi.producer.Producer;

/**
* Manages a Kafka {@link Producer} as part of the application lifecycle..
*/
public class ManagedProducer implements Managed {

private final Producer<?, ?> producer;

public ManagedProducer(final Producer<?, ?> producer) {
this.producer = producer;
}

@Override
public void start() throws Exception {
// nothing to do, already started
}

@Override
public void stop() throws Exception {
producer.close();
}
}

0 comments on commit dc21341

Please sign in to comment.