Skip to content

Commit

Permalink
0002699: Copy directly to staging when nodes are on same server
Browse files Browse the repository at this point in the history
use native file copy if it's available, else use streams
  • Loading branch information
erilong committed Aug 1, 2016
1 parent 80ae9da commit 822859a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 7 deletions.
Expand Up @@ -20,7 +20,6 @@
*/
package org.jumpmind.symmetric.service.impl;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -139,6 +138,7 @@
import org.jumpmind.symmetric.statistic.IStatisticManager;
import org.jumpmind.symmetric.transport.IOutgoingTransport;
import org.jumpmind.symmetric.transport.TransportUtils;
import org.jumpmind.symmetric.util.SymmetricUtils;
import org.jumpmind.util.Statistics;

/**
Expand Down Expand Up @@ -930,16 +930,19 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo
ISymmetricEngine targetEngine = AbstractSymmetricEngine.findEngineByUrl(targetNode.getSyncUrl());
if (targetEngine != null) {
try {
long memoryThresholdInBytes = parameterService.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD);
long memoryThresholdInBytes = extractedBatch.isFileResource() ? 0 :
targetEngine.getParameterService().getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD);
Node sourceNode = nodeService.findIdentity();
IStagedResource targetResource = targetEngine.getStagingManager().create(memoryThresholdInBytes,
Constants.STAGING_CATEGORY_INCOMING, Batch.getStagedLocation(false, sourceNode.getNodeId()),
currentBatch.getBatchId());
BufferedReader sourceReader = extractedBatch.getReader();
BufferedWriter targetWriter = targetResource.getWriter();
IOUtils.copy(sourceReader, targetWriter);
extractedBatch.close();
targetResource.close();
if (extractedBatch.isFileResource()) {
SymmetricUtils.copyFile(extractedBatch.getFile(), targetResource.getFile());
} else {
IOUtils.copy(extractedBatch.getReader(), targetResource.getWriter());
extractedBatch.close();
targetResource.close();
}
targetResource.setState(State.READY);
isRetry = true;
} catch (Exception e) {
Expand Down
Expand Up @@ -20,11 +20,25 @@
*/
package org.jumpmind.symmetric.util;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Array;
import java.lang.reflect.Method;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.symmetric.db.ISymmetricDialect;

final public class SymmetricUtils {

protected static boolean isJava7 = true;

protected static Method copyMethod;

protected static Method fileMethod;

protected static Object optionArray;

private SymmetricUtils() {
}

Expand All @@ -37,4 +51,31 @@ public static String quote(ISymmetricDialect symmetricDialect, String name) {
}
}

@SuppressWarnings({ "unchecked", "rawtypes" })
public static void copyFile(File source, File target) throws IOException {
if (isJava7) {
try {
if (copyMethod == null) {
Class filesClass = Class.forName("java.nio.file.Files");
Class pathClass = Class.forName("java.nio.file.Path");
Class optionArrayClass = Class.forName("[Ljava.nio.file.CopyOption;");
Class optionClass = Class.forName("java.nio.file.CopyOption");
Class standardOptionClass = Class.forName("java.nio.file.StandardCopyOption");

copyMethod = filesClass.getMethod("copy", new Class[] { pathClass, pathClass, optionArrayClass });
fileMethod = File.class.getMethod("toPath", (Class[]) null);
optionArray = Array.newInstance(optionClass, 1);
Array.set(optionArray, 0, Enum.valueOf(standardOptionClass, "REPLACE_EXISTING"));
}

Object sourcePath = fileMethod.invoke(source, (Object[]) null);
Object targetPath = fileMethod.invoke(target, (Object[]) null);
copyMethod.invoke(null, new Object[] { sourcePath, targetPath, optionArray });
return;
} catch (Exception e) {
isJava7 = false;
}
}
FileUtils.copyFile(source, target);
}
}

0 comments on commit 822859a

Please sign in to comment.