Skip to content

Commit

Permalink
INT-2855 Fix for Max OSX; Other Improvements
Browse files Browse the repository at this point in the history
 - Schedule Instead of Immediate Re-Run
     Mac OSX 'tail' terminates immediately when the file is
     missing.
 - Add delay when rescheduling a failed process
 - Add support for 'end' and 'reopen' options to Apache Tailer implementation
 - Clean up process destruction logic after failures
 - Add more debugging to threads
 - Add a JUnit @rule so the OSDFTMP test doesn't fail on Windows
 - Add file path normalization so the tests pass on Windows
  • Loading branch information
garyrussell committed May 17, 2013
1 parent e1e3f3a commit b34e311
Show file tree
Hide file tree
Showing 12 changed files with 416 additions and 47 deletions.
Expand Up @@ -24,6 +24,7 @@
import org.springframework.integration.file.tail.ApacheCommonsFileTailingMessageProducer;
import org.springframework.integration.file.tail.FileTailingMessageProducerSupport;
import org.springframework.integration.file.tail.OSDelegatingFileTailingMessageProducer;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;

/**
Expand All @@ -40,10 +41,16 @@ public class FileTailInboundChannelAdapterFactoryBean extends AbstractFactoryBea

private volatile TaskExecutor taskExecutor;

private volatile TaskScheduler taskScheduler;

private volatile Long delay;

private volatile Long fileDelay;

private volatile Boolean end;

private volatile Boolean reopen;

private volatile FileTailingMessageProducerSupport adapter;

private volatile String beanName;
Expand All @@ -66,6 +73,10 @@ public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}

public void setTaskScheduler(TaskScheduler taskScheduler) {
this.taskScheduler = taskScheduler;
}

public void setDelay(long delay) {
this.delay = delay;
}
Expand All @@ -74,6 +85,14 @@ public void setFileDelay(long fileDelay) {
this.fileDelay = fileDelay;
}

public void setEnd(Boolean end) {
this.end = end;
}

public void setReopen(Boolean reopen) {
this.reopen = reopen;
}

@Override
public void setBeanName(String name) {
this.beanName = name;
Expand All @@ -99,24 +118,33 @@ public Class<?> getObjectType() {
@Override
protected FileTailingMessageProducerSupport createInstance() throws Exception {
FileTailingMessageProducerSupport adapter;
if (this.delay == null && this.fileDelay == null) {
if (this.delay == null && this.end == null && this.reopen == null) {
adapter = new OSDelegatingFileTailingMessageProducer();
if (this.nativeOptions != null) {
((OSDelegatingFileTailingMessageProducer) adapter).setOptions(this.nativeOptions);
}
}
else {
Assert.isNull(this.nativeOptions, "Cannot have 'delay' or 'file-delay' with a native adapter");
Assert.isNull(this.nativeOptions, "Cannot have 'delay', 'end' or 'reopen' with a native adapter");
adapter = new ApacheCommonsFileTailingMessageProducer();
if (this.delay != null) {
((ApacheCommonsFileTailingMessageProducer) adapter).setPollingDelay(this.delay);
}
if (this.fileDelay != null) {
((ApacheCommonsFileTailingMessageProducer) adapter).setMissingFileDelay(this.fileDelay);
if (this.end != null) {
((ApacheCommonsFileTailingMessageProducer) adapter).setEnd(this.end);
}
if (this.reopen != null) {
((ApacheCommonsFileTailingMessageProducer) adapter).setReopen(this.reopen);
}
}
adapter.setFile(this.file);
adapter.setTaskExecutor(this.taskExecutor);
if (this.taskScheduler != null) {
adapter.setTaskScheduler(this.taskScheduler);
}
if (this.fileDelay != null) {
adapter.setTailAttemptsDelay(this.fileDelay);
}
adapter.setOutputChannel(outputChannel);
adapter.setBeanName(this.beanName);
if (this.autoStartup != null) {
Expand Down
Expand Up @@ -35,10 +35,16 @@ public class FileTailInboundChannelAdapterParser extends AbstractChannelAdapterP
protected AbstractBeanDefinition doParse(Element element, ParserContext parserContext, String channelName) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(FileTailInboundChannelAdapterFactoryBean.class);

if (element.hasAttribute("native-command")) {
if (element.hasAttribute("delay") || element.hasAttribute("file-delay")) {
if (element.hasAttribute("delay") || element.hasAttribute("end") || element.hasAttribute("reopen")) {
if (element.hasAttribute("native-options")) {
parserContext.getReaderContext().error(
"You cannot have 'delay' or 'file-delay' if 'native-command' is provided", element);
"You cannot have 'native-options' if one or more of " +
"'delay', 'end' or 'reopen' is provided", element);
}
if (element.hasAttribute("task-scheduler")) {
parserContext.getReaderContext().error(
"You cannot have 'task-scheduler' one or more of " +
"'delay', 'end' or 'reopen' is provided", element);
}
}

Expand All @@ -48,8 +54,11 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "native-options");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "file");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "task-executor");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "task-scheduler");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "delay");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "file-delay");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "end");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "reopen");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "auto-startup");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "phase");

Expand Down
Expand Up @@ -32,14 +32,34 @@ public class ApacheCommonsFileTailingMessageProducer extends FileTailingMessageP

private volatile long pollingDelay = 1000;

private volatile long missingFileDelay = 5000;
private volatile boolean end = true;

private volatile boolean reopen = false;

/**
* The delay between checks of the file for new content in milliseconds.
* @param pollingDelay The delay.
*/
public void setPollingDelay(long pollingDelay) {
this.pollingDelay = pollingDelay;
}

public void setMissingFileDelay(long missingFileDelay) {
this.missingFileDelay = missingFileDelay;
/**
* If true, tail from the end of the file, otherwise
* include all lines from the beginning. Default true.
* @param end true or false
*/
public void setEnd(boolean end) {
this.end = end;
}

/**
* If true, close and reopen the file between reading chunks;
* default false.
* @param reopen true or false.
*/
public void setReopen(boolean reopen) {
this.reopen = reopen;
}

@Override
Expand All @@ -50,7 +70,7 @@ public String getComponentType() {
@Override
protected void doStart() {
super.doStart();
Tailer tailer = new Tailer(this.getFile(), this, this.pollingDelay);
Tailer tailer = new Tailer(this.getFile(), this, this.pollingDelay, this.end, this.reopen);
this.getTaskExecutor().execute(tailer);
this.tailer = tailer;
}
Expand All @@ -69,7 +89,7 @@ public void init(Tailer tailer) {
public void fileNotFound() {
this.publish("File not found:" + this.getFile().getAbsolutePath());
try {
Thread.sleep(this.missingFileDelay);
Thread.sleep(this.getMissingFileDelay());
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Expand Up @@ -44,12 +44,17 @@ public abstract class FileTailingMessageProducerSupport extends MessageProducerS

private volatile TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();

private volatile long tailAttemptsDelay = 5000;

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.eventPublisher = applicationEventPublisher;
}

/**
* The name of the file you wish to tail.
* @param file The absolute path of the file.
*/
public void setFile(File file) {
Assert.notNull("'file' cannot be null");
this.file = file;
Expand All @@ -62,11 +67,29 @@ protected File getFile() {
return this.file;
}

/**
* A task executor; default is a {@link SimpleAsyncTaskExecutor}.
* @param taskExecutor
*/
public void setTaskExecutor(TaskExecutor taskExecutor) {
Assert.notNull("'taskExecutor' cannot be null");
this.taskExecutor = taskExecutor;
}

/**
* The delay in milliseconds between attempts to tail a non-existent file,
* or between attempts to execute a process if it fails for any reason.
* @param missingFileDelay the delay.
*/
public void setTailAttemptsDelay(long tailAttemptsDelay) {
Assert.isTrue(tailAttemptsDelay > 0, "'tailAttemptsDelay' must be > 0");
this.tailAttemptsDelay = tailAttemptsDelay;
}

protected long getMissingFileDelay() {
return tailAttemptsDelay;
}

protected TaskExecutor getTaskExecutor() {
return this.taskExecutor;
}
Expand Down

0 comments on commit b34e311

Please sign in to comment.