Skip to content

Commit

Permalink
0001139: Expose information about the processes that are currently ru…
Browse files Browse the repository at this point in the history
…nning via an api to be used to inspect what is going on in an engine
  • Loading branch information
chenson42 committed Mar 28, 2013
1 parent 9b7dbee commit 10d2755
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 214 deletions.
Expand Up @@ -707,10 +707,6 @@ public String preProcessTriggerSqlClause(String sqlClause) {
return sqlClause;
}

public int getRouterDataPeekAheadCount() {
return parameterService.getInt(ParameterConstants.ROUTING_PEEK_AHEAD_WINDOW);
}

public void truncateTable(String tableName) {
String quote = platform.getDdlBuilder().isDelimitedIdentifierModeOn() ? platform
.getDatabaseInfo().getDelimiterToken() : "";
Expand Down
Expand Up @@ -141,14 +141,7 @@ public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String sc
public long insertWithGeneratedKey(final String sql, final SequenceIdentifier identifier, Object... args);

@Deprecated
public Column[] orderColumns(String[] columnNames, Table table);

/*
* Get the max number of data objects to load before processing. This parameter typically comes
* from the {@link ParameterConstants#ROUTING_PEEK_AHEAD_WINDOW} parameter, unless the dialect chooses
* to override how it is retrieved.
*/
public int getRouterDataPeekAheadCount();
public Column[] orderColumns(String[] columnNames, Table table);

public boolean supportsOpenCursorsAcrossCommit();

Expand Down
Expand Up @@ -30,7 +30,7 @@ public class ProcessInfo implements Serializable, Comparable<ProcessInfo> {
private static final long serialVersionUID = 1L;

public static enum Status {
NEW, EXTRACTING, LOADING, TRANSFERRING, ACKING, DONE, ERROR
NEW, QUERYING, EXTRACTING, LOADING, TRANSFERRING, ACKING, PROCESSING, DONE, ERROR
};

private ProcessInfoKey key;
Expand Down
Expand Up @@ -27,7 +27,7 @@ public class ProcessInfoKey implements Serializable {
private static final long serialVersionUID = 1L;

public enum ProcessType {
MANUAL_LOAD, PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, ROUTER_JOB, ROUTER_READER, OUTGOING_PURGE_JOB, INCOMING_PURGE_JOB, TEST
MANUAL_LOAD, PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, ROUTER_JOB, GAP_DETECT, ROUTER_READER, OUTGOING_PURGE_JOB, INCOMING_PURGE_JOB, TEST
};

private String sourceNodeId;
Expand Down
Expand Up @@ -30,147 +30,175 @@
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.model.DataGap;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessInfo.Status;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IRouterService;
import org.jumpmind.symmetric.statistic.IStatisticManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Responsible for managing gaps in data ids to ensure that all captured data is
* routed for delivery to other nodes.
*/
public class DataGapDetector implements IDataToRouteGapDetector {
public class DataGapDetector {

private static final Logger log = LoggerFactory.getLogger(DataGapDetector.class);

private IDataService dataService;
protected IDataService dataService;

private IParameterService parameterService;
protected IParameterService parameterService;

private ISymmetricDialect symmetricDialect;
protected ISymmetricDialect symmetricDialect;

private IRouterService routerService;
protected IRouterService routerService;

protected IStatisticManager statisticManager;

protected INodeService nodeService;

public DataGapDetector() {
}

public DataGapDetector(IDataService dataService, IParameterService parameterService,
ISymmetricDialect symmetricDialect, IRouterService routerService) {
ISymmetricDialect symmetricDialect, IRouterService routerService, IStatisticManager statisticManager, INodeService nodeService) {
this.dataService = dataService;
this.parameterService = parameterService;
this.routerService = routerService;
this.symmetricDialect = symmetricDialect;
this.statisticManager = statisticManager;
this.nodeService = nodeService;
}

/**
* Always make sure sym_data_gap is up to date to make sure that we don't
* dual route data.
*/
public void beforeRouting() {
boolean deleteImmediately = isDeleteFilledGapsImmediately();
long ts = System.currentTimeMillis();
final List<DataGap> gaps = removeAbandonedGaps(dataService.findDataGaps());
long lastDataId = -1;
final int dataIdIncrementBy = parameterService
.getInt(ParameterConstants.DATA_ID_INCREMENT_BY);
final long maxDataToSelect = parameterService
.getInt(ParameterConstants.ROUTING_LARGEST_GAP_SIZE);
for (final DataGap dataGap : gaps) {
final boolean lastGap = dataGap.equals(gaps.get(gaps.size() - 1));
String sql = routerService.getSql("selectDistinctDataIdFromDataEventUsingGapsSql");
ISqlTemplate sqlTemplate = symmetricDialect.getPlatform().getSqlTemplate();
Object[] params = new Object[] { dataGap.getStartId(), dataGap.getEndId() };
lastDataId = -1;
List<Number> ids = sqlTemplate.query(sql, new NumberMapper(), params);
for (Number number : ids) {
long dataId = number.longValue();
if (lastDataId == -1 && dataGap.getStartId() + dataIdIncrementBy <= dataId) {
// there was a new gap at the start
dataService.insertDataGap(new DataGap(dataGap.getStartId(), dataId - 1));
} else if (lastDataId != -1 && lastDataId + dataIdIncrementBy != dataId
&& lastDataId != dataId) {
// found a gap somewhere in the existing gap
dataService.insertDataGap(new DataGap(lastDataId + 1, dataId - 1));
}
lastDataId = dataId;
}

// if we found data in the gap
if (lastDataId != -1) {
if (!lastGap && lastDataId + dataIdIncrementBy <= dataGap.getEndId()) {
dataService.insertDataGap(new DataGap(lastDataId + dataIdIncrementBy, dataGap
.getEndId()));
ProcessInfo processInfo = this.statisticManager.newProcessInfo(new ProcessInfoKey(
nodeService.findIdentityNodeId(), null, ProcessType.GAP_DETECT));
try {
boolean deleteImmediately = isDeleteFilledGapsImmediately();
long ts = System.currentTimeMillis();
final List<DataGap> gaps = removeAbandonedGaps(dataService.findDataGaps());
long lastDataId = -1;
final int dataIdIncrementBy = parameterService
.getInt(ParameterConstants.DATA_ID_INCREMENT_BY);
final long maxDataToSelect = parameterService
.getInt(ParameterConstants.ROUTING_LARGEST_GAP_SIZE);
for (final DataGap dataGap : gaps) {
final boolean lastGap = dataGap.equals(gaps.get(gaps.size() - 1));
String sql = routerService.getSql("selectDistinctDataIdFromDataEventUsingGapsSql");
ISqlTemplate sqlTemplate = symmetricDialect.getPlatform().getSqlTemplate();
Object[] params = new Object[] { dataGap.getStartId(), dataGap.getEndId() };
lastDataId = -1;
processInfo.setStatus(Status.QUERYING);
List<Number> ids = sqlTemplate.query(sql, new NumberMapper(), params);
processInfo.setStatus(Status.PROCESSING);
for (Number number : ids) {
long dataId = number.longValue();
processInfo.incrementDataCount();
if (lastDataId == -1 && dataGap.getStartId() + dataIdIncrementBy <= dataId) {
// there was a new gap at the start
dataService.insertDataGap(new DataGap(dataGap.getStartId(), dataId - 1));
} else if (lastDataId != -1 && lastDataId + dataIdIncrementBy != dataId
&& lastDataId != dataId) {
// found a gap somewhere in the existing gap
dataService.insertDataGap(new DataGap(lastDataId + 1, dataId - 1));
}
lastDataId = dataId;
}

if (deleteImmediately) {
dataService.deleteDataGap(dataGap);
} else {
dataService.updateDataGap(dataGap, DataGap.Status.OK);
}
// if we found data in the gap
if (lastDataId != -1) {
if (!lastGap && lastDataId + dataIdIncrementBy <= dataGap.getEndId()) {
dataService.insertDataGap(new DataGap(lastDataId + dataIdIncrementBy,
dataGap.getEndId()));
}

// if we did not find data in the gap and it was not the
// last gap
} else if (!lastGap) {
if (dataService.countDataInRange(dataGap.getStartId() - 1, dataGap.getEndId() + 1) == 0) {
if (symmetricDialect.supportsTransactionViews()) {
long transactionViewClockSyncThresholdInMs = parameterService
.getLong(
ParameterConstants.DBDIALECT_ORACLE_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS,
60000);
Date createTime = dataService.findCreateTimeOfData(dataGap.getEndId() + 1);
if (createTime != null
&& !symmetricDialect.areDatabaseTransactionsPendingSince(createTime
.getTime() + transactionViewClockSyncThresholdInMs)) {
if (dataService.countDataInRange(dataGap.getStartId() - 1,
dataGap.getEndId() + 1) == 0) {
if (dataGap.getStartId() == dataGap.getEndId()) {
log.info(
"Found a gap in data_id at {}. Skipping it because there are no pending transactions in the database",
dataGap.getStartId());
} else {
log.info(
"Found a gap in data_id from {} to {}. Skipping it because there are no pending transactions in the database",
dataGap.getStartId(), dataGap.getEndId());
}
if (deleteImmediately) {
dataService.deleteDataGap(dataGap);
} else {
dataService.updateDataGap(dataGap, DataGap.Status.OK);
}

if (deleteImmediately) {
dataService.deleteDataGap(dataGap);
} else {
dataService.updateDataGap(dataGap, DataGap.Status.SK);
// if we did not find data in the gap and it was not the
// last gap
} else if (!lastGap) {
if (dataService.countDataInRange(dataGap.getStartId() - 1,
dataGap.getEndId() + 1) == 0) {
if (symmetricDialect.supportsTransactionViews()) {
long transactionViewClockSyncThresholdInMs = parameterService
.getLong(
ParameterConstants.DBDIALECT_ORACLE_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS,
60000);
Date createTime = dataService
.findCreateTimeOfData(dataGap.getEndId() + 1);
if (createTime != null
&& !symmetricDialect
.areDatabaseTransactionsPendingSince(createTime
.getTime()
+ transactionViewClockSyncThresholdInMs)) {
if (dataService.countDataInRange(dataGap.getStartId() - 1,
dataGap.getEndId() + 1) == 0) {
if (dataGap.getStartId() == dataGap.getEndId()) {
log.info(
"Found a gap in data_id at {}. Skipping it because there are no pending transactions in the database",
dataGap.getStartId());
} else {
log.info(
"Found a gap in data_id from {} to {}. Skipping it because there are no pending transactions in the database",
dataGap.getStartId(), dataGap.getEndId());
}

if (deleteImmediately) {
dataService.deleteDataGap(dataGap);
} else {
dataService.updateDataGap(dataGap, DataGap.Status.SK);
}
}
}
} else if (isDataGapExpired(dataGap.getEndId() + 1)) {
if (dataGap.getStartId() == dataGap.getEndId()) {
log.info(
"Found a gap in data_id at {}. Skipping it because the gap expired",
dataGap.getStartId());
} else {
log.info(
"Found a gap in data_id from {} to {}. Skipping it because the gap expired",
dataGap.getStartId(), dataGap.getEndId());
}
if (deleteImmediately) {
dataService.deleteDataGap(dataGap);
} else {
dataService.updateDataGap(dataGap, DataGap.Status.SK);
}
}
} else if (isDataGapExpired(dataGap.getEndId() + 1)) {
if (dataGap.getStartId() == dataGap.getEndId()) {
log.info(
"Found a gap in data_id at {}. Skipping it because the gap expired",
dataGap.getStartId());
} else {
log.info(
"Found a gap in data_id from {} to {}. Skipping it because the gap expired",
dataGap.getStartId(), dataGap.getEndId());
}
if (deleteImmediately) {
dataService.deleteDataGap(dataGap);
} else {
dataService.updateDataGap(dataGap, DataGap.Status.SK);
}
} else {
dataService.checkForAndUpdateMissingChannelIds(dataGap.getStartId() - 1,
dataGap.getEndId() + 1);
}
} else {
dataService.checkForAndUpdateMissingChannelIds(dataGap.getStartId() - 1,
dataGap.getEndId() + 1);
}
}
}

if (lastDataId != -1) {
dataService.insertDataGap(new DataGap(lastDataId + 1, lastDataId + maxDataToSelect));
}
if (lastDataId != -1) {
dataService
.insertDataGap(new DataGap(lastDataId + 1, lastDataId + maxDataToSelect));
}

long updateTimeInMs = System.currentTimeMillis() - ts;
if (updateTimeInMs > 10000) {
log.info("Detecting gaps took {} ms", updateTimeInMs);
long updateTimeInMs = System.currentTimeMillis() - ts;
if (updateTimeInMs > 10000) {
log.info("Detecting gaps took {} ms", updateTimeInMs);
}
processInfo.setStatus(Status.DONE);
} catch (RuntimeException ex) {
processInfo.setStatus(Status.ERROR);
throw ex;
}

}
Expand Down

0 comments on commit 10d2755

Please sign in to comment.