diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java index 8ab1149e17..b7dc0ce092 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.StringReader; +import java.lang.reflect.Constructor; import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; @@ -349,6 +350,16 @@ protected IJobManager createJobManager() { @Override protected IStagingManager createStagingManager() { String directory = parameterService.getTempDirectory(); + String stagingManagerClassName = parameterService.getString(ParameterConstants.STAGING_MANAGER_CLASS); + if (stagingManagerClassName != null) { + try { + Constructor cons = Class.forName(stagingManagerClassName).getConstructor(ISymmetricEngine.class, String.class); + return (IStagingManager) cons.newInstance(this, directory); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return new BatchStagingManager(this, directory); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index 4410f37a61..e859240307 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -401,6 +401,8 @@ private ParameterConstants() { public final static String LOG_CONFLICT_RESOLUTION = "log.conflict.resolution"; public final static String UPDATE_SERVICE_CLASS = "update.service.class"; + + public final static String STAGING_MANAGER_CLASS = "staging.manager.class"; public static Map getParameterMetaData() { return parameterMetaData; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ServerConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ServerConstants.java index 56067da213..aafd64c4f2 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ServerConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ServerConstants.java @@ -42,5 +42,9 @@ public class ServerConstants { public static final String SERVER_ALLOW_DIR_LISTING = "server.allow.dir.list"; public static final String SERVER_ALLOW_HTTP_METHODS = "server.allow.http.methods"; public static final String SERVER_DISALLOW_HTTP_METHODS = "server.disallow.http.methods"; + + public final static String STREAM_TO_FILE_ENCRYPT_ENABLED = "stream.to.file.encrypt.enabled"; + public final static String STREAM_TO_FILE_COMPRESSION_ENABLED = "stream.to.file.compression.enabled"; + public final static String STREAM_TO_FILE_COMPRESSION_LEVEL = "stream.to.file.compression.level"; } \ No newline at end of file diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java index 256ff2c62c..f61a25db19 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java @@ -47,29 +47,29 @@ public class StagedResource implements IStagedResource { static final Logger log = LoggerFactory.getLogger(StagedResource.class); - private int references = 0; + protected int references = 0; - private File directory; + protected File directory; - private File file; + protected File file; - private String path; + protected String path; - private StringBuilder memoryBuffer; + protected StringBuilder memoryBuffer; - private long lastUpdateTime; + protected long lastUpdateTime; - private State state; + protected State state; - private OutputStream outputStream = null; + protected OutputStream outputStream = null; - private Map inputStreams = null; + protected Map inputStreams = null; - private Map readers = null; + protected Map readers = null; - private BufferedWriter writer; + protected BufferedWriter writer; - private StagingManager stagingManager; + protected StagingManager stagingManager; public StagedResource(File directory, String path, StagingManager stagingManager) { this.directory = directory; @@ -168,14 +168,14 @@ public void setState(State state) { this.file = buildFile(state); } - public synchronized BufferedReader getReader() { + @SuppressWarnings("resource") + public synchronized BufferedReader getReader() { Thread thread = Thread.currentThread(); BufferedReader reader = readers != null ? readers.get(thread) : null; if (reader == null) { if (file != null && file.exists()) { try { - reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), - IoConstants.ENCODING)); + reader = createReader(); createReadersMap(); readers.put(thread, reader); } catch (IOException ex) { @@ -194,6 +194,11 @@ public synchronized BufferedReader getReader() { return reader; } + protected BufferedReader createReader() throws IOException { + return new BufferedReader(new InputStreamReader(new FileInputStream(file), + IoConstants.ENCODING)); + } + private synchronized final void createReadersMap() { if (readers == null) { readers = new HashMap(path.contains("common") ? 10 : 1); @@ -261,7 +266,7 @@ public OutputStream getOutputStream() { file.delete(); } file.getParentFile().mkdirs(); - outputStream = new BufferedOutputStream(new FileOutputStream(file)); + outputStream = createOutputStream(); } return outputStream; } catch (FileNotFoundException e) { @@ -269,13 +274,18 @@ public OutputStream getOutputStream() { } } - public synchronized InputStream getInputStream() { + protected OutputStream createOutputStream() throws FileNotFoundException { + return new BufferedOutputStream(new FileOutputStream(file)); + } + + @SuppressWarnings("resource") + public synchronized InputStream getInputStream() { Thread thread = Thread.currentThread(); InputStream reader = inputStreams != null ? inputStreams.get(thread) : null; if (reader == null) { if (file != null && file.exists()) { try { - reader = new BufferedInputStream(new FileInputStream(file)); + reader = createInputStream(); createInputStreamsMap(); inputStreams.put(thread, reader); } catch (IOException ex) { @@ -289,6 +299,10 @@ public synchronized InputStream getInputStream() { return reader; } + protected InputStream createInputStream() throws FileNotFoundException { + return new BufferedInputStream(new FileInputStream(file)); + } + public BufferedWriter getWriter(long threshold) { if (writer == null) { if (file != null && file.exists()) { @@ -299,12 +313,15 @@ public BufferedWriter getWriter(long threshold) { this.memoryBuffer = null; } this.memoryBuffer = threshold > 0 ? new StringBuilder() : null; - writer = new BufferedWriter(new ThresholdFileWriter(threshold, this.memoryBuffer, - file)); + writer = createWriter(threshold); } return writer; } + protected BufferedWriter createWriter(long threshold) { + return new BufferedWriter(new ThresholdFileWriter(threshold, this.memoryBuffer, file)); + } + public long getSize() { if (file != null && file.exists()) { return file.length(); diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java index dbaaf5ca27..fef0951cf9 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java @@ -34,13 +34,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - - public class StagingManager implements IStagingManager { protected static final Logger log = LoggerFactory.getLogger(StagingManager.class); - private File directory; + protected File directory; protected Set resourcePaths; @@ -54,7 +52,7 @@ public StagingManager(String directory) { this.inUse = new ConcurrentHashMap(); refreshResourceList(); } - + public Set getResourceReferences() { synchronized (resourcePaths) { return new TreeSet(resourcePaths); @@ -149,8 +147,7 @@ public long clean(long ttlInMs) { */ public IStagedResource create(Object... path) { String filePath = buildFilePath(path); - IStagedResource resource = new StagedResource(directory, filePath, - this); + IStagedResource resource = createStagedResource(filePath); if (resource.exists()) { resource.delete(); } @@ -158,6 +155,10 @@ public IStagedResource create(Object... path) { this.resourcePaths.add(filePath); return resource; } + + protected IStagedResource createStagedResource(String filePath) { + return new StagedResource(directory, filePath, this); + } protected String buildFilePath(Object... path) { StringBuilder buffer = new StringBuilder(); @@ -177,7 +178,7 @@ protected String buildFilePath(Object... path) { public IStagedResource find(String path) { IStagedResource resource = inUse.get(path); if (resource == null && resourcePaths.contains(path)) { - resource = new StagedResource(directory, path, this); + resource = createStagedResource(path); } return resource; } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/ThresholdFileWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/ThresholdFileWriter.java index 1b5a3326a6..83c7aecb93 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/ThresholdFileWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/ThresholdFileWriter.java @@ -40,13 +40,13 @@ */ public class ThresholdFileWriter extends Writer { - private File file; + protected File file; - private BufferedWriter fileWriter; + protected BufferedWriter fileWriter; - private StringBuilder buffer; + protected StringBuilder buffer; - private long threshhold; + protected long threshhold; /** * @param threshold The number of bytes at which to start writing to a file @@ -87,7 +87,7 @@ public void write(char[] cbuf, int off, int len) throws IOException { fileWriter.write(cbuf, off, len); } else if (buffer == null || len + buffer.length() > threshhold) { file.getParentFile().mkdirs(); - fileWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), IoConstants.ENCODING)); + fileWriter = getWriter(); if (buffer != null) { fileWriter.write(buffer.toString()); buffer.setLength(0); @@ -99,6 +99,10 @@ public void write(char[] cbuf, int off, int len) throws IOException { buffer.append(new String(cbuf), off, len); } } + + protected BufferedWriter getWriter() throws IOException { + return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), IoConstants.ENCODING)); + } public BufferedReader getReader() throws IOException { if (file != null && file.exists()) { diff --git a/symmetric-util/src/main/java/org/jumpmind/security/ISecurityService.java b/symmetric-util/src/main/java/org/jumpmind/security/ISecurityService.java index a71b6bb450..d9f3a71346 100644 --- a/symmetric-util/src/main/java/org/jumpmind/security/ISecurityService.java +++ b/symmetric-util/src/main/java/org/jumpmind/security/ISecurityService.java @@ -22,6 +22,8 @@ import java.security.KeyStore; +import javax.crypto.Cipher; + /** * Pluggable Service API that is responsible for encrypting and decrypting data. */ @@ -44,5 +46,7 @@ public interface ISecurityService { public KeyStore getKeyStore(); public KeyStore getTrustStore(); + + public Cipher getCipher(int cipherMode) throws Exception; } \ No newline at end of file diff --git a/symmetric-util/src/main/java/org/jumpmind/security/SecurityService.java b/symmetric-util/src/main/java/org/jumpmind/security/SecurityService.java index e73183a8bf..f0677bc82c 100644 --- a/symmetric-util/src/main/java/org/jumpmind/security/SecurityService.java +++ b/symmetric-util/src/main/java/org/jumpmind/security/SecurityService.java @@ -174,7 +174,7 @@ private String rot13(String text) { return sb.toString(); } - protected Cipher getCipher(int mode) throws Exception { + public Cipher getCipher(int mode) throws Exception { if (secretKey == null) { secretKey = getSecretKey(); }