From 822859a92a7533736cc90a752e89d15e3c82038e Mon Sep 17 00:00:00 2001 From: elong Date: Mon, 1 Aug 2016 09:16:23 -0400 Subject: [PATCH] 0002699: Copy directly to staging when nodes are on same server use native file copy if it's available, else use streams --- .../service/impl/DataExtractorService.java | 17 ++++---- .../symmetric/util/SymmetricUtils.java | 41 +++++++++++++++++++ 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 4afe411c17..f254b759ee 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -20,7 +20,6 @@ */ package org.jumpmind.symmetric.service.impl; -import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStream; @@ -139,6 +138,7 @@ import org.jumpmind.symmetric.statistic.IStatisticManager; import org.jumpmind.symmetric.transport.IOutgoingTransport; import org.jumpmind.symmetric.transport.TransportUtils; +import org.jumpmind.symmetric.util.SymmetricUtils; import org.jumpmind.util.Statistics; /** @@ -930,16 +930,19 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo ISymmetricEngine targetEngine = AbstractSymmetricEngine.findEngineByUrl(targetNode.getSyncUrl()); if (targetEngine != null) { try { - long memoryThresholdInBytes = parameterService.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD); + long memoryThresholdInBytes = extractedBatch.isFileResource() ? 0 : + targetEngine.getParameterService().getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD); Node sourceNode = nodeService.findIdentity(); IStagedResource targetResource = targetEngine.getStagingManager().create(memoryThresholdInBytes, Constants.STAGING_CATEGORY_INCOMING, Batch.getStagedLocation(false, sourceNode.getNodeId()), currentBatch.getBatchId()); - BufferedReader sourceReader = extractedBatch.getReader(); - BufferedWriter targetWriter = targetResource.getWriter(); - IOUtils.copy(sourceReader, targetWriter); - extractedBatch.close(); - targetResource.close(); + if (extractedBatch.isFileResource()) { + SymmetricUtils.copyFile(extractedBatch.getFile(), targetResource.getFile()); + } else { + IOUtils.copy(extractedBatch.getReader(), targetResource.getWriter()); + extractedBatch.close(); + targetResource.close(); + } targetResource.setState(State.READY); isRetry = true; } catch (Exception e) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/util/SymmetricUtils.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/util/SymmetricUtils.java index c07d2df14d..0967bd0cf4 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/util/SymmetricUtils.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/util/SymmetricUtils.java @@ -20,11 +20,25 @@ */ package org.jumpmind.symmetric.util; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Array; +import java.lang.reflect.Method; + +import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.jumpmind.symmetric.db.ISymmetricDialect; final public class SymmetricUtils { + + protected static boolean isJava7 = true; + + protected static Method copyMethod; + + protected static Method fileMethod; + protected static Object optionArray; + private SymmetricUtils() { } @@ -37,4 +51,31 @@ public static String quote(ISymmetricDialect symmetricDialect, String name) { } } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static void copyFile(File source, File target) throws IOException { + if (isJava7) { + try { + if (copyMethod == null) { + Class filesClass = Class.forName("java.nio.file.Files"); + Class pathClass = Class.forName("java.nio.file.Path"); + Class optionArrayClass = Class.forName("[Ljava.nio.file.CopyOption;"); + Class optionClass = Class.forName("java.nio.file.CopyOption"); + Class standardOptionClass = Class.forName("java.nio.file.StandardCopyOption"); + + copyMethod = filesClass.getMethod("copy", new Class[] { pathClass, pathClass, optionArrayClass }); + fileMethod = File.class.getMethod("toPath", (Class[]) null); + optionArray = Array.newInstance(optionClass, 1); + Array.set(optionArray, 0, Enum.valueOf(standardOptionClass, "REPLACE_EXISTING")); + } + + Object sourcePath = fileMethod.invoke(source, (Object[]) null); + Object targetPath = fileMethod.invoke(target, (Object[]) null); + copyMethod.invoke(null, new Object[] { sourcePath, targetPath, optionArray }); + return; + } catch (Exception e) { + isJava7 = false; + } + } + FileUtils.copyFile(source, target); + } }