Skip to content

Commit

Permalink
SYMMETRICDS-236 - Leave at least one row in sym_data and sym_outgoing…
Browse files Browse the repository at this point in the history
…_batch so ids don't get reset on systems that get their identities from the max value in the table.
  • Loading branch information
chenson42 committed Mar 23, 2010
1 parent 09e0c83 commit 6d6448e
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 82 deletions.
Expand Up @@ -11,7 +11,7 @@
<entry key="selectOutgoingBatchRangeSql">
<value>
select min(batch_id), max(batch_id) from $[sym.sync.table.prefix]_outgoing_batch where
create_time &lt; ? and status in ('OK','IG')
create_time &lt; ? and status in ('OK','IG') and batch_id &lt; (select max(batch_id) from $[sym.sync.table.prefix]_outgoing_batch)
</value>
</entry>
<entry key="deleteOutgoingBatchSql">
Expand All @@ -36,7 +36,7 @@
</entry>
<entry key="selectDataRangeSql">
<value>
select min(data_id), max(data_id) from $[sym.sync.table.prefix]_data
select min(data_id), max(data_id) from $[sym.sync.table.prefix]_data where data_id &lt; (select max(data_id) from $[sym.sync.table.prefix]_data)
</value>
</entry>
<entry key="updateStrandedBatches">
Expand Down
Expand Up @@ -39,7 +39,7 @@ public class PurgeServiceTest extends AbstractDatabaseTest {
public PurgeServiceTest() throws Exception {
super();
}

@Test
public void testThatPurgeExecutes() {
IPurgeService service = find(Constants.PURGE_SERVICE);
Expand All @@ -48,190 +48,232 @@ public void testThatPurgeExecutes() {

@Test
public void testThatPurgeDoesNotDeleteSuccessfullySentData() throws Exception {
int oldPurgeRetentionPeriod = getParameterService().getInt(ParameterConstants.PURGE_RETENTION_MINUTES);
int oldPurgeRetentionPeriod = getParameterService().getInt(
ParameterConstants.PURGE_RETENTION_MINUTES);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, 10);
setupSentData();
assertCounts(1);
getSymmetricEngine().purge();
assertCounts(1);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, oldPurgeRetentionPeriod);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES,
oldPurgeRetentionPeriod);
}

@Test
public void testThatPurgeDeletesSuccessfullySentData() throws Exception {
int oldPurgeRetentionPeriod = getParameterService().getInt(ParameterConstants.PURGE_RETENTION_MINUTES);
int oldPurgeRetentionPeriod = getParameterService().getInt(
ParameterConstants.PURGE_RETENTION_MINUTES);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, 10);
setupSentData();
makeDataOld();
assertCounts(1);
getSymmetricEngine().purge();
assertCounts(0);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, oldPurgeRetentionPeriod);
}
assertCounts(1);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES,
oldPurgeRetentionPeriod);
}

private void setupSentData() {
cleanSlate("sym_data", "sym_data_event", "sym_outgoing_batch", "sym_incoming_batch");
assertCounts(0);
TriggerRouter router = getTriggerRouterService().getTriggerRoutersForCurrentNode(false).values().iterator().next().get(0);
Data data = new Data(router.getTrigger().getSourceTableName(), DataEventType.INSERT, "", "",
getTriggerRouterService().getNewestTriggerHistoryForTrigger(router.getTrigger().getTriggerId()),
TestConstants.TEST_CHANNEL_ID, null, null);
TriggerRouter router = getTriggerRouterService().getTriggerRoutersForCurrentNode(false)
.values().iterator().next().get(0);
Data data = new Data(router.getTrigger().getSourceTableName(), DataEventType.INSERT, "",
"", getTriggerRouterService().getNewestTriggerHistoryForTrigger(
router.getTrigger().getTriggerId()), TestConstants.TEST_CHANNEL_ID, null,
null);
data.setDataId(1);
getDataService().insertDataAndDataEventAndOutgoingBatch(data, TestConstants.TEST_CLIENT_EXTERNAL_ID,
router.getRouter().getRouterId());
getDataService().insertDataAndDataEventAndOutgoingBatch(data,
TestConstants.TEST_CLIENT_EXTERNAL_ID, router.getRouter().getRouterId());
getOutgoingBatchService().markAllAsSentForNode(TestConstants.TEST_CLIENT_NODE);
}

@Test
public void testThatPurgeDeletesSuccessfullyIgnoredData() throws Exception {
int oldPurgeRetentionPeriod = getParameterService().getInt(ParameterConstants.PURGE_RETENTION_MINUTES);
int oldPurgeRetentionPeriod = getParameterService().getInt(
ParameterConstants.PURGE_RETENTION_MINUTES);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, 10);
setupIgnoredData();
makeDataOld();
assertCounts(1);
getSymmetricEngine().purge();
assertCounts(0);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, oldPurgeRetentionPeriod);
}

assertCounts(1);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES,
oldPurgeRetentionPeriod);
}

private void setupIgnoredData() {
cleanSlate("sym_data", "sym_data_event", "sym_outgoing_batch", "sym_incoming_batch");
assertCounts(0);
TriggerRouter router = getTriggerRouterService().getTriggerRoutersForCurrentNode(false).values().iterator().next().get(0);
Data data = new Data(router.getTrigger().getSourceTableName(), DataEventType.INSERT, "", "",
getTriggerRouterService().getNewestTriggerHistoryForTrigger(router.getTrigger().getTriggerId()),
TestConstants.TEST_CHANNEL_ID, null, null);
TriggerRouter router = getTriggerRouterService().getTriggerRoutersForCurrentNode(false)
.values().iterator().next().get(0);
Data data = new Data(router.getTrigger().getSourceTableName(), DataEventType.INSERT, "",
"", getTriggerRouterService().getNewestTriggerHistoryForTrigger(
router.getTrigger().getTriggerId()), TestConstants.TEST_CHANNEL_ID, null,
null);
data.setDataId(1);
getDataService().insertDataAndDataEventAndOutgoingBatch(data, TestConstants.TEST_CLIENT_EXTERNAL_ID,
router.getRouter().getRouterId());
getDataService().insertDataAndDataEventAndOutgoingBatch(data,
TestConstants.TEST_CLIENT_EXTERNAL_ID, router.getRouter().getRouterId());
getOutgoingBatchService().markAllAsSentForNode(TestConstants.TEST_CLIENT_NODE);
getJdbcTemplate().update("update sym_outgoing_batch set status=?", new Object[] { Status.IG.name() });
getJdbcTemplate().update("update sym_outgoing_batch set status=?",
new Object[] { Status.IG.name() });
}

@Test
public void testThatPurgeDoesNotDeleteNewData() throws Exception {
int oldPurgeRetentionPeriod = getParameterService().getInt(ParameterConstants.PURGE_RETENTION_MINUTES);
int oldPurgeRetentionPeriod = getParameterService().getInt(
ParameterConstants.PURGE_RETENTION_MINUTES);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, 10);
setupNewData();
assertCounts(1, 0, 0);
getSymmetricEngine().purge();
assertCounts(1, 0, 0);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, oldPurgeRetentionPeriod);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES,
oldPurgeRetentionPeriod);
}

@Test
public void testThatPurgeDoesNotDeleteNewDataThatIsOld() throws Exception {
int oldPurgeRetentionPeriod = getParameterService().getInt(ParameterConstants.PURGE_RETENTION_MINUTES);
int oldPurgeRetentionPeriod = getParameterService().getInt(
ParameterConstants.PURGE_RETENTION_MINUTES);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, 10);
setupNewData();
makeDataOld();
assertCounts(1, 0, 0);
getSymmetricEngine().purge();
assertCounts(1, 0, 0);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, oldPurgeRetentionPeriod);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES,
oldPurgeRetentionPeriod);
}

private void setupNewData() {
cleanSlate("sym_data", "sym_data_event", "sym_outgoing_batch", "sym_incoming_batch");
assertCounts(0);
TriggerRouter router = getTriggerRouterService().getTriggerRoutersForCurrentNode(false).values().iterator().next().get(0);
Data data = new Data(router.getTrigger().getSourceTableName(), DataEventType.INSERT, "", "",
getTriggerRouterService().getNewestTriggerHistoryForTrigger(router.getTrigger().getTriggerId()),
TestConstants.TEST_CHANNEL_ID, null, null);
TriggerRouter router = getTriggerRouterService().getTriggerRoutersForCurrentNode(false)
.values().iterator().next().get(0);
Data data = new Data(router.getTrigger().getSourceTableName(), DataEventType.INSERT, "",
"", getTriggerRouterService().getNewestTriggerHistoryForTrigger(
router.getTrigger().getTriggerId()), TestConstants.TEST_CHANNEL_ID, null,
null);
data.setDataId(1);
getDataService().insertData(data);
if (getJdbcTemplate().update("update sym_data_ref set ref_data_id=(select max(data_id)-1 from sym_data)") == 0) {
if (getJdbcTemplate().update(
"update sym_data_ref set ref_data_id=(select max(data_id)-1 from sym_data)") == 0) {
Log.info("Inserting into sym_data_ref");
getJdbcTemplate().update("insert into sym_data_ref values(1, current_timestamp)");
}
}

@Test
public void testThatPurgeDoesNotDeleteUnroutedData() throws Exception {
int oldPurgeRetentionPeriod = getParameterService().getInt(ParameterConstants.PURGE_RETENTION_MINUTES);
int oldPurgeRetentionPeriod = getParameterService().getInt(
ParameterConstants.PURGE_RETENTION_MINUTES);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, 10);
setupUnroutedData();
setupUnroutedData(1);
getSymmetricEngine().purge();
assertCounts(1, 1, 0);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, oldPurgeRetentionPeriod);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES,
oldPurgeRetentionPeriod);
}

@Test
public void testThatPurgeDeletesUnroutedData() throws Exception {
int oldPurgeRetentionPeriod = getParameterService().getInt(ParameterConstants.PURGE_RETENTION_MINUTES);
int oldPurgeRetentionPeriod = getParameterService().getInt(
ParameterConstants.PURGE_RETENTION_MINUTES);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, 10);
setupUnroutedData();
setupUnroutedData(1, 2);
makeDataOld();
assertCounts(1, 1, 0);
assertCounts(2, 2, 0);
getSymmetricEngine().purge();
assertCounts(0, 0, 0);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, oldPurgeRetentionPeriod);
}
assertCounts(1, 0, 0);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES,
oldPurgeRetentionPeriod);
}

private void setupUnroutedData() {
private void setupUnroutedData(int... id) {
cleanSlate("sym_data", "sym_data_event", "sym_outgoing_batch", "sym_incoming_batch");
assertCounts(0);
TriggerRouter router = getTriggerRouterService().getTriggerRoutersForCurrentNode(false).values().iterator().next().get(0);
Data data = new Data(router.getTrigger().getSourceTableName(), DataEventType.INSERT, "", "",
getTriggerRouterService().getNewestTriggerHistoryForTrigger(router.getTrigger().getTriggerId()),
TestConstants.TEST_CHANNEL_ID, null, null);
data.setDataId(1);
getDataService().insertData(data);
getDataService().insertDataEvent(data.getDataId(), Constants.UNROUTED_BATCH_ID, Constants.UNKNOWN_ROUTER_ID);
TriggerRouter router = getTriggerRouterService().getTriggerRoutersForCurrentNode(false)
.values().iterator().next().get(0);
Data data = new Data(router.getTrigger().getSourceTableName(), DataEventType.INSERT, "",
"", getTriggerRouterService().getNewestTriggerHistoryForTrigger(
router.getTrigger().getTriggerId()), TestConstants.TEST_CHANNEL_ID, null,
null);
if (id != null) {
for (int i : id) {
data.setDataId(i);
getDataService().insertData(data);
getDataService().insertDataEvent(data.getDataId(), Constants.UNROUTED_BATCH_ID,
Constants.UNKNOWN_ROUTER_ID);
}
}
}

@Test
public void testThatPurgeDoesNotDeletePartiallySentData() {
int oldPurgeRetentionPeriod = getParameterService().getInt(ParameterConstants.PURGE_RETENTION_MINUTES);
int oldPurgeRetentionPeriod = getParameterService().getInt(
ParameterConstants.PURGE_RETENTION_MINUTES);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, 10);
setupPartiallySentData();
makeDataOld();
assertCounts(1, 3, 3);
getSymmetricEngine().purge();
assertCounts(1, 2, 2);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, oldPurgeRetentionPeriod);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES,
oldPurgeRetentionPeriod);
}

@Test
public void testThatPurgeRemovesDataForNodesThatHaveBeenDisabled() {
int oldPurgeRetentionPeriod = getParameterService().getInt(ParameterConstants.PURGE_RETENTION_MINUTES);
int oldPurgeRetentionPeriod = getParameterService().getInt(
ParameterConstants.PURGE_RETENTION_MINUTES);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, 10);
setupDataForDisabledNode();
makeDataOld();
assertCounts(1, 1, 1);
assertCounts(2, 2, 2);
getSymmetricEngine().purge();
assertCounts(0, 0, 0);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES, oldPurgeRetentionPeriod);
assertCounts(1, 1, 1);
getParameterService().saveParameter(ParameterConstants.PURGE_RETENTION_MINUTES,
oldPurgeRetentionPeriod);
}

private void setupPartiallySentData() {
cleanSlate("sym_data", "sym_data_event", "sym_outgoing_batch", "sym_incoming_batch");
assertCounts(0);
TriggerRouter router = getTriggerRouterService().getTriggerRoutersForCurrentNode(false).values().iterator().next().get(0);
Data data = new Data(router.getTrigger().getSourceTableName(), DataEventType.INSERT, "", "",
getTriggerRouterService().getNewestTriggerHistoryForTrigger(router.getTrigger().getTriggerId()),
TestConstants.TEST_CHANNEL_ID, null, null);
TriggerRouter router = getTriggerRouterService().getTriggerRoutersForCurrentNode(false)
.values().iterator().next().get(0);
Data data = new Data(router.getTrigger().getSourceTableName(), DataEventType.INSERT, "",
"", getTriggerRouterService().getNewestTriggerHistoryForTrigger(
router.getTrigger().getTriggerId()), TestConstants.TEST_CHANNEL_ID, null,
null);
data.setDataId(1);
getDataService().insertDataAndDataEventAndOutgoingBatch(data, TestConstants.TEST_CLIENT_EXTERNAL_ID,
router.getRouter().getRouterId());
getDataService().insertDataAndDataEventAndOutgoingBatch(data,
TestConstants.TEST_CLIENT_EXTERNAL_ID, router.getRouter().getRouterId());
int dataId = getJdbcTemplate().queryForInt("select max(data_id) from sym_data");
getDataService().insertDataEventAndOutgoingBatch(dataId, data.getChannelId(), "00003", router.getRouter().getRouterId());
getDataService().insertDataEventAndOutgoingBatch(dataId, data.getChannelId(), "00010", router.getRouter().getRouterId());
getDataService().insertDataEventAndOutgoingBatch(dataId, data.getChannelId(), "00003",
router.getRouter().getRouterId());
getDataService().insertDataEventAndOutgoingBatch(dataId, data.getChannelId(), "00010",
router.getRouter().getRouterId());
getOutgoingBatchService().markAllAsSentForNode(TestConstants.TEST_CLIENT_NODE);
}

private void setupDataForDisabledNode() {
cleanSlate("sym_data", "sym_data_event", "sym_outgoing_batch", "sym_incoming_batch");
assertCounts(0);
TriggerRouter router = getTriggerRouterService().getTriggerRoutersForCurrentNode(false).values().iterator().next().get(0);
Data data = new Data(router.getTrigger().getSourceTableName(), DataEventType.INSERT, "", "",
getTriggerRouterService().getNewestTriggerHistoryForTrigger(router.getTrigger().getTriggerId()),
TestConstants.TEST_CHANNEL_ID, null, null);
TriggerRouter router = getTriggerRouterService().getTriggerRoutersForCurrentNode(false)
.values().iterator().next().get(0);
Data data = new Data(router.getTrigger().getSourceTableName(), DataEventType.INSERT, "",
"", getTriggerRouterService().getNewestTriggerHistoryForTrigger(
router.getTrigger().getTriggerId()), TestConstants.TEST_CHANNEL_ID, null,
null);
data.setDataId(1);
getDataService().insertDataAndDataEventAndOutgoingBatch(data, "00002",
router.getRouter().getRouterId());
data.setDataId(2);
getDataService().insertDataAndDataEventAndOutgoingBatch(data, "00002",
router.getRouter().getRouterId());
}

private void assertCounts(int count) {
assertCounts(count, count, count);
}
Expand All @@ -246,8 +288,10 @@ private void makeDataOld() {
Calendar oldTime = Calendar.getInstance();
oldTime.add(Calendar.DATE, -1);
getJdbcTemplate().update("update sym_data set create_time=?", new Object[] { oldTime });
getJdbcTemplate().update("update sym_data_event set create_time=?", new Object[] { oldTime });
getJdbcTemplate().update("update sym_outgoing_batch set create_time=?", new Object[] { oldTime });
getJdbcTemplate().update("update sym_data_event set create_time=?",
new Object[] { oldTime });
getJdbcTemplate().update("update sym_outgoing_batch set create_time=?",
new Object[] { oldTime });
}

}

0 comments on commit 6d6448e

Please sign in to comment.