diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java index 2f9fa9d539..1c19de6b7e 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java @@ -61,13 +61,13 @@ public class StagedResource implements IStagedResource { private State state; - private Map outputStreams = null; + private OutputStream outputStream = null; private Map inputStreams = null; private Map readers = new HashMap(); - private Map writers = new HashMap(); + private BufferedWriter writer; private StagingManager stagingManager; @@ -104,9 +104,9 @@ public StagedResource(long threshold, File directory, String path, StagingManage } public boolean isInUse() { - return readers.size() > 0 || writers.size() > 0 || + return readers.size() > 0 || writer != null || (inputStreams != null && inputStreams.size() > 0) || - (outputStreams != null && outputStreams.size() > 0); + outputStream != null; } public boolean isFileResource() { @@ -126,6 +126,10 @@ public void setState(State state) { File newFile = buildFile(state); if (!newFile.equals(file)) { if (newFile.exists()) { + if (writer != null || outputStream != null) { + throw new IoException("Could not write '{}' it is currently being written to", newFile.getAbsolutePath()); + } + if (!FileUtils.deleteQuietly(newFile)) { log.warn("Failed to delete '{}' in preparation for renaming '{}'", newFile.getAbsolutePath(), file.getAbsoluteFile()); if (readers.size() > 0) { @@ -136,14 +140,6 @@ public void setState(State state) { } } - if (writers.size() > 0) { - for (Thread thread : writers.keySet()) { - BufferedWriter writer = writers.get(thread); - log.warn("Closing unwanted writer for '{}' that had been created on thread '{}'", newFile.getAbsolutePath(), thread.getName()); - IOUtils.closeQuietly(writer); - } - } - if (!FileUtils.deleteQuietly(newFile)) { log.warn("Failed to delete '{}' for a second time", newFile.getAbsolutePath()); } @@ -199,18 +195,12 @@ public void close() { readers.remove(thread); } - BufferedWriter writer = writers.get(thread); if (writer != null) { IOUtils.closeQuietly(writer); - writers.remove(thread); } - if (outputStreams != null) { - OutputStream outputStream = outputStreams.get(thread); - if (outputStream != null) { - IOUtils.closeQuietly(outputStream); - outputStreams.remove(thread); - } + if (outputStream != null) { + IOUtils.closeQuietly(outputStream); } if (inputStreams != null) { @@ -220,28 +210,20 @@ public void close() { inputStreams.remove(thread); } } - - } public OutputStream getOutputStream() { - try { - if (outputStreams == null) { - outputStreams = new HashMap(); - } - Thread thread = Thread.currentThread(); - OutputStream writer = outputStreams.get(thread); - if (writer == null) { + try { + if (outputStream == null) { if (file.exists()) { log.warn("We had to delete {} because it already existed", file.getAbsolutePath()); file.delete(); } file.getParentFile().mkdirs(); - writer = new BufferedOutputStream(new FileOutputStream(file)); - outputStreams.put(thread, writer); + outputStream = new BufferedOutputStream(new FileOutputStream(file)); } - return writer; + return outputStream; } catch (FileNotFoundException e) { throw new IoException(e); } @@ -270,8 +252,6 @@ public InputStream getInputStream() { } public BufferedWriter getWriter() { - Thread thread = Thread.currentThread(); - BufferedWriter writer = writers.get(thread); if (writer == null) { if (file.exists()) { log.warn("We had to delete {} because it already existed", file.getAbsolutePath()); @@ -283,7 +263,6 @@ public BufferedWriter getWriter() { this.memoryBuffer = new StringBuilder(); writer = new BufferedWriter(new ThresholdFileWriter(threshold, this.memoryBuffer, this.file)); - writers.put(thread, writer); } return writer; }