Skip to content

Commit

Permalink
Merge branch '3.9' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.9
  • Loading branch information
jumpmind-josh committed Oct 26, 2018
2 parents bf74454 + 1737913 commit 43bcd8a
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 28 deletions.
2 changes: 1 addition & 1 deletion symmetric-assemble/common.gradle
Expand Up @@ -186,7 +186,7 @@ subprojects { subproject ->
jdomVersion = '2.0.5'
junitVersion = '4.11'
log4jVersion = '1.2.17'
slf4jVersion = '1.7.21'
slf4jVersion = '1.7.6'
mockitoVersion = '1.9.5'
powerMockVersion = '1.5.3'
mysqlVersion = '5.1.45'
Expand Down
Expand Up @@ -191,20 +191,20 @@ public void createRequiredDatabaseObjects() {

String wkt2geom = this.parameterService.getTablePrefix() + "_" + "wkt2geom";
if (!installed(SQL_OBJECT_INSTALLED, wkt2geom)) {
String sql = " CREATE OR REPLACE "
+ " FUNCTION $(functionName)( "
+ " clob_in IN CLOB) "
+ " RETURN SDO_GEOMETRY "
+ " AS "
+ " v_out SDO_GEOMETRY := NULL; "
+ " BEGIN "
+ " IF clob_in IS NOT NULL THEN "
+ " IF DBMS_LOB.GETLENGTH(clob_in) > 0 THEN "
+ " v_out := SDO_GEOMETRY(clob_in); "
+ " END IF; "
+ " END IF; "
+ " RETURN v_out; "
+ " END $(functionName); ";
String sql = " CREATE OR REPLACE FUNCTION $(functionName) ( \r\n"
+ " clob_in IN CLOB, \r\n"
+ " srid_in IN INTEGER) \r\n"
+ " RETURN SDO_GEOMETRY \r\n"
+ " AS \r\n"
+ " v_out SDO_GEOMETRY := NULL; \r\n"
+ " BEGIN \r\n"
+ " IF clob_in IS NOT NULL THEN \r\n"
+ " IF DBMS_LOB.GETLENGTH(clob_in) > 0 THEN \r\n"
+ " v_out := SDO_GEOMETRY(clob_in, srid_in); \r\n"
+ " END IF; \r\n"
+ " END IF; \r\n"
+ " RETURN v_out; \r\n"
+ " END $(functionName); \r\n";
install(sql, wkt2geom);
}

Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
Expand Down Expand Up @@ -464,7 +465,7 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa
createBy, transactional, transaction);
}
}
Map<String, TableReloadRequest> mapReloadRequests = convertReloadListToMap(reloadRequests);
Map<String, TableReloadRequest> mapReloadRequests = convertReloadListToMap(reloadRequests, triggerRouters);

String symNodeSecurityReloadChannel = null;
int totalTableCount = 0;
Expand Down Expand Up @@ -585,17 +586,35 @@ private String findChannelFor(TriggerHistory history, List<TriggerRouter> trigge
}

@SuppressWarnings("unchecked")
protected Map<String, TableReloadRequest> convertReloadListToMap(List<TableReloadRequest> reloadRequests) {
protected Map<String, TableReloadRequest> convertReloadListToMap(List<TableReloadRequest> reloadRequests, List<TriggerRouter> triggerRouters) {
if (reloadRequests == null) {
return null;
}
Map<String, TableReloadRequest> reloadMap = new CaseInsensitiveMap();
for (TableReloadRequest item : reloadRequests) {
reloadMap.put(item.getIdentifier(), item);
for (TableReloadRequest reloadRequest : reloadRequests) {
validate(reloadRequest, triggerRouters);
reloadMap.put(reloadRequest.getIdentifier(), reloadRequest);
}
return reloadMap;
}

protected void validate(TableReloadRequest reloadRequest, List<TriggerRouter> triggerRouters) {
boolean validMatch = false;
for (TriggerRouter triggerRouter : triggerRouters) {
if (ObjectUtils.equals(triggerRouter.getTriggerId(), reloadRequest.getTriggerId())
&& ObjectUtils.equals(triggerRouter.getRouterId(), reloadRequest.getRouterId())) {
validMatch = true;
break;
}
}

if (!validMatch) {
throw new SymmetricException("Table reload request submitted which does not have a valid trigger/router "
+ "combination in sym_trigger_router. Request trigger id: '" + reloadRequest.getTriggerId() + "' router id: '"
+ reloadRequest.getRouterId() + "' create time: " + reloadRequest.getCreateTime());
}
}

private void callReloadListeners(boolean before, Node targetNode, boolean transactional,
ISqlTransaction transaction, long loadId) {
for (IReloadListener listener : extensionService.getExtensionPointList(IReloadListener.class)) {
Expand Down
Expand Up @@ -22,6 +22,7 @@

import java.sql.Types;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.TypeMap;
import org.jumpmind.db.platform.DatabaseInfo;
Expand All @@ -42,7 +43,7 @@ protected void appendColumnParameter(StringBuilder sql, Column column) {
sql.append("TO_TIMESTAMP_TZ(?, 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM')")
.append(",");
} else if (isGeometry(column)) {
sql.append("SYM_WKT2GEOM(?)").append(",");
sql.append("SYM_WKT2GEOM(?,").append(buildSRIDSelect(column)).append(")").append(",");
} else if (column.getJdbcTypeName().startsWith("XMLTYPE")) {
sql.append("XMLTYPE(?)").append(",");
} else {
Expand All @@ -57,7 +58,7 @@ protected void appendColumnEquals(StringBuilder sql, Column column) {
.append(" = TO_TIMESTAMP_TZ(?, 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM')");
} else if (isGeometry(column)) {
sql.append(quote).append(column.getName()).append(quote).append(" = ")
.append("SYM_WKT2GEOM(?)");
.append("SYM_WKT2GEOM(?,").append(buildSRIDSelect(column)).append(")");
} else if (column.getJdbcTypeName().startsWith("XMLTYPE")) {
sql.append(quote).append(column.getName()).append(quote).append(" = ")
.append("XMLTYPE(?)");
Expand Down Expand Up @@ -97,4 +98,14 @@ protected boolean isGeometry(Column column) {
return false;
}
}

protected String buildSRIDSelect(Column column) {
if (!StringUtils.isEmpty(schemaName)) {
return String.format("(select SRID from all_sdo_geom_metadata where owner = '%s' and table_name = '%s' and column_name = '%s')",
schemaName.toUpperCase(), tableName.toUpperCase(), column.getName().toUpperCase());
} else {
return String.format("(select SRID from user_sdo_geom_metadata where table_name = '%s' and column_name = '%s')",
tableName.toUpperCase(), column.getName().toUpperCase());
}
}
}
11 changes: 10 additions & 1 deletion symmetric-db/src/main/java/org/jumpmind/db/sql/DmlStatement.java
Expand Up @@ -53,6 +53,12 @@ public class DmlStatement {
public enum DmlType {
INSERT, UPDATE, DELETE, UPSERT, COUNT, FROM, WHERE, SELECT, SELECT_ALL, UNKNOWN
};

protected String catalogName;

protected String schemaName;

protected String tableName;

protected DmlType dmlType;

Expand Down Expand Up @@ -99,7 +105,10 @@ protected void init(DmlType type, String catalogName, String schemaName, String
Column[] keysColumns, Column[] columns, boolean[] nullKeyValues,
DatabaseInfo databaseInfo, boolean useQuotedIdentifiers, String textColumnExpression,
boolean namedParameters) {


this.catalogName = catalogName;
this.schemaName = schemaName;
this.tableName = tableName;
this.namedParameters = namedParameters;
this.databaseInfo = databaseInfo;
this.columns = columns;
Expand Down
Expand Up @@ -22,6 +22,7 @@

import java.sql.Timestamp;
import java.sql.Types;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.TimeZone;
Expand Down Expand Up @@ -118,8 +119,15 @@ protected boolean isVersionNewer(Conflict conflict, AbstractDatabaseWriter write
DmlStatement stmt = databaseWriter.getPlatform().createDmlStatement(DmlType.FROM, targetTable
, writer.getWriterSettings().getTextColumnExpression());
String sql = stmt.getColumnsSql(new Column[] { targetTable.getColumnWithName(columnName) });
Long existingVersion = databaseWriter.getTransaction()
.queryForObject(sql, Long.class, objectValues);
Long existingVersion = null;

try {
existingVersion = databaseWriter.getTransaction().queryForObject(sql, Long.class, objectValues);
} catch (Exception ex) {
throw new RuntimeException("Failed to execute conflict resolution SQL: \"" +
sql + "\" values: " + Arrays.toString(objectValues), ex);
}

if (existingVersion == null) {
return true;
} else {
Expand Down
Expand Up @@ -165,7 +165,7 @@ protected Integer mapUnknownJdbcTypeForColumn(Map<String, Object> values) {
if (isNotBlank(columnSize)) {
size = Integer.parseInt(columnSize);
}
if (typeName != null) {
if (typeName != null) {
if (typeName.toLowerCase().startsWith("text")) {
return Types.LONGVARCHAR;
} else if ( typeName.toLowerCase().startsWith("ntext")) {
Expand All @@ -182,9 +182,11 @@ protected Integer mapUnknownJdbcTypeForColumn(Map<String, Object> values) {
return Types.LONGNVARCHAR;
} else if ( typeName.toUpperCase().equals("SQL_VARIANT")) {
return Types.BINARY;
} else if (typeName != null && typeName.equalsIgnoreCase("DATETIMEOFFSET")) {
} else if (typeName.equalsIgnoreCase("DATETIMEOFFSET")) {
return MAPPED_TIMESTAMPTZ;
}
} else if (typeName.equalsIgnoreCase("datetime2")) {
return Types.TIMESTAMP;
}
}
return super.mapUnknownJdbcTypeForColumn(values);
}
Expand Down
Expand Up @@ -375,7 +375,7 @@ protected int readPidFromFile(String filename) {
pid = Integer.parseInt(reader.readLine());
reader.close();
} catch (FileNotFoundException e) {
} catch (IOException e) {
} catch (Exception e) {
e.printStackTrace();
}
return pid;
Expand Down

0 comments on commit 43bcd8a

Please sign in to comment.