Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
0004005: Initial load batches are not purging immediately on incoming
side
  • Loading branch information
erilong committed Jun 13, 2019
1 parent 7c2f34f commit 7561811
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 35 deletions.
Expand Up @@ -63,8 +63,6 @@ public interface IDataService {

public TableReloadStatus getTableReloadStatusByLoadId(long loadId);

public long getTableReloadStatusRowCount(long loadId);

public void updateTableReloadStatusDataLoaded(ISqlTransaction transcation, long loadId, long batchId, int batchCount);

public int updateTableReloadRequestsCancelled(long loadId);
Expand Down
Expand Up @@ -205,8 +205,7 @@ public BatchAckResult ack(final BatchAck batch) {
protected void purgeLoadBatchesFromStaging(OutgoingBatch outgoingBatch) {
long threshold = parameterService.getLong(ParameterConstants.INITIAL_LOAD_PURGE_STAGE_IMMEDIATE_THRESHOLD_ROWS);
if (threshold >= 0 && outgoingBatch.isLoadFlag() && !outgoingBatch.isCommonFlag()) {
long count = engine.getDataService().getTableReloadStatusRowCount(outgoingBatch.getLoadId());
if (count > threshold) {
if (outgoingBatch.getDataRowCount() > threshold) {
IStagedResource resource = engine.getStagingManager().find(Constants.STAGING_CATEGORY_OUTGOING,
outgoingBatch.getStagedLocation(), outgoingBatch.getBatchId());
if (resource != null) {
Expand Down
Expand Up @@ -41,10 +41,8 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -457,27 +455,13 @@ private void logDataReceivedFromPush(Node sourceNode, List<IncomingBatch> batchL

protected void purgeLoadBatchesFromStaging(List<IncomingBatch> batchList) {
long threshold = parameterService.getLong(ParameterConstants.INITIAL_LOAD_PURGE_STAGE_IMMEDIATE_THRESHOLD_ROWS);
if (threshold >= 0 && batchList != null && batchList.size() > 0) {
Set<Long> loadIds = new HashSet<Long>();
if (threshold >= 0 && batchList != null) {
for (IncomingBatch batch : batchList) {
if (batch.isLoadFlag() && !batch.isCommonFlag()) {
loadIds.add(batch.getLoadId());
}
}
if (loadIds.size() > 0) {
long count = 0;
for (long loadId : loadIds) {
count += engine.getDataService().getTableReloadStatusRowCount(loadId);
}
if (count > threshold) {
for (IncomingBatch batch : batchList) {
if (batch.isLoadFlag() && !batch.isCommonFlag()) {
IStagedResource resource = engine.getStagingManager().find(Constants.STAGING_CATEGORY_INCOMING,
batch.getStagedLocation(), batch.getBatchId());
if (resource != null) {
resource.delete();
}
}
if (batch.isLoadFlag() && !batch.isCommonFlag() && batch.getDataRowCount() >= threshold) {
IStagedResource resource = engine.getStagingManager().find(Constants.STAGING_CATEGORY_INCOMING,
batch.getStagedLocation(), batch.getBatchId());
if (resource != null) {
resource.delete();
}
}
}
Expand Down
Expand Up @@ -374,10 +374,6 @@ public List<TableReloadRequest> collapseTableReloadRequestsByLoadId(List<TableRe
return collapsedRequests;
}

public long getTableReloadStatusRowCount(long loadId) {
return sqlTemplateDirty.queryForLong(getSql("countTableReloadStatusRowsByLoadId"), loadId);
}

public void updateTableReloadStatusDataLoaded(ISqlTransaction transaction, long loadId, long batchId, int batchCount) {
int idType = symmetricDialect.getSqlTypeForIds();
transaction.prepareAndExecute(getSql("updateTableReloadStatusDataLoaded"),
Expand Down
Expand Up @@ -90,8 +90,6 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
+ " from $(table_reload_status) "
+ " where load_id = ?");

putSql("countTableReloadStatusRowsByLoadId", "select sum(rows_count) from $(table_reload_status) where load_id = ?");

putSql("updateProcessedTableReloadRequest", "update $(table_reload_request) set last_update_time = ?, processed = 1 where load_id = ?");

putSql("updateTableReloadRequestLoadId", "update $(table_reload_request) set load_id = ?, last_update_time = ? where target_node_id = ? and source_node_id = ? and trigger_id = ? and router_id = ? and create_time = ?");
Expand Down
Expand Up @@ -756,14 +756,14 @@ initial.load.schema.load.command=
# Type: boolean
initial.load.use.estimated.counts=true

# If the number of rows in the load request is greater than or equal to this threshold,
# it will immediately purge the staging file after each batch is successfully loaded.
# If the number of rows in a reload batch is greater than or equal to this threshold,
# it will immediately purge the staging file after it is successfully loaded.
# Set this to -1 to disable and keep the staging files.
#
# DatabaseOverridable: true
# Tags: load
# Type: integer
initial.load.purge.stage.immediate.threshold.rows=5000
initial.load.purge.stage.immediate.threshold.rows=100

# If tables are created as part of the initial load, it will defer the creation
# of foreign keys and indexes to improve performance.
Expand Down

0 comments on commit 7561811

Please sign in to comment.