Skip to content

Commit

Permalink
0002960: Save on memory: StagingManager creates lots of empty maps that
Browse files Browse the repository at this point in the history
just take up memory as staging grows
  • Loading branch information
chenson42 committed Jan 16, 2017
1 parent ced86fe commit 792adf7
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 52 deletions.
Expand Up @@ -238,7 +238,7 @@ protected void flush() {
protected void createStagingFile() {
//TODO: We should use constants for dir structure path,
// but we don't want to depend on symmetric core.
this.stagedInputFile = stagingManager.create(0, "bulkloaddir",
this.stagedInputFile = stagingManager.create("bulkloaddir",
table.getName() + this.getBatch().getBatchId() + ".csv");
}

Expand Down
Expand Up @@ -287,7 +287,7 @@ protected byte[] escape(byte[] byteData) {
protected void createStagingFile() {
//TODO: We should use constants for dir structure path,
// but we don't want to depend on symmetric core.
this.stagedInputFile = stagingManager.create(0, "bulkloaddir",
this.stagedInputFile = stagingManager.create("bulkloaddir",
table.getName() + this.getBatch().getBatchId() + ".csv");
}

Expand Down
Expand Up @@ -128,7 +128,7 @@ public void write(CsvData data) {
try {
String[] parsedData = data.getParsedData(CsvData.ROW_DATA);
String formattedData = CsvUtils.escapeCsvData(parsedData, '\n', '"', CsvWriter.ESCAPE_MODE_DOUBLED, "\\N");
stagedInputFile.getWriter().write(formattedData);
stagedInputFile.getWriter(0).write(formattedData);
loadedRows++;
loadedBytes += formattedData.getBytes().length;
} catch (Exception ex) {
Expand Down Expand Up @@ -211,7 +211,7 @@ protected void flush() {
}

protected void createStagingFile() {
stagedInputFile = stagingManager.create(0, "bulkloaddir", table.getName() + getBatch().getBatchId() + ".csv");
stagedInputFile = stagingManager.create("bulkloaddir", table.getName() + getBatch().getBatchId() + ".csv");
}

}
Expand Up @@ -5,7 +5,6 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -41,11 +40,11 @@ public long clean(long ttlInMs) {
Map<String, Long> biggestIncomingByNode = getBiggestBatchIds(incomingBatches);
synchronized (StagingManager.class) {
log.trace("Purging staging area");
Set<String> keys = new HashSet<String>(resourceList.keySet());
Set<String> keys = getResourceReferences();
long purgedFileCount = 0;
long purgedFileSize = 0;
for (String key : keys) {
IStagedResource resource = resourceList.get(key);
IStagedResource resource = find(key);
String[] path = key.split("/");
/*
* resource could have deleted itself between the time the keys
Expand Down
Expand Up @@ -1004,13 +1004,13 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo
long memoryThresholdInBytes = extractedBatch.isFileResource() ? 0 :
targetEngine.getParameterService().getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD);
Node sourceNode = nodeService.findIdentity();
IStagedResource targetResource = targetEngine.getStagingManager().create(memoryThresholdInBytes,
IStagedResource targetResource = targetEngine.getStagingManager().create(
Constants.STAGING_CATEGORY_INCOMING, Batch.getStagedLocation(false, sourceNode.getNodeId()),
currentBatch.getBatchId());
if (extractedBatch.isFileResource()) {
SymmetricUtils.copyFile(extractedBatch.getFile(), targetResource.getFile());
} else {
IOUtils.copy(extractedBatch.getReader(), targetResource.getWriter());
IOUtils.copy(extractedBatch.getReader(), targetResource.getWriter(memoryThresholdInBytes));
extractedBatch.close();
targetResource.close();
}
Expand Down
Expand Up @@ -93,7 +93,7 @@ protected MultiBatchStagingWriter buildMultiBatchStagingWriter(ExtractRequest re
batches, channel.getMaxBatchSize(), processInfo) {
@Override
protected IDataWriter buildWriter(long memoryThresholdInBytes) {
IStagedResource stagedResource = stagingManager.create(memoryThresholdInBytes,
IStagedResource stagedResource = stagingManager.create(
fileSyncService.getStagingPathComponents(outgoingBatch));

log.info("Exacting file sync batch {} to {}", outgoingBatch.getNodeBatchId(), stagedResource);
Expand Down
Expand Up @@ -496,7 +496,7 @@ public List<OutgoingBatch> sendFiles(ProcessInfo processInfo, Node targetNode,
stagedResource = previouslyStagedResource;
} else {
if (dataWriter == null) {
stagedResource = stagingManager.create(memoryThresholdInBytes,
stagedResource = stagingManager.create(
Constants.STAGING_CATEGORY_OUTGOING, processInfo.getSourceNodeId(),
targetNode.getNodeId(), "filesync.zip");
dataWriter = new FileSyncZipDataWriter(maxBytesToSync, this,
Expand Down
Expand Up @@ -125,9 +125,9 @@ public void process() throws IOException {
batch.getBatchId());
if (resource == null || resource.getState() == State.DONE) {
log.debug("Creating staged resource for batch {}", batch.getNodeBatchId());
resource = stagingManager.create(memoryThresholdInBytes, category, location, batch.getBatchId());
resource = stagingManager.create(category, location, batch.getBatchId());
}
writer = resource.getWriter();
writer = resource.getWriter(memoryThresholdInBytes);
writeLine(nodeLine);
writeLine(binaryLine);
writeLine(channelLine);
Expand Down
Expand Up @@ -83,7 +83,7 @@ protected IStagedResource getStagedResource(Batch batch) {
resource = stagingManager.find(category, location, batch.getBatchId());
if (resource == null || resource.getState() == State.DONE) {
log.debug("Creating staged resource for batch {}", batch.getNodeBatchId());
resource = stagingManager.create(memoryThresholdInBytes, category, location, batch.getBatchId());
resource = stagingManager.create(category, location, batch.getBatchId());
}
stagedResources.put(batch, resource);
}
Expand All @@ -106,7 +106,7 @@ protected void print(Batch batch, String data) {
log.debug("Writing staging data: {}", FormatUtils.abbreviateForLogging(data));
}
IStagedResource resource = getStagedResource(batch);
BufferedWriter writer = resource.getWriter();
BufferedWriter writer = resource.getWriter(memoryThresholdInBytes);
try {
int size = data.length();
for (int i = 0; i < size; i = i + 1024) {
Expand Down
Expand Up @@ -39,7 +39,7 @@ public String getExtensionName() {

public BufferedReader getReader();

public BufferedWriter getWriter();
public BufferedWriter getWriter(long threshold);

public OutputStream getOutputStream();

Expand Down
Expand Up @@ -20,18 +20,18 @@
*/
package org.jumpmind.symmetric.io.stage;

import java.util.Collection;
import java.util.Set;

public interface IStagingManager {

public IStagedResource find(Object... path);

public IStagedResource find(String path);

public IStagedResource create(long memoryThresholdInBytes, Object... path);
public IStagedResource create(Object... path);

public long clean(long timeToLiveInMs);

public Collection<String> getResourceReferences();
public Set<String> getResourceReferences();

}
Expand Up @@ -47,8 +47,6 @@ public class StagedResource implements IStagedResource {

static final Logger log = LoggerFactory.getLogger(StagedResource.class);

private long threshold;

private File directory;

private String path;
Expand All @@ -69,15 +67,10 @@ public class StagedResource implements IStagedResource {

private StagingManager stagingManager;

public StagedResource(long threshold, File directory, File file, StagingManager stagingManager) {
this.threshold = threshold;
public StagedResource(File directory, File file, StagingManager stagingManager) {
this.directory = directory;
this.stagingManager = stagingManager;
this.path = file.getAbsolutePath();
this.path = this.path.replaceAll("\\\\", "/");
this.path = this.path.substring(directory.getAbsolutePath().length(), file
.getAbsolutePath().length());
this.path = this.path.substring(1, path.lastIndexOf("."));
this.path = toPath(directory, file);
if (file.exists()) {
lastUpdateTime = file.lastModified();
String fileName = file.getName();
Expand All @@ -88,14 +81,28 @@ public StagedResource(long threshold, File directory, File file, StagingManager
file.getAbsolutePath()));
}
}

public StagedResource(long threshold, File directory, String path, StagingManager stagingManager) {
this.threshold = threshold;

public StagedResource(File directory, String path, StagingManager stagingManager) {
this.directory = directory;
this.path = path;
this.stagingManager = stagingManager;
lastUpdateTime = System.currentTimeMillis();
this.state = State.CREATE;
lastUpdateTime = System.currentTimeMillis();
if (buildFile(State.READY).exists()) {
this.state = State.READY;
} else if (buildFile(State.DONE).exists()){
this.state = State.DONE;
} else {
this.state = State.CREATE;
}
}

protected static String toPath(File directory, File file) {
String path = file.getAbsolutePath();
path = path.replaceAll("\\\\", "/");
path = path.substring(directory.getAbsolutePath().length(), file
.getAbsolutePath().length());
path = path.substring(1, path.lastIndexOf("."));
return path;
}

public boolean isInUse() {
Expand Down Expand Up @@ -237,6 +244,16 @@ public void close() {
inputStreams.remove(thread);
closeInputStreamsMap();
}

boolean isFileResource = this.isFileResource();

if (isFileResource || this.state == State.DONE) {
stagingManager.inUse.remove(path);
}

if (!isFileResource && this.state == State.DONE) {
stagingManager.resourcePaths.remove(path);
}
}

public OutputStream getOutputStream() {
Expand Down Expand Up @@ -278,7 +295,7 @@ public synchronized InputStream getInputStream() {
return reader;
}

public BufferedWriter getWriter() {
public BufferedWriter getWriter(long threshold) {
if (writer == null) {
File file = buildFile(state);
if (file.exists()) {
Expand Down Expand Up @@ -336,7 +353,8 @@ public boolean delete() {
}

if (deleted) {
stagingManager.resourceList.remove(getPath());
stagingManager.resourcePaths.remove(path);
stagingManager.inUse.remove(path);
}

return deleted;
Expand Down
Expand Up @@ -22,9 +22,10 @@

import java.io.File;
import java.util.Collection;
import java.util.HashSet;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.io.FileUtils;
Expand All @@ -34,24 +35,29 @@
import org.slf4j.LoggerFactory;



public class StagingManager implements IStagingManager {

protected static final Logger log = LoggerFactory.getLogger(StagingManager.class);

protected File directory;

protected Map<String, IStagedResource> resourceList = new ConcurrentHashMap<String, IStagedResource>();

protected Set<String> resourcePaths;

protected Map<String, IStagedResource> inUse;

public StagingManager(String directory) {
log.info("The staging directory was initialized at the following location: " + directory);
this.directory = new File(directory);
this.directory.mkdirs();
this.resourcePaths = Collections.synchronizedSet(new TreeSet<String>());
this.inUse = new ConcurrentHashMap<String, IStagedResource>();
refreshResourceList();
}

public Collection<String> getResourceReferences() {
public Set<String> getResourceReferences() {
synchronized (StagingManager.class) {
return resourceList.keySet();
return new TreeSet<String>(resourcePaths);
}
}

Expand All @@ -62,11 +68,9 @@ protected void refreshResourceList() {
State.DONE.getExtensionName() }, true);
for (File file : files) {
try {
StagedResource resource = new StagedResource(0, directory,
file, this);
String path = resource.getPath();
if (!resourceList.containsKey(path)) {
resourceList.put(path, resource);
String path = StagedResource.toPath(directory, file);
if (!resourcePaths.contains(path)) {
resourcePaths.add(path);
}
} catch (IllegalStateException ex) {
log.warn(ex.getMessage());
Expand All @@ -85,13 +89,13 @@ protected void refreshResourceList() {
public long clean(long ttlInMs) {
synchronized (StagingManager.class) {
log.trace("Cleaning staging area");
Set<String> keys = new HashSet<String>(resourceList.keySet());
Set<String> keys = getResourceReferences();
long purgedFileCount = 0;
long purgedFileSize = 0;
long purgedMemCount = 0;
long purgedMemSize = 0;
for (String key : keys) {
IStagedResource resource = resourceList.get(key);
IStagedResource resource = new StagedResource(directory, key, this);
/* resource could have deleted itself between the time the keys were cloned and now */
if (resource != null) {
boolean resourceIsOld = (System.currentTimeMillis() - resource
Expand All @@ -111,7 +115,7 @@ public long clean(long ttlInMs) {
purgedMemCount++;
purgedMemSize += size;
}
resourceList.remove(key);
resourcePaths.remove(key);
} else {
log.warn("Failed to delete the '{}' staging resource",
resource.getPath());
Expand Down Expand Up @@ -149,14 +153,15 @@ public long clean(long ttlInMs) {
/**
* Create a handle that can be written to
*/
public IStagedResource create(long memoryThresholdInBytes, Object... path) {
public IStagedResource create(Object... path) {
String filePath = buildFilePath(path);
StagedResource resource = new StagedResource(memoryThresholdInBytes, directory, filePath,
StagedResource resource = new StagedResource(directory, filePath,
this);
if (resource.exists()) {
resource.delete();
}
this.resourceList.put(filePath, resource);
this.inUse.put(filePath, resource);
this.resourcePaths.add(filePath);
return resource;
}

Expand All @@ -176,7 +181,11 @@ protected String buildFilePath(Object... path) {
}

public IStagedResource find(String path) {
return resourceList.get(path);
IStagedResource resource = inUse.get(path);
if (resource == null && resourcePaths.contains(path)) {
resource = new StagedResource(directory, path, this);
}
return resource;
}

public IStagedResource find(Object... path) {
Expand Down

0 comments on commit 792adf7

Please sign in to comment.