Skip to content

Commit

Permalink
0004227: Data gap detector should use batch mode to insert and delete
Browse files Browse the repository at this point in the history
sym_data_gap
  • Loading branch information
erilong committed Jan 3, 2020
1 parent f705488 commit f3bdd93
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 405 deletions.
Expand Up @@ -21,7 +21,6 @@
package org.jumpmind.symmetric.route;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -292,7 +291,7 @@ public void afterRouting() {
}
}

printStats = saveDataGaps(ts, printStats);
saveDataGaps();
if (gaps.size() > 0) {
lastGap = gaps.get(gaps.size() - 1);
}
Expand Down Expand Up @@ -348,7 +347,7 @@ private void printGapState() {
log.info(buff.toString());
}

protected long saveDataGaps(long ts, long printStats) {
protected void saveDataGaps() {
ISqlTemplate sqlTemplate = symmetricDialect.getPlatform().getSqlTemplate();
int totalGapChanges = gapsDeleted.size() + gapsAdded.size();
if (totalGapChanges > 0) {
Expand All @@ -364,7 +363,7 @@ protected long saveDataGaps(long ts, long printStats) {
log.info("There are {} data gap changes, which is within the max of {}, so switching to database",
totalGapChanges, maxGapChanges);
useInMemoryGaps = false;
printStats = insertDataGaps(transaction, ts, printStats, gaps);
dataService.insertDataGaps(transaction, gaps);
} else {
if (!useInMemoryGaps) {
log.info("There are {} data gap changes, which exceeds the max of {}, so switching to in-memory",
Expand All @@ -375,8 +374,8 @@ protected long saveDataGaps(long ts, long printStats) {
dataService.insertDataGap(transaction, newGap);
}
} else {
printStats = deleteDataGaps(transaction, ts, printStats);
printStats = insertDataGaps(transaction, ts, printStats, gapsAdded);
dataService.deleteDataGaps(transaction, gapsDeleted);
dataService.insertDataGaps(transaction, gapsAdded);
}
transaction.commit();
} catch (Error ex) {
Expand All @@ -395,35 +394,6 @@ protected long saveDataGaps(long ts, long printStats) {
}
}
}
return printStats;
}

protected long deleteDataGaps(ISqlTransaction transaction, long ts, long printStats) {
int counter = 0;
for (DataGap dataGap : gapsDeleted) {
dataService.deleteDataGap(transaction, dataGap);
counter++;
if (System.currentTimeMillis() - printStats > 30000) {
log.info("The data gap detection has been running for {}ms, deleted {} of {} old gaps", new Object[] {
System.currentTimeMillis() - ts, counter, gapsDeleted.size() });
printStats = System.currentTimeMillis();
}
}
return printStats;
}

protected long insertDataGaps(ISqlTransaction transaction, long ts, long printStats, Collection<DataGap> argGaps) {
int counter = 0;
for (DataGap dataGap : argGaps) {
dataService.insertDataGap(transaction, dataGap);
counter++;
if (System.currentTimeMillis() - printStats > 30000) {
log.info("The data gap detection has been running for {}ms, inserted {} of {} new gaps", new Object[] {
System.currentTimeMillis() - ts, counter, gapsDeleted.size() });
printStats = System.currentTimeMillis();
}
}
return printStats;
}

protected void queryDataIdMap() {
Expand Down
Expand Up @@ -20,6 +20,7 @@
*/
package org.jumpmind.symmetric.service;

import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -183,9 +184,13 @@ public void insertScriptEvent(ISqlTransaction transaction, String channelId,
public void insertDataGap(DataGap gap);

public void insertDataGap(ISqlTransaction transaction, DataGap gap);

public void insertDataGaps(ISqlTransaction transaction, Collection<DataGap> gaps);

public void deleteDataGap(ISqlTransaction transaction, DataGap gap);

public void deleteDataGaps(ISqlTransaction transaction, Collection<DataGap> gaps);

public void deleteAllDataGaps(ISqlTransaction transaction);

public void deleteDataGap(DataGap gap);
Expand Down
Expand Up @@ -28,6 +28,7 @@
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -2717,7 +2718,33 @@ public void insertDataGap(ISqlTransaction transaction, DataGap gap) {
gap.getCreateTime() }, new int[] {
Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP });
}


@Override
public void insertDataGaps(ISqlTransaction transaction, Collection<DataGap> gaps) {
if (gaps.size() > 0) {
int[] types = new int[] { Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP };
int maxRowsToFlush = engine.getParameterService().getInt(ParameterConstants.ROUTING_FLUSH_JDBC_BATCH_SIZE);
long ts = System.currentTimeMillis();
int flushCount = 0, totalCount = 0;
transaction.setInBatchMode(true);
transaction.prepare(getSql("insertDataGapSql"));
for (DataGap gap : gaps) {
transaction.addRow(gap, new Object[] { engine.getClusterService().getServerId(), gap.getStartId(), gap.getEndId(),
gap.getCreateTime() }, types);
totalCount++;
if (++flushCount >= maxRowsToFlush) {
transaction.flush();
flushCount = 0;
}
if (System.currentTimeMillis() - ts > 30000) {
log.info("Inserted {} of {} new gaps", totalCount, gaps.size());
ts = System.currentTimeMillis();
}
}
transaction.flush();
}
}

@Override
public void deleteDataGap(DataGap gap) {
ISqlTransaction transaction = null;
Expand Down Expand Up @@ -2753,6 +2780,30 @@ public void deleteDataGap(ISqlTransaction transaction, DataGap gap) {
}
}

@Override
public void deleteDataGaps(ISqlTransaction transaction, Collection<DataGap> gaps) {
if (gaps.size() > 0) {
int[] types = new int[] { symmetricDialect.getSqlTypeForIds(), symmetricDialect.getSqlTypeForIds() };
int maxRowsToFlush = engine.getParameterService().getInt(ParameterConstants.ROUTING_FLUSH_JDBC_BATCH_SIZE);
long ts = System.currentTimeMillis();
int flushCount = 0, totalCount = 0;
transaction.setInBatchMode(true);
transaction.prepare(getSql("deleteDataGapSql"));
for (DataGap gap : gaps) {
transaction.addRow(gap, new Object[] { gap.getStartId(), gap.getEndId() }, types);
if (++flushCount >= maxRowsToFlush) {
transaction.flush();
flushCount = 0;
}
if (System.currentTimeMillis() - ts > 30000) {
log.info("Deleted {} of {} old gaps", totalCount, gaps.size());
ts = System.currentTimeMillis();
}
}
transaction.flush();
}
}

public void deleteAllDataGaps(ISqlTransaction transaction) {
transaction.prepareAndExecute(getSql("deleteAllDataGapsSql"));
}
Expand Down

0 comments on commit f3bdd93

Please sign in to comment.