Skip to content

Commit

Permalink
Merge branch '3.9' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.9
  • Loading branch information
jumpmind-josh committed Jul 25, 2018
2 parents ad2e739 + fe8f043 commit 5cf93c7
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 83 deletions.
Expand Up @@ -373,6 +373,8 @@ protected static SqlTemplateSettings createSqlTemplateSettings(TypedProperties p
settings.setReadStringsAsBytes(properties.is(ParameterConstants.JDBC_READ_STRINGS_AS_BYTES, false));
settings.setTreatBinaryAsLob(properties.is(ParameterConstants.TREAT_BINARY_AS_LOB_ENABLED, true));
settings.setRightTrimCharValues(properties.is(ParameterConstants.RIGHT_TRIM_CHAR_VALUES, false));
settings.setAllowUpdatesWithResults(properties.is(ParameterConstants.ALLOW_UPDATES_WITH_RESULTS, false));

LogSqlBuilder logSqlBuilder = new LogSqlBuilder();
logSqlBuilder.setLogSlowSqlThresholdMillis(properties.getInt(ParameterConstants.LOG_SLOW_SQL_THRESHOLD_MILLIS, 20000));
logSqlBuilder.setLogSqlParametersInline(properties.is(ParameterConstants.LOG_SQL_PARAMETERS_INLINE, true));
Expand Down
Expand Up @@ -22,23 +22,18 @@

import static org.apache.commons.lang.StringUtils.isBlank;

import java.io.IOException;
import java.text.ParseException;
import java.util.Date;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
import org.jumpmind.db.model.Database;
import org.jumpmind.db.model.IndexColumn;
import org.jumpmind.db.model.NonUniqueIndex;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.platform.PermissionType;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.SqlException;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.TableConstants;
import org.jumpmind.symmetric.db.AbstractSymmetricDialect;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.db.SequenceIdentifier;
Expand Down Expand Up @@ -390,20 +385,5 @@ public PermissionType[] getSymTablePermissions() {
PermissionType[] permissions = { PermissionType.CREATE_TABLE, PermissionType.DROP_TABLE, PermissionType.CREATE_TRIGGER, PermissionType.DROP_TRIGGER, PermissionType.EXECUTE};
return permissions;
}

@Override
protected Database readDatabaseFromXml(String resourceName) throws IOException {
Database database = super.readDatabaseFromXml(resourceName);
if (parameterService.is(ParameterConstants.DBDIALECT_ORACLE_SEQUENCE_NOORDER, false)) {
Table table = database.findTable(TableConstants.SYM_DATA);
if (table != null) {
NonUniqueIndex index = new NonUniqueIndex("idx_crt_tm_dt_d");
index.addColumn(new IndexColumn(table.findColumn("create_time")));
index.addColumn(new IndexColumn(table.findColumn("data_id")));
table.addIndex(index);
}
}
return database;
}

}
Expand Up @@ -435,6 +435,8 @@ private ParameterConstants() {

public final static String RIGHT_TRIM_CHAR_VALUES = "right.trim.char.values";

public final static String ALLOW_UPDATES_WITH_RESULTS = "allow.updates.with.results";

public final static String NODE_LOAD_ONLY = "load.only";

public final static String MYSQL_TINYINT_DDL_TO_BOOLEAN = "mysql.tinyint.ddl.to.boolean";
Expand Down
Expand Up @@ -43,11 +43,13 @@
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.FileSnapshot;
import org.jumpmind.symmetric.model.FileSnapshot.LastEventType;
import org.jumpmind.symmetric.model.FileTrigger;
import org.jumpmind.symmetric.model.FileTriggerRouter;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IExtensionService;
import org.jumpmind.symmetric.service.IFileSyncService;
import org.jumpmind.symmetric.service.INodeService;
Expand All @@ -71,14 +73,16 @@ public class FileSyncZipDataWriter implements IDataWriter {
protected DataContext context;
protected INodeService nodeService;
protected IExtensionService extensionService;
protected IConfigurationService configurationService;

public FileSyncZipDataWriter(long maxBytesToSync, IFileSyncService fileSyncService,
INodeService nodeService, IStagedResource stagedResource, IExtensionService extensionService) {
INodeService nodeService, IStagedResource stagedResource, IExtensionService extensionService, IConfigurationService configurationService) {
this.maxBytesToSync = maxBytesToSync;
this.fileSyncService = fileSyncService;
this.stagedResource = stagedResource;
this.nodeService = nodeService;
this.extensionService = extensionService;
this.configurationService = configurationService;
}

public void open(DataContext context) {
Expand Down Expand Up @@ -107,13 +111,18 @@ public boolean start(Table table) {

public void write(CsvData data) {
DataEventType eventType = data.getDataEventType();

if (eventType == DataEventType.INSERT || eventType == DataEventType.UPDATE) {
if (eventType == DataEventType.INSERT) {
statistics.get(this.batch).increment(DataWriterStatisticConstants.INSERTCOUNT);
}
else {
statistics.get(this.batch).increment(DataWriterStatisticConstants.UPDATECOUNT);
}
if (filterInitialLoad(data)) {
return;
}

if (eventType == DataEventType.INSERT) {
statistics.get(this.batch).increment(DataWriterStatisticConstants.INSERTCOUNT);
}
else {
statistics.get(this.batch).increment(DataWriterStatisticConstants.UPDATECOUNT);
}
Map<String, String> columnData = data.toColumnNameValuePairs(
snapshotTable.getColumnNames(), CsvData.ROW_DATA);
Map<String, String> oldColumnData = data.toColumnNameValuePairs(
Expand Down Expand Up @@ -306,4 +315,29 @@ protected boolean isCClient(String nodeId) {
return cclient;
}

protected boolean filterInitialLoad(CsvData data) {
Channel channel = configurationService.getChannel(batch.getChannelId());
if (channel.isReloadFlag()) {
List<FileTriggerRouter> fileTriggerRouters = fileSyncService
.getFileTriggerRoutersForCurrentNode(false);
Map<String, String> columnData = data.toColumnNameValuePairs(
snapshotTable.getColumnNames(), CsvData.ROW_DATA);
String triggerId = columnData.get("TRIGGER_ID");
String routerId = columnData.get("ROUTER_ID");

for (FileTriggerRouter fileTriggerRouter : fileTriggerRouters) {
if (fileTriggerRouter.getTriggerId().equals(triggerId)
&& fileTriggerRouter.getRouterId().equals(routerId)) {
if (! fileTriggerRouter.isEnabled() || !fileTriggerRouter.isInitialLoadEnabled()) {
return true;
} else {
return false;
}
}
}
}

return false;
}

}
Expand Up @@ -103,7 +103,7 @@ public void mergeInjectedBatchStatistics(Statistics statistics) {

public void setNodeBatchId(String value) {
if (value != null) {
int splitIndex = value.indexOf("-");
int splitIndex = value.lastIndexOf("-");
if (splitIndex > 0) {
setNodeId(value.substring(0, splitIndex));
setBatchId(Long.parseLong(value.substring(splitIndex + 1)));
Expand Down
Expand Up @@ -1825,7 +1825,7 @@ protected void updateExtractRequestStatus(ISqlTransaction transaction, long extr

protected boolean canProcessExtractRequest(ExtractRequest request, CommunicationType communicationType) {
Trigger trigger = this.triggerRouterService.getTriggerById(request.getTriggerId());
if (trigger != null && !trigger.getSourceTableName().equalsIgnoreCase(TableConstants.getTableName(tablePrefix,
if (trigger == null || !trigger.getSourceTableName().equalsIgnoreCase(TableConstants.getTableName(tablePrefix,
TableConstants.SYM_FILE_SNAPSHOT))) {
return true;
} else {
Expand Down Expand Up @@ -2246,41 +2246,43 @@ public CsvData next() {
Trigger trigger = triggerRouterService.getTriggerById(
triggerHistory.getTriggerId(), false);
boolean isFileParserRouter = triggerHistory.getTriggerId().equals(AbstractFileParsingRouter.TRIGGER_ID_FILE_PARSER);
if (trigger != null || isFileParserRouter) {
if (lastTriggerHistory == null || lastTriggerHistory
.getTriggerHistoryId() != triggerHistory.getTriggerHistoryId() ||
lastRouterId == null || !lastRouterId.equals(routerId)) {

this.sourceTable = columnsAccordingToTriggerHistory.lookup(
routerId, triggerHistory, false, !isFileParserRouter);

this.targetTable = columnsAccordingToTriggerHistory.lookup(
routerId, triggerHistory, true, false);

if (trigger != null && trigger.isUseStreamLobs() || (data.getRowData() != null && hasLobsThatNeedExtract(sourceTable, data))) {
this.requiresLobSelectedFromSource = true;
} else {
this.requiresLobSelectedFromSource = false;
}
}

data.setNoBinaryOldData(requiresLobSelectedFromSource
|| symmetricDialect.getName().equals(
DatabaseNamesConstants.MSSQL2000)
|| symmetricDialect.getName().equals(
DatabaseNamesConstants.MSSQL2005)
|| symmetricDialect.getName().equals(
DatabaseNamesConstants.MSSQL2008));

outgoingBatch.incrementExtractRowCount();
outgoingBatch.incrementExtractRowCount(data.getDataEventType());
} else {
log.error(
"Could not locate a trigger with the id of {} for {}. It was recorded in the hist table with a hist id of {}",
if (trigger == null && !isFileParserRouter) {
log.warn(
"Could not locate a trigger with the id of {} for table {} (data id {} with trigger hist id {}). It's possible this trigger was deleted before the batch could be extracted.",
new Object[] { triggerHistory.getTriggerId(),
triggerHistory.getSourceTableName(),
data.getDataId(),
triggerHistory.getTriggerHistoryId() });
}

if (lastTriggerHistory == null || lastTriggerHistory
.getTriggerHistoryId() != triggerHistory.getTriggerHistoryId() ||
lastRouterId == null || !lastRouterId.equals(routerId)) {

this.sourceTable = columnsAccordingToTriggerHistory.lookup(
routerId, triggerHistory, false, !isFileParserRouter);

this.targetTable = columnsAccordingToTriggerHistory.lookup(
routerId, triggerHistory, true, false);

if (trigger != null && trigger.isUseStreamLobs() || (data.getRowData() != null && hasLobsThatNeedExtract(sourceTable, data))) {
this.requiresLobSelectedFromSource = true;
} else {
this.requiresLobSelectedFromSource = false;
}
}

data.setNoBinaryOldData(requiresLobSelectedFromSource
|| symmetricDialect.getName().equals(
DatabaseNamesConstants.MSSQL2000)
|| symmetricDialect.getName().equals(
DatabaseNamesConstants.MSSQL2005)
|| symmetricDialect.getName().equals(
DatabaseNamesConstants.MSSQL2008));

outgoingBatch.incrementExtractRowCount();
outgoingBatch.incrementExtractRowCount(data.getDataEventType());

if (data.getDataEventType() == DataEventType.CREATE && StringUtils.isBlank(data.getCsvData(CsvData.ROW_DATA))) {

boolean excludeDefaults = parameterService.is(ParameterConstants.CREATE_TABLE_WITHOUT_DEFAULTS, false);
Expand Down
Expand Up @@ -128,7 +128,7 @@ protected IDataWriter buildWriter(long memoryThresholdInBytes) {
.getLong(ParameterConstants.TRANSPORT_MAX_BYTES_TO_SYNC);

FileSyncZipDataWriter fileSyncWriter = new FileSyncZipDataWriter(maxBytesToSync, fileSyncService,
nodeService, stagedResource, extensionService) {
nodeService, stagedResource, extensionService, configurationService) {
@Override
public void close() {
super.finish();
Expand Down
Expand Up @@ -588,7 +588,7 @@ public List<OutgoingBatch> sendFiles(ProcessInfo processInfo, Node targetNode,
Constants.STAGING_CATEGORY_OUTGOING, processInfo.getSourceNodeId(),
targetNode.getNodeId(), "filesync.zip");
dataWriter = new FileSyncZipDataWriter(maxBytesToSync, this,
engine.getNodeService(), stagedResource, engine.getExtensionService());
engine.getNodeService(), stagedResource, engine.getExtensionService(), engine.getConfigurationService());
}
log.debug("Extracting batch {} for filesync.", currentBatch.getNodeBatchId());

Expand Down
Expand Up @@ -2233,6 +2233,14 @@ treat.binary.as.lob.enabled=true
# Tags: other
right.trim.char.values=false

# When executing DML statements during data load, this controls whether executeUpdate or execute is used on the PreparedStatement. executeUpdate
# is used by default. execute() allows for unusual situations like when an application trigger generates a result set during an
# update statement.
#
# DatabaseOverridable: true
# Type: boolean
# Tags: other
allow.updates.with.results=false

# This is the location the staging directory will be put. If it isn't set the staging directory will be located according to java.io.tmpdir.
#
Expand Down
Expand Up @@ -31,6 +31,7 @@ public class SqlTemplateSettings {
protected int overrideIsolationLevel = -1;
protected int resultSetType = java.sql.ResultSet.TYPE_FORWARD_ONLY;
protected LogSqlBuilder logSqlBuilder;
protected boolean allowUpdatesWithResults = false;

public SqlTemplateSettings() {
}
Expand Down Expand Up @@ -107,4 +108,12 @@ public void setRightTrimCharValues(boolean rightTrimCharValues) {
this.rightTrimCharValues = rightTrimCharValues;
}

public boolean isAllowUpdatesWithResults() {
return allowUpdatesWithResults;
}

public void setAllowUpdatesWithResults(boolean allowUpdatesWithResults) {
this.allowUpdatesWithResults = allowUpdatesWithResults;
}

}
Expand Up @@ -166,12 +166,13 @@ public void write(CsvData data) {
}

for (TransformTable transformation : transformTables) {
if (eventType == DataEventType.INSERT && transformation.isUpdateFirst()) {
eventType = DataEventType.UPDATE;
DataEventType localEventType = eventType;
if (localEventType == DataEventType.INSERT && transformation.isUpdateFirst()) {
localEventType = DataEventType.UPDATE;
}

List<TransformedData> dataThatHasBeenTransformed =
transform(eventType, context, transformation, sourceKeyValues, oldSourceValues, sourceValues);
transform(localEventType, context, transformation, sourceKeyValues, oldSourceValues, sourceValues);

for (TransformedData transformedData : dataThatHasBeenTransformed) {
Table transformedTable = transformedData.buildTargetTable();
Expand Down

0 comments on commit 5cf93c7

Please sign in to comment.