Skip to content

Commit

Permalink
0005827: Track expired data gaps and repair any data missed by routing
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed May 9, 2023
1 parent d969628 commit 005e631
Show file tree
Hide file tree
Showing 18 changed files with 740 additions and 195 deletions.
Expand Up @@ -302,7 +302,7 @@ protected void init() {
this.statisticManager = createStatisticManager();
this.concurrentConnectionManager = new ConcurrentConnectionManager(parameterService,
statisticManager);
this.purgeService = new PurgeService(parameterService, symmetricDialect, clusterService,
this.purgeService = new PurgeService(parameterService, symmetricDialect, clusterService, dataService, sequenceService,
statisticManager, extensionService, contextService);
this.transformService = new TransformService(this, symmetricDialect);
this.loadFilterService = new LoadFilterService(this, symmetricDialect);
Expand Down
Expand Up @@ -311,10 +311,12 @@ private ParameterConstants() {
public final static String PURGE_REGISTRATION_REQUEST_RETENTION_MINUTES = "purge.registration.request.retention.minutes";
public final static String PURGE_STATS_RETENTION_MINUTES = "purge.stats.retention.minutes";
public final static String PURGE_TRIGGER_HIST_RETENTION_MINUTES = "purge.trigger.hist.retention.minutes";
public final static String PURGE_EXPIRED_DATA_GAP_RETENTION_MINUTES = "purge.expired.data.gap.retention.minutes";
public final static String PURGE_MAX_NUMBER_OF_DATA_IDS = "job.purge.max.num.data.to.delete.in.tx";
public final static String PURGE_MAX_NUMBER_OF_BATCH_IDS = "job.purge.max.num.batches.to.delete.in.tx";
public final static String PURGE_MAX_NUMBER_OF_EVENT_BATCH_IDS = "job.purge.max.num.data.event.batches.to.delete.in.tx";
public final static String PURGE_MAX_LINGERING_BATCHES_READ = "job.purge.max.lingering.batches.read";
public final static String PURGE_MAX_EXPIRED_DATA_GAPS_READ = "job.purge.max.data.gaps.read";
public final static String PURGE_FIRST_PASS = "job.purge.first.pass";
public final static String PURGE_FIRST_PASS_OUTSTANDING_BATCHES_THRESHOLD = "job.purge.first.pass.outstanding.batches.threshold";
public final static String JMX_LINE_FEED = "jmx.line.feed";
Expand Down
Expand Up @@ -71,6 +71,7 @@ public class DataGapFastDetector extends DataGapDetector implements ISqlRowMappe
protected Set<DataGap> gapsAll;
protected Set<DataGap> gapsAdded;
protected Set<DataGap> gapsDeleted;
protected Set<DataGap> gapsExpired;
protected boolean detectInvalidGaps;
protected boolean useInMemoryGaps;
protected long routingStartTime;
Expand Down Expand Up @@ -133,6 +134,7 @@ protected void reset() {
gapsAll = new HashSet<DataGap>();
gapsAdded = new HashSet<DataGap>();
gapsDeleted = new HashSet<DataGap>();
gapsExpired = new HashSet<DataGap>();
routingStartTime = System.currentTimeMillis();
supportsTransactionViews = symmetricDialect.supportsTransactionViews();
earliestTransactionTime = 0;
Expand Down Expand Up @@ -176,7 +178,6 @@ public void afterRouting() {
} else if (lastBusyExpireRunTime != 0) {
setLastBusyExpireRunTime(0);
}
List<DataGap> skippedDataGaps = new ArrayList<>();
try {
long ts = System.currentTimeMillis();
long lastDataId = -1;
Expand Down Expand Up @@ -218,8 +219,7 @@ public void afterRouting() {
expireChecked++;
}
if (isAllDataRead || isGapEmpty) {
skippedDataGaps.add(dataGap);
gapsDeleted.add(dataGap);
gapsExpired.add(dataGap);
gapsAll.remove(dataGap);
}
}
Expand Down Expand Up @@ -270,7 +270,7 @@ public void afterRouting() {
processInfo.setStatus(ProcessStatus.ERROR);
throw ex;
} finally {
logSkippedDataGaps(skippedDataGaps);
logExpiredDataGaps();
}
}

Expand Down Expand Up @@ -308,7 +308,7 @@ private void printGapState() {

protected void saveDataGaps() {
ISqlTemplate sqlTemplate = symmetricDialect.getPlatform().getSqlTemplate();
int totalGapChanges = gapsDeleted.size() + gapsAdded.size();
int totalGapChanges = gapsDeleted.size() + gapsAdded.size() + gapsExpired.size();
if (totalGapChanges > 0) {
ISqlTransaction transaction = null;
gaps = new ArrayList<DataGap>(gapsAll);
Expand Down Expand Up @@ -336,6 +336,7 @@ protected void saveDataGaps() {
dataService.deleteDataGaps(transaction, gapsDeleted);
dataService.insertDataGaps(transaction, gapsAdded);
}
dataService.expireDataGaps(transaction, gapsExpired);
transaction.commit();
} catch (Error ex) {
if (transaction != null) {
Expand Down Expand Up @@ -467,12 +468,12 @@ protected void fixOverlappingGaps(List<DataGap> gapsToCheck, ProcessInfo process
}
}

protected void logSkippedDataGaps(List<DataGap> skippedDataGaps) {
if (skippedDataGaps.isEmpty()) {
protected void logExpiredDataGaps() {
if (gapsExpired.isEmpty()) {
return;
}
if (log.isDebugEnabled()) {
for (DataGap dataGap : skippedDataGaps) {
for (DataGap dataGap : gapsExpired) {
if (dataGap.getStartId() == dataGap.getEndId()) {
log.debug("Expired data gap at data_id {} create_time {}. Skipping it because " +
(supportsTransactionViews ? "there are no pending transactions" : "the gap expired"), dataGap.getStartId(), dataGap
Expand All @@ -485,22 +486,22 @@ protected void logSkippedDataGaps(List<DataGap> skippedDataGaps) {
}
return;
}
Date minDate = skippedDataGaps.get(0).getCreateTime();
Date maxDate = skippedDataGaps.get(0).getCreateTime();
long minDataId = skippedDataGaps.get(0).getStartId();
long maxDataId = skippedDataGaps.get(0).getEndId();
for (DataGap dataGap : skippedDataGaps) {
if (dataGap.getCreateTime().before(minDate)) {
Date minDate = null;
Date maxDate = null;
long minDataId = Long.MAX_VALUE;
long maxDataId = 0;
for (DataGap dataGap : gapsExpired) {
if (minDate == null || dataGap.getCreateTime().before(minDate)) {
minDate = dataGap.getCreateTime();
}
if (dataGap.getCreateTime().after(maxDate)) {
if (maxDate == null || dataGap.getCreateTime().after(maxDate)) {
maxDate = dataGap.getCreateTime();
}
minDataId = Math.min(minDataId, dataGap.getStartId());
maxDataId = Math.max(maxDataId, dataGap.getEndId());
}
log.info("Expired {} data gap(s) between data_id {} and {} and between create_time {} and {}",
skippedDataGaps.size(), minDataId, maxDataId, minDate, maxDate);
gapsExpired.size(), minDataId, maxDataId, minDate, maxDate);
}

private List<Long> getOracleNextValues() {
Expand Down
Expand Up @@ -197,6 +197,8 @@ public void insertCreateEvent(ISqlTransaction transaction, Node targetNode, Trig

public List<DataGap> findDataGapsUnchecked();

public List<DataGap> findDataGapsExpired();

public List<DataGap> findDataGaps();

public Date findCreateTimeOfEvent(long dataId);
Expand Down Expand Up @@ -231,6 +233,8 @@ public void insertCreateEvent(ISqlTransaction transaction, Node targetNode, Trig

public void deleteDataGap(DataGap gap);

public void expireDataGaps(ISqlTransaction transaction, Collection<DataGap> gaps);

public void deleteCapturedConfigChannelData();

public boolean fixLastDataGap();
Expand All @@ -250,4 +254,8 @@ public void insertCreateEvent(ISqlTransaction transaction, Node targetNode, Trig
public Map<String, Date> getLastDataCaptureByChannel();

public String findNodeIdsByNodeGroupId();

public int resendBatchAsReload(long batchId, String nodeId);

public int resendDataAsReload(long minDataId, long maxDataId);
}
Expand Up @@ -47,6 +47,8 @@
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.DatabaseInfo;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.DmlStatement;
import org.jumpmind.db.sql.DmlStatement.DmlType;
import org.jumpmind.db.sql.ISqlReadCursor;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.ISqlTransaction;
Expand All @@ -55,6 +57,7 @@
import org.jumpmind.db.sql.SqlException;
import org.jumpmind.db.sql.UniqueKeyException;
import org.jumpmind.db.sql.mapper.NumberMapper;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.db.util.TableRow;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.ISymmetricEngine;
Expand Down Expand Up @@ -2747,12 +2750,20 @@ public long countDataGaps() {
}

public List<DataGap> findDataGapsUnchecked() {
return findDataGaps(false);
}

public List<DataGap> findDataGapsExpired() {
return findDataGaps(true);
}

protected List<DataGap> findDataGaps(boolean isExpired) {
return sqlTemplate.query(getSql("findDataGapsSql"), new ISqlRowMapper<DataGap>() {
public DataGap mapRow(Row rs) {
return new DataGap(rs.getLong("start_id"), rs.getLong("end_id"), rs
.getDateTime("create_time"));
}
});
}, isExpired ? 1 : 0);
}

public List<DataGap> findDataGaps() {
Expand Down Expand Up @@ -2902,10 +2913,35 @@ public void deleteDataGaps(ISqlTransaction transaction, Collection<DataGap> gaps
}
}

@Override
public void deleteAllDataGaps(ISqlTransaction transaction) {
transaction.prepareAndExecute(getSql("deleteAllDataGapsSql"));
}

@Override
public void expireDataGaps(ISqlTransaction transaction, Collection<DataGap> gaps) {
if (gaps.size() > 0) {
int[] types = new int[] { symmetricDialect.getSqlTypeForIds(), symmetricDialect.getSqlTypeForIds() };
int maxRowsToFlush = engine.getParameterService().getInt(ParameterConstants.ROUTING_FLUSH_JDBC_BATCH_SIZE);
long ts = System.currentTimeMillis();
int flushCount = 0, totalCount = 0;
transaction.setInBatchMode(true);
transaction.prepare(getSql("expireDataGapSql"));
for (DataGap gap : gaps) {
transaction.addRow(gap, new Object[] { gap.getStartId(), gap.getEndId() }, types);
if (++flushCount >= maxRowsToFlush) {
transaction.flush();
flushCount = 0;
}
if (System.currentTimeMillis() - ts > 30000) {
log.info("Expired {} of {} gaps", totalCount, gaps.size());
ts = System.currentTimeMillis();
}
}
transaction.flush();
}
}

public Date findCreateTimeOfEvent(long dataId) {
return sqlTemplate.queryForObject(getSql("findDataEventCreateTimeSql"), Date.class, dataId);
}
Expand Down Expand Up @@ -2991,6 +3027,10 @@ public Data findData(long dataId) {
return sqlTemplateDirty.queryForObject(getSql("selectData", "whereDataId"), new DataMapper(), dataId);
}

public List<Data> findData(long startDataId, long endDataId) {
return sqlTemplateDirty.query(getSql("selectData", "whereDataIdBetween"), new DataMapper(), startDataId, endDataId);
}

public ISqlRowMapper<Data> getDataMapper() {
return new DataMapper();
}
Expand Down Expand Up @@ -3231,6 +3271,109 @@ private TriggerHistory findOrCreateTriggerHistory(String tableName, int triggerH
}
}

public int resendBatchAsReload(long batchId, String nodeId) {
List<Data> dataList = new ArrayList<Data>();
log.info("Resending as reload for batch {}-{}", nodeId, batchId);
ISqlReadCursor<Data> cursor = selectDataFor(batchId, nodeId, true);
try {
Data data = null;
while ((data = cursor.next()) != null) {
dataList.add(data);
}
} finally {
cursor.close();
}
resendDataAsReload(dataList, nodeId);
return dataList.size();
}

public int resendDataAsReload(long minDataId, long maxDataId) {
List<Data> dataList = findData(minDataId, maxDataId);
if (dataList.size() > 0) {
resendDataAsReload(dataList, null);
}
return dataList.size();
}

protected void resendDataAsReload(List<Data> dataList, String nodeId) {
IDataService dataService = engine.getDataService();
IDatabasePlatform platform = engine.getDatabasePlatform();
for (Data data : dataList) {
TriggerHistory hist = data.getTriggerHistory();
Table table = platform.getTableFromCache(hist.getSourceCatalogName(), hist.getSourceSchemaName(), hist.getSourceTableName(), false);
if (table != null) {
table = table.copyAndFilterColumns(hist.getParsedColumnNames(), hist.getParsedPkColumnNames(), true);
if (data.getDataEventType() == DataEventType.INSERT || data.getDataEventType() == DataEventType.UPDATE) {
convertDataToReload(data, table, hist, nodeId);
} else if (data.getDataEventType() == DataEventType.DELETE) {
String[] pkData = data.getParsedData(CsvData.PK_DATA);
Object[] values = platform.getObjectValues(engine.getSymmetricDialect().getBinaryEncoding(), pkData, table.getPrimaryKeyColumns());
DmlStatement st = platform.createDmlStatement(DmlType.COUNT, table.getCatalog(), table.getSchema(),
table.getName(), table.getPrimaryKeyColumns(), table.getPrimaryKeyColumns(), DmlStatement.getNullKeyValues(values), null);
int count = platform.getSqlTemplateDirty().queryForInt(st.getSql(), values);
if (count > 0) {
convertDataToReload(data, table, hist, nodeId);
} else {
data.setNodeList(nodeId);
data.setExternalData(null);
data.setPreRouted(false);
data.setChannelId(Constants.CHANNEL_RELOAD);
}
}
}
}
ISqlTransaction transaction = null;
try {
transaction = engine.getSqlTemplate().startSqlTransaction();
for (Data data : dataList) {
dataService.insertData(transaction, data);
}
transaction.commit();
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
if (transaction != null) {
transaction.close();
}
}
}

protected void convertDataToReload(Data data, Table table, TriggerHistory hist, String nodeId) {
IDatabasePlatform platform = engine.getDatabasePlatform();
BinaryEncoding encoding = engine.getSymmetricDialect().getBinaryEncoding();
String[] pkNames = hist.getParsedPkColumnNames();
String[] pkData = null;
if (data.getDataEventType() == DataEventType.INSERT) {
pkData = ArrayUtils.subarray(data.getParsedData(CsvData.ROW_DATA), 0, pkNames.length);
} else {
pkData = data.getParsedData(CsvData.PK_DATA);
}
DmlStatement st = platform.createDmlStatement(DmlType.WHERE, table.getCatalog(), table.getSchema(),
table.getName(), table.getPrimaryKeyColumns(), table.getPrimaryKeyColumns(), DmlStatement.getNullKeyValues(pkData), null);
Row row = new Row(pkNames.length);
Object[] values = platform.getObjectValues(encoding, pkData, table.getPrimaryKeyColumns());
for (int i = 0; i < pkNames.length; i++) {
row.put(pkNames[i], values[i]);
}
String where = st.buildDynamicSql(encoding, row, false, false);
data.setPkData(null);
data.setRowData(where);
data.setOldData(null);
data.setNodeList(nodeId);
data.setExternalData(null);
data.setPreRouted(false);
data.setDataEventType(DataEventType.RELOAD);
data.setChannelId(Constants.CHANNEL_RELOAD);
}

public static class LastCaptureByChannelMapper implements ISqlRowMapper<String> {
private Map<String, Date> captureMap;

Expand Down
Expand Up @@ -217,6 +217,7 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
"create_time, trigger_hist_id, channel_id, transaction_id, source_node_id, external_data, node_list, '' as router_id, is_prerouted " +
"from $(data) ");
putSql("whereDataId", "where data_id = ?");
putSql("whereDataIdBetween", "where data_id between ? and ? order by data_id");
putSql("whereNewerData", "where table_name = ? and ((event_type = 'I' and row_data like ?) or " +
"(event_type in ('U', 'D') and pk_data like ?)) and create_time >= ? order by create_time desc");
putSql("selectMaxDataEventDataIdSql", ""
Expand All @@ -235,14 +236,15 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
+ "select create_time from $(data) where data_id=? ");
putSql("findMinDataSql", ""
+ "select min(data_id) from $(data) where data_id >= ?");
putSql("countDataGapsSql", "select count(*) from $(data_gap)");
putSql("countDataGapsSql", "select count(*) from $(data_gap) where is_expired = 0");
putSql("findDataGapsSql",
"select start_id, end_id, create_time from $(data_gap) order by start_id asc");
"select start_id, end_id, create_time from $(data_gap) where is_expired = ? order by start_id asc");
putSql("insertDataGapSql",
"insert into $(data_gap) (last_update_hostname, start_id, end_id, create_time) values(?, ?, ?, ?)");
putSql("deleteDataGapSql",
"delete from $(data_gap) where start_id=? and end_id=? ");
putSql("deleteAllDataGapsSql", "delete from $(data_gap)");
putSql("deleteAllDataGapsSql", "delete from $(data_gap) where is_expired = 0");
putSql("expireDataGapSql", "update $(data_gap) set is_expired = 1 where start_id = ? and end_id = ?");
putSql("selectMaxDataIdSql", "select max(data_id) from $(data) ");
putSql("selectMinDataIdSql", "select min(data_id) from $(data) ");
putSql("deleteCapturedConfigChannelDataSql", "delete from $(data) where channel_id='config'");
Expand Down

0 comments on commit 005e631

Please sign in to comment.