diff --git a/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidSymmetricEngine.java b/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidSymmetricEngine.java index 76a393729b..69fb8f29ae 100644 --- a/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidSymmetricEngine.java +++ b/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidSymmetricEngine.java @@ -104,7 +104,7 @@ protected IDatabasePlatform createDatabasePlatform(TypedProperties properties) { protected IStagingManager createStagingManager() { String directory = androidContext.getCacheDir().toString(); log.info("Staging manager directory: " + directory); - return new StagingManager(directory); + return new StagingManager(directory,false); } @Override diff --git a/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriterTest.java b/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriterTest.java index 91ef4710f0..10ac5eb81a 100644 --- a/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriterTest.java +++ b/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriterTest.java @@ -54,7 +54,7 @@ public static void setup() throws Exception { .equals("net.sourceforge.jtds.jdbc.Driver")) { platform = DbTestUtils.createDatabasePlatform(DbTestUtils.ROOT); platform.createDatabase(platform.readDatabaseFromXml("/testBulkWriter.xml", true), true, false); - stagingManager = new StagingManager("target/tmp"); + stagingManager = new StagingManager("target/tmp",false); } } diff --git a/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MySqlBulkDatabaseWriterTest.java b/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MySqlBulkDatabaseWriterTest.java index 1c3fa8eb14..f1773f1c11 100644 --- a/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MySqlBulkDatabaseWriterTest.java +++ b/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MySqlBulkDatabaseWriterTest.java @@ -45,7 +45,7 @@ public static void setup() throws Exception { .equals("com.mysql.jdbc.Driver")) { platform = DbTestUtils.createDatabasePlatform(DbTestUtils.ROOT); platform.createDatabase(platform.readDatabaseFromXml("/testBulkWriter.xml", true), true, false); - stagingManager = new StagingManager("tmp"); + stagingManager = new StagingManager("tmp",false); } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java index 5e31d22253..baf73d4a2f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java @@ -18,7 +18,7 @@ public class BatchStagingManager extends StagingManager { ISymmetricEngine engine; public BatchStagingManager(ISymmetricEngine engine, String directory) { - super(directory); + super(directory,engine.getParameterService().is(ParameterConstants.CLUSTER_LOCKING_ENABLED)); this.engine = engine; } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java index dbaaf5ca27..8ff9bf5c87 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java @@ -45,13 +45,16 @@ public class StagingManager implements IStagingManager { protected Set resourcePaths; protected Map inUse; + + boolean clusterEnabled; - public StagingManager(String directory) { + public StagingManager(String directory, boolean clusterEnabled) { log.info("The staging directory was initialized at the following location: " + directory); this.directory = new File(directory); this.directory.mkdirs(); this.resourcePaths = Collections.synchronizedSet(new TreeSet()); this.inUse = new ConcurrentHashMap(); + this.clusterEnabled = clusterEnabled; refreshResourceList(); } @@ -176,8 +179,15 @@ protected String buildFilePath(Object... path) { public IStagedResource find(String path) { IStagedResource resource = inUse.get(path); - if (resource == null && resourcePaths.contains(path)) { - resource = new StagedResource(directory, path, this); + if (resource == null) { + boolean foundResourcePath = resourcePaths.contains(path); + if (clusterEnabled) { + refreshResourceList(); + foundResourcePath = resourcePaths.contains(path); + } + if (foundResourcePath) { + resource = new StagedResource(directory, path, this); + } } return resource; } diff --git a/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/StagingDataWriterTest.java b/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/StagingDataWriterTest.java index dbcd91b5e5..08260e367e 100644 --- a/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/StagingDataWriterTest.java +++ b/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/StagingDataWriterTest.java @@ -78,7 +78,7 @@ public void readThenWrite(long threshold) throws Exception { String origCsv = IOUtils.toString(is); is.close(); - StagingManager stagingManager = new StagingManager(DIR.getAbsolutePath()); + StagingManager stagingManager = new StagingManager(DIR.getAbsolutePath(),false); ProtocolDataReader reader = new ProtocolDataReader(BatchType.LOAD, "test", origCsv); StagingDataWriter writer = new StagingDataWriter(threshold, false, "aaa", "test", stagingManager, new BatchListener()); DataProcessor processor = new DataProcessor(reader, writer, "test");