Skip to content

Commit

Permalink
Merge branch '3.9' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.9
  • Loading branch information
jumpmind-josh committed Jun 5, 2018
2 parents 2f6bf1c + 1ce77ac commit a894239
Show file tree
Hide file tree
Showing 15 changed files with 207 additions and 60 deletions.
29 changes: 24 additions & 5 deletions symmetric-assemble/src/asciidoc/configuration/transforms/types.ad
Expand Up @@ -239,13 +239,32 @@ Some variables are provided to the script:
.Variables
|===

|_COLUMN_NAME_|The variable name is the source column name in uppercase of the row being changed (replace COLUMN_NAME with your column)
|_<COLUMN_NAME>_|The variable name is the source column name in uppercase of the row being changed (replace <COLUMN_NAME> with your column)
|currentValue|The value of the current source column
|oldValue|The old value of the source column for an updated row
|sqlTemplate|a org.jumpmind.db.sql.ISqlTemplate object for querying or updating the database
|channelId|a reference to the channel on which the transformation is happening
|sourceNode|a org.jumpmind.symmetric.model.Node object that represents the node from where the data came
|targetNode|a org.jumpmind.symmetric.model.Node object that represents the node where the data is being loaded.
|sqlTemplate| org.jumpmind.db.sql.ISqlTemplate object for querying or updating the database
|channelId| name of the channel on which the transformation is happening
|sourceNode| org.jumpmind.symmetric.model.Node object that represents the node from where the data came
|sourceNodeId|same as sourceNode.getNodeId()
|sourceNodeGroupId|same as sourceNode.getNodeGroupId()
|sourceNodeExternalId|same as sourceNode.getNodeExternalId()
|targetNode| org.jumpmind.symmetric.model.Node object that represents the node where the data is being loaded.
|targetNodeId|same as targetNode.getNodeId()
|targetNodeGroupId|same as targetNode.getNodeGroupId()
|targetNodeExternalId|same as targetNode.getNodeExternalId()
|transformColumn| org.jumpmind.symmetric.io.data.transform.TransformColumn that is the transform configuration
|includeOn| org.jumpmind.symmetric.io.data.transform.TransformColumn.IncludeOnType, same as transformColumn.getIncludeOn(), tells whether column transform is configured for all, insert, update, or delete
|sourceSchemaName | source schema name that the transform matched
|sourceCatalogName | source catalog name that the transform matched
|sourceTableName | source table name that the transform matched
|transformedData | org.jumpmind.symmetric.io.data.transform.TransformedData, the model object representing the outputted transformed data
|sourceDmlType| org.jumpmind.symmetric.io.data.DataEventType that is the source row change type, either insert, update, or delete
|sourceDmlTypeString| same as sourceDmlType.toString(), returning insert, update, or delete
|log | org.slf4j.Logger, write to the log file
|context | org.jumpmind.symmetric.io.data.DataContext containing internal variables and also acts like a Map for sharing variables between transforms for the current sync session
|bshContext | java.util.Map, static map of variables to share between transforms
|engine | org.jumpmind.symmetric.ISymmetricEngine, access to engine functions and services


|===

Expand Down
Expand Up @@ -22,18 +22,23 @@

import static org.apache.commons.lang.StringUtils.isBlank;

import java.io.IOException;
import java.text.ParseException;
import java.util.Date;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
import org.jumpmind.db.model.Database;
import org.jumpmind.db.model.IndexColumn;
import org.jumpmind.db.model.NonUniqueIndex;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.platform.PermissionType;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.SqlException;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.TableConstants;
import org.jumpmind.symmetric.db.AbstractSymmetricDialect;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.db.SequenceIdentifier;
Expand Down Expand Up @@ -207,6 +212,21 @@ public void createRequiredDatabaseObjects() {
+ " END $(functionName); ";
install(sql, wkt2geom);
}

boolean isNoOrder = parameterService.is(ParameterConstants.DBDIALECT_ORACLE_SEQUENCE_NOORDER, false);
String seqName = getSequenceName(SequenceIdentifier.DATA).toUpperCase();
String orderFlag = platform.getSqlTemplate().queryForString(
"select order_flag from user_sequences where sequence_name = ?", seqName);
String sql = null;
if (orderFlag != null && orderFlag.equals("N") && !isNoOrder) {
sql = "alter sequence " + seqName + " order";
} else if (orderFlag != null && orderFlag.equals("Y") && isNoOrder) {
sql = "alter sequence " + seqName + " noorder";
}
if (sql != null) {
log.info("DDL applied: " + sql);
platform.getSqlTemplate().update(sql);
}
}

@Override
Expand Down Expand Up @@ -370,4 +390,20 @@ public PermissionType[] getSymTablePermissions() {
PermissionType[] permissions = { PermissionType.CREATE_TABLE, PermissionType.DROP_TABLE, PermissionType.CREATE_TRIGGER, PermissionType.DROP_TRIGGER, PermissionType.EXECUTE};
return permissions;
}

@Override
protected Database readDatabaseFromXml(String resourceName) throws IOException {
Database database = super.readDatabaseFromXml(resourceName);
if (parameterService.is(ParameterConstants.DBDIALECT_ORACLE_SEQUENCE_NOORDER, false)) {
Table table = database.findTable(TableConstants.SYM_DATA);
if (table != null) {
NonUniqueIndex index = new NonUniqueIndex("idx_crt_tm_dt_d");
index.addColumn(new IndexColumn(table.findColumn("create_time")));
index.addColumn(new IndexColumn(table.findColumn("data_id")));
table.addIndex(index);
}
}
return database;
}

}
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.common.Constants;
Expand Down Expand Up @@ -398,12 +399,12 @@ public long getAverageExecutionTimeInMs() {
}

public boolean isCronSchedule() {
String cronSchedule = parameterService.getString(jobDefinition.getCronParameter());
return !StringUtils.isEmpty(cronSchedule);
return !isPeriodicSchedule();
}

public boolean isPeriodicSchedule() {
return !isCronSchedule();
String schedule = getSchedule();
return NumberUtils.isDigits(schedule);
}

public String getSchedule() {
Expand Down
Expand Up @@ -23,6 +23,7 @@
import static org.jumpmind.symmetric.job.JobDefaults.EVERY_10_SECONDS;

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

Expand All @@ -41,7 +42,9 @@ public JobDefaults getDefaults() {

@Override
public void doJob(boolean force) throws Exception {
engine.getFileSyncExtractorService().queueWork(force);
if (engine.getParameterService().is(ParameterConstants.FILE_SYNC_ENABLE)) {
engine.getFileSyncExtractorService().queueWork(force);
}
engine.getDataExtractorService().queueWork(force);
}
}
Expand Up @@ -224,6 +224,7 @@ private ParameterConstants() {
public final static String DBDIALECT_ORACLE_USE_TRANSACTION_VIEW = "oracle.use.transaction.view";
public final static String DBDIALECT_ORACLE_TEMPLATE_NUMBER_SPEC = "oracle.template.precision";
public final static String DBDIALECT_ORACLE_USE_HINTS = "oracle.use.hints";
public final static String DBDIALECT_ORACLE_SEQUENCE_NOORDER = "oracle.sequence.noorder";

public final static String DBDIALECT_TIBERO_USE_TRANSACTION_VIEW = "tibero.use.transaction.view";
public final static String DBDIALECT_TIBERO_TEMPLATE_NUMBER_SPEC = "tibero.template.precision";
Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.util.Context;
Expand Down Expand Up @@ -151,7 +152,11 @@ public NewAndOldValue transform(IDatabasePlatform platform,
}

if (result instanceof String) {
return new NewAndOldValue((String) result, null);
if (data.getSourceDmlType().equals(DataEventType.DELETE)) {
return new NewAndOldValue(null, (String) result);
} else {
return new NewAndOldValue((String) result, null);
}
} else if (result instanceof NewAndOldValue) {
return (NewAndOldValue) result;
} else if (result != null) {
Expand Down
Expand Up @@ -124,6 +124,7 @@ public void beforeRouting() {
queryDataIdMap();
processInfo.setStatus(ProcessStatus.OK);
log.info("Querying data in gaps from database took {} ms", System.currentTimeMillis() - ts);
isAllDataRead = false;
afterRouting();
reset();
log.info("Full gap analysis is done after {} ms", System.currentTimeMillis() - ts);
Expand Down Expand Up @@ -192,7 +193,9 @@ public void afterRouting() {
} else if (lastBusyExpireRunTime != 0) {
setLastBusyExpireRunTime(0);
}


List<DataGap> skippedDataGaps = new ArrayList<>();

try {
long ts = System.currentTimeMillis();
long lastDataId = -1;
Expand Down Expand Up @@ -237,14 +240,7 @@ public void afterRouting() {
expireChecked++;
}
if (isAllDataRead || isGapEmpty) {
if (dataGap.getStartId() == dataGap.getEndId()) {
log.info("Found a gap in data_id at {}. Skipping it because " +
(supportsTransactionViews ? "there are no pending transactions" : "the gap expired"), dataGap.getStartId());
} else {
log.info("Found a gap in data_id from {} to {}. Skipping it because " +
(supportsTransactionViews ? "there are no pending transactions" : "the gap expired"),
dataGap.getStartId(), dataGap.getEndId());
}
skippedDataGaps.add(dataGap);
gapsDeleted.add(dataGap);
gapsAll.remove(dataGap);
}
Expand Down Expand Up @@ -299,6 +295,8 @@ public void afterRouting() {
} catch (RuntimeException ex) {
processInfo.setStatus(ProcessStatus.ERROR);
throw ex;
} finally {
logSkippedDataGaps(skippedDataGaps);
}
}

Expand Down Expand Up @@ -529,7 +527,46 @@ protected void fixOverlappingGaps(List<DataGap> gapsToCheck, ProcessInfo process
throw ex;
}
}


protected void logSkippedDataGaps(List<DataGap> skippedDataGaps) {
if (skippedDataGaps.isEmpty()) {
return;
}

if (log.isDebugEnabled()) {
for (DataGap dataGap : skippedDataGaps) {
if (dataGap.getStartId() == dataGap.getEndId()) {
log.debug("Expired data gap at data_id {} create_time {}. Skipping it because " +
(supportsTransactionViews ? "there are no pending transactions" : "the gap expired"), dataGap.getStartId(), dataGap.getCreateTime());
} else {
log.debug("Expired data gap between data_id {} and {} create_time {}. Skipping it because " +
(supportsTransactionViews ? "there are no pending transactions" : "the gap expired"),
dataGap.getStartId(), dataGap.getEndId(), dataGap.getCreateTime());
}
}
return;
}

Date minDate = skippedDataGaps.get(0).getCreateTime();
Date maxDate = skippedDataGaps.get(0).getCreateTime();
long minDataId = skippedDataGaps.get(0).getStartId();
long maxDataId = skippedDataGaps.get(0).getEndId();

for (DataGap dataGap : skippedDataGaps) {
if (dataGap.getCreateTime().before(minDate)) {
minDate = dataGap.getCreateTime();
}
if (dataGap.getCreateTime().after(maxDate)) {
maxDate = dataGap.getCreateTime();
}
minDataId = Math.min(minDataId, dataGap.getStartId());
maxDataId = Math.min(maxDataId, dataGap.getEndId());
}

log.info("Expired {} data gap(s) between data_id {} and {} and between create_time {} and {}",
skippedDataGaps.size(), minDataId, maxDataId, minDate, maxDate);

}

public Long mapRow(Row row) {
return row.getLong("data_id");
Expand All @@ -543,6 +580,9 @@ public void addDataIds(List<Long> dataIds) {
this.dataIds.addAll(dataIds);
}

/**
* This method is called for each channel that is routed. Once it is set for a routing pass it should remain set until the routing pass is done.
*/
public void setIsAllDataRead(boolean isAllDataRead) {
this.isAllDataRead &= isAllDataRead;
}
Expand Down
Expand Up @@ -88,7 +88,11 @@ public class DataGapRouteReader implements IDataToRouteReader {
protected long peekAheadSizeInBytes = 0;

protected boolean finishTransactionMode = false;

protected boolean isEachGapQueried;

protected boolean isOracleNoOrder;

protected String lastTransactionId = null;

protected static Map<String, Boolean> lastSelectUsedGreaterThanQueryByEngineName = new HashMap<String, Boolean>();
Expand All @@ -102,6 +106,7 @@ public DataGapRouteReader(ChannelRouterContext context, ISymmetricEngine engine)
this.percentOfHeapToUse = (double)parameterService.getInt(ParameterConstants.ROUTING_PEEK_AHEAD_MEMORY_THRESHOLD)/(double)100;
this.takeTimeout = engine.getParameterService().getInt(
ParameterConstants.ROUTING_WAIT_FOR_DATA_TIMEOUT_SECONDS, 330);
this.isOracleNoOrder = parameterService.is(ParameterConstants.DBDIALECT_ORACLE_SEQUENCE_NOORDER, false);
if (parameterService.is(ParameterConstants.SYNCHRONIZE_ALL_JOBS)) {
/* there will not be a separate thread to read a blocked queue so make sure the queue is big enough that it can be filled */
this.dataQueue = new LinkedBlockingQueue<Data>();
Expand Down Expand Up @@ -231,15 +236,28 @@ protected boolean process(Data data) {
if (!finishTransactionMode
|| (lastTransactionId != null && finishTransactionMode && lastTransactionId
.equals(data.getTransactionId()))) {
while (!okToProcess && currentGap != null && dataId >= currentGap.getStartId()) {
if (dataId <= currentGap.getEndId()) {
if (isOracleNoOrder) {
if (isEachGapQueried) {
okToProcess = true;
} else {
// past current gap. move to next gap
if (dataGaps.size() > 0) {
currentGap = dataGaps.remove(0);
for (DataGap gap : dataGaps) {
if (dataId >= gap.getStartId() && dataId <= gap.getEndId()) {
okToProcess = true;
break;
}
}
}
} else {
while (!okToProcess && currentGap != null && dataId >= currentGap.getStartId()) {
if (dataId <= currentGap.getEndId()) {
okToProcess = true;
} else {
currentGap = null;
// past current gap. move to next gap
if (dataGaps.size() > 0) {
currentGap = dataGaps.remove(0);
} else {
currentGap = null;
}
}
}
}
Expand Down Expand Up @@ -276,6 +294,7 @@ protected ISqlReadCursor<Data> prepareCursor() {
useGreaterThanDataId = true;
}

isEachGapQueried = !useGreaterThanDataId && this.dataGaps.size() <= numberOfGapsToQualify;
String channelId = context.getChannel().getChannelId();

String sql = null;
Expand All @@ -301,7 +320,9 @@ protected ISqlReadCursor<Data> prepareCursor() {
}
}

if (parameterService.is(ParameterConstants.ROUTING_DATA_READER_ORDER_BY_DATA_ID_ENABLED, true)) {
if (isOracleNoOrder) {
sql = String.format("%s %s", sql, engine.getRouterService().getSql("orderByCreateTime"));
} else if (parameterService.is(ParameterConstants.ROUTING_DATA_READER_ORDER_BY_DATA_ID_ENABLED, true)) {
sql = String.format("%s %s", sql, engine.getRouterService().getSql("orderByDataId"));
}

Expand Down Expand Up @@ -338,7 +359,9 @@ protected ISqlReadCursor<Data> prepareCursor() {
}
}

this.currentGap = dataGaps.remove(0);
if (!isOracleNoOrder) {
this.currentGap = dataGaps.remove(0);
}

ISqlRowMapper<Data> dataMapper = new ISqlRowMapper<Data>() {
public Data mapRow(Row row) {
Expand Down
Expand Up @@ -111,8 +111,14 @@ public BatchAckResult ack(final BatchAck batch) {

boolean isNewError = false;
if (!batch.isOk() && batch.getErrorLine() != 0) {
List<Number> ids = sqlTemplateDirty.query(getSql("selectDataIdSql"),
new NumberMapper(), outgoingBatch.getBatchId());
String sql = getSql("selectDataIdSql");
if (parameterService.is(ParameterConstants.DBDIALECT_ORACLE_SEQUENCE_NOORDER, false)) {
sql += getSql("orderByCreateTime");
} else if (parameterService.is(ParameterConstants.ROUTING_DATA_READER_ORDER_BY_DATA_ID_ENABLED, true)) {
sql += getSql("orderByDataId");
}

List<Number> ids = sqlTemplateDirty.query(sql, new NumberMapper(), outgoingBatch.getBatchId());
if (ids.size() >= batch.getErrorLine()) {
long failedDataId = ids.get((int) batch.getErrorLine() - 1).longValue();
if (outgoingBatch.getFailedDataId() == 0 || outgoingBatch.getFailedDataId() != failedDataId) {
Expand Down
Expand Up @@ -29,8 +29,12 @@ public class AcknowledgeServiceSqlMap extends AbstractSqlMap {
public AcknowledgeServiceSqlMap(IDatabasePlatform platform,
Map<String, String> replacementTokens) {
super(platform, replacementTokens);
putSql("selectDataIdSql",
"select data_id from $(data_event) b where batch_id = ? order by data_id ");
putSql("selectDataIdSql", "select data_id from $(data_event) b where batch_id = ?");

putSql("orderByDataId", " order by data_id asc");

putSql("orderByCreateTime", " order by create_time asc, data_id asc ");

}

}

0 comments on commit a894239

Please sign in to comment.