Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[SYMMETRICDS-88] - If enabled, write initial load to a file first, du…
…ring extract and before data load in order to decrease time db and network resources are in use. Enabled by default.
  • Loading branch information
chenson42 committed May 17, 2009
1 parent c2f4757 commit 7364f72
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 65 deletions.
Expand Up @@ -54,9 +54,11 @@ public class ParameterConstants {
public final static String AUTO_CONFIGURE_REGISTRATION_SERVER_SQL_SCRIPT = "auto.config.registration.svr.sql.script";
public final static String AUTO_CONFIGURE_REGISTRATION_SERVER_DDLUTIL_XML = "auto.config.registration.svr.ddlutil.xml";
public final static String AUTO_UPGRADE = "auto.upgrade";
public final static String AUTO_DELETE_BEFORE_RELOAD = "initial.load.delete.first";
public final static String AUTO_DELETE_BEFORE_RELOAD = "initial.load.delete.first";
public final static String AUTO_CREATE_SCHEMA_BEFORE_RELOAD = "initial.load.create.first";
public final static String AUTO_UPDATE_NODE_VALUES = "auto.update.node.values.from.properties";

public final static String STREAM_TO_FILE_ENABLED = "stream.to.file.enabled";

public final static String PARAMETER_REFRESH_PERIOD_IN_MS = "parameter.reload.timeout.ms";

Expand Down
Expand Up @@ -50,8 +50,6 @@ public void extractInitialLoadWithinBatchFor(Node node, Trigger trigger, Buffere
*/
public boolean extract(Node node, IOutgoingTransport transport) throws Exception;

public boolean extract(Node node, final IExtractListener handler) throws Exception;

public boolean extractBatchRange(IOutgoingTransport transport, String startBatchId, String endBatchId)
throws Exception;

Expand Down
Expand Up @@ -22,6 +22,7 @@
package org.jumpmind.symmetric.service.impl;

import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.Connection;
Expand All @@ -32,10 +33,12 @@
import java.util.Date;
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.Version;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.TableConstants;
import org.jumpmind.symmetric.config.TriggerSelector;
import org.jumpmind.symmetric.db.IDbDialect;
Expand All @@ -62,6 +65,7 @@
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.transport.IOutgoingTransport;
import org.jumpmind.symmetric.transport.TransportUtils;
import org.jumpmind.symmetric.transport.file.FileOutgoingTransport;
import org.jumpmind.symmetric.upgrade.UpgradeConstants;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
Expand Down Expand Up @@ -106,10 +110,10 @@ public void extractConfigurationStandalone(Node node, OutputStream out) throws I
* failure.
*/
public void extractConfigurationStandalone(Node node, BufferedWriter writer) throws IOException {

try {
OutgoingBatch batch = new OutgoingBatch(node, Constants.CHANNEL_CONFIG, BatchType.INITIAL_LOAD);
if (Version.isOlderThanVersion(node.getSymmetricVersion(), UpgradeConstants.VERSION_FOR_NEW_REGISTRATION_PROTOCOL)) {
if (Version.isOlderThanVersion(node.getSymmetricVersion(),
UpgradeConstants.VERSION_FOR_NEW_REGISTRATION_PROTOCOL)) {
outgoingBatchService.insertOutgoingBatch(batch);
OutgoingBatchHistory history = new OutgoingBatchHistory(batch);
history.setStatus(OutgoingBatchHistory.Status.SE);
Expand All @@ -133,15 +137,15 @@ public void extractConfigurationStandalone(Node node, BufferedWriter writer) thr

dataExtractor.commit(batch, writer);


} finally {
writer.flush();
}
}

public void extractConfiguration(Node node, BufferedWriter writer, DataExtractorContext ctx) throws IOException {
List<Trigger> triggers = new TriggerSelector(configurationService.getActiveTriggersForSourceNodeGroup(parameterService.getNodeGroupId()),
Constants.CHANNEL_CONFIG, node.getNodeGroupId()).select();
List<Trigger> triggers = new TriggerSelector(configurationService
.getActiveTriggersForSourceNodeGroup(parameterService.getNodeGroupId()), Constants.CHANNEL_CONFIG, node
.getNodeGroupId()).select();
if (node != null && node.isVersionGreaterThanOrEqualTo(1, 5, 0)) {
for (int i = triggers.size() - 1; i >= 0; i--) {
Trigger trigger = triggers.get(i);
Expand Down Expand Up @@ -224,13 +228,12 @@ protected void writeInitialLoad(Node node, Trigger trigger, BufferedWriter write
* @param hist
* @param transport
* @param batch
* If null, then assume this 'initial load' is part of
* another batch.
* If null, then assume this 'initial load' is part of another
* batch.
* @param ctx
*/
protected void writeInitialLoad(Node node, final Trigger trigger, final TriggerHistory hist,
final BufferedWriter writer, final OutgoingBatch batch, final DataExtractorContext ctx) {

final String sql = dbDialect.createInitalLoadSqlFor(node, trigger);

final IDataExtractor dataExtractor = ctx != null ? ctx.getDataExtractor() : getDataExtractor(node
Expand Down Expand Up @@ -272,17 +275,8 @@ public Object doInConnection(Connection conn) throws SQLException, DataAccessExc
});
}

public boolean extract(Node node, IOutgoingTransport transport) throws Exception {
public boolean extract(Node node, IOutgoingTransport targetTransport) throws Exception {
IDataExtractor dataExtractor = getDataExtractor(node.getSymmetricVersion());
ExtractStreamHandler handler = new ExtractStreamHandler(dataExtractor, transport);
return extract(node, handler);
}

/**
* Allow a handler callback to do the work so we can route the extracted
* data to other types of handlers for processing.
*/
public boolean extract(Node node, final IExtractListener handler) throws Exception {

List<NodeChannel> channels = configurationService.getChannels();

Expand All @@ -291,50 +285,92 @@ public boolean extract(Node node, final IExtractListener handler) throws Excepti
}

List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatches(node.getNodeId());

if (batches != null && batches.size() > 0) {
OutgoingBatchHistory history = null;
FileOutgoingTransport fileTransport = null;

if (shouldStreamToFile(batches)) {
fileTransport = new FileOutgoingTransport();
}

ExtractStreamHandler handler = new ExtractStreamHandler(dataExtractor,
fileTransport != null ? fileTransport : targetTransport);

extract(node, batches, handler);

copy(fileTransport, targetTransport);

return true;
} else {
return false;
}
}

protected boolean shouldStreamToFile(List<OutgoingBatch> batches) {
boolean stream2file = false;
for (OutgoingBatch outgoingBatch : batches) {
stream2file |= Constants.CHANNEL_RELOAD.equals(outgoingBatch.getChannelId());
}
return stream2file && parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED);
}

protected void copy(FileOutgoingTransport fileTransport, IOutgoingTransport targetTransport) throws IOException {
if (fileTransport != null) {
fileTransport.close();
FileReader reader = null;
try {
boolean initialized = false;
for (final OutgoingBatch batch : batches) {
history = new OutgoingBatchHistory(batch);
if (!initialized) {
handler.init();
initialized = true;
}
handler.startBatch(batch);
selectEventDataToExtract(handler, batch);
handler.endBatch(batch);
history.setStatus(OutgoingBatchHistory.Status.SE);
history.setEndTime(new Date());
outgoingBatchService.insertOutgoingBatchHistory(history);
reader = new FileReader(fileTransport.getFile());
IOUtils.copy(reader, targetTransport.open());
} finally {
IOUtils.closeQuietly(reader);
fileTransport.getFile().delete();
}
}
}

/**
* Allow a handler callback to do the work so we can route the extracted
* data to other types of handlers for processing.
*/
protected void extract(Node node, List<OutgoingBatch> batches, final IExtractListener handler) throws Exception {
OutgoingBatchHistory history = null;
try {
boolean initialized = false;
for (final OutgoingBatch batch : batches) {
history = new OutgoingBatchHistory(batch);
if (!initialized) {
handler.init();
initialized = true;
}
} catch (RuntimeException e) {
SQLException se = unwrapSqlException(e);
if (history != null) {
if (se != null) {
history.setSqlState(se.getSQLState());
history.setSqlCode(se.getErrorCode());
history.setSqlMessage(se.getMessage());
} else {
history.setSqlMessage(e.getMessage());
}
history.setStatus(OutgoingBatchHistory.Status.SE);
history.setEndTime(new Date());
outgoingBatchService.setBatchStatus(history.getBatchId(), Status.ER);
outgoingBatchService.insertOutgoingBatchHistory(history);
handler.startBatch(batch);
selectEventDataToExtract(handler, batch);
handler.endBatch(batch);
history.setStatus(OutgoingBatchHistory.Status.SE);
history.setEndTime(new Date());
outgoingBatchService.insertOutgoingBatchHistory(history);
}
} catch (RuntimeException e) {
SQLException se = unwrapSqlException(e);
if (history != null) {
if (se != null) {
history.setSqlState(se.getSQLState());
history.setSqlCode(se.getErrorCode());
history.setSqlMessage(se.getMessage());
} else {
logger.error(
"Could not log the outgoing batch status because the batch history has not been created.",
e);
history.setSqlMessage(e.getMessage());
}
throw e;
} finally {
handler.done();
history.setStatus(OutgoingBatchHistory.Status.SE);
history.setEndTime(new Date());
outgoingBatchService.setBatchStatus(history.getBatchId(), Status.ER);
outgoingBatchService.insertOutgoingBatchHistory(history);
} else {
logger.error("Could not log the outgoing batch status because the batch history has not been created.",
e);
}
return true;
throw e;
} finally {
handler.done();
}
return false;

}

public boolean extractBatchRange(IOutgoingTransport transport, String startBatchId, String endBatchId)
Expand All @@ -346,7 +382,6 @@ public boolean extractBatchRange(IOutgoingTransport transport, String startBatch

public boolean extractBatchRange(final IExtractListener handler, String startBatchId, String endBatchId)
throws Exception {

List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatchRange(startBatchId, endBatchId);

if (batches != null && batches.size() > 0) {
Expand Down
Expand Up @@ -22,6 +22,8 @@
package org.jumpmind.symmetric.service.impl;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -35,6 +37,7 @@
import java.util.List;
import java.util.Map;

import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.common.Constants;
Expand Down Expand Up @@ -62,7 +65,9 @@
import org.jumpmind.symmetric.transport.IIncomingTransport;
import org.jumpmind.symmetric.transport.ITransportManager;
import org.jumpmind.symmetric.transport.TransportException;
import org.jumpmind.symmetric.transport.file.FileIncomingTransport;
import org.jumpmind.symmetric.transport.internal.InternalIncomingTransport;
import org.jumpmind.symmetric.util.AppUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
Expand All @@ -87,7 +92,7 @@ public class DataLoaderService extends AbstractService implements IDataLoaderSer
protected List<IDataLoaderFilter> filters;

protected IStatisticManager statisticManager;

protected INodeService nodeService;

protected Map<String, IColumnFilter> columnFilters = new HashMap<String, IColumnFilter>();
Expand Down Expand Up @@ -169,11 +174,11 @@ public IDataLoaderStatistics loadDataBatch(String batchData) throws IOException
dataLoader.load();
IncomingBatchHistory history = new IncomingBatchHistory(dataLoader.getContext());
history.setValues(dataLoader.getStatistics(), true);
fireBatchComplete(dataLoader, history);
fireBatchComplete(dataLoader, history);
}
} finally {
stats = dataLoader.getStatistics();
dataLoader.close();
dataLoader.close();
}
return stats;
}
Expand All @@ -192,6 +197,9 @@ protected List<IncomingBatchHistory> loadDataAndReturnBatches(IIncomingTransport
IncomingBatchHistory history = null;
IDataLoader dataLoader = null;
try {
if (shouldStreamToFile(transport)) {
transport = writeToFile(transport);
}
dataLoader = openDataLoader(transport.open());
while (dataLoader.hasNext()) {
status = new IncomingBatch(dataLoader.getContext());
Expand All @@ -202,7 +210,7 @@ protected List<IncomingBatchHistory> loadDataAndReturnBatches(IIncomingTransport
}
} catch (RegistrationRequiredException ex) {
throw ex;
} catch (ConnectException ex) {
} catch (ConnectException ex) {
statisticManager.getStatistic(StatisticNameConstants.INCOMING_TRANSPORT_CONNECT_ERROR_COUNT).increment();
throw ex;
} catch (UnknownHostException ex) {
Expand Down Expand Up @@ -249,11 +257,35 @@ protected List<IncomingBatchHistory> loadDataAndReturnBatches(IIncomingTransport
if (dataLoader != null) {
dataLoader.close();
}
transport.close();
recordStatistics(list);
}
return list;
}

/**
* Right now we will only write to a file first if the feature is enabled and we are in initial load
* state.
* TODO - Make sure statistics are captured correctly and stream to a file if the payload is > than
* a certain # of bytes.
*/
protected boolean shouldStreamToFile(IIncomingTransport transport) {
return parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED) && !nodeService.isDataLoadCompleted();
}

protected IIncomingTransport writeToFile(IIncomingTransport transport) throws IOException {
File file = AppUtils.createTempFile("load");
FileWriter writer = null;
try {
writer = new FileWriter(file);
IOUtils.copy(transport.open(), writer);
} finally {
IOUtils.closeQuietly(writer);
transport.close();
}
return new FileIncomingTransport(file);
}

private void recordStatistics(List<IncomingBatchHistory> list) {
if (list != null) {
statisticManager.getStatistic(StatisticNameConstants.INCOMING_BATCH_COUNT).add(list.size());
Expand Down Expand Up @@ -296,8 +328,7 @@ public void doInTransactionWithoutResult(TransactionStatus transactionstatus) {
history.setValues(dataLoader.getStatistics(), true);
fireBatchComplete(dataLoader, history);
if (status.isPersistable()) {
incomingBatchService
.insertIncomingBatchHistory(history);
incomingBatchService.insertIncomingBatchHistory(history);
}
} catch (IOException e) {
throw new TransportException(e);
Expand Down

0 comments on commit 7364f72

Please sign in to comment.