Skip to content

Commit

Permalink
CAMEL-18152: implement auto-configuration for the adapters
Browse files Browse the repository at this point in the history
  • Loading branch information
orpiske committed Jul 26, 2022
1 parent f2a700a commit ac1e460
Show file tree
Hide file tree
Showing 23 changed files with 62 additions and 119 deletions.
@@ -0,0 +1,2 @@
# Generated by camel build tools - do NOT edit this file!
class=org.apache.camel.component.atom.UpdatedDateFilter
@@ -0,0 +1,2 @@
# Generated by camel build tools - do NOT edit this file!
class=org.apache.camel.component.atom.UpdatedDateFilter
Expand Up @@ -20,13 +20,16 @@

import org.apache.abdera.model.Entry;
import org.apache.camel.component.feed.EntryFilter;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.spi.annotations.JdkService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Filters out all entries which occur before the last time of the entry we saw (assuming entries arrive sorted in
* order).
*/
@JdkService(ResumeAdapter.RESUME_ADAPTER_FACTORY)
public class UpdatedDateFilter implements EntryFilter<Entry> {

private static final Logger LOG = LoggerFactory.getLogger(UpdatedDateFilter.class);
Expand Down
Expand Up @@ -88,6 +88,11 @@ public ResumeStrategy getResumeStrategy() {
return resumeStrategy;
}

@Override
public String adapterFactoryService() {
return "atom-adapter-factory";
}

protected abstract void resetList();

protected abstract void populateList(Object feed) throws Exception;
Expand Down

This file was deleted.

@@ -0,0 +1,2 @@
# Generated by camel build tools - do NOT edit this file!
class=org.apache.camel.component.aws2.kinesis.consumer.KinesisDefaultResumeAdapter
Expand Up @@ -23,12 +23,15 @@
import org.apache.camel.resume.Deserializable;
import org.apache.camel.resume.Offset;
import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.spi.annotations.JdkService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

@JdkService(ResumeAdapter.RESUME_ADAPTER_FACTORY)
public class KinesisDefaultResumeAdapter implements KinesisResumeAdapter, Cacheable, Deserializable {
private static final Logger LOG = LoggerFactory.getLogger(KinesisDefaultResumeAdapter.class);

Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

@@ -0,0 +1,2 @@
# Generated by camel build tools - do NOT edit this file!
class=org.apache.camel.component.file.consumer.adapters.FileResumeAdapterDelegate
@@ -0,0 +1,2 @@
# Generated by camel build tools - do NOT edit this file!
class=org.apache.camel.component.file.consumer.adapters.FileResumeAdapterDelegate
Expand Up @@ -340,4 +340,8 @@ public void setResumeStrategy(ResumeStrategy resumeStrategy) {
this.resumeStrategy = resumeStrategy;
}

@Override
public String adapterFactoryService() {
return "file-adapter-factory";
}
}
Expand Up @@ -29,7 +29,9 @@
import org.apache.camel.resume.Offset;
import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.spi.annotations.JdkService;

@JdkService("file")
public class FileResumeAdapterDelegate
implements FileResumeAdapter, Cacheable, Deserializable, FileOffsetResumeAdapter, DirectoryEntriesResumeAdapter {
private final DefaultDirectoryEntriesResumeAdapter directoryEntriesResumeAdapter
Expand Down

This file was deleted.

Expand Up @@ -29,9 +29,11 @@
import org.apache.camel.cluster.CamelClusterMember;
import org.apache.camel.cluster.CamelClusterService;
import org.apache.camel.cluster.CamelClusterView;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.resume.AdapterHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -139,8 +141,13 @@ private synchronized void onLeadershipTaken() throws Exception {
}

if (delegatedConsumer instanceof ResumeAware) {
final ResumeAware resumeAwareConsumer = (ResumeAware) delegatedConsumer;
LOG.info("Setting up the resume adapter for the resume strategy in the delegated consumer");
ResumeAdapter resumeAdapter = AdapterHelper.eval(clusterService.getCamelContext(), resumeAwareConsumer);
resumeStrategy.setAdapter(resumeAdapter);

LOG.info("Setting up the resume strategy for the delegated consumer");
((ResumeAware) delegatedConsumer).setResumeStrategy(resumeStrategy);
resumeAwareConsumer.setResumeStrategy(resumeStrategy);
}

ServiceHelper.startService(delegatedEndpoint, delegatedConsumer);
Expand Down
Expand Up @@ -28,6 +28,7 @@
* resume API, as well as to offer component-specific interfaces that can be specialized by other integrations.
*/
public interface ResumeAdapter {
String RESUME_ADAPTER_FACTORY = "adapter-factory";

/**
* Execute the resume logic for the adapter
Expand Down
Expand Up @@ -35,4 +35,17 @@ public interface ResumeAware<T extends ResumeStrategy> {
* @return the resume strategy
*/
T getResumeStrategy();


/**
* Allows the implementation to provide custom adapter factories. It binds the service name provided in the
* {@link org.apache.camel.spi.annotations.JdkService} annotation in the adapter with the resume aware class. This
* allows the adapter to be resolved automatically in runtime while also allowing fallback to reusable adapters
* when available.
*
* @return
*/
default String adapterFactoryService() {
return ResumeAdapter.RESUME_ADAPTER_FACTORY;
}
}
Expand Up @@ -644,7 +644,7 @@ protected void gatherRootServices(List<Service> services) throws Exception {
}

if (consumer instanceof ResumeAware && resumeStrategy != null) {
ResumeAdapter resumeAdapter = AdapterHelper.eval(getCamelContext(), consumer);
ResumeAdapter resumeAdapter = AdapterHelper.eval(getCamelContext(), (ResumeAware) consumer);
resumeStrategy.setAdapter(resumeAdapter);
((ResumeAware) consumer).setResumeStrategy(resumeStrategy);
}
Expand Down
@@ -0,0 +1,2 @@
# Generated by camel build tools - do NOT edit this file!
class=org.apache.camel.support.resume.ResumeActionAwareAdapter
Expand Up @@ -20,34 +20,34 @@
import java.util.Optional;

import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.resume.Cacheable;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.spi.FactoryFinder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class AdapterHelper {
private static final Logger LOG = LoggerFactory.getLogger(AdapterHelper.class);
private static final String ADAPTER_PROPERTIES = "/org/apache/camel/resume/";
private static final String ADAPTER_KEY = "Adapter";

private AdapterHelper() {
}

public static ResumeAdapter eval(CamelContext context, Consumer consumer) {
public static ResumeAdapter eval(CamelContext context, ResumeAware resumeAware) {
assert context != null;
assert consumer != null;
assert resumeAware != null;

LOG.debug("Using the factory finder to search for the resume adapter");
final FactoryFinder factoryFinder = context.adapt(ExtendedCamelContext.class).getFactoryFinder(ADAPTER_PROPERTIES);
final FactoryFinder factoryFinder = context.adapt(ExtendedCamelContext.class).getFactoryFinder(FactoryFinder.DEFAULT_PATH);


LOG.debug("Creating a new resume adapter");
final Optional<ResumeAdapter> adapterOptional = factoryFinder.newInstance(ADAPTER_KEY, ResumeAdapter.class);
Optional<ResumeAdapter> adapterOptional = factoryFinder.newInstance(resumeAware.adapterFactoryService(), ResumeAdapter.class);

if (!adapterOptional.isPresent()) {

throw new RuntimeException("Cannot find a resume adapter class in the consumer classpath or in the registry");
}

Expand Down
Expand Up @@ -25,14 +25,17 @@
import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.ResumeAction;
import org.apache.camel.resume.ResumeActionAware;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.spi.annotations.JdkService;

/**
* A simple resume adapter that support caching, deserialization and actions. This is usually suitable for supporting
* resume operations that have simple cache storage requirements, but delegate the resume action to the integrations
* (i.e.: such as when resuming from database components, where the resume operation can only be determined by the
* integration itself)
*/
@JdkService(ResumeAdapter.RESUME_ADAPTER_FACTORY)
public class ResumeActionAwareAdapter implements ResumeActionAware, Cacheable, Deserializable {
private ResumeCache<Object> cache;
private ResumeAction resumeAction;
Expand Down

0 comments on commit ac1e460

Please sign in to comment.