Skip to content

Commit

Permalink
0003247: Include check for next data_id to ensure within range of
Browse files Browse the repository at this point in the history
sym_data_gap
  • Loading branch information
chenson42 committed Oct 4, 2017
1 parent 2a908da commit 848aade
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 22 deletions.
Expand Up @@ -145,8 +145,6 @@ public void insertScriptEvent(ISqlTransaction transaction, String channelId,

public List<Data> listData(long batchId, String nodeId, long startDataId, String channelId, int maxRowsToRetrieve);

public void updateDataGap(DataGap gap, DataGap.Status status);

public void insertDataGap(DataGap gap);

public void insertDataGap(ISqlTransaction transaction, DataGap gap);
Expand All @@ -159,6 +157,8 @@ public void insertScriptEvent(ISqlTransaction transaction, String channelId,

public void deleteCapturedConfigChannelData();

public boolean fixLastDataGap();

public long findMaxDataId();

public Data findData(long dataId);
Expand Down
Expand Up @@ -1986,15 +1986,6 @@ public void insertDataGap(ISqlTransaction transaction, DataGap gap) {
gap.getLastUpdateTime(), gap.getCreateTime() }, new int[] {
Types.VARCHAR, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.TIMESTAMP });
}

public void updateDataGap(DataGap gap, DataGap.Status status) {
sqlTemplate.update(
getSql("updateDataGapSql"),
new Object[] { status.name(), AppUtils.getHostName(), gap.getLastUpdateTime(), gap.getStartId(),
gap.getEndId() }, new int[] { Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP,
symmetricDialect.getSqlTypeForIds(), symmetricDialect.getSqlTypeForIds() });
}


@Override
public void deleteDataGap(DataGap gap) {
Expand Down Expand Up @@ -2174,6 +2165,42 @@ public Map<String, Date> getLastDataCaptureByChannel() {
return mapper.getCaptureMap();
}

@Override
public boolean fixLastDataGap() {
boolean fixed = false;
long maxDataId = findMaxDataId();
List<DataGap> gaps = findDataGaps();
if (gaps.size() > 0) {
DataGap lastGap = gaps.get(gaps.size()-1);
if (lastGap.getEndId() < maxDataId) {
fixed = true;
log.warn("The last data id of {} was bigger than the last gap's end_id of {}. Increasing the gap size", maxDataId, lastGap.getEndId());
final long maxDataToSelect = parameterService
.getLong(ParameterConstants.ROUTING_LARGEST_GAP_SIZE);
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
deleteDataGap(transaction, lastGap);
insertDataGap(transaction, new DataGap(lastGap.getStartId(), maxDataId+maxDataToSelect));
transaction.commit();
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
close(transaction);
}
}
}
return fixed;
}

class TableRow {
Table table;
Row row;
Expand Down Expand Up @@ -2254,9 +2281,7 @@ public String getReferenceColumnName() {
public String getFkName() {
return fkName;
}




}

public class DataMapper implements ISqlRowMapper<Data> {
Expand Down
Expand Up @@ -106,10 +106,6 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
""
+ "insert into $(data_gap) (status, last_update_hostname, start_id, end_id, last_update_time, create_time) values(?, ?, ?, ?, ?, ?) ");

putSql("updateDataGapSql",
""
+ "update $(data_gap) set status=?, last_update_hostname=?, last_update_time=? where start_id=? and end_id=? ");

putSql("deleteDataGapSql",
"delete from $(data_gap) where start_id=? and end_id=? ");

Expand Down
Expand Up @@ -46,6 +46,7 @@
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.SyntaxParsingException;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ContextConstants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
Expand Down Expand Up @@ -123,7 +124,7 @@ public class RouterService extends AbstractService implements IRouterService {

protected boolean syncTriggersBeforeInitialLoadAttempted = false;

protected boolean firstTimeCheckForAbandonedBatches = true;
protected boolean firstTimeCheck = true;

public RouterService(ISymmetricEngine engine) {
super(engine.getParameterService(), engine.getSymmetricDialect());
Expand Down Expand Up @@ -185,9 +186,12 @@ synchronized public long routeData(boolean force) {
if (identity != null) {
if (force || engine.getClusterService().lock(ClusterConstants.ROUTE)) {
try {
if (firstTimeCheckForAbandonedBatches) {
if (firstTimeCheck) {
engine.getOutgoingBatchService().updateAbandonedRoutingBatches();
firstTimeCheckForAbandonedBatches = false;
if (engine.getDataService().fixLastDataGap()) {
engine.getContextService().save(ContextConstants.ROUTING_FULL_GAP_ANALYSIS, Boolean.TRUE.toString());
}
firstTimeCheck = false;
}


Expand Down Expand Up @@ -486,7 +490,7 @@ protected int routeDataForEachChannel() {
processInfo.setStatus(ProcessInfo.ProcessStatus.OK);
} catch (RuntimeException ex) {
processInfo.setStatus(ProcessInfo.ProcessStatus.ERROR);
firstTimeCheckForAbandonedBatches = true;
firstTimeCheck = true;
throw ex;
}
return dataCount;
Expand Down

0 comments on commit 848aade

Please sign in to comment.