Skip to content

Commit

Permalink
0002682: Cache outgoing batch sequences
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Jul 17, 2016
1 parent a972593 commit 51f58fe
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 65 deletions.
3 changes: 2 additions & 1 deletion symmetric-client/src/main/resources/symmetric-ext-points.xml
Expand Up @@ -18,5 +18,6 @@
<property name="typeName" value="mongodb" />
</bean>


<bean id="databaseUpgradeListener" class="org.jumpmind.symmetric.db.DatabaseUpgradeListener" />

</beans>
Expand Up @@ -33,19 +33,21 @@ public class Sequence {
private String lastUpdateBy;
private Date lastUpdateTime;
private boolean cycle;

private int cacheSize;

public Sequence() {
}

public Sequence(String sequenceName, long currentValue, int incrementBy, long minValue,
long maxValue, String lastUpdateBy, boolean cycle) {
long maxValue, String lastUpdateBy, boolean cycle, int cacheSize) {
this.sequenceName = sequenceName;
this.currentValue = currentValue;
this.incrementBy = incrementBy;
this.minValue = minValue;
this.maxValue = maxValue;
this.lastUpdateBy = lastUpdateBy;
this.cycle = cycle;
this.cacheSize = cacheSize;
}

public String getSequenceName() {
Expand Down Expand Up @@ -119,4 +121,12 @@ public void setCycle(boolean cycle) {
public boolean isCycle() {
return cycle;
}

public int getCacheSize() {
return cacheSize;
}

public void setCacheSize(int cacheSize) {
this.cacheSize = cacheSize;
}
}
Expand Up @@ -40,6 +40,8 @@
public class SequenceService extends AbstractService implements ISequenceService {

private Map<String, Sequence> sequenceDefinitionCache = new HashMap<String, Sequence>();

private Map<String, CachedRange> sequenceCache = new HashMap<String, CachedRange>();

public SequenceService(IParameterService parameterService, ISymmetricDialect symmetricDialect) {
super(parameterService, symmetricDialect);
Expand All @@ -50,63 +52,78 @@ public SequenceService(IParameterService parameterService, ISymmetricDialect sym
public void init() {
Map<String, Sequence> sequences = getAll();
if (sequences.get(Constants.SEQUENCE_OUTGOING_BATCH_LOAD_ID) == null) {
initSequence(Constants.SEQUENCE_OUTGOING_BATCH_LOAD_ID, 1);
initSequence(Constants.SEQUENCE_OUTGOING_BATCH_LOAD_ID, 1, 0);
}

if (sequences.get(Constants.SEQUENCE_OUTGOING_BATCH) == null) {
long maxBatchId = sqlTemplate.queryForLong(getSql("maxOutgoingBatchSql"));
initSequence(Constants.SEQUENCE_OUTGOING_BATCH, maxBatchId);
initSequence(Constants.SEQUENCE_OUTGOING_BATCH, maxBatchId, 10);
}

if (sequences.get(Constants.SEQUENCE_TRIGGER_HIST) == null) {
long maxTriggerHistId = sqlTemplate.queryForLong(getSql("maxTriggerHistSql"));
initSequence(Constants.SEQUENCE_TRIGGER_HIST, maxTriggerHistId);
initSequence(Constants.SEQUENCE_TRIGGER_HIST, maxTriggerHistId, 0);
}

if (sequences.get(Constants.SEQUENCE_EXTRACT_REQ) == null) {
long maxRequestId = sqlTemplate.queryForLong(getSql("maxExtractRequestSql"));
initSequence(Constants.SEQUENCE_EXTRACT_REQ, maxRequestId);
initSequence(Constants.SEQUENCE_EXTRACT_REQ, maxRequestId, 0);
}
}

private void initSequence(String name, long initialValue) {
private void initSequence(String name, long initialValue, int cacheSize) {
try {
if (initialValue < 1) {
initialValue = 1;
}
create(new Sequence(name, initialValue, 1, 1, 9999999999l,
"system", false));
"system", false, cacheSize));
} catch (UniqueKeyException ex) {
log.debug("Failed to create sequence {}. Must be initialized already.",
name);
}
}

public long nextVal(String name) {
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
long val = nextVal(transaction, name);
transaction.commit();
return val;
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
close(transaction);
if (getSequenceDefinition(name).getCacheSize() > 0) {
return nextValFromCache(null, name);
}
return nextValFromDatabase(name);
}

public long nextVal(ISqlTransaction transaction, String name) {
if (getSequenceDefinition(transaction, name).getCacheSize() > 0) {
return nextValFromCache(transaction, name);
}
return nextValFromDatabase(transaction, name);
}

protected synchronized long nextValFromCache(ISqlTransaction transaction, String name) {
CachedRange range = sequenceCache.get(name);
if (range != null) {
long currentValue = range.getCurrentValue();
if (currentValue < range.getEndValue()) {
range.setCurrentValue(++currentValue);
System.out.println("--- CACHED NEXTVAL " + currentValue);
return currentValue;
} else {
sequenceCache.remove(name);
}
}
return nextValFromDatabase(transaction, name);
}

protected long nextValFromDatabase(final String name) {
return new DoTransaction<Long>() {
public Long execute(ISqlTransaction transaction) {
return nextValFromDatabase(transaction, name);
}
}.execute();
}

protected long nextValFromDatabase(ISqlTransaction transaction, String name) {
if (transaction == null) {
return nextVal(name);
return nextValFromDatabase(name);
} else {
long sequenceTimeoutInMs = parameterService.getLong(
ParameterConstants.SEQUENCE_TIMEOUT_MS, 5000);
Expand All @@ -126,18 +143,7 @@ public long nextVal(ISqlTransaction transaction, String name) {

protected long tryToGetNextVal(ISqlTransaction transaction, String name) {
long currVal = currVal(transaction, name);
Sequence sequence = sequenceDefinitionCache.get(name);
if (sequence == null) {
sequence = get(transaction, name);
if (sequence != null) {
sequenceDefinitionCache.put(name, sequence);
} else {
throw new IllegalStateException(String.format(
"The sequence named %s is not configured in %s", name,
TableConstants.getTableName(getTablePrefix(), TableConstants.SYM_SEQUENCE)));
}
}

Sequence sequence = getSequenceDefinition(transaction, name);
long nextVal = currVal + sequence.getIncrementBy();
if (nextVal > sequence.getMaxValue()) {
if (sequence.isCycle()) {
Expand All @@ -157,45 +163,78 @@ protected long tryToGetNextVal(ISqlTransaction transaction, String name) {
}
}

CachedRange range = null;
if (sequence.getCacheSize() > 0) {
long endVal = nextVal + (sequence.getIncrementBy() * (sequence.getCacheSize() - 1));
range = new CachedRange(nextVal, endVal);
nextVal = endVal;
}

int updateCount = transaction.prepareAndExecute(getSql("updateCurrentValueSql"), nextVal,
name, currVal);
if (updateCount != 1) {
nextVal = -1;
} else if (range != null) {
sequenceCache.put(name, range);
nextVal = range.getCurrentValue();
}

System.out.println("--- NEXTVAL " + nextVal);
return nextVal;
}

protected Sequence getSequenceDefinition(final String name) {
Sequence sequence = sequenceDefinitionCache.get(name);
if (sequence != null) {
return sequence;
}

return new DoTransaction<Sequence>() {
public Sequence execute(ISqlTransaction transaction) {
return getSequenceDefinition(transaction, name);
}
}.execute();
}

protected Sequence getSequenceDefinition(ISqlTransaction transaction, String name) {
Sequence sequence = sequenceDefinitionCache.get(name);
if (sequence == null) {
sequence = get(transaction, name);
if (sequence != null) {
sequenceDefinitionCache.put(name, sequence);
} else {
throw new IllegalStateException(String.format(
"The sequence named %s is not configured in %s", name,
TableConstants.getTableName(getTablePrefix(), TableConstants.SYM_SEQUENCE)));
}
}
return sequence;
}

public long currVal(ISqlTransaction transaction, String name) {
CachedRange range = sequenceCache.get(name);
if (range != null) {
return range.getCurrentValue();
}
return transaction.queryForLong(getSql("getCurrentValueSql"), name);
}

public long currVal(String name) {
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
long val = currVal(transaction, name);
transaction.commit();
return val;
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
close(transaction);
public long currVal(final String name) {
CachedRange range = sequenceCache.get(name);
if (range != null) {
return range.getCurrentValue();
}

return new DoTransaction<Long>() {
public Long execute(ISqlTransaction transaction) {
return currVal(transaction, name);
}
}.execute();
}

public void create(Sequence sequence) {
sqlTemplate.update(getSql("insertSequenceSql"), sequence.getSequenceName(),
sequence.getCurrentValue(), sequence.getIncrementBy(), sequence.getMinValue(),
sequence.getMaxValue(), sequence.isCycle() ? 1 : 0, sequence.getLastUpdateBy());
sequence.getMaxValue(), sequence.isCycle() ? 1 : 0, sequence.getCacheSize(), sequence.getLastUpdateBy());
}

protected Sequence get(ISqlTransaction transaction, String name) {
Expand All @@ -216,6 +255,54 @@ protected Map<String, Sequence> getAll() {
return map;
}

class CachedRange {
long currentValue;
long endValue;

public CachedRange(long currentValue, long endValue) {
this.currentValue = currentValue;
this.endValue = endValue;
}

public long getCurrentValue() {
return currentValue;
}

public void setCurrentValue(long currentValue) {
this.currentValue = currentValue;
}

public long getEndValue() {
return endValue;
}
}

abstract class DoTransaction<T> {
public T execute() {
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
T result = execute(transaction);
transaction.commit();
return result;
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
close(transaction);
}
}

abstract public T execute(ISqlTransaction transaction);
}

class SequenceRowMapper implements ISqlRowMapper<Sequence> {
public Sequence mapRow(Row rs) {
Sequence sequence = new Sequence();
Expand All @@ -228,6 +315,7 @@ public Sequence mapRow(Row rs) {
sequence.setMinValue(rs.getLong("min_value"));
sequence.setSequenceName(rs.getString("sequence_name"));
sequence.setCycle(rs.getBoolean("cycle"));
sequence.setCacheSize(rs.getInt("cache_size"));
return sequence;
}
}
Expand Down
Expand Up @@ -32,11 +32,11 @@ public SequenceServiceSqlMap(IDatabasePlatform platform, Map<String, String> rep
// @formatter:off
putSql("getSequenceSql",
"select sequence_name,current_value,increment_by,min_value,max_value, " +
"cycle,create_time,last_update_by,last_update_time from $(sequence) where sequence_name=?");
"cycle,cache_size,create_time,last_update_by,last_update_time from $(sequence) where sequence_name=?");

putSql("getAllSequenceSql",
"select sequence_name,current_value,increment_by,min_value,max_value," +
"cycle,create_time,last_update_by,last_update_time from $(sequence)");
"cycle,cache_size,create_time,last_update_by,last_update_time from $(sequence)");

putSql("getCurrentValueSql",
"select current_value from $(sequence) where sequence_name=?");
Expand All @@ -48,8 +48,8 @@ public SequenceServiceSqlMap(IDatabasePlatform platform, Map<String, String> rep
putSql("insertSequenceSql",
"insert into $(sequence) " +
" (sequence_name, current_value, increment_by, min_value, max_value, " +
" cycle, create_time, last_update_by, last_update_time) " +
" values(?,?,?,?,?,?,current_timestamp,?,current_timestamp) ");
" cycle, cache_size, create_time, last_update_by, last_update_time) " +
" values(?,?,?,?,?,?,?,current_timestamp,?,current_timestamp) ");

putSql("maxOutgoingBatchSql", "select max(batch_id)+1 from $(outgoing_batch)");

Expand Down
1 change: 1 addition & 0 deletions symmetric-core/src/main/resources/symmetric-schema.xml
Expand Up @@ -680,6 +680,7 @@
<column name="min_value" type="BIGINT" required="true" default="1" description="Specify the minimum value of the sequence."/>
<column name="max_value" type="BIGINT" required="true" default="9999999999" description="Specify the maximum value the sequence can generate."/>
<column name="cycle" type="BOOLEANINT" size="1" default="0" description="Indicate whether the sequence should automatically cycle once a boundary is hit." />
<column name="cache_size" type="INTEGER" required="true" default="0" description="Specify the number of sequence numbers to acquire and cache when one is requested."/>
<column name="create_time" type="TIMESTAMP" description="Timestamp when this entry was created." />
<column name="last_update_by" type="VARCHAR" size="50" description="The user who last updated this entry." />
<column name="last_update_time" type="TIMESTAMP" required="true" description="Timestamp when a user last updated this entry." />
Expand Down

0 comments on commit 51f58fe

Please sign in to comment.