Skip to content
Permalink
Browse files

0003842: Immediately purge stage files for large loads

  • Loading branch information...
erilong committed Dec 21, 2018
1 parent f7e60c3 commit c8f47decc8f4c274f64fddb3695cc493fda55bea
@@ -163,6 +163,7 @@ private ParameterConstants() {
public final static String INITIAL_LOAD_EXTRACT_AND_SEND_WHEN_STAGED = "initial.load.extract.and.send.when.staged";
public final static String INITIAL_LOAD_TRANSPORT_MAX_BYTES_TO_SYNC = "initial.load.transport.max.bytes.to.sync";
public final static String INITIAL_LOAD_USE_ESTIMATED_COUNTS = "initial.load.use.estimated.counts";
public final static String INITIAL_LOAD_PURGE_STAGE_IMMEDIATE_THRESHOLD_ROWS = "initial.load.purge.stage.immediate.threshold.rows";

public final static String CREATE_TABLE_WITHOUT_DEFAULTS = "create.table.without.defaults";
public final static String CREATE_TABLE_WITHOUT_FOREIGN_KEYS = "create.table.without.foreign.keys";
@@ -22,7 +22,6 @@

import static org.apache.commons.lang.StringUtils.isNotBlank;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
@@ -44,7 +43,6 @@
import org.jumpmind.symmetric.model.NetworkedNode;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.TableReloadRequestKey;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.TriggerRouter;
@@ -55,9 +53,6 @@

public static final String ROUTER_TYPE = "configurationChanged";

final String CTX_KEY_TABLE_RELOAD_NEEDED = "Reload.Table."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

final String CTX_KEY_RESYNC_NEEDED = "Resync."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

@@ -117,7 +112,6 @@ public ConfigurationChangedDataRouter(ISymmetricEngine engine) {
this.engine = engine;
}

@SuppressWarnings("unchecked")
public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData dataMetaData,
Set<Node> possibleTargetNodes, boolean initialLoad, boolean initialLoadSelectUsed,
TriggerRouter triggerRouter) {
@@ -163,31 +157,14 @@ public ConfigurationChangedDataRouter(ISymmetricEngine engine) {
dataMetaData, possibleTargetNodes, initialLoad);
} else if (tableMatches(dataMetaData, TableConstants.SYM_TABLE_RELOAD_REQUEST)) {
String sourceNodeId = columnValues.get("SOURCE_NODE_ID");
String reloadEnabled = columnValues.get("RELOAD_ENABLED");
if (me.getNodeId().equals(sourceNodeId)) {
if ("1".equals(reloadEnabled)) {
List<TableReloadRequestKey> list = (List<TableReloadRequestKey>) routingContext
.get(CTX_KEY_TABLE_RELOAD_NEEDED);
if (list == null) {
list = new ArrayList<TableReloadRequestKey>();
routingContext.put(CTX_KEY_TABLE_RELOAD_NEEDED, list);
}

String targetNodeId = columnValues.get("TARGET_NODE_ID");
String routerId = columnValues.get("ROUTER_ID");
String triggerId = columnValues.get("TRIGGER_ID");

list.add(new TableReloadRequestKey(targetNodeId, sourceNodeId, triggerId,
routerId, dataMetaData.getData().getSourceNodeId()));
}
} else {
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
if (!Constants.DEPLOYMENT_TYPE_REST.equals(nodeThatMayBeRoutedTo
.getDeploymentType())
&& !nodeThatMayBeRoutedTo.requires13Compatiblity()
&& nodeThatMayBeRoutedTo.getNodeId().equals(sourceNodeId)) {
nodeIds.add(sourceNodeId);
}
String targetNodeId = columnValues.get("TARGET_NODE_ID");
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
if (!Constants.DEPLOYMENT_TYPE_REST.equals(nodeThatMayBeRoutedTo
.getDeploymentType())
&& !nodeThatMayBeRoutedTo.requires13Compatiblity()
&& (nodeThatMayBeRoutedTo.getNodeId().equals(sourceNodeId) ||
nodeThatMayBeRoutedTo.getNodeId().equals(targetNodeId))) {
nodeIds.add(nodeThatMayBeRoutedTo.getNodeId());
}
}
} else {
@@ -56,7 +56,9 @@
public List<TableReloadRequest> getTableReloadRequestToProcess(final String sourceNodeId);

public List<TableReloadRequest> getTableReloadRequestsByLoadId();


public long getTableReloadRequestRowCount(long loadId);

public void updateTableReloadRequestsCounts(ISqlTransaction transaction, long loadId, int batchCount, long rowsCount);

public void updateTableReloadRequestsLoadedCounts(ISqlTransaction transcation, long loadId, int batchCount, long rowsCount);
@@ -36,7 +36,6 @@
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.service.IAcknowledgeService;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.service.IRegistrationService;
import org.jumpmind.symmetric.statistic.RouterStats;
@@ -175,6 +174,7 @@ public BatchAckResult ack(final BatchAck batch) {
engine.getStatisticManager().incrementDataLoadedOutgoing(outgoingBatch.getChannelId(), outgoingBatch.getLoadRowCount());
engine.getStatisticManager().incrementDataBytesLoadedOutgoing(outgoingBatch.getChannelId(), outgoingBatch.getByteCount());
}
purgeLoadBatchesFromStaging(outgoingBatch);
Channel channel = engine.getConfigurationService().getChannel(outgoingBatch.getChannelId());
if (channel != null && channel.isFileSyncFlag()){
/* Acknowledge the file_sync in case the file needs deleted. */
@@ -191,6 +191,20 @@ public BatchAckResult ack(final BatchAck batch) {
return result;
}

protected void purgeLoadBatchesFromStaging(OutgoingBatch outgoingBatch) {
long threshold = parameterService.getLong(ParameterConstants.INITIAL_LOAD_PURGE_STAGE_IMMEDIATE_THRESHOLD_ROWS);
if (threshold >= 0 && outgoingBatch.isLoadFlag() && !outgoingBatch.isCommonFlag()) {
long count = engine.getDataService().getTableReloadRequestRowCount(outgoingBatch.getLoadId());
if (count > threshold) {
IStagedResource resource = engine.getStagingManager().find(Constants.STAGING_CATEGORY_OUTGOING,
outgoingBatch.getStagedLocation(), outgoingBatch.getBatchId());
if (resource != null) {
resource.delete();
}
}
}
}

public List<BatchAckResult> ack(List<BatchAck> batches) {

List<BatchAckResult> results = new ArrayList<BatchAckResult>();
@@ -41,8 +41,10 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -328,6 +330,7 @@ public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOExce
}

updateBatchToSendCount(remote, transport);
purgeLoadBatchesFromStaging(list);

} catch (RuntimeException e) {
transferInfo.setStatus(ProcessInfo.ProcessStatus.ERROR);
@@ -407,6 +410,7 @@ public void loadDataFromPush(Node sourceNode, String queue, InputStream in, Outp
transportManager.writeAcknowledgement(out, sourceNode, batchList, local,
security != null ? security.getNodePassword() : null);
transferInfo.setStatus(ProcessInfo.ProcessStatus.OK);
purgeLoadBatchesFromStaging(batchList);
} catch (Exception e) {
transferInfo.setStatus(ProcessInfo.ProcessStatus.ERROR);
if (e instanceof RuntimeException) {
@@ -449,6 +453,35 @@ private void logDataReceivedFromPush(Node sourceNode, List<IncomingBatch> batchL
}
}

protected void purgeLoadBatchesFromStaging(List<IncomingBatch> batchList) {
long threshold = parameterService.getLong(ParameterConstants.INITIAL_LOAD_PURGE_STAGE_IMMEDIATE_THRESHOLD_ROWS);
if (threshold >= 0 && batchList != null && batchList.size() > 0) {
Set<Long> loadIds = new HashSet<Long>();
for (IncomingBatch batch : batchList) {
if (batch.isLoadFlag() && !batch.isCommonFlag()) {
loadIds.add(batch.getLoadId());
}
}
if (loadIds.size() > 0) {
long count = 0;
for (long loadId : loadIds) {
count += engine.getDataService().getTableReloadRequestRowCount(loadId);
}
if (count > threshold) {
for (IncomingBatch batch : batchList) {
if (batch.isLoadFlag() && !batch.isCommonFlag()) {
IStagedResource resource = engine.getStagingManager().find(Constants.STAGING_CATEGORY_INCOMING,
batch.getStagedLocation(), batch.getBatchId());
if (resource != null) {
resource.delete();
}
}
}
}
}
}
}

public List<IncomingBatch> loadDataFromOfflineTransport(Node remote, RemoteNodeStatus status, IIncomingTransport transport) throws IOException {
Node local = nodeService.findIdentity();
ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(remote
@@ -367,7 +367,11 @@ public TableReloadRequest mapRow(Row rs) {
}
return collapsedRequests;
}


public long getTableReloadRequestRowCount(long loadId) {
return sqlTemplateDirty.queryForLong(getSql("countTableReloadRequestRowsByLoadId"), loadId);
}

public void updateTableReloadRequestsLoadedCounts(ISqlTransaction transaction, long loadId, int batchCount, long rowsCount) {
transaction.prepareAndExecute(getSql("updateTableReloadRequestLoadedCounts"),
new Object[] { batchCount, rowsCount, new Date(), loadId },
@@ -744,12 +748,10 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa
engine.getStatisticManager().incrementNodesLoaded(1);

if (reloadRequests != null && reloadRequests.size() > 0) {
for (TableReloadRequest request : reloadRequests) {
int rowsAffected = transaction.prepareAndExecute(getSql("updateProcessedTableReloadRequest"), new Date(), batchCount, loadId);
if (rowsAffected == 0) {
throw new SymmetricException(String.format("Failed to update a table_reload_request as processed for loadId '%s' ",
loadId));
}
int rowsAffected = transaction.prepareAndExecute(getSql("updateProcessedTableReloadRequest"), new Date(), batchCount, loadId);
if (rowsAffected == 0) {
throw new SymmetricException(String.format("Failed to update a table_reload_request as processed for loadId '%s' ",
loadId));
}
log.info("Table reload request(s) for load id " + loadId + " have been processed.");
}
@@ -63,6 +63,8 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
+ " where load_id = ? "
+ " order by processed, completed, last_update_time desc");

putSql("countTableReloadRequestRowsByLoadId", "select sum(row_count) from $(table_reload_request) where load_id = ?");

putSql("updateProcessedTableReloadRequest", "update $(table_reload_request) set last_update_time = ?, batch_count = ?, processed = 1 where load_id = ?");

putSql("updateTableReloadRequestLoadId", "update $(table_reload_request) set load_id = ?, table_count = ?, last_update_time = ?, batch_count = 0, row_count = 0 where target_node_id = ? and source_node_id = ? and trigger_id = ? and router_id = ? and create_time = ?");
@@ -724,6 +724,14 @@ initial.load.schema.load.command=
# Type: boolean
initial.load.use.estimated.counts=true

# If the number of rows in the load request is greater than or equal to this threshold,
# it will immediately purge the staging file after each batch is successfully loaded.
# Set this to -1 to disable and keep the staging files.
#
# DatabaseOverridable: true
# Tags: load
initial.load.purge.stage.immediate.threshold.rows=5000

# If this is true, registration is opened automatically for nodes requesting it.
#
# DatabaseOverridable: true

0 comments on commit c8f47de

Please sign in to comment.
You can’t perform that action at this time.