Skip to content

Commit

Permalink
0002565: Add VoltDB Dialect - fix purge.
Browse files Browse the repository at this point in the history
  • Loading branch information
mmichalek committed Jun 3, 2016
1 parent d5264ad commit de46f63
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 10 deletions.
Expand Up @@ -34,6 +34,8 @@ public class VoltDbSymmetricDialect extends AbstractSymmetricDialect {
public VoltDbSymmetricDialect(IParameterService parameterService, IDatabasePlatform platform) {
super(parameterService, platform);
this.triggerTemplate = new VoltDbTriggerTemplate(this);
this.supportsSubselectsInDelete = false;
this.supportsSubselectsInUpdate = false;
}

@Override
Expand Down
Expand Up @@ -99,6 +99,10 @@ abstract public class AbstractSymmetricDialect implements ISymmetricDialect {

protected boolean supportsTransactionViews = false;

protected boolean supportsSubselectsInDelete = true;

protected boolean supportsSubselectsInUpdate = true;

protected Map<String,String> sqlReplacementTokens = new HashMap<String, String>();

public AbstractSymmetricDialect() {
Expand Down Expand Up @@ -555,6 +559,20 @@ public String getProductVersion() {
public boolean supportsTransactionViews() {
return supportsTransactionViews;
}

/*
* Indicates if this dialect supports subselects in delete statements.
*/
public boolean supportsSubselectsInDelete() {
return supportsSubselectsInDelete;
}

/*
* Indicates if this dialect supports subselects in update statements.
*/
public boolean supportsSubselectsInUpdate() {
return supportsSubselectsInUpdate;
}

public long insertWithGeneratedKey(String sql, SequenceIdentifier sequenceId) {
return insertWithGeneratedKey(sql, sequenceId, null, null);
Expand Down
Expand Up @@ -114,6 +114,16 @@ public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String sc
* a way to check on pending database transactions.
*/
public boolean supportsTransactionViews();

/*
* Indicates if this dialect supports subselects in delete statements.
*/
public boolean supportsSubselectsInDelete();

/*
* Indicates if this dialect supports subselects in update statements.
*/
public boolean supportsSubselectsInUpdate();

/*
* Implement this if the database has some type of cleanup functionality
Expand Down
Expand Up @@ -23,12 +23,14 @@
import java.sql.Timestamp;
import java.sql.Types;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.List;

import org.apache.commons.lang.time.DateUtils;
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.Row;
import org.jumpmind.symmetric.common.ParameterConstants;
Expand Down Expand Up @@ -91,10 +93,16 @@ public long purgeOutgoing(Calendar retentionCutoff, boolean force) {
log.info("The outgoing purge process is about to run for data older than {}",
SimpleDateFormat.getDateTimeInstance()
.format(retentionCutoff.getTime()));
rowsPurged += purgeStrandedBatches();
rowsPurged += purgeDataRows(retentionCutoff);
rowsPurged += purgeOutgoingBatch(retentionCutoff);
rowsPurged += purgeExtractRequests();
// VoltDB doesn't support capture, or subselects. So we'll just be purging heartbeats
// by date here.
if (getSymmetricDialect().getName().equalsIgnoreCase(DatabaseNamesConstants.VOLTDB)) {
rowsPurged += purgeOutgoingByRetentionCutoff(retentionCutoff);
} else {
rowsPurged += purgeStrandedBatches();
rowsPurged += purgeDataRows(retentionCutoff);
rowsPurged += purgeOutgoingBatch(retentionCutoff);
rowsPurged += purgeExtractRequests();
}
} finally {
if (!force) {
clusterService.unlock(ClusterConstants.PURGE_OUTGOING);
Expand All @@ -110,6 +118,24 @@ public long purgeOutgoing(Calendar retentionCutoff, boolean force) {
return rowsPurged;
}

protected long purgeOutgoingByRetentionCutoff(Calendar retentionCutoff) {
int totalCount = 0;
totalCount += executePurgeDelete(getSql("deleteOutgoingBatchByCreateTimeSql"), retentionCutoff.getTime());
totalCount += executePurgeDelete(getSql("deleteDataEventByCreateTimeSql"), retentionCutoff.getTime());
totalCount += executePurgeDelete(getSql("deleteDataByCreateTimeSql"), retentionCutoff.getTime());
totalCount += executePurgeDelete(getSql("deleteExtractRequestByCreateTimeSql"), retentionCutoff.getTime());

log.info("Done purging {} rows", totalCount);
return totalCount;
}

protected int executePurgeDelete(String deleteSql, Object argument) {
log.debug("Running the following statement: {} with the following arguments: {}", deleteSql, argument);
int count = sqlTemplate.update(deleteSql, argument);
log.debug("Deleted {} rows", count);
return count;
}

private long purgeOutgoingBatch(final Calendar time) {
log.info("Getting range for outgoing batch");
long[] minMax = queryForMinMax(getSql("selectOutgoingBatchRangeSql"),
Expand Down Expand Up @@ -287,10 +313,17 @@ public long purgeIncoming(Calendar retentionCutoff, boolean force) {

private long purgeIncomingError() {
log.info("Purging incoming error rows");
long rowCount = sqlTemplate.update(getSql("deleteIncomingErrorsSql"));
long rowCount = 0;

if (getSymmetricDialect().supportsSubselectsInDelete()) {
rowCount = sqlTemplate.update(getSql("deleteIncomingErrorsSql"));
} else {
rowCount = selectIdsAndDelete(getSql("selectIncomingErrorsBatchIdsSql"),
"batch_id", getSql("deleteIncomingErrorsBatchIdsSql"));
}

log.info("Purged {} incoming error rows", rowCount);
return rowCount;

}

private long purgeIncomingBatch(final Calendar time) {
Expand Down Expand Up @@ -397,6 +430,30 @@ public void purgeAllIncomingEventsForNode(String nodeId) {
log.info("Purged all {} incoming batch for node {}", count, nodeId);
}

protected int selectIdsAndDelete(String selectSql, String fieldName, String deleteSql) {
List<Row> results = sqlTemplate.query(selectSql);
int rowCount = 0;
if (! results.isEmpty()) {
List<Integer> ids = new ArrayList<Integer>(results.size());
for (Row row : results) {
ids.add(row.getInt(fieldName));
}

results = null;

StringBuilder placeHolders = new StringBuilder(ids.size()*2);
for (int i = 0; i < ids.size(); i++) {
placeHolders.append("?,");
}
placeHolders.setLength(placeHolders.length()-1);

String deleteStatement = deleteSql.replace("?", placeHolders);

rowCount = sqlTemplate.update(deleteStatement, ids.toArray());
}
return rowCount;
}



}
Expand Up @@ -100,7 +100,15 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac
putSql("purgeNodeHostStatsSql", "delete from $(node_host_stats) where start_time < ?");

putSql("purgeNodeHostJobStatsSql", "delete from $(node_host_job_stats) where start_time < ?");


putSql("selectIncomingErrorsBatchIdsSql", "select distinct e.batch_id as batch_id from sym_incoming_error e LEFT OUTER JOIN sym_incoming_batch i ON e.batch_id = i.batch_id where i.batch_id IS NULL");

putSql("deleteIncomingErrorsBatchIdsSql", "delete from sym_incoming_error where batch_id IN (?)");

putSql("deleteOutgoingBatchByCreateTimeSql", "delete from sym_outgoing_batch where create_time < ?");
putSql("deleteDataEventByCreateTimeSql", "delete from sym_data_event where create_time < ?");
putSql("deleteDataByCreateTimeSql", "delete from sym_data where create_time < ?");
putSql("deleteExtractRequestByCreateTimeSql", "delete from sym_extract_request where create_time < ?");
}

}
Expand Up @@ -47,8 +47,8 @@ public VoltDbDdlBuilder() {
databaseInfo.addNativeTypeMapping(Types.BIT, "TINYINT", Types.TINYINT);
databaseInfo.addNativeTypeMapping(Types.DOUBLE, "DECIMAL", Types.DECIMAL);

databaseInfo.addNativeTypeMapping(Types.CLOB, "VARCHAR", Types.VARCHAR);
databaseInfo.addNativeTypeMapping(Types.LONGVARCHAR, "VARCHAR", Types.VARCHAR);
databaseInfo.addNativeTypeMapping(Types.CLOB, "VARCHAR(100000)", Types.VARCHAR);
databaseInfo.addNativeTypeMapping(Types.LONGVARCHAR, "VARCHAR(100000)", Types.VARCHAR);
databaseInfo.addNativeTypeMapping(Types.CHAR, "VARCHAR", Types.VARCHAR);

databaseInfo.addNativeTypeMapping(Types.BINARY, "VARCHAR", Types.VARCHAR);
Expand Down
Expand Up @@ -46,6 +46,8 @@ public VoltDbDatabasePlatform(DataSource dataSource, SqlTemplateSettings setting
getDatabaseInfo().setDelimitedIdentifiersSupported(false);
getDatabaseInfo().setTriggersSupported(false);
getDatabaseInfo().setForeignKeysSupported(false);
getDatabaseInfo().setHasPrecisionAndScale(Types.DECIMAL, false);
getDatabaseInfo().setHasPrecisionAndScale(Types.FLOAT, false);
}

public static final String JDBC_DRIVER = "org.voltdb.jdbc.Driver";
Expand Down Expand Up @@ -111,7 +113,7 @@ protected VoltDbDdlReader createDdlReader() {
return new VoltDbDdlReader(this);
}

@Override
@Override
protected VoltDbJdbcSqlTemplate createSqlTemplate() {
// TODO
return new VoltDbJdbcSqlTemplate(dataSource, settings, new SymmetricLobHandler(), getDatabaseInfo());
Expand Down

0 comments on commit de46f63

Please sign in to comment.