Skip to content

Commit

Permalink
SYMMETRICDS-167: support oid blob data type on postgresql
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Jan 12, 2010
1 parent 799f69e commit 3f09980
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 8 deletions.
30 changes: 23 additions & 7 deletions symmetric/src/main/java/org/jumpmind/symmetric/db/SqlTemplate.java
Expand Up @@ -30,6 +30,7 @@
import org.apache.ddlutils.model.Table;
import org.jumpmind.symmetric.Version;
import org.jumpmind.symmetric.db.mssql.MsSqlDbDialect;
import org.jumpmind.symmetric.db.postgresql.PostgreSqlDbDialect;
import org.jumpmind.symmetric.model.DataEventType;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.Trigger;
Expand Down Expand Up @@ -87,6 +88,8 @@ public void setDateColumnTemplate(String dateColumnTemplate) {

private String blobColumnTemplate;

private String wrappedBlobColumnTemplate;

private String booleanColumnTemplate;

private String triggerConcatCharacter;
Expand All @@ -103,7 +106,7 @@ public String createInitalLoadSql(Node node, IDbDialect dialect, TriggerRouter t
String sql = sqlTemplates.get(INITIAL_LOAD_SQL_TEMPLATE);

Column[] columns = trig.orderColumnsForTable(metaData);
String columnsText = buildColumnString(dialect.getInitialLoadTableAlias(), dialect.getInitialLoadTableAlias(),
String columnsText = buildColumnString(dialect, dialect.getInitialLoadTableAlias(), dialect.getInitialLoadTableAlias(),
"", columns, dialect, DataEventType.INSERT).columnString;
sql = AppUtils.replace("columns", columnsText, sql);
sql = AppUtils.replace("whereClause", StringUtils.isBlank(trig.getInitialLoadSelect()) ? "1=1" : trig.getInitialLoadSelect(), sql);
Expand All @@ -129,7 +132,7 @@ public String createCsvDataSql(IDbDialect dialect, Trigger trig, Table metaData,
String sql = sqlTemplates.get(INITIAL_LOAD_SQL_TEMPLATE);

Column[] columns = trig.orderColumnsForTable(metaData);
String columnsText = buildColumnString(dialect.getInitialLoadTableAlias(), dialect.getInitialLoadTableAlias(),
String columnsText = buildColumnString(dialect, dialect.getInitialLoadTableAlias(), dialect.getInitialLoadTableAlias(),
"", columns, dialect, DataEventType.INSERT).columnString;
sql = AppUtils.replace("columns", columnsText, sql);

Expand All @@ -146,7 +149,7 @@ public String createCsvPrimaryKeySql(IDbDialect dialect, Trigger trig, Table met
String sql = sqlTemplates.get(INITIAL_LOAD_SQL_TEMPLATE);

Column[] columns = metaData.getPrimaryKeyColumns();
String columnsText = buildColumnString(dialect.getInitialLoadTableAlias(), dialect.getInitialLoadTableAlias(),
String columnsText = buildColumnString(dialect, dialect.getInitialLoadTableAlias(), dialect.getInitialLoadTableAlias(),
"", columns, dialect, DataEventType.INSERT).columnString;
sql = AppUtils.replace("columns", columnsText, sql);

Expand Down Expand Up @@ -239,11 +242,11 @@ public String replaceTemplateVariables(IDbDialect dialect, DataEventType dml, Tr
ddl = AppUtils.replace("origTableAlias", ORIG_TABLE_ALIAS, ddl);

Column[] columns = trigger.orderColumnsForTable(metaData);
ColumnString columnString = buildColumnString(ORIG_TABLE_ALIAS, newTriggerValue, newColumnPrefix, columns, dialect, dml);
ColumnString columnString = buildColumnString(dialect, ORIG_TABLE_ALIAS, newTriggerValue, newColumnPrefix, columns, dialect, dml);
ddl = AppUtils.replace("columns", columnString.columnString, ddl);
ddl = AppUtils.replace("virtualOldNewTable", buildVirtualTableSql(dialect, oldColumnPrefix, newColumnPrefix, metaData.getColumns()),
ddl);
ddl = AppUtils.replace("oldColumns", buildColumnString(ORIG_TABLE_ALIAS, oldTriggerValue, oldColumnPrefix, columns, dialect, dml).columnString, ddl);
ddl = AppUtils.replace("oldColumns", buildColumnString(dialect, ORIG_TABLE_ALIAS, oldTriggerValue, oldColumnPrefix, columns, dialect, dml).columnString, ddl);
ddl = eval(columnString.isBlobClob, "containsBlobClobColumns", ddl);

// some column templates need tableName and schemaName
Expand All @@ -254,7 +257,7 @@ public String replaceTemplateVariables(IDbDialect dialect, DataEventType dml, Tr
+ "." : "")), ddl);

columns = metaData.getPrimaryKeyColumns();
ddl = AppUtils.replace("oldKeys", buildColumnString(ORIG_TABLE_ALIAS, oldTriggerValue, oldColumnPrefix, columns, dialect, dml).columnString, ddl);
ddl = AppUtils.replace("oldKeys", buildColumnString(dialect, ORIG_TABLE_ALIAS, oldTriggerValue, oldColumnPrefix, columns, dialect, dml).columnString, ddl);
ddl = AppUtils.replace("oldNewPrimaryKeyJoin", aliasedPrimaryKeyJoin(oldTriggerValue, newTriggerValue, columns), ddl);
ddl = AppUtils.replace("tableNewPrimaryKeyJoin", aliasedPrimaryKeyJoin(ORIG_TABLE_ALIAS, newTriggerValue, columns), ddl);
ddl = AppUtils.replace("primaryKeyWhereString", getPrimaryKeyWhereString(dml == DataEventType.DELETE ? oldTriggerValue : newTriggerValue, columns), ddl);
Expand Down Expand Up @@ -414,7 +417,7 @@ private String getPrimaryKeyWhereString(String alias, Column[] columns) {
return b.toString();
}

private ColumnString buildColumnString(String origTableAlias, String tableAlias, String columnPrefix, Column[] columns, IDbDialect dbDialect, DataEventType dml) {
private ColumnString buildColumnString(IDbDialect dialect, String origTableAlias, String tableAlias, String columnPrefix, Column[] columns, IDbDialect dbDialect, DataEventType dml) {
String columnsText = "";
boolean isBlobClob = false;
for (Column column : columns) {
Expand All @@ -441,6 +444,11 @@ private ColumnString buildColumnString(String origTableAlias, String tableAlias,
isBlobClob = true;
break;
case Types.BLOB:
if (dialect instanceof PostgreSqlDbDialect) {
templateToUse = wrappedBlobColumnTemplate;
isBlobClob = true;
break;
}
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
Expand Down Expand Up @@ -656,6 +664,14 @@ public String getBlobColumnTemplate() {
public void setBlobColumnTemplate(String blobColumnTemplate) {
this.blobColumnTemplate = blobColumnTemplate;
}

public String getWrappedBlobColumnTemplate() {
return wrappedBlobColumnTemplate;
}

public void setWrappedBlobColumnTemplate(String wrappedBlobColumnTemplate) {
this.wrappedBlobColumnTemplate = wrappedBlobColumnTemplate;
}

public void setFunctionInstalledSql(String functionInstalledSql) {
this.functionInstalledSql = functionInstalledSql;
Expand Down
Expand Up @@ -23,7 +23,10 @@
import java.sql.Types;
import java.util.Map;

import javax.sql.rowset.serial.SerialBlob;

import org.apache.commons.lang.StringUtils;
import org.apache.ddlutils.model.Column;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.AbstractDbDialect;
Expand Down Expand Up @@ -68,10 +71,29 @@ protected Integer overrideJdbcTypeForColumn(Map values) {
String typeName = (String) values.get("TYPE_NAME");
if (typeName != null && typeName.equalsIgnoreCase("ABSTIME")) {
return Types.TIMESTAMP;
} else if (typeName != null && typeName.equalsIgnoreCase("OID")) {
return Types.BLOB;
} else {
return super.overrideJdbcTypeForColumn(values);
}
}

@Override
public Object[] getObjectValues(BinaryEncoding encoding, String[] values,
Column[] orderedMetaData) {

Object[] objectValues = super.getObjectValues(encoding, values, orderedMetaData);
for (int i = 0; i < orderedMetaData.length; i++) {
if (orderedMetaData[i] != null && orderedMetaData[i].getTypeCode() == Types.BLOB) {
try {
objectValues[i] = new SerialBlob((byte[]) objectValues[i]);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
return objectValues;
}

@Override
protected boolean doesTriggerExistOnPlatform(String catalogName, String schema, String tableName, String triggerName) {
Expand Down
Expand Up @@ -55,6 +55,23 @@
]]>
</value>
</entry>
<entry key="fn_sym_largeobject">
<value>
<![CDATA[
CREATE OR REPLACE FUNCTION $(defaultSchema)fn_sym_largeobject(objectId oid) RETURNS text AS $$
DECLARE
encodedBlob text;
BEGIN
select encode(data, 'base64') into encodedBlob
from pg_largeobject where loid = objectId;
RETURN encodedBlob;
EXCEPTION WHEN OTHERS THEN
RETURN '';
END
$$ LANGUAGE plpgsql;
]]>
</value>
</entry>
</map>
</property>
<property name="stringColumnTemplate" >
Expand All @@ -69,7 +86,12 @@
</property>
<property name="blobColumnTemplate">
<value>
<![CDATA[ case when $(tableAlias)."$(columnName)" is null then '' else '"' || replace(replace(encode($(tableAlias)."$(columnName)", 'base64'),E'\\',E'\\\\'),'"',E'\\"') || '"' end ||','||]]>
<![CDATA[ case when $(tableAlias)."$(columnName)" is null then '' else '"' || encode($(tableAlias)."$(columnName)", 'base64') || '"' end ||','||]]>
</value>
</property>
<property name="wrappedBlobColumnTemplate">
<value>
<![CDATA[ case when $(tableAlias)."$(columnName)" is null then '' else '"' || fn_sym_largeobject($(tableAlias)."$(columnName)") || '"' end ||','||]]>
</value>
</property>
<property name="numberColumnTemplate">
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.apache.ddlutils.model.Table;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.csv.CsvConstants;
import org.jumpmind.symmetric.db.BinaryEncoding;
import org.jumpmind.symmetric.db.mssql.MsSqlDbDialect;
import org.jumpmind.symmetric.db.mysql.MySqlDbDialect;
import org.jumpmind.symmetric.db.oracle.OracleDbDialect;
Expand Down Expand Up @@ -330,6 +331,39 @@ public boolean isAutoRegister() {
Assert.assertEquals(1, getJdbcTemplate().queryForInt("select count(*) from TEST_CHANGING_COLUMN_NAME"));
}

@Test
public void testBinaryColumnTypesForPostgres() throws Exception {
if (getDbDialect() instanceof PostgreSqlDbDialect) {
getJdbcTemplate().update("drop table if exists test_postgres_binary_types");
getJdbcTemplate().update("create table test_postgres_binary_types (binary_data oid)");

String tableName = "test_postgres_binary_types";
String[] keys = {"binary_data"};
String[] columns = {"binary_data"};
String[] values = {"dGVzdCAxIDIgMw=="};

ByteArrayOutputStream out = new ByteArrayOutputStream();
CsvWriter writer = getWriter(out);
writer.writeRecord(new String[] {CsvConstants.NODEID, TestConstants.TEST_CLIENT_EXTERNAL_ID});
writeTable(writer, tableName, keys, columns);
String nextBatchId = getNextBatchId();
writer.writeRecord(new String[] {CsvConstants.BATCH, nextBatchId});
writer.writeRecord(new String[] {CsvConstants.BINARY, BinaryEncoding.BASE64.name()});
writer.write(CsvConstants.INSERT);
writer.writeRecord(values, true);
writer.writeRecord(new String[] {CsvConstants.COMMIT, nextBatchId});
writer.close();
load(out);

String result = (String) getJdbcTemplate().queryForObject(
"select data from pg_largeobject where loid in (select binary_data from test_postgres_binary_types)",
String.class);
// clean up the object from pg_largeobject, otherwise it becomes abandoned on subsequent runs
getJdbcTemplate().queryForList("select lo_unlink(binary_data) from test_postgres_binary_types");
assertEquals(result, "test 1 2 3");
}
}

@Test
public void testBenchmark() throws Exception {
ZipInputStream in = new ZipInputStream(getClass().getResourceAsStream("/test-data-loader-benchmark.zip"));
Expand Down
Expand Up @@ -20,13 +20,19 @@

package org.jumpmind.symmetric.service.impl;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Calendar;
import java.util.List;

import javax.sql.rowset.serial.SerialBlob;

import org.jumpmind.symmetric.db.IDbDialect;
import org.jumpmind.symmetric.db.db2.Db2DbDialect;
import org.jumpmind.symmetric.db.oracle.OracleDbDialect;
import org.jumpmind.symmetric.db.postgresql.PostgreSqlDbDialect;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.Router;
import org.jumpmind.symmetric.model.Trigger;
Expand All @@ -36,6 +42,8 @@
import org.jumpmind.symmetric.test.TestConstants;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;

public class TriggerRouterServiceTest extends AbstractDatabaseTest {
Expand All @@ -46,6 +54,11 @@ public class TriggerRouterServiceTest extends AbstractDatabaseTest {
public final static String INSERT_ORACLE_BINARY_TYPE_1 = "insert into test_oracle_binary_types values('1', 2.04299998, 5.2212)";
public final static String EXPECTED_INSERT_ORALCE_BINARY_TYPE_1 = "\"1\",\"2.04299998\",\"5.2212\"";

public final static String CREATE_POSTGRES_BINARY_TYPE = "create table test_postgres_binary_types (id integer, binary_data oid, primary key(id))";
public final static String INSERT_POSTGRES_BINARY_TYPE_1 = "insert into test_postgres_binary_types values(47, ?)";
public final static String EXPECTED_INSERT_POSTGRES_BINARY_TYPE_1 = "\"47\",\"dGVzdCAxIDIgMw==\"";
public final static String DROP_POSTGRES_BINARY_TYPE = "drop table if exists test_postgres_binary_types";

public final static String INSERT = "insert into "
+ TEST_TRIGGERS_TABLE
+ " (string_One_Value,string_Two_Value,long_String_Value,time_Value,date_Value,boolean_Value,bigInt_Value,decimal_Value) "
Expand Down Expand Up @@ -228,6 +241,42 @@ public void testBinaryColumnTypesForOracle() {
}
}

@SuppressWarnings("unchecked")
@Test
public void testBinaryColumnTypesForPostgres() {
IDbDialect dialect = getDbDialect();
if (dialect instanceof PostgreSqlDbDialect) {
getJdbcTemplate().update(DROP_POSTGRES_BINARY_TYPE);
getJdbcTemplate().update(CREATE_POSTGRES_BINARY_TYPE);
TriggerRouter trouter = new TriggerRouter();
Trigger trigger = trouter.getTrigger();
trigger.setSourceTableName("test_postgres_binary_types");
trigger.setChannelId(TestConstants.TEST_CHANNEL_ID);
Router router = trouter.getRouter();
router.setSourceNodeGroupId(TestConstants.TEST_ROOT_NODE_GROUP);
router.setTargetNodeGroupId(TestConstants.TEST_ROOT_NODE_GROUP);
getTriggerRouterService().saveTriggerRouter(trouter);

ITriggerRouterService triggerService = getTriggerRouterService();
triggerService.syncTriggers();
Assert.assertEquals("Some triggers must have failed to build.", 0, triggerService.getFailedTriggers()
.size());

getJdbcTemplate().execute(new ConnectionCallback() {
public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
conn.setAutoCommit(false);
PreparedStatement ps = conn.prepareStatement(INSERT_POSTGRES_BINARY_TYPE_1);
ps.setBlob(1, new SerialBlob("test 1 2 3".getBytes()));
ps.executeUpdate();
conn.commit();
return null;
}
});
String csvString = getNextDataRow();
Assert.assertEquals(EXPECTED_INSERT_POSTGRES_BINARY_TYPE_1, csvString);
}
}

protected static int[] filterTypes(int[] types, IDbDialect dbDialect) {
boolean isBooleanSupported = !((dbDialect instanceof OracleDbDialect) || (dbDialect instanceof Db2DbDialect));
int[] filteredTypes = new int[types.length];
Expand Down

0 comments on commit 3f09980

Please sign in to comment.