Skip to content

Commit

Permalink
[CAMEL-10069] Update to use ClassResolver to help search for the part…
Browse files Browse the repository at this point in the history
…itioner and serializers

Also pull search for Partitioner out into separate try block to allow for use with 0.8 kafka client (which doesn't have partitioner)
  • Loading branch information
dkulp committed Jun 22, 2016
1 parent b8f5da7 commit ccef28f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 30 deletions.
Expand Up @@ -16,7 +16,6 @@
*/ */
package org.apache.camel.component.kafka; package org.apache.camel.component.kafka;


import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
Expand All @@ -30,7 +29,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InterruptException;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand Down
Expand Up @@ -16,6 +16,7 @@
*/ */
package org.apache.camel.component.kafka; package org.apache.camel.component.kafka;


import java.lang.reflect.Field;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
Expand All @@ -30,15 +31,20 @@
import org.apache.camel.CamelExchangeException; import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange; import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.spi.ClassResolver;
import org.apache.camel.util.CastUtils;
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class KafkaProducer extends DefaultAsyncProducer { public class KafkaProducer extends DefaultAsyncProducer {

private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer.class);

private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer; private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
private final KafkaEndpoint endpoint; private final KafkaEndpoint endpoint;
private ExecutorService workerPool; private ExecutorService workerPool;
Expand All @@ -49,55 +55,63 @@ public KafkaProducer(KafkaEndpoint endpoint) {
this.endpoint = endpoint; this.endpoint = endpoint;
} }



<T> Class<T> loadClass(Object o, ClassResolver resolver, Class<T> type) {
Class<?> loadClass(Object o, ClassLoader loader) {
if (o == null || o instanceof Class) { if (o == null || o instanceof Class) {
return (Class<?>)o; return CastUtils.cast((Class<?>)o);
} }
String name = o.toString(); String name = o.toString();
Class<?> c; Class<T> c = resolver.resolveClass(name, type);
try {
c = Class.forName(name, true, loader);
} catch (ClassNotFoundException e) {
c = null;
}
if (c == null) { if (c == null) {
try { c = resolver.resolveClass(name, type, getClass().getClassLoader());
c = Class.forName(name, true, getClass().getClassLoader());
} catch (ClassNotFoundException e) {
c = null;
}
} }
if (c == null) { if (c == null) {
try { c = resolver.resolveClass(name, type, org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
c = Class.forName(name, true, org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
} catch (ClassNotFoundException e) {
c = null;
}
} }
return c; return c;
} }
void replaceWithClass(Properties props, String key, ClassLoader loader, Class<?> type) { void replaceWithClass(Properties props, String key, ClassResolver resolver, Class<?> type) {
Class<?> c = loadClass(props.get(key), loader); Class<?> c = loadClass(props.get(key), resolver, type);
if (c != null) { if (c != null) {
props.put(key, c); props.put(key, c);
} }
} }


Properties getProps() { Properties getProps() {
Properties props = endpoint.getConfiguration().createProducerProperties(); Properties props = endpoint.getConfiguration().createProducerProperties();
if (endpoint.getCamelContext() != null) { try {
ClassLoader loader = endpoint.getCamelContext().getApplicationContextClassLoader(); if (endpoint.getCamelContext() != null) {
replaceWithClass(props, "key.serializer", loader, Serializer.class); ClassResolver resolver = endpoint.getCamelContext().getClassResolver();
replaceWithClass(props, "value.serializer", loader, Serializer.class); replaceWithClass(props, "key.serializer", resolver, Serializer.class);
replaceWithClass(props, "partitioner.class", loader, Partitioner.class); replaceWithClass(props, "value.serializer", resolver, Serializer.class);

try {
//doesn't exist in old version of Kafka client so detect and only call the method if
//the field/config actually exists
Field f = ProducerConfig.class.getDeclaredField("PARTITIONER_CLASS_CONFIG");
if (f != null) {
loadParitionerClass(resolver, props);
}
} catch (NoSuchFieldException e) {
//ignore
} catch (SecurityException e) {
//ignore
}
}
} catch (Throwable t) {
//can ignore and Kafka itself might be able to handle it, if not, it will throw an exception
LOG.debug("Problem loading classes for Serializers", t);
} }
if (endpoint.getBrokers() != null) { if (endpoint.getBrokers() != null) {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getBrokers()); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getBrokers());
} }
return props; return props;
} }


private void loadParitionerClass(ClassResolver resolver, Properties props) {
replaceWithClass(props, "partitioner.class", resolver, Partitioner.class);
}


public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() { public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() {
return kafkaProducer; return kafkaProducer;
} }
Expand Down Expand Up @@ -184,6 +198,11 @@ public ProducerRecord next() {
} }
return new ProducerRecord(msgTopic, msgList.next()); return new ProducerRecord(msgTopic, msgList.next());
} }

@Override
public void remove() {
msgList.remove();
}
}; };
} }
ProducerRecord record; ProducerRecord record;
Expand Down
Expand Up @@ -99,7 +99,7 @@ public static void after() {
} }


@Override @Override
protected RoutesBuilder createRouteBuilder() throws Exception { protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() { return new RouteBuilder() {
@Override @Override
public void configure() throws Exception { public void configure() throws Exception {
Expand Down

0 comments on commit ccef28f

Please sign in to comment.