Skip to content
Permalink
Browse files

0001302: Speed up queries against sym_data, sym_data_gap and sym_outg…

…oing_batch on postgres
  • Loading branch information...
chenson42 committed Jun 27, 2013
1 parent a923247 commit bf4cea553dc264206a84e528c152e05ba27e7916
@@ -20,6 +20,8 @@
*/
package org.jumpmind.symmetric.db.postgresql;

import java.sql.Types;

import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.util.BinaryEncoding;
@@ -239,5 +241,10 @@ public void purgeRecycleBin() {
public BinaryEncoding getBinaryEncoding() {
return BinaryEncoding.BASE64;
}


@Override
public int getSqlTypeForIds() {
return Types.BIGINT;
}

}
@@ -22,6 +22,7 @@

import java.io.IOException;
import java.io.StringWriter;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -832,4 +833,8 @@ public String getTablePrefix() {
public String getTemplateNumberPrecisionSpec() {
return null;
}

public int getSqlTypeForIds() {
return Types.NUMERIC;
}
}
@@ -211,6 +211,8 @@ public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String sc

public String getTemplateNumberPrecisionSpec();

public Map<String, String> getSqlReplacementTokens();
public Map<String, String> getSqlReplacementTokens();

public int getSqlTypeForIds();

}
@@ -247,9 +247,10 @@ public Data take() throws InterruptedException {
Object[] args = null;
int[] types = null;

int dataIdSqlType = engine.getSymmetricDialect().getSqlTypeForIds();
if (useGreaterThanDataId) {
args = new Object[] { channelId, dataGaps.get(0).getStartId() };
types = new int[] { Types.VARCHAR, Types.NUMERIC };
types = new int[] { Types.VARCHAR, dataIdSqlType };
} else {
int numberOfArgs = 1 + 2 * (numberOfGapsToQualify < dataGaps.size() ? numberOfGapsToQualify
: dataGaps.size());
@@ -261,7 +262,7 @@ public Data take() throws InterruptedException {
for (int i = 0; i < numberOfGapsToQualify && i < dataGaps.size(); i++) {
DataGap gap = dataGaps.get(i);
args[i * 2 + 1] = gap.getStartId();
types[i * 2 + 1] = Types.NUMERIC;
types[i * 2 + 1] = dataIdSqlType;
if ((i + 1) == numberOfGapsToQualify && (i + 1) < dataGaps.size()) {
// there were more gaps than we are going to use in the SQL.
// use
@@ -270,7 +271,7 @@ public Data take() throws InterruptedException {
} else {
args[i * 2 + 2] = gap.getEndId();
}
types[i * 2 + 2] = Types.NUMERIC;
types[i * 2 + 2] = dataIdSqlType;
}
}

@@ -1194,14 +1194,14 @@ public void updateDataGap(DataGap gap, DataGap.Status status) {
sqlTemplate.update(
getSql("updateDataGapSql"),
new Object[] { status.name(), AppUtils.getHostName(), gap.getStartId(),
gap.getEndId() }, new int[] { Types.VARCHAR, Types.VARCHAR, Types.NUMERIC,
Types.NUMERIC });
gap.getEndId() }, new int[] { Types.VARCHAR, Types.VARCHAR, symmetricDialect.getSqlTypeForIds(),
symmetricDialect.getSqlTypeForIds() });
}

public void deleteDataGap(DataGap gap) {
sqlTemplate.update(getSql("deleteDataGapSql"),
new Object[] { gap.getStartId(), gap.getEndId() }, new int[] { Types.NUMERIC,
Types.NUMERIC });
new Object[] { gap.getStartId(), gap.getEndId() }, new int[] { symmetricDialect.getSqlTypeForIds(),
symmetricDialect.getSqlTypeForIds() });

}

@@ -1213,16 +1213,6 @@ public Date findCreateTimeOfData(long dataId) {
return sqlTemplate.queryForObject(getSql("findDataCreateTimeSql"), Date.class, dataId);
}

// public Map<String, String> getRowDataAsMap(Data data) {
// Map<String, String> map = new HashMap<String, String>();
// String[] columnNames = CsvUtils.tokenizeCsvData(data.getTriggerHistory().getColumnNames());
// String[] columnData = CsvUtils.tokenizeCsvData(data.getRowData());
// for (int i = 0; i < columnNames.length; i++) {
// map.put(columnNames[i].toLowerCase(), columnData[i]);
// }
// return map;
// }

/**
* Get a list of {@link IHeartbeatListener}s that are ready for a heartbeat
* according to
@@ -264,7 +264,7 @@ public void insertIncomingBatch(IncomingBatch batch) {

public int deleteIncomingBatch(IncomingBatch batch) {
return sqlTemplate.update(getSql("deleteIncomingBatchSql"),
new Object[] { batch.getBatchId(), batch.getNodeId() }, new int[] { Types.NUMERIC,
new Object[] { batch.getBatchId(), batch.getNodeId() }, new int[] { symmetricDialect.getSqlTypeForIds(),
Types.VARCHAR });
}

@@ -317,7 +317,7 @@ public int updateIncomingBatch(ISqlTransaction transaction , IncomingBatch batch
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR,
Types.VARCHAR, Types.TIMESTAMP, Types.NUMERIC, Types.VARCHAR });
Types.VARCHAR, Types.TIMESTAMP, symmetricDialect.getSqlTypeForIds(), Types.VARCHAR });
}
return count;
}
@@ -143,7 +143,7 @@ public void updateOutgoingBatch(OutgoingBatch outgoingBatch) {
Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT,
Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT,
Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.VARCHAR, Types.NUMERIC,
Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.TIMESTAMP, Types.NUMERIC,
Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.TIMESTAMP, symmetricDialect.getSqlTypeForIds(),
Types.VARCHAR });
}

@@ -190,7 +190,7 @@ public OutgoingBatch findOutgoingBatch(long batchId, String nodeId) {
list = (List<OutgoingBatch>) sqlTemplate.query(
getSql("selectOutgoingBatchPrefixSql", "findOutgoingBatchSql"),
new OutgoingBatchMapper(true, false), new Object[] { batchId, nodeId },
new int[] { Types.NUMERIC, Types.VARCHAR });
new int[] { symmetricDialect.getSqlTypeForIds(), Types.VARCHAR });
} else {
/*
* Pushing to an older version of symmetric might result in a batch
@@ -199,7 +199,7 @@ public OutgoingBatch findOutgoingBatch(long batchId, String nodeId) {
list = (List<OutgoingBatch>) sqlTemplate.query(
getSql("selectOutgoingBatchPrefixSql", "findOutgoingBatchByIdOnlySql"),
new OutgoingBatchMapper(true, false), new Object[] { batchId },
new int[] { Types.NUMERIC });
new int[] { symmetricDialect.getSqlTypeForIds() });
}
if (list != null && list.size() > 0) {
return list.get(0);
@@ -23,6 +23,7 @@
import java.sql.Timestamp;
import java.sql.Types;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
@@ -133,7 +134,7 @@ public long purgeOutgoing(Calendar retentionCutoff, boolean force) {
log.info("The outgoing purge process has completed");
}
} else {
log.info("Could not get a lock to run an outgoing purge");
log.debug("Could not get a lock to run an outgoing purge");
}
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
@@ -193,16 +194,18 @@ private long purgeDataRows(final Calendar time) {
}, params);
return minMax;
}

private int purgeByMinMax(long[] minMax, MinMaxDeleteSql identifier, Date retentionTime,
int maxNumtoPurgeinTx) {
long minId = minMax[0];
long purgeUpToId = minMax[1];
long ts = System.currentTimeMillis();
int totalCount = 0;
int totalDeleteStmts = 0;
int idSqlType = symmetricDialect.getSqlTypeForIds();
Timestamp cutoffTime = new Timestamp(retentionTime.getTime());
log.info("About to purge {}", identifier.toString().toLowerCase());

while (minId <= purgeUpToId) {
totalDeleteStmts++;
long maxId = minId + maxNumtoPurgeinTx;
@@ -219,31 +222,34 @@ private int purgeByMinMax(long[] minMax, MinMaxDeleteSql identifier, Date retent
deleteSql = getSql("deleteDataSql");
args = new Object[] { minId, maxId, cutoffTime, minId, maxId, minId, maxId,
OutgoingBatch.Status.OK.name() };
argTypes = new int[] { Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.VARCHAR};
argTypes = new int[] { idSqlType, idSqlType, Types.TIMESTAMP,
idSqlType, idSqlType, idSqlType, idSqlType, Types.VARCHAR};
break;
case DATA_EVENT:
deleteSql = getSql("deleteDataEventSql");
args = new Object[] { minId, maxId, OutgoingBatch.Status.OK.name(), minId,
maxId };
argTypes = new int[] { Types.NUMERIC, Types.NUMERIC, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC};
argTypes = new int[] { idSqlType, idSqlType, Types.VARCHAR, idSqlType, idSqlType};

break;
case OUTGOING_BATCH:
deleteSql = getSql("deleteOutgoingBatchSql");
args = new Object[] { OutgoingBatch.Status.OK.name(), minId, maxId, minId,
maxId };
argTypes = new int[] {Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC};
argTypes = new int[] {Types.VARCHAR, idSqlType, idSqlType, idSqlType, idSqlType};

break;
case STRANDED_DATA:
deleteSql = getSql("deleteStrandedData");
args = new Object[] { minId, maxId, cutoffTime, minId, maxId };
argTypes = new int[] { Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.NUMERIC, Types.NUMERIC};
argTypes = new int[] { idSqlType, idSqlType, Types.TIMESTAMP, idSqlType, idSqlType};
break;
}

totalCount += sqlTemplate.update(deleteSql, args, argTypes);
log.debug("Running the following statement: {} with the following arguments: {}", deleteSql, Arrays.toString(args));
int count = sqlTemplate.update(deleteSql, args, argTypes);
log.debug("Deleted {} rows", count);
totalCount += count;

if (totalCount > 0
&& (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE * 5)) {
@@ -273,7 +279,7 @@ public long purgeIncoming(Calendar retentionCutoff, boolean force) {
}

} else {
log.info("Could not get a lock to run an incoming purge");
log.debug("Could not get a lock to run an incoming purge");
}
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
@@ -32,8 +32,8 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac
// @formatter:off

putSql("selectOutgoingBatchRangeSql" ,
"select min(batch_id) as min_id, max(batch_id) as max_id from $(outgoing_batch) where " +
" create_time < ? and status = ? and batch_id < (select max(batch_id) from $(outgoing_batch)) " );
"select min(batch_id) as min_id, max(batch_id) as max_id from $(outgoing_batch) where " +
" create_time < ? and status = ? and batch_id < (select max(batch_id) from $(outgoing_batch)) " );

putSql("deleteOutgoingBatchSql" ,
"delete from $(outgoing_batch) where status = ? and batch_id between ? " +
@@ -42,7 +42,7 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac

putSql("deleteDataEventSql" ,
"delete from $(data_event) where batch_id not in (select batch_id from " +
" $(outgoing_batch) where batch_id between ? and ? and status != ?) " +
" $(outgoing_batch) where batch_id between ? and ? and status != ?) " +
" and batch_id between ? and ? " );

putSql("selectDataRangeSql" ,
@@ -55,7 +55,7 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac
putSql("deleteStrandedData" ,
"delete from $(data) where " +
" data_id between ? and ? and " +
" data_id < (select min(start_id) from $(data_gap)) and " +
" data_id < (select min(start_id) from $(data_gap)) and " +
" create_time < ? and " +
" data_id not in (select e.data_id from $(data_event) e where " +
" e.data_id between ? and ?) " );
@@ -77,7 +77,7 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac

putSql("selectIncomingBatchRangeSql" ,
"select node_id, min(batch_id) as min_id, max(batch_id) as max_id from $(incoming_batch) where " +
" create_time < ? and status = ? group by node_id " );
" create_time < ? and status = ? group by node_id " );

putSql("deleteIncomingBatchSql" ,
"delete from $(incoming_batch) where batch_id between ? and ? and node_id = ? and status = ?" );

0 comments on commit bf4cea5

Please sign in to comment.
You can’t perform that action at this time.