Skip to content

Commit

Permalink
added test case and fix for CAMEL-95 - many thanks to Aaron Crickenbe…
Browse files Browse the repository at this point in the history
…rger!

git-svn-id: https://svn.apache.org/repos/asf/activemq/camel/trunk@564495 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
jstrachan committed Aug 10, 2007
1 parent c5cbb7f commit abba495
Show file tree
Hide file tree
Showing 15 changed files with 624 additions and 52 deletions.
1 change: 1 addition & 0 deletions camel-core/src/main/data/bar.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
<hello>bar</hello>
1 change: 1 addition & 0 deletions camel-core/src/main/data/foo.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
<hello>foo</hello>
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
*/
package org.apache.camel.component.file;

import java.io.File;

import org.apache.camel.Processor;
import org.apache.camel.component.file.strategy.FileStrategy;
import org.apache.camel.component.file.strategy.FileProcessStrategy;
import org.apache.camel.impl.ScheduledPollConsumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.File;

/**
* @version $Revision: 523016 $
*/
Expand All @@ -47,41 +47,57 @@ protected void poll() throws Exception {
protected void pollFileOrDirectory(File fileOrDirectory, boolean processDir) {
if (!fileOrDirectory.isDirectory()) {
pollFile(fileOrDirectory); // process the file
} else if (processDir) {
}
else if (processDir) {
if (isValidFile(fileOrDirectory)) {
LOG.debug("Polling directory " + fileOrDirectory);
File[] files = fileOrDirectory.listFiles();
for (int i = 0; i < files.length; i++) {
pollFileOrDirectory(files[i], isRecursive()); // self-recursion
}
}
} else {
}
else {
LOG.debug("Skipping directory " + fileOrDirectory);
}
}

protected void pollFile(final File file) {
if (file.exists() && file.lastModified() > lastPollTime) {
if (isValidFile(file)) {
FileStrategy strategy = endpoint.getFileStrategy();
FileExchange exchange = endpoint.createExchange(file);

try {
if (!file.exists()) {
return;
}
if (isValidFile(file)) {
// we only care about file modified times if we are not deleting/moving files
if (endpoint.isNoop()) {
long fileModified = file.lastModified();
if (fileModified <= lastPollTime) {
if (LOG.isDebugEnabled()) {
LOG.debug("About to process file: " + file + " using exchange: " + exchange);
LOG.debug("Ignoring file: " + file + " as modified time: " + fileModified + " less than last poll time: " + lastPollTime);
}
if (strategy.begin(endpoint, exchange, file)) {
getProcessor().process(exchange);
strategy.commit(endpoint, exchange, file);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(endpoint + " cannot process file: " + file);
}
return;
}
}

FileProcessStrategy processStrategy = endpoint.getFileStrategy();
FileExchange exchange = endpoint.createExchange(file);

try {
if (LOG.isDebugEnabled()) {
LOG.debug("About to process file: " + file + " using exchange: " + exchange);
}
if (processStrategy.begin(endpoint, exchange, file)) {
getProcessor().process(exchange);
processStrategy.commit(endpoint, exchange, file);
}
else {
if (LOG.isDebugEnabled()) {
LOG.debug(endpoint + " cannot process file: " + file);
}
} catch (Throwable e) {
handleException(e);
}
}
catch (Throwable e) {
handleException(e);
}
}
}

Expand All @@ -103,9 +119,19 @@ protected boolean isMatched(File file) {
}
}
String[] prefixes = endpoint.getExcludedNamePrefixes();
for (String prefix : prefixes) {
if (name.startsWith(prefix)) {
return false;
if (prefixes != null) {
for (String prefix : prefixes) {
if (name.startsWith(prefix)) {
return false;
}
}
}
String[] postfixes = endpoint.getExcludedNamePostfixes();
if (postfixes != null) {
for (String postfix : postfixes) {
if (name.endsWith(postfix)) {
return false;
}
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@
*/
package org.apache.camel.component.file;

import java.io.File;

import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.component.file.strategy.DeleteFileStrategy;
import org.apache.camel.component.file.strategy.FileStrategy;
import org.apache.camel.component.file.strategy.NoOpFileStrategy;
import org.apache.camel.component.file.strategy.RenameFileStrategy;
import org.apache.camel.component.file.strategy.DefaultFileRenamer;
import org.apache.camel.component.file.strategy.DeleteFileProcessStrategy;
import org.apache.camel.component.file.strategy.FileProcessStrategy;
import org.apache.camel.component.file.strategy.FileProcessStrategySupport;
import org.apache.camel.component.file.strategy.NoOpFileProcessStrategy;
import org.apache.camel.component.file.strategy.RenameFileProcessStrategy;
import org.apache.camel.impl.ScheduledPollEndpoint;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.File;

/**
* A <a href="http://activemq.apache.org/camel/file.html">File Endpoint</a> for
* working with file systems
Expand All @@ -38,7 +40,7 @@
public class FileEndpoint extends ScheduledPollEndpoint<FileExchange> {
private static final transient Log LOG = LogFactory.getLog(FileEndpoint.class);
private File file;
private FileStrategy fileStrategy;
private FileProcessStrategy fileProcessStrategy;
private boolean autoCreate = true;
private boolean lock = true;
private boolean delete;
Expand All @@ -47,6 +49,8 @@ public class FileEndpoint extends ScheduledPollEndpoint<FileExchange> {
private String moveNamePrefix;
private String moveNamePostfix;
private String[] excludedNamePrefixes = {"."};
private String[] excludedNamePostfixes = { FileProcessStrategySupport.DEFAULT_LOCK_FILE_POSTFIX };
private int bufferSize = 128 * 1024;

protected FileEndpoint(File file, String endpointUri, FileComponent component) {
super(endpointUri, component);
Expand Down Expand Up @@ -117,22 +121,22 @@ public void setAutoCreate(boolean autoCreate) {
this.autoCreate = autoCreate;
}

public FileStrategy getFileStrategy() {
if (fileStrategy == null) {
fileStrategy = createFileStrategy();
LOG.debug("" + this + " using strategy: " + fileStrategy);
public FileProcessStrategy getFileStrategy() {
if (fileProcessStrategy == null) {
fileProcessStrategy = createFileStrategy();
LOG.debug("" + this + " using strategy: " + fileProcessStrategy);
}
return fileStrategy;
return fileProcessStrategy;
}

/**
* Sets the strategy to be used when the file has been processed such as
* deleting or renaming it etc.
*
* @param fileStrategy the new stategy to use
* @param fileProcessStrategy the new stategy to use
*/
public void setFileStrategy(FileStrategy fileStrategy) {
this.fileStrategy = fileStrategy;
public void setFileStrategy(FileProcessStrategy fileProcessStrategy) {
this.fileProcessStrategy = fileProcessStrategy;
}

public boolean isDelete() {
Expand Down Expand Up @@ -160,7 +164,7 @@ public String getMoveNamePostfix() {
* the files from * to *.done set this value to ".done"
*
* @param moveNamePostfix
* @see RenameFileStrategy#setNamePostfix(String)
* @see DefaultFileRenamer#setNamePostfix(String)
*/
public void setMoveNamePostfix(String moveNamePostfix) {
this.moveNamePostfix = moveNamePostfix;
Expand All @@ -175,7 +179,7 @@ public String getMoveNamePrefix() {
* processed files into a hidden directory called ".camel" set this value to
* ".camel/"
*
* @see RenameFileStrategy#setNamePrefix(String)
* @see DefaultFileRenamer#setNamePrefix(String)
*/
public void setMoveNamePrefix(String moveNamePrefix) {
this.moveNamePrefix = moveNamePrefix;
Expand All @@ -193,13 +197,25 @@ public void setExcludedNamePrefixes(String[] excludedNamePrefixes) {
this.excludedNamePrefixes = excludedNamePrefixes;
}

public String[] getExcludedNamePostfixes() {
return excludedNamePostfixes;
}

/**
* Sets the excluded file name postfixes, such as {@link FileProcessStrategySupport#DEFAULT_LOCK_FILE_POSTFIX}
* to ignore lock files by default.
*/
public void setExcludedNamePostfixes(String[] excludedNamePostfixes) {
this.excludedNamePostfixes = excludedNamePostfixes;
}

public boolean isNoop() {
return noop;
}

/**
* If set to true then the default {@link FileStrategy} will be to use the
* {@link NoOpFileStrategy} to not move or copy processed files
* If set to true then the default {@link FileProcessStrategy} will be to use the
* {@link NoOpFileProcessStrategy} to not move or copy processed files
*
* @param noop
*/
Expand All @@ -221,22 +237,33 @@ public void setAppend(boolean append) {
this.append = append;
}

public int getBufferSize() {
return bufferSize;
}

/**
* Sets the buffer size used to read/write files
*/
public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}

/**
* A strategy method to lazily create the file strategy
*/
protected FileStrategy createFileStrategy() {
protected FileProcessStrategy createFileStrategy() {
if (isNoop()) {
return new NoOpFileStrategy();
return new NoOpFileProcessStrategy();
} else if (moveNamePostfix != null || moveNamePrefix != null) {
if (isDelete()) {
throw new IllegalArgumentException(
"You cannot set the deleteFiles property and a moveFilenamePostfix or moveFilenamePrefix");
}
return new RenameFileStrategy(isLock(), moveNamePrefix, moveNamePostfix);
return new RenameFileProcessStrategy(isLock(), moveNamePrefix, moveNamePostfix);
} else if (isDelete()) {
return new DeleteFileStrategy(isLock());
return new DeleteFileProcessStrategy(isLock());
} else {
return new RenameFileStrategy(isLock());
return new RenameFileProcessStrategy(isLock());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

/**
* A {@link Producer} implementation for File
*
*
* @version $Revision: 523016 $
*/
public class FileProducer extends DefaultProducer {
Expand All @@ -57,6 +58,58 @@ public void process(Exchange exchange) throws Exception {
}

public void process(FileExchange exchange) throws Exception {
InputStream in = ExchangeHelper.getMandatoryInBody(exchange, InputStream.class);
File file = createFileName(exchange);
buildDirectory(file);
if (LOG.isDebugEnabled()) {
LOG.debug("About to write to: " + file + " from exchange: " + exchange);
}
FileChannel fc = null;
try {
if (getEndpoint().isAppend()) {
fc = new RandomAccessFile(file, "rw").getChannel();
fc.position(fc.size());
}
else {
fc = new FileOutputStream(file).getChannel();
}
int size = getEndpoint().getBufferSize();
byte[] buffer = new byte[size];
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
while (true) {
int count = in.read(buffer);
if (count <= 0) {
break;
}
else if (count < size) {
byteBuffer = ByteBuffer.wrap(buffer, 0, count);
fc.write(byteBuffer);
break;
}
else {
fc.write(byteBuffer);
}
}
}
finally {
if (in != null) {
try {
in.close();
}
catch (IOException e) {
LOG.warn("Failed to close input: " + e, e);
}
}
if (fc != null) {
try {
fc.close();
}
catch (IOException e) {
LOG.warn("Failed to close output: " + e, e);
}
}
}
/*
ByteBuffer payload = exchange.getIn().getBody(ByteBuffer.class);
if (payload == null) {
InputStream in = ExchangeHelper.getMandatoryInBody(exchange, InputStream.class);
Expand Down Expand Up @@ -87,6 +140,7 @@ public void process(FileExchange exchange) throws Exception {
fc.close();
}
}
*/
}

protected File createFileName(FileExchange exchange) {
Expand All @@ -98,13 +152,15 @@ protected File createFileName(FileExchange exchange) {
File answer = new File(endpointFile, name);
if (answer.isDirectory()) {
return new File(answer, fileName);
} else {
}
else {
return answer;
}
}
if (endpointFile != null && endpointFile.isDirectory()) {
return new File(endpointFile, fileName);
} else {
}
else {
return new File(fileName);
}
}
Expand Down
Loading

0 comments on commit abba495

Please sign in to comment.