Skip to content

Commit

Permalink
Merge branch '3.10' of https://github.com/JumpMind/symmetric-ds.git i…
Browse files Browse the repository at this point in the history
…nto 3.10
  • Loading branch information
jumpmind-josh committed Dec 10, 2019
2 parents b248b08 + 2425afa commit b157b0c
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 33 deletions.
5 changes: 1 addition & 4 deletions symmetric-assemble/src/asciidoc/configuration/conflicts.ad
Expand Up @@ -113,10 +113,7 @@ endif::pro[]
Detection Expression:: An expression that provides additional information about the detection mechanism. If the detection
mechanism is use_timestamp or use_version then this expression will be the name of the timestamp or version column.
The detect_expression is also used to exclude certain column names from being used. For example, to exclude column1 and column2,
the expression is:
----
excluded_column_names=column1,column2
----
the expression is "excluded_column_names=column1,column2".

Resolve Changes Only:: Indicates that when applying changes during an update that only data that has changed should be applied.
Otherwise, all the columns will be updated. This really only applies to updates.
Expand Down
Expand Up @@ -387,8 +387,10 @@ private void runJob(CommandLine line, List<String> args) throws Exception{
String jobName = popArg(args, "job name");
if (jobName.equals("pull")) {
getSymmetricEngine().pull();
getSymmetricEngine().getNodeCommunicationService().stop();
} else if (jobName.equals("push")) {
getSymmetricEngine().push();
getSymmetricEngine().getNodeCommunicationService().stop();
} else if (jobName.equals("route")) {
getSymmetricEngine().route();
} else if (jobName.equals("purge")) {
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.JdbcSqlTransaction;
import org.jumpmind.db.sql.SqlException;
import org.jumpmind.db.util.BasicDataSourcePropertyConstants;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.Version;
import org.jumpmind.symmetric.common.ParameterConstants;
Expand Down Expand Up @@ -56,18 +57,30 @@ public class MySqlSymmetricDialect extends AbstractSymmetricDialect implements I
static final String SQL_FUNCTION_INSTALLED = "select count(*) from information_schema.routines where routine_name='$(functionName)' and routine_schema in (select database())" ;

private String functionTemplateKeySuffix = null;

private boolean isConvertZeroDateToNull;

public MySqlSymmetricDialect(IParameterService parameterService, IDatabasePlatform platform) {
super(parameterService, platform);
this.triggerTemplate = new MySqlTriggerTemplate(this);
this.parameterService = parameterService;


if (parameterService.getString(BasicDataSourcePropertyConstants.DB_POOL_URL).contains("zeroDateTimeBehavior=convertToNull")) {
try {
String sqlMode = platform.getSqlTemplate().queryForString("select @@sql_mode");
isConvertZeroDateToNull = sqlMode == null || (!sqlMode.contains("NO_ZERO_DATE") && !sqlMode.contains("NO_ZERO_IN_DATE"));
if (isConvertZeroDateToNull) {
log.info("Zero dates will be converted to null");
}
} catch (Exception e) {
log.warn("Cannot convert zero dates to null because unable to verify sql_mode: {}", e.getMessage());
}
}
this.triggerTemplate = new MySqlTriggerTemplate(this, isConvertZeroDateToNull);

int[] versions = Version.parseVersion(getProductVersion());
if (getMajorVersion() == 5
&& (getMinorVersion() == 0 || (getMinorVersion() == 1 && versions[2] < 23))) {
if (getMajorVersion() == 5 && (getMinorVersion() == 0 || (getMinorVersion() == 1 && versions[2] < 23))) {
this.functionTemplateKeySuffix = PRE_5_1_23;
} else if (getMajorVersion() == 5
&& (getMinorVersion() < 7 || (getMinorVersion() == 7 && versions[2] < 6))) {
} else if (getMajorVersion() == 5 && (getMinorVersion() < 7 || (getMinorVersion() == 7 && versions[2] < 6))) {
this.functionTemplateKeySuffix = PRE_5_7_6;
} else {
this.functionTemplateKeySuffix = POST_5_7_6;
Expand Down
Expand Up @@ -22,20 +22,19 @@

import java.util.HashMap;

import org.jumpmind.db.util.BasicDataSourcePropertyConstants;
import org.jumpmind.symmetric.db.AbstractTriggerTemplate;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.util.SymmetricUtils;

public class MySqlTriggerTemplate extends AbstractTriggerTemplate {

public MySqlTriggerTemplate(ISymmetricDialect symmetricDialect) {
public MySqlTriggerTemplate(ISymmetricDialect symmetricDialect, boolean isConvertZeroDateToNull) {
super(symmetricDialect);
emptyColumnTemplate = "''" ;
stringColumnTemplate = "cast(if($(tableAlias).`$(columnName)` is null,'',concat('\"',replace(replace($(tableAlias).`$(columnName)`,'\\\\','\\\\\\\\'),'\"','\\\\\"'),'\"')) as char)\n" ;
geometryColumnTemplate = "if($(tableAlias).`$(columnName)` is null,'',concat('\"',replace(replace(astext($(tableAlias).`$(columnName)`),'\\\\','\\\\\\\\'),'\"','\\\\\"'),'\"'))\n" ;
numberColumnTemplate = "if($(tableAlias).`$(columnName)` is null,'',concat('\"',cast($(tableAlias).`$(columnName)` as char),'\"'))\n" ;
datetimeColumnTemplate = "if($(tableAlias).`$(columnName)` is null" + getConvertZeroDateToNull() + ",'',concat('\"',cast($(tableAlias).`$(columnName)` as char),'\"'))\n" ;
datetimeColumnTemplate = "if($(tableAlias).`$(columnName)` is null" + (isConvertZeroDateToNull ? " or $(tableAlias).`$(columnName)` = '0000-00-00'" : "") + ",'',concat('\"',cast($(tableAlias).`$(columnName)` as char),'\"'))\n" ;
clobColumnTemplate = stringColumnTemplate;
blobColumnTemplate = "if($(tableAlias).`$(columnName)` is null,'',concat('\"',hex($(tableAlias).`$(columnName)`),'\"'))\n" ;
booleanColumnTemplate = "if($(tableAlias).`$(columnName)` is null,'',concat('\"',cast($(tableAlias).`$(columnName)` as unsigned),'\"'))\n" ;
Expand Down Expand Up @@ -163,13 +162,6 @@ public MySqlTriggerTemplate(ISymmetricDialect symmetricDialect) {
sqlTemplates.put("initialLoadSqlTemplate" ,
"select concat($(columns)) from $(schemaName)$(tableName) t where $(whereClause) " );
}

protected String getConvertZeroDateToNull() {
if (symmetricDialect.getParameterService().getString(BasicDataSourcePropertyConstants.DB_POOL_URL).contains("zeroDateTimeBehavior=convertToNull")) {
return " or $(tableAlias).`$(columnName)` = '0000-00-00'";
}
return "";
}

@Override
protected String castDatetimeColumnToString(String columnName) {
Expand Down
Expand Up @@ -953,9 +953,13 @@ public Map<Integer, ExtractRequest> insertReloadEvents(Node targetNode, boolean
triggerHistories, triggerRoutersByHistoryId,
mapReloadRequests, isFullLoad, symNodeSecurityReloadChannel);

finalizeBatchCount += insertFileSyncBatchForReload(targetNode, loadId, createBy, transactional,
int fileSyncBatches = insertFileSyncBatchForReload(targetNode, loadId, createBy, transactional,
transaction, mapReloadRequests, isFullLoad, processInfo);


if (reloadRequests != null && reloadRequests.size() > 0) {
updateTableReloadStatusTableCount(platform.supportsMultiThreadedTransactions() ? null : transaction, loadId,
totalTableCount + fileSyncBatches);
}

if (isFullLoad) {

Expand Down Expand Up @@ -1409,7 +1413,7 @@ private Map<Integer, ExtractRequest> insertLoadBatchesForReload(Node targetNode,
}

long firstBatchId = 0;

for (TriggerHistory triggerHistory : triggerHistories) {
List<TriggerRouter> triggerRouters = triggerRoutersByHistoryId.get(triggerHistory
.getTriggerHistoryId());
Expand Down Expand Up @@ -1486,11 +1490,11 @@ private Map<Integer, ExtractRequest> insertLoadBatchesForReload(Node targetNode,
}
}

firstBatchId = firstBatchId > 0 ? firstBatchId : startBatchId;
firstBatchId = firstBatchId == 0 ? startBatchId : firstBatchId;

if (table.getNameLowerCase().startsWith(symmetricDialect.getTablePrefix() + "_" + TableConstants.SYM_FILE_SNAPSHOT)) {
TableReloadStatus reloadStatus = getTableReloadStatusByLoadId(loadId);
startBatchId = reloadStatus.getStartDataBatchId();
firstBatchId = reloadStatus.getStartDataBatchId() > 0 ? reloadStatus.getStartDataBatchId() : firstBatchId;
}

updateTableReloadStatusDataCounts(platform.supportsMultiThreadedTransactions() ? null : transaction,
Expand Down
Expand Up @@ -713,7 +713,7 @@ private boolean isWaitForExtractionRequired(OutgoingBatch currentBatch, IStagedR

if (currentBatch.getStatus() == OutgoingBatch.Status.RQ) {
// if this is a reload that isn't extracted yet, we need to defer to the extract job.
log.info("Batch needs to be extracted by the extact job {}", currentBatch.getNodeBatchId());
log.info("Batch needs to be extracted by the extract job {}", currentBatch.getNodeBatchId());
} else {
// it's possible there was an error and staging was cleared, so we need to re-request extraction here.
log.info("Batch has status of '{}' but is not extracted. Requesting re-extract for batch: {}",
Expand Down
Expand Up @@ -50,6 +50,7 @@
import org.jumpmind.symmetric.model.NodeCommunication;
import org.jumpmind.symmetric.model.NodeCommunication.CommunicationType;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.RemoteNodeStatus;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;
import org.jumpmind.symmetric.service.IClusterService;
Expand Down Expand Up @@ -250,7 +251,8 @@ public List<NodeCommunication> list(CommunicationType communicationType) {

protected List<String> getNodeIdsWithUnsentCount() {
return sqlTemplate.query(getSql("selectNodeIdsWithUnsentBatchsSql"),
new StringMapper());
new StringMapper(), OutgoingBatch.Status.ER, OutgoingBatch.Status.NE, OutgoingBatch.Status.QY,
OutgoingBatch.Status.SE, OutgoingBatch.Status.LD, OutgoingBatch.Status.IG, OutgoingBatch.Status.RS);
}

protected List<NodeCommunication> filterForChannelThreading(List<Node> nodesToCommunicateWith) {
Expand Down Expand Up @@ -609,7 +611,7 @@ public void stop() {
for (CommunicationType communicationType : services) {
try {
ExecutorService service = executors.get(communicationType);
service.shutdownNow();
service.shutdown();
} finally {
executors.remove(communicationType);
}
Expand Down
Expand Up @@ -34,7 +34,7 @@ public NodeCommunicationServiceSqlMap(IDatabasePlatform platform,

putSql("clearLocksOnRestartSql", "update $(node_communication) set lock_time=null where locking_server_id=? and lock_time is not null");

putSql("selectNodeIdsWithUnsentBatchsSql", "select distinct(node_id) from $(outgoing_batch) where status <> 'OK'");
putSql("selectNodeIdsWithUnsentBatchsSql", "select distinct(node_id) from $(outgoing_batch) where status in (?,?,?,?,?,?,?)");

putSql("selectNodeCommunicationSql",
"select * from $(node_communication) where communication_type=? order by node_priority DESC,last_lock_time");
Expand Down
Expand Up @@ -46,7 +46,7 @@
*/
public class InterbaseDdlBuilder extends AbstractDdlBuilder {

public static int SWITCH_TO_LONGVARCHAR_SIZE = 4096;
public static int SWITCH_TO_LONGVARCHAR_SIZE = 3840;

public InterbaseDdlBuilder() {

Expand Down Expand Up @@ -94,8 +94,8 @@ public InterbaseDdlBuilder() {
databaseInfo.setHasSize(Types.BINARY, false);
databaseInfo.setHasSize(Types.VARBINARY, false);

databaseInfo.setNonBlankCharColumnSpacePadded(false);
databaseInfo.setBlankCharColumnSpacePadded(false);
databaseInfo.setNonBlankCharColumnSpacePadded(true);
databaseInfo.setBlankCharColumnSpacePadded(true);
databaseInfo.setCharColumnSpaceTrimmed(false);
databaseInfo.setEmptyStringNulled(false);

Expand Down
Expand Up @@ -202,7 +202,8 @@ public void needsResolved(AbstractDatabaseWriter writer, CsvData data, LoadStatu
default:
break;
}


writer.getContext().setLastError(null);
logConflictResolution(conflict, data, writer, resolvedData, lineNumber);
}

Expand Down

0 comments on commit b157b0c

Please sign in to comment.