Skip to content

Commit

Permalink
GEODE-4894: Changes are made to support case sensitivity between regi…
Browse files Browse the repository at this point in the history
…on, table name and pdx field, column name (#1663)

* meta-data is now used to compute the column name
* reads will now consult the pdx registry for an existing pdx type to use its field name.
* Convert field name to lowercase only when column name is uppercase.
* Table name lookup is changes to look for exact match first, than case insensitive match.
* Field to column name lookup is changed to find exact column name or case insensitive match in the metadata.
  • Loading branch information
agingade committed Mar 23, 2018
1 parent 6367925 commit da51fce
Show file tree
Hide file tree
Showing 28 changed files with 725 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.geode.annotations.Experimental;
import org.apache.geode.connectors.jdbc.JdbcConnectorException;
import org.apache.geode.pdx.internal.PdxType;
import org.apache.geode.pdx.internal.TypeRegistry;

@Experimental
public class RegionMapping implements Serializable {
Expand All @@ -28,37 +35,45 @@ public class RegionMapping implements Serializable {
private final String tableName;
private final String connectionConfigName;
private final Boolean primaryKeyInValue;
private final Map<String, String> fieldToColumnMap;
private final Map<String, String> columnToFieldMap;
private final ConcurrentMap<String, String> fieldToColumnMap;
private final ConcurrentMap<String, String> columnToFieldMap;

private final Map<String, String> configuredFieldToColumnMap;
private final Map<String, String> configuredColumnToFieldMap;

public RegionMapping(String regionName, String pdxClassName, String tableName,
String connectionConfigName, Boolean primaryKeyInValue,
Map<String, String> fieldToColumnMap) {
Map<String, String> configuredFieldToColumnMap) {
this.regionName = regionName;
this.pdxClassName = pdxClassName;
this.tableName = tableName;
this.connectionConfigName = connectionConfigName;
this.primaryKeyInValue = primaryKeyInValue;
this.fieldToColumnMap =
fieldToColumnMap == null ? null : Collections.unmodifiableMap(fieldToColumnMap);
this.columnToFieldMap = createReverseMap(fieldToColumnMap);
this.fieldToColumnMap = new ConcurrentHashMap<>();
this.columnToFieldMap = new ConcurrentHashMap<>();
if (configuredFieldToColumnMap != null) {
this.configuredFieldToColumnMap =
Collections.unmodifiableMap(new HashMap<>(configuredFieldToColumnMap));
this.configuredColumnToFieldMap =
Collections.unmodifiableMap(createReverseMap(configuredFieldToColumnMap));
} else {
this.configuredFieldToColumnMap = null;
this.configuredColumnToFieldMap = null;
}
}

private static Map<String, String> createReverseMap(Map<String, String> fieldToColumnMap) {
if (fieldToColumnMap == null) {
return null;
}
Map<String, String> reverseMap = new HashMap<>();
for (Map.Entry<String, String> entry : fieldToColumnMap.entrySet()) {
String reverseMapKey = entry.getValue().toLowerCase();
private static Map<String, String> createReverseMap(Map<String, String> input) {
Map<String, String> output = new HashMap<>();
for (Map.Entry<String, String> entry : input.entrySet()) {
String reverseMapKey = entry.getValue();
String reverseMapValue = entry.getKey();
if (reverseMap.containsKey(reverseMapKey)) {
if (output.containsKey(reverseMapKey)) {
throw new IllegalArgumentException(
"The field " + reverseMapValue + " can not be mapped to more than one column.");
}
reverseMap.put(reverseMapKey, reverseMapValue);
output.put(reverseMapKey, reverseMapValue);
}
return Collections.unmodifiableMap(reverseMap);
return output;
}

public String getConnectionConfigName() {
Expand Down Expand Up @@ -88,32 +103,136 @@ public String getRegionToTableName() {
return tableName;
}

public String getColumnNameForField(String fieldName) {
String columnName = null;
if (fieldToColumnMap != null) {
columnName = fieldToColumnMap.get(fieldName);
private String getConfiguredColumnNameForField(String fieldName) {
String result = fieldName;
if (configuredFieldToColumnMap != null) {
String mapResult = configuredFieldToColumnMap.get(fieldName);
if (mapResult != null) {
result = mapResult;
}
}
return result;
}

public String getColumnNameForField(String fieldName, TableMetaDataView tableMetaDataView) {
String columnName = fieldToColumnMap.get(fieldName);
if (columnName == null) {
String configuredColumnName = getConfiguredColumnNameForField(fieldName);
Set<String> columnNames = tableMetaDataView.getColumnNames();
if (columnNames.contains(configuredColumnName)) {
// exact match
columnName = configuredColumnName;
} else {
for (String candidate : columnNames) {
if (candidate.equalsIgnoreCase(configuredColumnName)) {
if (columnName != null) {
throw new JdbcConnectorException(
"The SQL table has at least two columns that match the PDX field: " + fieldName);
}
columnName = candidate;
}
}
}

if (columnName == null) {
columnName = configuredColumnName;
}
fieldToColumnMap.put(fieldName, columnName);
columnToFieldMap.put(columnName, fieldName);
}
return columnName != null ? columnName : fieldName;
return columnName;
}

public String getFieldNameForColumn(String columnName) {
String canonicalColumnName = columnName.toLowerCase();
String fieldName = null;
if (this.columnToFieldMap != null) {
fieldName = columnToFieldMap.get(canonicalColumnName);
private String getConfiguredFieldNameForColumn(String columnName) {
String result = columnName;
if (configuredColumnToFieldMap != null) {
String mapResult = configuredColumnToFieldMap.get(columnName);
if (mapResult != null) {
result = mapResult;
}
}
return fieldName != null ? fieldName : canonicalColumnName;
return result;
}

public Map<String, String> getFieldToColumnMap() {
return fieldToColumnMap;
public String getFieldNameForColumn(String columnName, TypeRegistry typeRegistry) {
String fieldName = columnToFieldMap.get(columnName);
if (fieldName == null) {
String configuredFieldName = getConfiguredFieldNameForColumn(columnName);
if (getPdxClassName() == null) {
if (configuredFieldName.equals(configuredFieldName.toUpperCase())) {
fieldName = configuredFieldName.toLowerCase();
} else {
fieldName = configuredFieldName;
}
} else {
Set<PdxType> pdxTypes = getPdxTypesForClassName(typeRegistry);
fieldName = findExactMatch(configuredFieldName, pdxTypes);
if (fieldName == null) {
fieldName = findCaseInsensitiveMatch(columnName, configuredFieldName, pdxTypes);
}
}
assert fieldName != null;
fieldToColumnMap.put(fieldName, columnName);
columnToFieldMap.put(columnName, fieldName);
}
return fieldName;
}

private Set<PdxType> getPdxTypesForClassName(TypeRegistry typeRegistry) {
Set<PdxType> pdxTypes = typeRegistry.getPdxTypesForClassName(getPdxClassName());
if (pdxTypes.isEmpty()) {
throw new JdbcConnectorException(
"The class " + getPdxClassName() + " has not been pdx serialized.");
}
return pdxTypes;
}

/**
* Given a column name and a set of pdx types, find the field name in those types that match,
* ignoring case, the column name.
*
* @return the matching field name or null if no match
* @throws JdbcConnectorException if no fields match
* @throws JdbcConnectorException if more than one field matches
*/
private String findCaseInsensitiveMatch(String columnName, String configuredFieldName,
Set<PdxType> pdxTypes) {
HashSet<String> matchingFieldNames = new HashSet<>();
for (PdxType pdxType : pdxTypes) {
for (String existingFieldName : pdxType.getFieldNames()) {
if (existingFieldName.equalsIgnoreCase(configuredFieldName)) {
matchingFieldNames.add(existingFieldName);
}
}
}
if (matchingFieldNames.isEmpty()) {
throw new JdbcConnectorException("The class " + getPdxClassName()
+ " does not have a field that matches the column " + columnName);
} else if (matchingFieldNames.size() > 1) {
throw new JdbcConnectorException(
"Could not determine what pdx field to use for the column name " + columnName
+ " because the pdx fields " + matchingFieldNames + " all match it.");
}
return matchingFieldNames.iterator().next();
}

/**
* For unit tests
* Given a column name, search the given pdxTypes for a field whose name exactly matches the
* column name.
*
* @return the matching field name or null if no match
*/
Map<String, String> getColumnToFieldMap() {
return this.columnToFieldMap;
private String findExactMatch(String columnName, Set<PdxType> pdxTypes) {
for (PdxType pdxType : pdxTypes) {
if (pdxType.getPdxField(columnName) != null) {
return columnName;
}
}
return null;
}

public Map<String, String> getFieldToColumnMap() {
return configuredFieldToColumnMap;
}

@Override
Expand Down Expand Up @@ -144,8 +263,10 @@ public boolean equals(Object o) {
: that.connectionConfigName != null) {
return false;
}
return fieldToColumnMap != null ? fieldToColumnMap.equals(that.fieldToColumnMap)
: that.fieldToColumnMap == null;

return (configuredFieldToColumnMap != null
? configuredFieldToColumnMap.equals(that.configuredFieldToColumnMap)
: that.configuredFieldToColumnMap == null);
}

@Override
Expand All @@ -155,7 +276,8 @@ public int hashCode() {
result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
result = 31 * result + (connectionConfigName != null ? connectionConfigName.hashCode() : 0);
result = 31 * result + (primaryKeyInValue ? 1 : 0);
result = 31 * result + (fieldToColumnMap != null ? fieldToColumnMap.hashCode() : 0);
result = 31 * result
+ (configuredFieldToColumnMap != null ? configuredFieldToColumnMap.hashCode() : 0);
return result;
}

Expand All @@ -164,6 +286,6 @@ public String toString() {
return "RegionMapping{" + "regionName='" + regionName + '\'' + ", pdxClassName='" + pdxClassName
+ '\'' + ", tableName='" + tableName + '\'' + ", connectionConfigName='"
+ connectionConfigName + '\'' + ", primaryKeyInValue=" + primaryKeyInValue
+ ", fieldToColumnMap=" + fieldToColumnMap + '}';
+ ", fieldToColumnMap=" + configuredFieldToColumnMap + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,17 @@ public <K, V> PdxInstance read(Region<K, V> region, K key) throws SQLException {
RegionMapping regionMapping = getMappingForRegion(region.getName());
ConnectionConfiguration connectionConfig =
getConnectionConfig(regionMapping.getConnectionConfigName());
String tableName = regionMapping.getRegionToTableName();
PdxInstance result;
try (Connection connection = getConnection(connectionConfig)) {
TableMetaDataView tableMetaData = this.tableMetaDataManager.getTableMetaDataView(connection,
regionMapping.getRegionToTableName());
String realTableName = tableMetaData.getTableName();
List<ColumnValue> columnList =
getColumnToValueList(connection, regionMapping, key, null, Operation.GET);
getColumnToValueList(tableMetaData, regionMapping, key, null, Operation.GET);
try (PreparedStatement statement =
getPreparedStatement(connection, columnList, tableName, Operation.GET)) {
getPreparedStatement(connection, columnList, realTableName, Operation.GET)) {
try (ResultSet resultSet = executeReadQuery(statement, columnList)) {
String keyColumnName = getKeyColumnName(connection, tableName);
String keyColumnName = tableMetaData.getKeyColumnName();
InternalCache cache = (InternalCache) region.getRegionService();
SqlToPdxInstanceCreator sqlToPdxInstanceCreator =
new SqlToPdxInstanceCreator(cache, regionMapping, resultSet, keyColumnName);
Expand All @@ -84,7 +86,6 @@ private ResultSet executeReadQuery(PreparedStatement statement, List<ColumnValue
return statement.executeQuery();
}


private RegionMapping getMappingForRegion(String regionName) {
RegionMapping regionMapping = this.configService.getMappingForRegion(regionName);
if (regionMapping == null) {
Expand All @@ -104,10 +105,6 @@ private ConnectionConfiguration getConnectionConfig(String connectionConfigName)
return connectionConfig;
}

private String getKeyColumnName(Connection connection, String tableName) {
return this.tableMetaDataManager.getTableMetaDataView(connection, tableName).getKeyColumnName();
}

private void setValuesInStatement(PreparedStatement statement, List<ColumnValue> columnList)
throws SQLException {
int index = 0;
Expand All @@ -134,14 +131,15 @@ public <K, V> void write(Region<K, V> region, Operation operation, K key, PdxIns
ConnectionConfiguration connectionConfig =
getConnectionConfig(regionMapping.getConnectionConfigName());

String tableName = regionMapping.getRegionToTableName();

try (Connection connection = getConnection(connectionConfig)) {
TableMetaDataView tableMetaData = this.tableMetaDataManager.getTableMetaDataView(connection,
regionMapping.getRegionToTableName());
String realTableName = tableMetaData.getTableName();
List<ColumnValue> columnList =
getColumnToValueList(connection, regionMapping, key, value, operation);
getColumnToValueList(tableMetaData, regionMapping, key, value, operation);
int updateCount = 0;
try (PreparedStatement statement =
getPreparedStatement(connection, columnList, tableName, operation)) {
getPreparedStatement(connection, columnList, realTableName, operation)) {
updateCount = executeWriteStatement(statement, columnList);
} catch (SQLException e) {
if (operation.isDestroy()) {
Expand All @@ -157,7 +155,7 @@ public <K, V> void write(Region<K, V> region, Operation operation, K key, PdxIns
if (updateCount <= 0) {
Operation upsertOp = getOppositeOperation(operation);
try (PreparedStatement upsertStatement =
getPreparedStatement(connection, columnList, tableName, upsertOp)) {
getPreparedStatement(connection, columnList, realTableName, upsertOp)) {
updateCount = executeWriteStatement(upsertStatement, columnList);
}
}
Expand Down Expand Up @@ -197,11 +195,8 @@ private String getSqlString(String tableName, List<ColumnValue> columnList, Oper
}
}

<K> List<ColumnValue> getColumnToValueList(Connection connection, RegionMapping regionMapping,
K key, PdxInstance value, Operation operation) {
String tableName = regionMapping.getRegionToTableName();
TableMetaDataView tableMetaData =
this.tableMetaDataManager.getTableMetaDataView(connection, tableName);
<K> List<ColumnValue> getColumnToValueList(TableMetaDataView tableMetaData,
RegionMapping regionMapping, K key, PdxInstance value, Operation operation) {
String keyColumnName = tableMetaData.getKeyColumnName();
ColumnValue keyColumnValue =
new ColumnValue(true, keyColumnName, key, tableMetaData.getColumnDataType(keyColumnName));
Expand All @@ -210,17 +205,17 @@ <K> List<ColumnValue> getColumnToValueList(Connection connection, RegionMapping
return Collections.singletonList(keyColumnValue);
}

List<ColumnValue> result =
createColumnValueList(tableMetaData, regionMapping, value, keyColumnName);
List<ColumnValue> result = createColumnValueList(tableMetaData, regionMapping, value);
result.add(keyColumnValue);
return result;
}

private List<ColumnValue> createColumnValueList(TableMetaDataView tableMetaData,
RegionMapping regionMapping, PdxInstance value, String keyColumnName) {
RegionMapping regionMapping, PdxInstance value) {
final String keyColumnName = tableMetaData.getKeyColumnName();
List<ColumnValue> result = new ArrayList<>();
for (String fieldName : value.getFieldNames()) {
String columnName = regionMapping.getColumnNameForField(fieldName);
String columnName = regionMapping.getColumnNameForField(fieldName, tableMetaData);
if (columnName.equalsIgnoreCase(keyColumnName)) {
continue;
}
Expand Down
Loading

0 comments on commit da51fce

Please sign in to comment.