Skip to content

Commit

Permalink
Continue to work on the purge process.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Feb 28, 2008
1 parent d150bac commit c7b8ed2
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 110 deletions.
Expand Up @@ -24,6 +24,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
Expand Down Expand Up @@ -67,12 +68,14 @@ public class PurgeService extends AbstractService implements IPurgeService {

private String deleteFromEventDataIdSql;

private String selectDataIdToPurgeSql;
private String deleteFromDataSql;

private String deleteDataSql;
private String selectMinDataIdSql;

private String selectIncomingBatchOrderByCreateTimeSql;

private String selectOutgoingBatchHistoryRangeSql;

private TransactionTemplate transactionTemplate;

private IClusterService clusterService;
Expand All @@ -82,8 +85,8 @@ public void purge() {
Calendar retentionCutoff = Calendar.getInstance();
retentionCutoff.add(Calendar.MINUTE, -retentionInMinutes);

purgeIncoming(retentionCutoff);
purgeOutgoing(retentionCutoff);
purgeIncoming(retentionCutoff);

}

Expand All @@ -94,6 +97,7 @@ private void purgeOutgoing(Calendar retentionCutoff) {
logger.info("The outgoing purge process is about to run.");

purgeBatchesOlderThan(retentionCutoff);
purgeOutgoingBatchHistory(retentionCutoff);
purgeDataRows();

} finally {
Expand All @@ -109,6 +113,24 @@ private void purgeOutgoing(Calendar retentionCutoff) {
}
}

private void purgeOutgoingBatchHistory(Calendar retentionCutoff) {
int[] minMax = (int[]) jdbcTemplate.queryForObject(selectOutgoingBatchHistoryRangeSql,
new Object[] { retentionCutoff.getTime() }, new RowMapper() {
public Object mapRow(ResultSet rs, int row) throws SQLException {
return new int[] { rs.getInt(1), rs.getInt(2) };
}
});
if (minMax != null) {
int currentHistoryId = minMax[0];
int max = minMax[1];
while (currentHistoryId < max) {
currentHistoryId = currentHistoryId + maxNumOfDataIdsToPurgeInTx > max ? max : currentHistoryId
+ maxNumOfDataIdsToPurgeInTx;
jdbcTemplate.update(deleteFromOutgoingBatchHistSql, new Object[] { currentHistoryId });
}
}
}

private void purgeIncoming(final Calendar retentionCutoff) {
try {
if (clusterService.lock(LockAction.PURGE_INCOMING)) {
Expand All @@ -134,7 +156,8 @@ public Object doInConnection(Connection conn) throws SQLException, DataAccessExc
String status = rs.getString(3);
Date createTime = rs.getTimestamp(4);
if (createTime.after(retentionCutoff.getTime())) {
logger.info("Done purging incoming. Purged " + totalRowsPurged + " total batch and hist rows up through " + createTime);
logger.info("Done purging incoming. Purged " + totalRowsPurged
+ " total batch and hist rows up through " + createTime);
break;
}
if ("OK".equals(status)) {
Expand All @@ -143,9 +166,11 @@ public Object doInConnection(Connection conn) throws SQLException, DataAccessExc
new Object[] { batchId, nodeId });
}
}

if (totalRowsPurged > 0 && (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE * 5)) {
logger.info("Purged " + totalRowsPurged + " total incoming batch and hist rows up through " + createTime);

if (totalRowsPurged > 0
&& (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE * 5)) {
logger.info("Purged " + totalRowsPurged
+ " total incoming batch and hist rows up through " + createTime);
ts = System.currentTimeMillis();
}
}
Expand Down Expand Up @@ -180,107 +205,76 @@ public void purgeAllIncomingEventsForNode(String nodeId) {
}

private void purgeDataRows() {
int dataIdCount = 0;
int totalCount = 0;
long ts = System.currentTimeMillis();
do {
dataIdCount = (Integer) transactionTemplate.execute(new TransactionCallback() {
public Object doInTransaction(final TransactionStatus s) {
int count = 0;
List<Integer> dataIds = null;
dataIds = getNextDataIds(selectDataIdToPurgeSql, null, maxNumOfDataIdsToPurgeInTx);
for (final Integer dataId : dataIds) {
count += jdbcTemplate.update(deleteDataSql, new Object[] { dataId });
}
return dataIds.size();
}
});

totalCount += dataIdCount;
int minDataId = jdbcTemplate.queryForInt(selectMinDataIdSql);
int deletedCount = 0;
long ts = System.currentTimeMillis();
int totalCount = 0;

do {
minDataId += maxNumOfDataIdsToPurgeInTx;
deletedCount = jdbcTemplate.update(deleteFromDataSql, new Object[] { minDataId, minDataId });
totalCount += deletedCount;
if (totalCount > 0 && (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE * 5)) {
logger.info("Purged " + totalCount + " a total of data rows.");
ts = System.currentTimeMillis();
}
} while (dataIdCount > 0);
} while (deletedCount > 0);

logger.info("Purged " + totalCount + " data rows.");

}

@SuppressWarnings("unchecked")
private void purgeBatchesOlderThan(final Calendar time) {
// Iterate over batch ids and data events to access by primary key so we prevent lock escalation
final List<BatchForNode> batchIds = jdbcTemplate.query(selectOutgoingBatchIdsToPurgeSql, new Object[] { time
.getTime() }, new RowMapper() {
public Object mapRow(ResultSet rs, int row) throws SQLException {
return new BatchForNode(rs.getInt(1), rs.getString(2));
}
});
int eventRowCount = 0;
int dataIdCount = 0;
int batchesPurged = 0;
long ts = System.currentTimeMillis();
for (final BatchForNode batchNode : batchIds) {
do {
dataIdCount = (Integer) transactionTemplate.execute(new TransactionCallback() {
public Object doInTransaction(final TransactionStatus s) {
jdbcTemplate.update(deleteFromOutgoingBatchHistSql, new Object[] { batchNode.batchId });

int eventCount = jdbcTemplate.update(deleteFromEventDataIdSql, new Object[] {
batchNode.batchId, batchNode.nodeId });

jdbcTemplate.update(deleteFromOutgoingBatchSql, new Object[] { batchNode.batchId,
batchNode.nodeId });
return eventCount;
}
});
eventRowCount += dataIdCount;

if (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE * 5) {
logger.info("Purged " + batchesPurged + " out of " + batchIds.size() + " total and "
+ eventRowCount + " data_events.");
ts = System.currentTimeMillis();
}
} while (dataIdCount > 0);
batchesPurged++;
}

if (batchIds.size() > 0) {
logger.info("Purged " + batchIds.size() + " batches and " + eventRowCount + " data_events.");
}
}

/**
* Select data ids using a streaming results set so we don't pull too much data into memory.
*/
@SuppressWarnings("unchecked")
private List<Integer> getNextDataIds(final String sql, final Object[] args, final int maxNumberToReturn) {
return (List<Integer>) jdbcTemplate.execute(new ConnectionCallback() {
jdbcTemplate.execute(new ConnectionCallback() {
public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
List<Integer> dataIds = new ArrayList<Integer>();
PreparedStatement st = null;
ResultSet rs = null;
int eventRowCount = 0;
int dataIdCount = 0;
int batchesPurged = 0;
long ts = System.currentTimeMillis();

try {
st = conn.prepareStatement(sql, java.sql.ResultSet.TYPE_FORWARD_ONLY,
st = conn.prepareStatement(selectOutgoingBatchIdsToPurgeSql, java.sql.ResultSet.TYPE_FORWARD_ONLY,
java.sql.ResultSet.CONCUR_READ_ONLY);
if (args != null) {
for (int i = 1; i <= args.length; i++) {
st.setObject(i, args[i - 1]);
}
}
st.setFetchSize(dbDialect.getStreamingResultsFetchSize());
st.setTimestamp(1, new Timestamp(time.getTime().getTime()));
rs = st.executeQuery();
for (int i = 0; i < 10000 && rs.next(); i++) {
dataIds.add(rs.getInt(1));
while (rs.next()) {
final int batchId = rs.getInt(1);
final String nodeId = rs.getString(2);

do {
dataIdCount = (Integer) transactionTemplate.execute(new TransactionCallback() {
public Object doInTransaction(final TransactionStatus s) {
int eventCount = jdbcTemplate.update(deleteFromEventDataIdSql, new Object[] {
batchId, nodeId });

jdbcTemplate.update(deleteFromOutgoingBatchSql, new Object[] { batchId });
return eventCount;
}
});
eventRowCount += dataIdCount;

if (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE * 5) {
logger.info("Purged " + batchesPurged + " batches and " + eventRowCount
+ " data_events so far.");
ts = System.currentTimeMillis();
}
} while (dataIdCount > 0);

batchesPurged++;
}

logger.info("Purged a total of " + batchesPurged + " batches and " + eventRowCount
+ " data_events.");
} finally {
JdbcUtils.closeResultSet(rs);
JdbcUtils.closeStatement(st);
}

return dataIds;
return null;
}
});
}
Expand Down Expand Up @@ -317,16 +311,12 @@ public void setDeleteFromEventDataIdSql(String selectDataIdToPurgeSql) {
this.deleteFromEventDataIdSql = selectDataIdToPurgeSql;
}

public void setDeleteDataSql(String deleteDataSql) {
this.deleteDataSql = deleteDataSql;
}

public void setTransactionTemplate(TransactionTemplate transactionTemplate) {
this.transactionTemplate = transactionTemplate;
}

public void setSelectDataIdToPurgeSql(String selectDataIdToDeleteSql) {
this.selectDataIdToPurgeSql = selectDataIdToDeleteSql;
public void setDeleteFromDataSql(String selectDataIdToDeleteSql) {
this.deleteFromDataSql = selectDataIdToDeleteSql;
}

public void setClusterService(IClusterService clusterService) {
Expand All @@ -341,21 +331,16 @@ public void setDeleteIncomingBatchesByNodeIdSql(String[] deleteIncomingBatchesBy
this.deleteIncomingBatchesByNodeIdSql = deleteIncomingBatchesByNodeIdSql;
}

class BatchForNode {

int batchId;

String nodeId;
public void setSelectIncomingBatchOrderByCreateTimeSql(String selectIncomingBatchOrderByCreateTimeSql) {
this.selectIncomingBatchOrderByCreateTimeSql = selectIncomingBatchOrderByCreateTimeSql;
}

public BatchForNode(int batchId, String nodeId) {
super();
this.batchId = batchId;
this.nodeId = nodeId;
}
public void setSelectOutgoingBatchHistoryRangeSql(String selectOutgoingBatchHistoryRangeSql) {
this.selectOutgoingBatchHistoryRangeSql = selectOutgoingBatchHistoryRangeSql;
}

public void setSelectIncomingBatchOrderByCreateTimeSql(String selectIncomingBatchOrderByCreateTimeSql) {
this.selectIncomingBatchOrderByCreateTimeSql = selectIncomingBatchOrderByCreateTimeSql;
public void setSelectMinDataIdSql(String selectMinDataIdSql) {
this.selectMinDataIdSql = selectMinDataIdSql;
}

}
24 changes: 14 additions & 10 deletions symmetric/src/main/resources/symmetric-services.xml
Expand Up @@ -732,29 +732,33 @@
<property name="selectOutgoingBatchIdsToPurgeSql">
<value>
select batch_id, node_id from ${sync.table.prefix}_outgoing_batch where status='OK' and
create_time &lt; ?
create_time &lt; ? order by batch_id asc
</value>
</property>
<property name="deleteFromOutgoingBatchHistSql">
<value>delete from ${sync.table.prefix}_outgoing_batch_hist where batch_id=?</value>
<value>delete from ${sync.table.prefix}_outgoing_batch_hist where history_id &lt;= ?</value>
</property>
<property name="deleteFromOutgoingBatchSql">
<value>delete from ${sync.table.prefix}_outgoing_batch where batch_id=? and node_id=?</value>
<value>delete from ${sync.table.prefix}_outgoing_batch where batch_id=?</value>
</property>
<property name="deleteFromEventDataIdSql">
<value>delete from ${sync.table.prefix}_data_event where batch_id=? and node_id=?</value>
</property>
<property name="selectDataIdToPurgeSql">
<property name="selectOutgoingBatchHistoryRangeSql">
<value>select min(history_id),max(history_id) from ${sync.table.prefix}_outgoing_batch_hist where event_time &lt; ?</value>
</property>
<property name="selectMinDataIdSql">
<value>
select data_id from ${sync.table.prefix}_data d where not exists (select 1 from
${sync.table.prefix}_data_event de where d.data_id = de.data_id)
select min(data_id) from ${sync.table.prefix}_data
</value>
</property>
<property name="deleteFromDataSql">
<value>
delete from ${sync.table.prefix}_data where data_id not in (select data_id from ${sync.table.prefix}_data_event where data_id &lt; ?) and data_id &lt; ?
</value>
</property>
<property name="deleteDataSql">
<value>delete from ${sync.table.prefix}_data where data_id=?</value>
</property>
<property name="selectIncomingBatchOrderByCreateTimeSql">
<value>select batch_id, node_id, status, create_time from sym_incoming_batch order by create_time asc</value>
<value>select batch_id, node_id, status, create_time from ${sync.table.prefix}_incoming_batch order by create_time asc</value>
</property>
<property name="deleteIncomingBatchesByNodeIdSql">
<list>
Expand Down

0 comments on commit c7b8ed2

Please sign in to comment.