Skip to content

Commit

Permalink
CAMEL-18299: use the FactoryFinder to lookup the adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
orpiske committed Jul 26, 2022
1 parent b52b772 commit f2a700a
Showing 1 changed file with 19 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,22 @@

package org.apache.camel.support.resume;

import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.Properties;
import java.util.Optional;

import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.resume.Cacheable;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.spi.FactoryFinder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class AdapterHelper {
private static final Logger LOG = LoggerFactory.getLogger(AdapterHelper.class);
private static final String ADAPTER_PROPERTIES = "/org/apache/camel/resume/Adapter";
private static final String PROP_ADAPTER_CLASS = "class";
private static final String ADAPTER_PROPERTIES = "/org/apache/camel/resume/";
private static final String ADAPTER_KEY = "Adapter";

private AdapterHelper() {
}
Expand All @@ -43,70 +41,26 @@ public static ResumeAdapter eval(CamelContext context, Consumer consumer) {
assert context != null;
assert consumer != null;

Object adapterInstance = context.getRegistry().lookupByName("resumeAdapter");
if (adapterInstance == null) {
adapterInstance = resolveAdapter(context, consumer);
LOG.debug("Using the factory finder to search for the resume adapter");
final FactoryFinder factoryFinder = context.adapt(ExtendedCamelContext.class).getFactoryFinder(ADAPTER_PROPERTIES);

if (adapterInstance == null) {
throw new RuntimeException("Cannot find a resume adapter class in the consumer classpath or in the registry");
}
}

if (adapterInstance instanceof ResumeAdapter) {
ResumeAdapter resumeAdapter = (ResumeAdapter) adapterInstance;
LOG.debug("Creating a new resume adapter");
final Optional<ResumeAdapter> adapterOptional = factoryFinder.newInstance(ADAPTER_KEY, ResumeAdapter.class);

Object obj = context.getRegistry().lookupByName(ResumeCache.DEFAULT_NAME);
if (resumeAdapter instanceof Cacheable && obj instanceof ResumeCache) {
((Cacheable) resumeAdapter).setCache((ResumeCache<?>) obj);
} else {
LOG.debug("The resume adapter {} is not cacheable", resumeAdapter.getClass().getName());
}

return resumeAdapter;
} else {
LOG.error("Invalid resume adapter type: {}", getType(adapterInstance));
throw new IllegalArgumentException("Invalid resume adapter type: " + getType(adapterInstance));
if (!adapterOptional.isPresent()) {
throw new RuntimeException("Cannot find a resume adapter class in the consumer classpath or in the registry");
}
}

private static Object resolveAdapter(CamelContext context, Consumer consumer) {
try (InputStream adapterStream = consumer.getClass().getResourceAsStream(ADAPTER_PROPERTIES)) {

if (adapterStream == null) {
LOG.error("Cannot find a resume adapter class in the consumer {} classpath", consumer.getClass());
return null;
}

Properties properties = new Properties();
properties.load(adapterStream);

String adapterClass = properties.getProperty(PROP_ADAPTER_CLASS);
final ResumeAdapter resumeAdapter = adapterOptional.get();
LOG.debug("Using the acquired resume adapter: {}", resumeAdapter.getClass().getName());

if (ObjectHelper.isEmpty(adapterClass)) {
LOG.error("A resume adapter class is not defined in the adapter configuration");

return null;
}

LOG.debug("About to load an adapter class {} for consumer {}", adapterClass, consumer.getClass());
Class<?> clazz = context.getClassResolver().resolveClass(adapterClass);
if (clazz == null) {
LOG.error("Cannot find the resume adapter class in the classpath {}", adapterClass);

return null;
}

return clazz.getDeclaredConstructor().newInstance();
} catch (IOException e) {
LOG.error("Unable to read the resolve the resume adapter due to I/O error: {}", e.getMessage(), e);
} catch (InvocationTargetException | InstantiationException | IllegalAccessException | NoSuchMethodException e) {
LOG.error("Unable to create a resume adapter instance: {}", e.getMessage(), e);
final Object cacheObj = context.getRegistry().lookupByName(ResumeCache.DEFAULT_NAME);
if (resumeAdapter instanceof Cacheable && cacheObj instanceof ResumeCache) {
((Cacheable) resumeAdapter).setCache((ResumeCache<?>) cacheObj);
} else {
LOG.debug("The resume adapter {} is not cacheable", resumeAdapter.getClass().getName());
}

return null;
}

private static Object getType(Object instance) {
return instance == null ? "null" : instance.getClass();
return resumeAdapter;
}
}

0 comments on commit f2a700a

Please sign in to comment.