Skip to content

Commit

Permalink
Allow to use plugin resources without aggregator/processor
Browse files Browse the repository at this point in the history
  • Loading branch information
smecsia committed May 15, 2015
1 parent 7d589e2 commit bd5c35c
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,5 @@ public interface PluginsService {
*/
String getEngineName();

boolean pluginCanConsume(Plugin plugin);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import ru.yandex.qatools.camelot.core.builders.BuildersFactory;
import ru.yandex.qatools.camelot.core.builders.BuildersFactoryImpl;
import ru.yandex.qatools.camelot.core.builders.ResourceBuilder;
import ru.yandex.qatools.camelot.error.MetadataException;
import ru.yandex.qatools.camelot.error.PluginsSystemException;

import javax.xml.bind.JAXBContext;
Expand All @@ -28,6 +29,7 @@
import java.net.URISyntaxException;
import java.util.*;

import static java.lang.String.format;
import static jodd.util.StringUtil.isEmpty;
import static ru.yandex.qatools.camelot.api.Constants.Headers.BODY_CLASS;
import static ru.yandex.qatools.camelot.util.IOUtils.readResource;
Expand Down Expand Up @@ -309,6 +311,15 @@ public void setEngineName(String engineName) {
this.engineName = engineName;
}

/**
* Returns true if plugin contains aggregator or processor
* @param plugin
*/
@Override
public boolean pluginCanConsume(Plugin plugin) {
return !isEmpty(plugin.getAggregator()) || !isEmpty(plugin.getProcessor());
}

/**
* Is currently plugins are reloading
*/
Expand Down Expand Up @@ -423,7 +434,7 @@ protected void initBasicPluginRoutes(final Plugin plugin) throws Exception {
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
if (isListenersEnabled()) {
if (isListenersEnabled() && pluginCanConsume(plugin)) {
from(plugin.getContext().getEndpoints().getEndpointListenerUri())
.log(LoggingLevel.DEBUG, "Plugin " + plugin.getId() + " endpoint listener input " +
"${in.headers." + BODY_CLASS + "}")
Expand Down Expand Up @@ -471,7 +482,14 @@ protected void initPluginsContexts(PluginsConfig config) throws Exception {
protected void initPluginContext(PluginsSource source, final Plugin plugin, PluginContext context, ClassLoader classLoader) throws Exception {
context.setSource(source);
plugin.setContext(context);
context.setPluginClass(isEmpty(plugin.getAggregator()) ? plugin.getProcessor() : plugin.getAggregator());
if (pluginCanConsume(plugin)) {
context.setPluginClass(isEmpty(plugin.getAggregator()) ? plugin.getProcessor() : plugin.getAggregator());
} else {
context.setPluginClass(plugin.getResource());
}
if (isEmpty(context.getPluginClass())) {
throw new MetadataException(format("Plugin class cannot be empty for plugin %s!", plugin.getId()));
}
if (isEmpty(plugin.getId())) {
plugin.setId(defaultPluginId(plugin));
}
Expand All @@ -483,15 +501,21 @@ protected void initPluginContext(PluginsSource source, final Plugin plugin, Plug
context.setId(plugin.getId());
context.setClassLoader(classLoader);
context.setInterop(interop);
context.setStorage(repositoryBuilder.initStorage(plugin));
context.setAggregationRepo(repositoryBuilder.initWritable(plugin));
context.setRepository(repositoryBuilder.initReadonly(plugin));
context.setOutput(initEventProducer(camelContext, endpoints.getProducerUri()));
context.setMainInput(initEventProducer(camelContext, endpoints.getMainInputUri()));
context.setInput(initEventProducer(camelContext, endpoints.getConsumerUri()));
context.setClientSendersProvider(new ClientSendersProviderImpl(camelContext, endpoints.getClientSendUri()));
context.setListener(new EndpointListenerImpl(context));
context.setInjector(getContextInjector());

if (pluginCanConsume(plugin)) {
context.setInput(initEventProducer(camelContext, endpoints.getConsumerUri()));
context.setStorage(repositoryBuilder.initStorage(plugin));
context.setAggregationRepo(repositoryBuilder.initWritable(plugin));
context.setRepository(repositoryBuilder.initReadonly(plugin));
context.setListener(new EndpointListenerImpl(context));
} else {
logger.warn("Plugin {} does not contain processing code! It contains resource only", plugin.getId());
}

context.setAppConfig(initPluginAppConfig(getAppConfig(), classLoader, PROPS_PATH));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ public void setPluginInitializer(PluginInitializer pluginInitializer) {
private void buildRoutes(final PluginsConfig config) throws Exception {
for (final PluginsSource source : config.getSources()) {
for (final Plugin plugin : source.getPlugins()) {
addPluginRoutes(plugin);
if (pluginCanConsume(plugin)) {
addPluginRoutes(plugin);
}
}
}
}
Expand Down Expand Up @@ -210,8 +212,9 @@ public void configure() throws Exception {
}
// The rest of the plugins messages go to OUTPUT
for (String fromId : pluginsMap.keySet()) {
if (!consumersMap.keySet().contains(fromId)) {
final PluginEndpoints endpoints = pluginsMap.get(fromId).getContext().getEndpoints();
final Plugin plugin = pluginsMap.get(fromId);
if (!consumersMap.keySet().contains(fromId) && pluginCanConsume(plugin)) {
final PluginEndpoints endpoints = plugin.getContext().getEndpoints();
from(endpoints.getOutputUri())
.routeId(endpoints.getOutputRouteId())
.multicast()
Expand Down Expand Up @@ -249,12 +252,14 @@ public Object transform(Object pluginId) {
private Map<String, Set<String>> getPluginsConsumersMap(final Map<String, Plugin> pluginsMap) {
Map<String, Set<String>> result = new HashMap<>();
for (final Plugin plugin : pluginsMap.values()) {
final String from = plugin.getSource();
final String id = plugin.getId();
if (!result.containsKey(from)) {
result.put(from, new HashSet<String>());
if (pluginCanConsume(plugin)) {
final String from = plugin.getSource();
final String id = plugin.getId();
if (!result.containsKey(from)) {
result.put(from, new HashSet<String>());
}
result.get(from).add(id);
}
result.get(from).add(id);
}
return result;
}
Expand All @@ -274,9 +279,11 @@ private void addPluginRoutes(final Plugin plugin) throws Exception {
* Invoke init methods upon the plugin
*/
protected void initPlugin(Plugin plugin) throws Exception {
getPluginInitializer().init(plugin);
plugin.getContext().setSchedulerBuilder(schedulerBuildersFactory.build(plugin));
plugin.getContext().getSchedulerBuilder().schedule();
if (pluginCanConsume(plugin)) {
getPluginInitializer().init(plugin);
plugin.getContext().setSchedulerBuilder(schedulerBuildersFactory.build(plugin));
plugin.getContext().getSchedulerBuilder().schedule();
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion camelot-core/src/main/resources/config.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
</xsd:complexType>
<xsd:complexType name="Plugin">
<xsd:sequence>
<xsd:choice minOccurs="1" maxOccurs="1">
<xsd:choice minOccurs="0">
<xsd:element name="aggregator" type="xsd:string" maxOccurs="1" minOccurs="1"/>
<xsd:element name="processor" type="xsd:string" maxOccurs="1" minOccurs="1"/>
</xsd:choice>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
import ru.yandex.qatools.camelot.api.EndpointListener;
import ru.yandex.qatools.camelot.api.EventProducer;
import ru.yandex.qatools.camelot.api.Storage;
import ru.yandex.qatools.camelot.api.annotations.Input;
import ru.yandex.qatools.camelot.api.annotations.Listener;
import ru.yandex.qatools.camelot.api.annotations.PluginStorage;
import ru.yandex.qatools.camelot.api.annotations.Repository;
import ru.yandex.qatools.camelot.api.annotations.*;
import ru.yandex.qatools.camelot.core.beans.CounterState;
import ru.yandex.qatools.camelot.core.builders.ReadonlyAggregatorRepository;

Expand All @@ -31,6 +28,9 @@ public class AllSkippedService {
@Input
EventProducer producer;

@MainInput
EventProducer mainProducer;

@Repository
private AggregatorRepository repository;

Expand All @@ -47,6 +47,10 @@ public EventProducer getProducer() {
return producer;
}

public EventProducer getMainProducer() {
return mainProducer;
}

public AggregatorRepository getRepository() {
return repository;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package ru.yandex.qatools.camelot.core.plugins;

import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import ru.yandex.qatools.camelot.api.AggregatorRepository;
import ru.yandex.qatools.camelot.api.EventProducer;
import ru.yandex.qatools.camelot.api.annotations.Input;
import ru.yandex.qatools.camelot.api.annotations.MainInput;
import ru.yandex.qatools.camelot.api.annotations.Repository;
import ru.yandex.qatools.camelot.core.beans.CounterState;
import ru.yandex.qatools.camelot.core.builders.ReadonlyAggregatorRepository;

import javax.ws.rs.Path;

/**
* @author Ilya Sadykov (mailto: smecsia@yandex-team.ru)
*/
@Component
@Scope("request")
@Path("/resonly")
public class ResourceOnlyService {

@Input
EventProducer producer;

@MainInput
EventProducer mainProducer;

@Repository
private AggregatorRepository repository;

@Repository(ByLabelBrokenAggregator.class)
private ReadonlyAggregatorRepository<CounterState> counterRepo;

public EventProducer getProducer() {
return producer;
}

public EventProducer getMainProducer() {
return mainProducer;
}

public AggregatorRepository getRepository() {
return repository;
}

public ReadonlyAggregatorRepository<CounterState> getCounterRepo() {
return counterRepo;
}
}
3 changes: 3 additions & 0 deletions camelot-core/src/test/resources/plugins-two.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,8 @@
<plugin id="dependent" source="by-custom-header">
<aggregator>ru.yandex.qatools.camelot.core.plugins.DependentAggregator</aggregator>
</plugin>
<plugin id="resource-only">
<resource>ru.yandex.qatools.camelot.core.plugins.ResourceOnlyService</resource>
</plugin>
</source>
</plugins-config>
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import ru.yandex.qatools.camelot.core.beans.StopAllSkipped;
import ru.yandex.qatools.camelot.core.plugins.AllSkippedService;
import ru.yandex.qatools.camelot.core.plugins.ResourceOnlyService;

import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import static org.springframework.test.annotation.DirtiesContext.ClassMode.AFTER_CLASS;
import static ru.yandex.qatools.camelot.core.util.TestEventGenerator.createTestSkipped;
import static ru.yandex.qatools.camelot.core.util.TestEventsUtils.copyOf;
Expand All @@ -30,21 +29,34 @@ public class ResourceTest extends BasicAggregatorsTest {
@Autowired
ApplicationContext applicationContext;

@Test
public void testResourceWithoutPluginClass() throws Exception {
final Object bean = applicationContext.getBean(pluginResourceBeanName("resource-only"));
assertNotNull("Resource must be added to the context", bean);
assertTrue("Resource must be added to the context", bean instanceof ResourceOnlyService);
final ResourceOnlyService resOnly = (ResourceOnlyService) bean;
assertNull("Resource must not contain producer", resOnly.getProducer());
assertNotNull("Resource must contain main producer", resOnly.getMainProducer());
assertNull("Resource must not contain repository", resOnly.getRepository());
assertNotNull("Resource must contain foreign repository", resOnly.getCounterRepo());
resOnly.getMainProducer().produce(createTestSkipped()); // test can produce to main queue (no error)
}

@Test
public void testResourceIsAdded() throws Exception {
final Object bean = applicationContext.getBean(pluginResourceBeanName("all-skipped"));
assertNotNull("Resouce must be added to the context", bean);
assertTrue("Resouce must be added to the context", bean instanceof AllSkippedService);
assertNotNull("Resource must be added to the context", bean);
assertTrue("Resource must be added to the context", bean instanceof AllSkippedService);
final AllSkippedService allSkipped = (AllSkippedService) bean;
assertNotNull("Resouce must contain producer", allSkipped.getProducer());
assertNotNull("Resouce must contain repository", allSkipped.getRepository());
assertNotNull("Resource must contain producer", allSkipped.getProducer());
assertNotNull("Resource must contain repository", allSkipped.getRepository());
assertThat(allSkipped.getCounterRepo(), not(sameInstance(allSkipped.getRepository())));
}

@Test
public void testResourceEndpointListener() throws Exception {
final AllSkippedService service = (AllSkippedService) applicationContext.getBean(pluginResourceBeanName("all-skipped"));
assertNotNull("Resouce must contain endpoint listener ", service.getListener());
assertNotNull("Resource must contain endpoint listener ", service.getListener());
new Thread() {
@Override
public void run() {
Expand Down
3 changes: 3 additions & 0 deletions camelot-web/src/test/resources/plugins-web.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@
<plugin id="test-started" brokerConfig="?maxConcurrentConsumers=5">
<aggregator>ru.yandex.qatools.camelot.core.plugins.TestStartedCounterAggregator</aggregator>
</plugin>
<plugin id="resource-only">
<resource>ru.yandex.qatools.camelot.core.plugins.ResourceOnlyService</resource>
</plugin>
</source>
</plugins-config>

0 comments on commit bd5c35c

Please sign in to comment.