Skip to content
Permalink
Browse files

SYMMETRICDS-528. Detect distribution keys and set them when reading t…

…he model.
  • Loading branch information...
gwilmer committed Oct 24, 2011
1 parent 0ff9d16 commit e5880f1035b059e7f59ead9232f20f65847d0e4f
@@ -68,7 +68,7 @@

private String jdbcTypeName;

private boolean distributedKey;
private boolean distributionKey;

/**
* Returns the name of the column.
@@ -627,12 +627,13 @@ public void setJdbcTypeName(String jdbcTypeName) {
public String getJdbcTypeName() {
return jdbcTypeName;
}
public boolean isDistributedKey() {
return distributedKey;

public boolean isDistributionKey() {
return distributionKey;
}
public void setDistributedKey(boolean distributedKey) {
this.distributedKey = distributedKey;

public void setDistributionKey(boolean distributionKey) {
this.distributionKey = distributionKey;
}

}
@@ -920,11 +920,11 @@ public boolean hasAutoIncrementColumn() {
return false;
}

public Column[] getDistributedKeyColumns() {
public Column[] getDistributionKeyColumns() {
@SuppressWarnings("unchecked")
Collection<Column> columns = CollectionUtils.select(_columns, new Predicate() {
public boolean evaluate(Object input) {
return ((Column)input).isDistributedKey();
return ((Column)input).isDistributionKey();
}
});

@@ -438,8 +438,8 @@ public Database getDatabase(Connection connection, String name) throws SQLExcept
* @param connection The connection
* @param name The name of the resulting database; <code>null</code> when the default name (the catalog)
* is desired which might be <code>null</code> itself though
* @param catalog The catalog to acess in the database; use <code>null</code> for the default value
* @param schema The schema to acess in the database; use <code>null</code> for the default value
* @param catalog The catalog to access in the database; use <code>null</code> for the default value
* @param schema The schema to access in the database; use <code>null</code> for the default value
* @param tableTypes The table types to process; use <code>null</code> or an empty list for the default ones
* @return The database model
*/
@@ -529,6 +529,7 @@ public int compare(Table obj1, Table obj2)
return collator.compare(obj1.getName().toUpperCase(), obj2.getName().toUpperCase());
}
});

return tables;
}
finally
@@ -566,11 +567,11 @@ protected Table readTable(DatabaseMetaDataWrapper metaData, Map<String,Object> v
table.addForeignKeys(readForeignKeys(metaData, tableName));
table.addIndices(readIndices(metaData, tableName));

Collection primaryKeys = readPrimaryKeyNames(metaData, tableName);
Collection<String> primaryKeys = readPrimaryKeyNames(metaData, tableName);

for (Iterator it = primaryKeys.iterator(); it.hasNext();)
for (Iterator<String> it = primaryKeys.iterator(); it.hasNext();)
{
table.findColumn((String)it.next(), true).setPrimaryKey(true);
table.findColumn(it.next(), true).setPrimaryKey(true);
}

if (getPlatformInfo().isSystemIndicesReturned())
@@ -608,7 +609,7 @@ protected void removeSystemIndices(DatabaseMetaDataWrapper metaData, Table table
protected void removeInternalPrimaryKeyIndex(DatabaseMetaDataWrapper metaData, Table table) throws SQLException
{
Column[] pks = table.getPrimaryKeyColumns();
List columnNames = new ArrayList();
List<String> columnNames = new ArrayList<String>();

for (int columnIdx = 0; columnIdx < pks.length; columnIdx++)
{
@@ -640,7 +641,7 @@ protected void removeInternalPrimaryKeyIndex(DatabaseMetaDataWrapper metaData, T
*/
protected void removeInternalForeignKeyIndex(DatabaseMetaDataWrapper metaData, Table table, ForeignKey fk) throws SQLException
{
List columnNames = new ArrayList();
List<String> columnNames = new ArrayList<String>();
boolean mustBeUnique = !getPlatformInfo().isSystemForeignKeyIndicesAlwaysNonUnique();

for (int columnIdx = 0; columnIdx < fk.getReferenceCount(); columnIdx++)
@@ -1,12 +1,111 @@
package org.jumpmind.symmetric.ddl.platform.greenplum;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.ddl.Platform;
import org.jumpmind.symmetric.ddl.model.Column;
import org.jumpmind.symmetric.ddl.model.Table;
import org.jumpmind.symmetric.ddl.platform.JdbcModelReader;
import org.jumpmind.symmetric.ddl.platform.MetaDataColumnDescriptor;
import org.jumpmind.symmetric.ddl.platform.postgresql.PostgreSqlModelReader;

public class GreenplumModelReader extends PostgreSqlModelReader {

public GreenplumModelReader(Platform platform) {
super(platform);
/** The Log to which logging calls will be made. */
private final Log _log = LogFactory.getLog(JdbcModelReader.class);

public GreenplumModelReader(Platform platform) {
super(platform);
}

protected Collection<Table> readTables(String catalog, String schemaPattern, String[] tableTypes) throws SQLException
{
Collection<Table> tables = super.readTables(catalog, schemaPattern, tableTypes);
setDistributionKeys(tables, schemaPattern);
return tables;
}

protected void setDistributionKeys(Collection<Table> tables, String schema) throws SQLException {

//get the distribution keys for segments
StringBuffer query = new StringBuffer();

query.append("select ");
query.append(" t.relname, ");
query.append(" a.attname ");
query.append("from ");
query.append(" pg_class t, ");
query.append(" pg_namespace n, ");
query.append(" pg_attribute a, ");
query.append(" gp_distribution_policy p ");
query.append("where ");
query.append(" n.oid = t.relnamespace and ");
query.append(" p.localoid = t.oid and ");
query.append(" a.attrelid = t.oid and ");
query.append(" a.attnum = any(p.attrnums) and ");
query.append(" n.nspname = ? ");

PreparedStatement prepStmt = getConnection().prepareStatement(query.toString());

try
{
//set the schema parm in the query
prepStmt.setString(1, schema);
ResultSet rs = prepStmt.executeQuery();

//for every row, find the table and set the distributionKey values
// for the corresponding columns
while (rs.next())
{
Table table = findTable(tables, rs.getString(1).trim(), getPlatform().isDelimitedIdentifierModeOn());
if (table != null) {
Column column = table.findColumn(rs.getString(2).trim(), getPlatform().isDelimitedIdentifierModeOn());
if (column != null) {
column.setDistributionKey(true);
}
}
}
rs.close();
}
finally
{
if (prepStmt != null) {
prepStmt.close();
}
}
}

//TODO: don't like this - talk to eric and chris for alternatives
private Table findTable(Collection<Table> tables, String tableName, boolean caseSensitive) {

for (Iterator<Table> iter = tables.iterator(); iter.hasNext();)
{
Table table = (Table) iter.next();

if (caseSensitive)
{
if (table.getName().equals(tableName))
{
return table;
}
}
else
{
if (table.getName().equalsIgnoreCase(tableName))
{
return table;
}
}
}
return null;
}
}

0 comments on commit e5880f1

Please sign in to comment.
You can’t perform that action at this time.