Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
0000803: Check that column names match column values in the abstract …
…router. If not log details.
  • Loading branch information
chenson42 committed Sep 6, 2012
1 parent 16c09cd commit cb47cec
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 65 deletions.
Expand Up @@ -16,7 +16,8 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License. */
* under the License.
*/

package org.jumpmind.symmetric.route;

Expand All @@ -26,6 +27,9 @@
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang.ArrayUtils;
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.model.DataMetaData;
Expand All @@ -46,66 +50,68 @@ public abstract class AbstractDataRouter implements IDataRouter {
public void contextCommitted(SimpleRouterContext context) {
}

protected Map<String, String> getDataMap(DataMetaData dataMetaData) {
protected Map<String, String> getDataMap(DataMetaData dataMetaData, ISymmetricDialect symmetricDialect) {
Map<String, String> data = null;
DataEventType dml = dataMetaData.getData().getDataEventType();
switch (dml) {
case UPDATE:
data = new HashMap<String, String>(dataMetaData.getTable().getColumnCount() * 2);
data.putAll(getNewDataAsString(null, dataMetaData));
data.putAll(getOldDataAsString(OLD_, dataMetaData));
break;
case INSERT:
data = new HashMap<String, String>(dataMetaData.getTable().getColumnCount() * 2);
data.putAll(getNewDataAsString(null, dataMetaData));
Map<String, String> map = getNullData(OLD_, dataMetaData);
data.putAll(map);
break;
case DELETE:
data = new HashMap<String, String>(dataMetaData.getTable().getColumnCount() * 2);
data.putAll(getOldDataAsString(null, dataMetaData));
data.putAll(getOldDataAsString(OLD_, dataMetaData));
break;
default:
data = new HashMap<String, String>(1);
break;
case UPDATE:
data = new HashMap<String, String>(dataMetaData.getTable().getColumnCount() * 2);
data.putAll(getNewDataAsString(null, dataMetaData, symmetricDialect));
data.putAll(getOldDataAsString(OLD_, dataMetaData, symmetricDialect));
break;
case INSERT:
data = new HashMap<String, String>(dataMetaData.getTable().getColumnCount() * 2);
data.putAll(getNewDataAsString(null, dataMetaData, symmetricDialect));
Map<String, String> map = getNullData(OLD_, dataMetaData);
data.putAll(map);
break;
case DELETE:
data = new HashMap<String, String>(dataMetaData.getTable().getColumnCount() * 2);
data.putAll(getOldDataAsString(null, dataMetaData, symmetricDialect));
data.putAll(getOldDataAsString(OLD_, dataMetaData, symmetricDialect));
break;
default:
data = new HashMap<String, String>(1);
break;
}

if (data.size() == 0) {
data.putAll(getPkDataAsString(dataMetaData));
data.putAll(getPkDataAsString(dataMetaData, symmetricDialect));
}
data.put("EXTERNAL_DATA", dataMetaData.getData().getExternalData());
return data;
}

protected Map<String, String> getNewDataAsString(String prefix, DataMetaData dataMetaData) {
protected Map<String, String> getNewDataAsString(String prefix, DataMetaData dataMetaData, ISymmetricDialect symmetricDialect) {
String[] rowData = dataMetaData.getData().toParsedRowData();
return getDataAsString(prefix, dataMetaData, rowData);
return getDataAsString(prefix, dataMetaData, symmetricDialect, rowData);
}

protected Map<String, String> getOldDataAsString(String prefix, DataMetaData dataMetaData) {
protected Map<String, String> getOldDataAsString(String prefix, DataMetaData dataMetaData, ISymmetricDialect symmetricDialect) {
String[] rowData = dataMetaData.getData().toParsedOldData();
return getDataAsString(prefix, dataMetaData, rowData);
return getDataAsString(prefix, dataMetaData, symmetricDialect, rowData);
}

protected Map<String, String> getDataAsString(String prefix, DataMetaData dataMetaData,
protected Map<String, String> getDataAsString(String prefix, DataMetaData dataMetaData, ISymmetricDialect symmetricDialect,
String[] rowData) {
String[] columns = dataMetaData.getTriggerHistory().getParsedColumnNames();
Map<String, String> map = new HashMap<String, String>(columns.length);
if (rowData != null) {
testSize(dataMetaData, symmetricDialect, columns, rowData);
for (int i = 0; i < columns.length; i++) {
String columnName = columns[i].toUpperCase();
map.put(prefix != null ? prefix + columnName : columnName, rowData[i]);
}
}
return map;
}
protected Map<String, String> getPkDataAsString(DataMetaData dataMetaData) {

protected Map<String, String> getPkDataAsString(DataMetaData dataMetaData, ISymmetricDialect symmetricDialect) {
String[] columns = dataMetaData.getTriggerHistory().getParsedPkColumnNames();
String[] rowData = dataMetaData.getData().toParsedPkData();
Map<String, String> map = new HashMap<String, String>(columns.length);
if (rowData != null) {
testSize(dataMetaData, symmetricDialect, columns, rowData);
for (int i = 0; i < columns.length; i++) {
String columnName = columns[i].toUpperCase();
map.put(columnName, rowData[i]);
Expand All @@ -114,31 +120,32 @@ protected Map<String, String> getPkDataAsString(DataMetaData dataMetaData) {
return map;
}

protected Map<String, Object> getDataObjectMap(DataMetaData dataMetaData, ISymmetricDialect symmetricDialect) {
protected Map<String, Object> getDataObjectMap(DataMetaData dataMetaData,
ISymmetricDialect symmetricDialect) {
Map<String, Object> data = null;
DataEventType dml = dataMetaData.getData().getDataEventType();
switch (dml) {
case UPDATE:
data = new HashMap<String, Object>(dataMetaData.getTable().getColumnCount() * 2);
data.putAll(getNewDataAsObject(null, dataMetaData, symmetricDialect));
data.putAll(getOldDataAsObject(OLD_, dataMetaData, symmetricDialect));
break;
case INSERT:
data = new HashMap<String, Object>(dataMetaData.getTable().getColumnCount() * 2);
data.putAll(getNewDataAsObject(null, dataMetaData, symmetricDialect));
Map<String, Object> map = getNullData(OLD_, dataMetaData);
data.putAll(map);
break;
case DELETE:
data = new HashMap<String, Object>(dataMetaData.getTable().getColumnCount() * 2);
data.putAll(getOldDataAsObject(null, dataMetaData, symmetricDialect));
data.putAll(getOldDataAsObject(OLD_, dataMetaData, symmetricDialect));
if (data.size() == 0) {
data.putAll(getPkDataAsObject(dataMetaData, symmetricDialect));
}
break;
default:
break;
case UPDATE:
data = new HashMap<String, Object>(dataMetaData.getTable().getColumnCount() * 2);
data.putAll(getNewDataAsObject(null, dataMetaData, symmetricDialect));
data.putAll(getOldDataAsObject(OLD_, dataMetaData, symmetricDialect));
break;
case INSERT:
data = new HashMap<String, Object>(dataMetaData.getTable().getColumnCount() * 2);
data.putAll(getNewDataAsObject(null, dataMetaData, symmetricDialect));
Map<String, Object> map = getNullData(OLD_, dataMetaData);
data.putAll(map);
break;
case DELETE:
data = new HashMap<String, Object>(dataMetaData.getTable().getColumnCount() * 2);
data.putAll(getOldDataAsObject(null, dataMetaData, symmetricDialect));
data.putAll(getOldDataAsObject(OLD_, dataMetaData, symmetricDialect));
if (data.size() == 0) {
data.putAll(getPkDataAsObject(dataMetaData, symmetricDialect));
}
break;
default:
break;
}
return data;
}
Expand Down Expand Up @@ -169,8 +176,10 @@ protected Map<String, Object> getDataAsObject(String prefix, DataMetaData dataMe
if (rowData != null) {
Map<String, Object> data = new HashMap<String, Object>(rowData.length);
String[] columnNames = dataMetaData.getTriggerHistory().getParsedColumnNames();
Object[] objects = symmetricDialect.getPlatform().getObjectValues(symmetricDialect.getBinaryEncoding(),
dataMetaData.getTable(), columnNames, rowData);
Object[] objects = symmetricDialect.getPlatform().getObjectValues(
symmetricDialect.getBinaryEncoding(), dataMetaData.getTable(), columnNames,
rowData);
testSize(dataMetaData, symmetricDialect, columnNames, objects);
for (int i = 0; i < columnNames.length; i++) {
String upperCase = columnNames[i].toUpperCase();
data.put(prefix != null ? (prefix + upperCase) : upperCase, objects[i]);
Expand All @@ -180,18 +189,37 @@ protected Map<String, Object> getDataAsObject(String prefix, DataMetaData dataMe
return Collections.emptyMap();
}
}


protected void testSize(DataMetaData dataMetaData, ISymmetricDialect symmetricDialect, String[] columnNames, Object[] values) {
if (columnNames.length != values.length) {
String possibleOracleErrorMessage = "";
if (symmetricDialect.getPlatform().getName().equals(DatabaseNamesConstants.ORACLE)) {
boolean isUseCaptureLobs = dataMetaData.getTriggerRouter().getTrigger().isUseCaptureLobs();
possibleOracleErrorMessage = String.format("One possible cause of this issue is when trigger.use_capture_lobs=0 and the captured row_data size exceeds 4k, captured data will be truncated at 4k. trigger.use_capture_lobs is currently set to %s.", isUseCaptureLobs ? "1" : "0");
}
String message = String.format(
"The number of recorded column names (%d) did not match the number of captured data values (%d). The failed data_id is %d and the event type is %s. %s\ncolumn_names:\n%s\nvalues:\n%s",
columnNames.length, values.length,
dataMetaData.getData().getDataId(),
dataMetaData.getData().getDataEventType().name(),
possibleOracleErrorMessage,
ArrayUtils.toString(columnNames), ArrayUtils.toString(values));
throw new SymmetricException(message);
}
}

protected Map<String, Object> getPkDataAsObject(DataMetaData dataMetaData,
ISymmetricDialect symmetricDialect) {
String[] rowData = dataMetaData.getData().toParsedPkData();
if (rowData != null) {
Map<String, Object> data = new HashMap<String, Object>(rowData.length);
String[] columnNames = dataMetaData.getTriggerHistory().getParsedColumnNames();
Object[] objects = symmetricDialect.getPlatform().getObjectValues(symmetricDialect.getBinaryEncoding(),
dataMetaData.getTable(), columnNames, rowData);
Object[] objects = symmetricDialect.getPlatform().getObjectValues(
symmetricDialect.getBinaryEncoding(), dataMetaData.getTable(), columnNames,
rowData);
testSize(dataMetaData, symmetricDialect, columnNames, objects);
for (int i = 0; i < columnNames.length; i++) {
data.put(columnNames[i]
.toUpperCase(), objects[i]);
data.put(columnNames[i].toUpperCase(), objects[i]);
}
return data;
} else {
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.commons.lang.StringUtils;
import org.jumpmind.symmetric.SyntaxParsingException;
import org.jumpmind.symmetric.common.TokenConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.Router;
Expand Down Expand Up @@ -75,23 +76,26 @@ public class ColumnMatchDataRouter extends AbstractDataRouter implements IDataRo

private static final String NULL_VALUE = "NULL";

private IConfigurationService configurationService;
private IConfigurationService configurationService;

private ISymmetricDialect symmetricDialect;

final static String EXPRESSION_KEY = String.format("%s.Expression.", ColumnMatchDataRouter.class
.getName());

public ColumnMatchDataRouter() {
}

public ColumnMatchDataRouter(IConfigurationService configurationService) {
public ColumnMatchDataRouter(IConfigurationService configurationService, ISymmetricDialect symmetricDialect) {
this.configurationService = configurationService;
this.symmetricDialect = symmetricDialect;
}

public Set<String> routeToNodes(SimpleRouterContext routingContext,
DataMetaData dataMetaData, Set<Node> nodes, boolean initialLoad) {
Set<String> nodeIds = null;
List<Expression> expressions = getExpressions(dataMetaData.getTriggerRouter().getRouter(), routingContext);
Map<String, String> columnValues = getDataMap(dataMetaData);
Map<String, String> columnValues = getDataMap(dataMetaData, symmetricDialect);

if (columnValues != null) {
for (Expression e : expressions) {
Expand Down
Expand Up @@ -77,7 +77,7 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
Set<String> nodeIds = null;

// the inbound data
Map<String, String> columnValues = getDataMap(dataMetaData);
Map<String, String> columnValues = getDataMap(dataMetaData, engine.getSymmetricDialect());

Node me = findIdentity();

Expand Down
Expand Up @@ -74,7 +74,7 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData

params = getParams(router, routingContext);

Map<String, String> dataMap = getDataMap(dataMetaData);
Map<String, String> dataMap = getDataMap(dataMetaData, symmetricDialect);
Map<String, Set<String>> lookupTable = getLookupTable(params, router, routingContext);
String column = params.get(PARAM_KEY_COLUMN);
String keyData = dataMap.get(column);
Expand Down
Expand Up @@ -100,7 +100,7 @@ public RouterService(ISymmetricEngine engine) {
this.routers.put("subselect", new SubSelectDataRouter(symmetricDialect));
this.routers.put("lookuptable", new LookupTableDataRouter(symmetricDialect));
this.routers.put("default", new DefaultDataRouter());
this.routers.put("column", new ColumnMatchDataRouter(engine.getConfigurationService()));
this.routers.put("column", new ColumnMatchDataRouter(engine.getConfigurationService(), engine.getSymmetricDialect()));

setSqlMap(new RouterServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));
Expand Down

0 comments on commit cb47cec

Please sign in to comment.