Skip to content

Commit

Permalink
Merge branch '3.9' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.9
  • Loading branch information
erilong committed Oct 29, 2018
2 parents 3b03c85 + 4da2a5d commit 8d0f682
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 30 deletions.
2 changes: 1 addition & 1 deletion symmetric-assemble/common.gradle
Expand Up @@ -186,7 +186,7 @@ subprojects { subproject ->
jdomVersion = '2.0.5'
junitVersion = '4.11'
log4jVersion = '1.2.17'
slf4jVersion = '1.7.21'
slf4jVersion = '1.7.6'
mockitoVersion = '1.9.5'
powerMockVersion = '1.5.3'
mysqlVersion = '5.1.45'
Expand Down
Expand Up @@ -191,20 +191,20 @@ public void createRequiredDatabaseObjects() {

String wkt2geom = this.parameterService.getTablePrefix() + "_" + "wkt2geom";
if (!installed(SQL_OBJECT_INSTALLED, wkt2geom)) {
String sql = " CREATE OR REPLACE "
+ " FUNCTION $(functionName)( "
+ " clob_in IN CLOB) "
+ " RETURN SDO_GEOMETRY "
+ " AS "
+ " v_out SDO_GEOMETRY := NULL; "
+ " BEGIN "
+ " IF clob_in IS NOT NULL THEN "
+ " IF DBMS_LOB.GETLENGTH(clob_in) > 0 THEN "
+ " v_out := SDO_GEOMETRY(clob_in); "
+ " END IF; "
+ " END IF; "
+ " RETURN v_out; "
+ " END $(functionName); ";
String sql = " CREATE OR REPLACE FUNCTION $(functionName) ( \r\n"
+ " clob_in IN CLOB, \r\n"
+ " srid_in IN INTEGER) \r\n"
+ " RETURN SDO_GEOMETRY \r\n"
+ " AS \r\n"
+ " v_out SDO_GEOMETRY := NULL; \r\n"
+ " BEGIN \r\n"
+ " IF clob_in IS NOT NULL THEN \r\n"
+ " IF DBMS_LOB.GETLENGTH(clob_in) > 0 THEN \r\n"
+ " v_out := SDO_GEOMETRY(clob_in, srid_in); \r\n"
+ " END IF; \r\n"
+ " END IF; \r\n"
+ " RETURN v_out; \r\n"
+ " END $(functionName); \r\n";
install(sql, wkt2geom);
}

Expand Down
@@ -1,6 +1,8 @@
package org.jumpmind.symmetric.io;

import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.JdbcSqlTemplate;
import org.jumpmind.db.sql.JdbcSqlTransaction;
import org.jumpmind.symmetric.common.ContextConstants;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
Expand All @@ -22,6 +24,7 @@ public AbstractBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabase
public void start(Batch batch) {
super.start(batch);
if (isFallBackToDefault()) {
getTransaction().setInBatchMode(false);
log.debug("Writing batch " + batch.getBatchId() + " on channel " + batch.getChannelId() + " to node " + batch.getTargetNodeId() + " using DEFAULT loader");
}else{
log.debug("Writing batch " + batch.getBatchId() + " on channel " + batch.getChannelId() + " to node " + batch.getTargetNodeId() + " using BULK loader");
Expand Down
Expand Up @@ -34,7 +34,7 @@ protected LoadStatus insert(CsvData data) {

@Override
protected LoadStatus update(CsvData data, boolean applyChangesOnly, boolean useConflictDetection) {
LoadStatus loadStatus = super.insert(data);
LoadStatus loadStatus = super.update(data, applyChangesOnly, useConflictDetection);
if (loadStatus == LoadStatus.CONFLICT) {
loadStatus = LoadStatus.SUCCESS;
}
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
Expand Down Expand Up @@ -464,7 +465,7 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa
createBy, transactional, transaction);
}
}
Map<String, TableReloadRequest> mapReloadRequests = convertReloadListToMap(reloadRequests);
Map<String, TableReloadRequest> mapReloadRequests = convertReloadListToMap(reloadRequests, triggerRouters);

String symNodeSecurityReloadChannel = null;
int totalTableCount = 0;
Expand Down Expand Up @@ -585,17 +586,35 @@ private String findChannelFor(TriggerHistory history, List<TriggerRouter> trigge
}

@SuppressWarnings("unchecked")
protected Map<String, TableReloadRequest> convertReloadListToMap(List<TableReloadRequest> reloadRequests) {
protected Map<String, TableReloadRequest> convertReloadListToMap(List<TableReloadRequest> reloadRequests, List<TriggerRouter> triggerRouters) {
if (reloadRequests == null) {
return null;
}
Map<String, TableReloadRequest> reloadMap = new CaseInsensitiveMap();
for (TableReloadRequest item : reloadRequests) {
reloadMap.put(item.getIdentifier(), item);
for (TableReloadRequest reloadRequest : reloadRequests) {
validate(reloadRequest, triggerRouters);
reloadMap.put(reloadRequest.getIdentifier(), reloadRequest);
}
return reloadMap;
}

protected void validate(TableReloadRequest reloadRequest, List<TriggerRouter> triggerRouters) {
boolean validMatch = false;
for (TriggerRouter triggerRouter : triggerRouters) {
if (ObjectUtils.equals(triggerRouter.getTriggerId(), reloadRequest.getTriggerId())
&& ObjectUtils.equals(triggerRouter.getRouterId(), reloadRequest.getRouterId())) {
validMatch = true;
break;
}
}

if (!validMatch) {
throw new SymmetricException("Table reload request submitted which does not have a valid trigger/router "
+ "combination in sym_trigger_router. Request trigger id: '" + reloadRequest.getTriggerId() + "' router id: '"
+ reloadRequest.getRouterId() + "' create time: " + reloadRequest.getCreateTime());
}
}

private void callReloadListeners(boolean before, Node targetNode, boolean transactional,
ISqlTransaction transaction, long loadId) {
for (IReloadListener listener : extensionService.getExtensionPointList(IReloadListener.class)) {
Expand Down
Expand Up @@ -146,6 +146,10 @@ public void trackChanges(boolean force) {
try {
log.debug("Tracking changes for file sync");
Node local = engine.getNodeService().findIdentity();
if (local == null) {
log.warn("Not running file sync trackChanges because the local node is not available yet. It may not be registered yet.");
return;
}
ProcessInfo processInfo = engine.getStatisticManager().newProcessInfo(
new ProcessInfoKey(local.getNodeId(), null, ProcessType.FILE_SYNC_TRACKER));
boolean useCrc = engine.getParameterService().is(ParameterConstants.FILE_SYNC_USE_CRC);
Expand Down
Expand Up @@ -933,7 +933,11 @@ public AuthenticationStatus getAuthenticationStatus(String nodeId, String securi
if (node == null) {
retVal = AuthenticationStatus.REGISTRATION_REQUIRED;
} else if (!syncEnabled(node)) {
retVal = AuthenticationStatus.SYNC_DISABLED;
if(registrationOpen(node)){
retVal = AuthenticationStatus.REGISTRATION_REQUIRED;
}else{
retVal = AuthenticationStatus.SYNC_DISABLED;
}
} else if (!isNodeAuthorized(nodeId, securityToken)) {
retVal = AuthenticationStatus.FORBIDDEN;
}
Expand All @@ -946,6 +950,14 @@ protected boolean syncEnabled(Node node) {
syncEnabled = node.isSyncEnabled();
}
return syncEnabled;
}
}

protected boolean registrationOpen(Node node){
NodeSecurity security = findNodeSecurity(node.getNodeId());
if(security != null){
return security.isRegistrationEnabled();
}
return false;
}

}
Expand Up @@ -22,6 +22,7 @@

import java.sql.Types;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.TypeMap;
import org.jumpmind.db.platform.DatabaseInfo;
Expand All @@ -42,7 +43,7 @@ protected void appendColumnParameter(StringBuilder sql, Column column) {
sql.append("TO_TIMESTAMP_TZ(?, 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM')")
.append(",");
} else if (isGeometry(column)) {
sql.append("SYM_WKT2GEOM(?)").append(",");
sql.append("SYM_WKT2GEOM(?,").append(buildSRIDSelect(column)).append(")").append(",");
} else if (column.getJdbcTypeName().startsWith("XMLTYPE")) {
sql.append("XMLTYPE(?)").append(",");
} else {
Expand All @@ -57,7 +58,7 @@ protected void appendColumnEquals(StringBuilder sql, Column column) {
.append(" = TO_TIMESTAMP_TZ(?, 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM')");
} else if (isGeometry(column)) {
sql.append(quote).append(column.getName()).append(quote).append(" = ")
.append("SYM_WKT2GEOM(?)");
.append("SYM_WKT2GEOM(?,").append(buildSRIDSelect(column)).append(")");
} else if (column.getJdbcTypeName().startsWith("XMLTYPE")) {
sql.append(quote).append(column.getName()).append(quote).append(" = ")
.append("XMLTYPE(?)");
Expand Down Expand Up @@ -97,4 +98,14 @@ protected boolean isGeometry(Column column) {
return false;
}
}

protected String buildSRIDSelect(Column column) {
if (!StringUtils.isEmpty(schemaName)) {
return String.format("(select SRID from all_sdo_geom_metadata where owner = '%s' and table_name = '%s' and column_name = '%s')",
schemaName.toUpperCase(), tableName.toUpperCase(), column.getName().toUpperCase());
} else {
return String.format("(select SRID from user_sdo_geom_metadata where table_name = '%s' and column_name = '%s')",
tableName.toUpperCase(), column.getName().toUpperCase());
}
}
}
11 changes: 10 additions & 1 deletion symmetric-db/src/main/java/org/jumpmind/db/sql/DmlStatement.java
Expand Up @@ -53,6 +53,12 @@ public class DmlStatement {
public enum DmlType {
INSERT, UPDATE, DELETE, UPSERT, COUNT, FROM, WHERE, SELECT, SELECT_ALL, UNKNOWN
};

protected String catalogName;

protected String schemaName;

protected String tableName;

protected DmlType dmlType;

Expand Down Expand Up @@ -99,7 +105,10 @@ protected void init(DmlType type, String catalogName, String schemaName, String
Column[] keysColumns, Column[] columns, boolean[] nullKeyValues,
DatabaseInfo databaseInfo, boolean useQuotedIdentifiers, String textColumnExpression,
boolean namedParameters) {


this.catalogName = catalogName;
this.schemaName = schemaName;
this.tableName = tableName;
this.namedParameters = namedParameters;
this.databaseInfo = databaseInfo;
this.columns = columns;
Expand Down
Expand Up @@ -22,6 +22,7 @@

import java.sql.Timestamp;
import java.sql.Types;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.TimeZone;
Expand Down Expand Up @@ -118,8 +119,15 @@ protected boolean isVersionNewer(Conflict conflict, AbstractDatabaseWriter write
DmlStatement stmt = databaseWriter.getPlatform().createDmlStatement(DmlType.FROM, targetTable
, writer.getWriterSettings().getTextColumnExpression());
String sql = stmt.getColumnsSql(new Column[] { targetTable.getColumnWithName(columnName) });
Long existingVersion = databaseWriter.getTransaction()
.queryForObject(sql, Long.class, objectValues);
Long existingVersion = null;

try {
existingVersion = databaseWriter.getTransaction().queryForObject(sql, Long.class, objectValues);
} catch (Exception ex) {
throw new RuntimeException("Failed to execute conflict resolution SQL: \"" +
sql + "\" values: " + Arrays.toString(objectValues), ex);
}

if (existingVersion == null) {
return true;
} else {
Expand Down
Expand Up @@ -165,7 +165,7 @@ protected Integer mapUnknownJdbcTypeForColumn(Map<String, Object> values) {
if (isNotBlank(columnSize)) {
size = Integer.parseInt(columnSize);
}
if (typeName != null) {
if (typeName != null) {
if (typeName.toLowerCase().startsWith("text")) {
return Types.LONGVARCHAR;
} else if ( typeName.toLowerCase().startsWith("ntext")) {
Expand All @@ -182,9 +182,11 @@ protected Integer mapUnknownJdbcTypeForColumn(Map<String, Object> values) {
return Types.LONGNVARCHAR;
} else if ( typeName.toUpperCase().equals("SQL_VARIANT")) {
return Types.BINARY;
} else if (typeName != null && typeName.equalsIgnoreCase("DATETIMEOFFSET")) {
} else if (typeName.equalsIgnoreCase("DATETIMEOFFSET")) {
return MAPPED_TIMESTAMPTZ;
}
} else if (typeName.equalsIgnoreCase("datetime2")) {
return Types.TIMESTAMP;
}
}
return super.mapUnknownJdbcTypeForColumn(values);
}
Expand Down

0 comments on commit 8d0f682

Please sign in to comment.