Skip to content

Commit

Permalink
0003125: Encrypt and/or compress staging
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed May 25, 2017
1 parent a6a84e9 commit b08b001
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 33 deletions.
Expand Up @@ -24,6 +24,7 @@

import java.io.File;
import java.io.StringReader;
import java.lang.reflect.Constructor;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
Expand Down Expand Up @@ -349,6 +350,16 @@ protected IJobManager createJobManager() {
@Override
protected IStagingManager createStagingManager() {
String directory = parameterService.getTempDirectory();
String stagingManagerClassName = parameterService.getString(ParameterConstants.STAGING_MANAGER_CLASS);
if (stagingManagerClassName != null) {
try {
Constructor<?> cons = Class.forName(stagingManagerClassName).getConstructor(ISymmetricEngine.class, String.class);
return (IStagingManager) cons.newInstance(this, directory);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

return new BatchStagingManager(this, directory);
}

Expand Down
Expand Up @@ -401,6 +401,8 @@ private ParameterConstants() {
public final static String LOG_CONFLICT_RESOLUTION = "log.conflict.resolution";

public final static String UPDATE_SERVICE_CLASS = "update.service.class";

public final static String STAGING_MANAGER_CLASS = "staging.manager.class";

public static Map<String, ParameterMetaData> getParameterMetaData() {
return parameterMetaData;
Expand Down
Expand Up @@ -42,5 +42,9 @@ public class ServerConstants {
public static final String SERVER_ALLOW_DIR_LISTING = "server.allow.dir.list";
public static final String SERVER_ALLOW_HTTP_METHODS = "server.allow.http.methods";
public static final String SERVER_DISALLOW_HTTP_METHODS = "server.disallow.http.methods";

public final static String STREAM_TO_FILE_ENCRYPT_ENABLED = "stream.to.file.encrypt.enabled";
public final static String STREAM_TO_FILE_COMPRESSION_ENABLED = "stream.to.file.compression.enabled";
public final static String STREAM_TO_FILE_COMPRESSION_LEVEL = "stream.to.file.compression.level";

}
Expand Up @@ -47,29 +47,29 @@ public class StagedResource implements IStagedResource {

static final Logger log = LoggerFactory.getLogger(StagedResource.class);

private int references = 0;
protected int references = 0;

private File directory;
protected File directory;

private File file;
protected File file;

private String path;
protected String path;

private StringBuilder memoryBuffer;
protected StringBuilder memoryBuffer;

private long lastUpdateTime;
protected long lastUpdateTime;

private State state;
protected State state;

private OutputStream outputStream = null;
protected OutputStream outputStream = null;

private Map<Thread, InputStream> inputStreams = null;
protected Map<Thread, InputStream> inputStreams = null;

private Map<Thread, BufferedReader> readers = null;
protected Map<Thread, BufferedReader> readers = null;

private BufferedWriter writer;
protected BufferedWriter writer;

private StagingManager stagingManager;
protected StagingManager stagingManager;

public StagedResource(File directory, String path, StagingManager stagingManager) {
this.directory = directory;
Expand Down Expand Up @@ -168,14 +168,14 @@ public void setState(State state) {
this.file = buildFile(state);
}

public synchronized BufferedReader getReader() {
@SuppressWarnings("resource")
public synchronized BufferedReader getReader() {
Thread thread = Thread.currentThread();
BufferedReader reader = readers != null ? readers.get(thread) : null;
if (reader == null) {
if (file != null && file.exists()) {
try {
reader = new BufferedReader(new InputStreamReader(new FileInputStream(file),
IoConstants.ENCODING));
reader = createReader();
createReadersMap();
readers.put(thread, reader);
} catch (IOException ex) {
Expand All @@ -194,6 +194,11 @@ public synchronized BufferedReader getReader() {
return reader;
}

protected BufferedReader createReader() throws IOException {
return new BufferedReader(new InputStreamReader(new FileInputStream(file),
IoConstants.ENCODING));
}

private synchronized final void createReadersMap() {
if (readers == null) {
readers = new HashMap<Thread, BufferedReader>(path.contains("common") ? 10 : 1);
Expand Down Expand Up @@ -261,21 +266,26 @@ public OutputStream getOutputStream() {
file.delete();
}
file.getParentFile().mkdirs();
outputStream = new BufferedOutputStream(new FileOutputStream(file));
outputStream = createOutputStream();
}
return outputStream;
} catch (FileNotFoundException e) {
throw new IoException(e);
}
}

public synchronized InputStream getInputStream() {
protected OutputStream createOutputStream() throws FileNotFoundException {
return new BufferedOutputStream(new FileOutputStream(file));
}

@SuppressWarnings("resource")
public synchronized InputStream getInputStream() {
Thread thread = Thread.currentThread();
InputStream reader = inputStreams != null ? inputStreams.get(thread) : null;
if (reader == null) {
if (file != null && file.exists()) {
try {
reader = new BufferedInputStream(new FileInputStream(file));
reader = createInputStream();
createInputStreamsMap();
inputStreams.put(thread, reader);
} catch (IOException ex) {
Expand All @@ -289,6 +299,10 @@ public synchronized InputStream getInputStream() {
return reader;
}

protected InputStream createInputStream() throws FileNotFoundException {
return new BufferedInputStream(new FileInputStream(file));
}

public BufferedWriter getWriter(long threshold) {
if (writer == null) {
if (file != null && file.exists()) {
Expand All @@ -299,12 +313,15 @@ public BufferedWriter getWriter(long threshold) {
this.memoryBuffer = null;
}
this.memoryBuffer = threshold > 0 ? new StringBuilder() : null;
writer = new BufferedWriter(new ThresholdFileWriter(threshold, this.memoryBuffer,
file));
writer = createWriter(threshold);
}
return writer;
}

protected BufferedWriter createWriter(long threshold) {
return new BufferedWriter(new ThresholdFileWriter(threshold, this.memoryBuffer, file));
}

public long getSize() {
if (file != null && file.exists()) {
return file.length();
Expand Down
Expand Up @@ -34,13 +34,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



public class StagingManager implements IStagingManager {

protected static final Logger log = LoggerFactory.getLogger(StagingManager.class);

private File directory;
protected File directory;

protected Set<String> resourcePaths;

Expand All @@ -54,7 +52,7 @@ public StagingManager(String directory) {
this.inUse = new ConcurrentHashMap<String, IStagedResource>();
refreshResourceList();
}

public Set<String> getResourceReferences() {
synchronized (resourcePaths) {
return new TreeSet<String>(resourcePaths);
Expand Down Expand Up @@ -149,15 +147,18 @@ public long clean(long ttlInMs) {
*/
public IStagedResource create(Object... path) {
String filePath = buildFilePath(path);
IStagedResource resource = new StagedResource(directory, filePath,
this);
IStagedResource resource = createStagedResource(filePath);
if (resource.exists()) {
resource.delete();
}
this.inUse.put(filePath, resource);
this.resourcePaths.add(filePath);
return resource;
}

protected IStagedResource createStagedResource(String filePath) {
return new StagedResource(directory, filePath, this);
}

protected String buildFilePath(Object... path) {
StringBuilder buffer = new StringBuilder();
Expand All @@ -177,7 +178,7 @@ protected String buildFilePath(Object... path) {
public IStagedResource find(String path) {
IStagedResource resource = inUse.get(path);
if (resource == null && resourcePaths.contains(path)) {
resource = new StagedResource(directory, path, this);
resource = createStagedResource(path);
}
return resource;
}
Expand Down
Expand Up @@ -40,13 +40,13 @@
*/
public class ThresholdFileWriter extends Writer {

private File file;
protected File file;

private BufferedWriter fileWriter;
protected BufferedWriter fileWriter;

private StringBuilder buffer;
protected StringBuilder buffer;

private long threshhold;
protected long threshhold;

/**
* @param threshold The number of bytes at which to start writing to a file
Expand Down Expand Up @@ -87,7 +87,7 @@ public void write(char[] cbuf, int off, int len) throws IOException {
fileWriter.write(cbuf, off, len);
} else if (buffer == null || len + buffer.length() > threshhold) {
file.getParentFile().mkdirs();
fileWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), IoConstants.ENCODING));
fileWriter = getWriter();
if (buffer != null) {
fileWriter.write(buffer.toString());
buffer.setLength(0);
Expand All @@ -99,6 +99,10 @@ public void write(char[] cbuf, int off, int len) throws IOException {
buffer.append(new String(cbuf), off, len);
}
}

protected BufferedWriter getWriter() throws IOException {
return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), IoConstants.ENCODING));
}

public BufferedReader getReader() throws IOException {
if (file != null && file.exists()) {
Expand Down
Expand Up @@ -22,6 +22,8 @@

import java.security.KeyStore;

import javax.crypto.Cipher;

/**
* Pluggable Service API that is responsible for encrypting and decrypting data.
*/
Expand All @@ -44,5 +46,7 @@ public interface ISecurityService {
public KeyStore getKeyStore();

public KeyStore getTrustStore();

public Cipher getCipher(int cipherMode) throws Exception;

}
Expand Up @@ -174,7 +174,7 @@ private String rot13(String text) {
return sb.toString();
}

protected Cipher getCipher(int mode) throws Exception {
public Cipher getCipher(int mode) throws Exception {
if (secretKey == null) {
secretKey = getSecretKey();
}
Expand Down

0 comments on commit b08b001

Please sign in to comment.