Navigation Menu

Skip to content

Commit

Permalink
greenplum doesn't support currval.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Oct 31, 2011
1 parent b23c8c1 commit 39d8109
Show file tree
Hide file tree
Showing 5 changed files with 314 additions and 5 deletions.
Expand Up @@ -99,7 +99,9 @@ public IDbDialect getObject() throws Exception {
} else if (pf instanceof Oracle10Platform) {
dialect = (AbstractDbDialect) beanFactory.getBean("oracleDialect");
} else if (pf instanceof MSSqlPlatform) {
dialect = (AbstractDbDialect) beanFactory.getBean("msSqlDialect");
dialect = (AbstractDbDialect) beanFactory.getBean("msSqlDialect");
} else if (pf instanceof GreenplumPlatform) {
dialect = (AbstractDbDialect) beanFactory.getBean("greenplumDialect");
} else if (pf instanceof PostgreSqlPlatform) {
dialect = (AbstractDbDialect) beanFactory.getBean("postgresqlDialect");
} else if (pf instanceof DerbyPlatform) {
Expand All @@ -113,9 +115,7 @@ public IDbDialect getObject() throws Exception {
} else if (pf instanceof HsqlDb2Platform) {
dialect = (AbstractDbDialect) beanFactory.getBean("hsqldb2Dialect");
} else if (pf instanceof InformixPlatform) {
dialect = (AbstractDbDialect) beanFactory.getBean("informixDialect");
} else if (pf instanceof GreenplumPlatform) {
dialect = (AbstractDbDialect) beanFactory.getBean("greenplumDialect");
dialect = (AbstractDbDialect) beanFactory.getBean("informixDialect");
} else if (pf instanceof Db2Platform) {
String currentDbProductVersion = PlatformUtils.getDatabaseProductVersion(jdbcTemplate
.getDataSource());
Expand Down
@@ -0,0 +1,60 @@
package org.jumpmind.symmetric.db.postgresql;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import org.jumpmind.symmetric.db.SequenceIdentifier;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCallback;
import org.springframework.jdbc.support.JdbcUtils;

public class GreenplumDbDialect extends PostgreSqlDbDialect {

@Override
public boolean supportsGetGeneratedKeys() {
return false;
}

@Override
public long insertWithGeneratedKey(final JdbcTemplate jdbcTemplate, final String sql,
final SequenceIdentifier sequenceId, final PreparedStatementCallback<Object> callback) {
return jdbcTemplate.execute(new ConnectionCallback<Long>() {
public Long doInConnection(Connection conn) throws SQLException, DataAccessException {
long key = 0;
PreparedStatement ps = null;
try {
Statement st = null;
ResultSet rs = null;
try {
st = conn.createStatement();
rs = st.executeQuery("select nextval('" + getSequenceName(sequenceId)
+ "_seq')");
if (rs.next()) {
key = rs.getLong(1);
}
} finally {
JdbcUtils.closeResultSet(rs);
JdbcUtils.closeStatement(st);
}

String replaceSql = sql.replaceFirst("\\(null,", "(" + key + ",");
ps = conn.prepareStatement(replaceSql);
ps.setQueryTimeout(jdbcTemplate.getQueryTimeout());
if (callback != null) {
callback.doInPreparedStatement(ps);
}
ps.executeUpdate();
} finally {
JdbcUtils.closeStatement(ps);
}
return key;
}
});
}

}
@@ -0,0 +1,248 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd" default-lazy-init="true">

<bean id="greenplumDialect" class="org.jumpmind.symmetric.db.postgresql.GreenplumDbDialect"
scope="prototype">
<property name="primaryKeyViolationSqlStates" value="23505"/>
<property name="tablePrefix" value="$[sym.sync.table.prefix]" />
<property name="parameterService" ref="parameterService" />
<property name="defaultSchema" value="$[sym.db.default.schema]" />
<property name="streamingResultsFetchSize" value="$[sym.db.jdbc.streaming.results.fetch.size]" />
<property name="sqlTemplate">
<bean class="org.jumpmind.symmetric.db.SqlTemplate">

<property name="functionInstalledSql">
<value>
<![CDATA[select count(*) from information_schema.routines
where routine_name = '$(functionName)' and specific_schema = '$(defaultSchema)']]>
</value>
</property>
<property name="functionTemplatesToInstall">
<map>
<entry key="triggers_disabled">
<value>
<![CDATA[
CREATE or REPLACE FUNCTION $(defaultSchema)$(functionName)() RETURNS INTEGER AS $$
DECLARE
triggerDisabled INTEGER;
BEGIN
select current_setting('symmetric.triggers_disabled') into triggerDisabled;
return triggerDisabled;
EXCEPTION WHEN OTHERS THEN
return 0;
END;
$$ LANGUAGE plpgsql;
]]>
</value>
</entry>
<entry key="node_disabled">
<value>
<![CDATA[
CREATE or REPLACE FUNCTION $(defaultSchema)$(functionName)() RETURNS VARCHAR AS $$
DECLARE
nodeId VARCHAR(50);
BEGIN
select current_setting('symmetric.node_disabled') into nodeId;
return nodeId;
EXCEPTION WHEN OTHERS THEN
return '';
END;
$$ LANGUAGE plpgsql;
]]>
</value>
</entry>
<entry key="largeobject">
<value>
<![CDATA[
CREATE OR REPLACE FUNCTION $(defaultSchema)$(functionName)(objectId oid) RETURNS text AS $$
DECLARE
encodedBlob text;
encodedBlobPage text;
BEGIN
encodedBlob := '';
FOR encodedBlobPage IN SELECT encode(data, 'escape')
FROM pg_largeobject WHERE loid = objectId ORDER BY pageno LOOP
encodedBlob := encodedBlob || encodedBlobPage;
END LOOP;
RETURN encode(decode(encodedBlob, 'escape'), 'base64');
EXCEPTION WHEN OTHERS THEN
RETURN '';
END
$$ LANGUAGE plpgsql;
]]>
</value>
</entry>
</map>
</property>
<property name="stringColumnTemplate" >
<value>
<![CDATA[ case when $(tableAlias)."$(columnName)" is null then '' else '"' || replace(replace($(tableAlias)."$(columnName)",$$\$$,$$\\$$),'"',$$\"$$) || '"' end ]]>
</value>
</property>
<property name="arrayColumnTemplate" >
<value>
<![CDATA[ case when $(tableAlias)."$(columnName)" is null then '' else '"' || replace(replace(cast($(tableAlias)."$(columnName)" as varchar),$$\$$,$$\\$$),'"',$$\"$$) || '"' end ]]>
</value>
</property>
<property name="xmlColumnTemplate" >
<value>
<![CDATA[ case when $(tableAlias)."$(columnName)" is null then '' else '"' || replace(replace(cast($(tableAlias)."$(columnName)" as varchar),$$\$$,$$\\$$),'"',$$\"$$) || '"' end ]]>
</value>
</property>
<property name="clobColumnTemplate">
<value>
<![CDATA[ case when $(tableAlias)."$(columnName)" is null then '' else '"' || replace(replace($(tableAlias)."$(columnName)",$$\$$,$$\\$$),'"',$$\"$$) || '"' end ]]>
</value>
</property>
<property name="blobColumnTemplate">
<value>
<![CDATA[ case when $(tableAlias)."$(columnName)" is null then '' else '"' || encode($(tableAlias)."$(columnName)", 'base64') || '"' end ]]>
</value>
</property>
<property name="wrappedBlobColumnTemplate">
<value>
<![CDATA[ case when $(tableAlias)."$(columnName)" is null then '' else '"' || $(defaultSchema)$(prefixName)_largeobject($(tableAlias)."$(columnName)") || '"' end ]]>
</value>
</property>
<property name="numberColumnTemplate">
<value>
<![CDATA[ case when $(tableAlias)."$(columnName)" is null then '' else '"' || cast($(tableAlias)."$(columnName)" as varchar) || '"' end ]]>
</value>
</property>
<property name="booleanColumnTemplate">
<value>
<![CDATA[case when $(tableAlias)."$(columnName)" is null then '' when $(tableAlias)."$(columnName)" then '"1"' else '"0"' end]]>
</value>
</property>
<property name="datetimeColumnTemplate" >
<value>
<![CDATA[ case when $(tableAlias)."$(columnName)" is null then '' else '"' || to_char($(tableAlias)."$(columnName)", 'YYYY-MM-DD HH24:MI:SS.MS') || '"' end ]]>
</value>
</property>
<property name="triggerConcatCharacter" value="||"/>
<property name="newTriggerValue" value="new"/>
<property name="oldTriggerValue" value="old"/>
<property name="sqlTemplates">
<map>
<entry key="insertTriggerTemplate">
<value>
<![CDATA[
create or replace function $(schemaName)f$(triggerName)() returns trigger as $function$
begin
if $(syncOnInsertCondition) and $(syncOnIncomingBatchCondition) then
insert into $(defaultSchema)$(prefixName)_data
(table_name, event_type, trigger_hist_id, row_data, channel_id, transaction_id, source_node_id, external_data, create_time)
values(
'$(targetTableName)',
'I',
$(triggerHistoryId),
$(columns),
'$(channelName)',
$(txIdExpression),
$(defaultSchema)$(prefixName)_node_disabled(),
$(externalSelect),
CURRENT_TIMESTAMP
);
end if;
return null;
end;
$function$ language plpgsql;
]]>
</value>
</entry>
<entry key="insertPostTriggerTemplate">
<value>
<![CDATA[
create trigger $(triggerName) after insert on $(schemaName)$(tableName)
for each row execute procedure $(schemaName)f$(triggerName)();
]]>
</value>
</entry>
<entry key="updateTriggerTemplate">
<value>
<![CDATA[
create or replace function $(schemaName)f$(triggerName)() returns trigger as $function$
begin
if $(syncOnUpdateCondition) and $(syncOnIncomingBatchCondition) then
insert into $(defaultSchema)$(prefixName)_data
(table_name, event_type, trigger_hist_id, pk_data, row_data, old_data, channel_id, transaction_id, source_node_id, external_data, create_time)
values(
'$(targetTableName)',
'U',
$(triggerHistoryId),
$(oldKeys),
$(columns),
$(oldColumns),
'$(channelName)',
$(txIdExpression),
$(defaultSchema)$(prefixName)_node_disabled(),
$(externalSelect),
CURRENT_TIMESTAMP
);
end if;
return null;
end;
$function$ language plpgsql;
]]>
</value>
</entry>
<entry key="updatePostTriggerTemplate">
<value>
<![CDATA[
create trigger $(triggerName) after update on $(schemaName)$(tableName)
for each row execute procedure $(schemaName)f$(triggerName)();
]]>
</value>
</entry>
<entry key="deleteTriggerTemplate">
<value>
<![CDATA[
create or replace function $(schemaName)f$(triggerName)() returns trigger as $function$
begin
if $(syncOnDeleteCondition) and $(syncOnIncomingBatchCondition) then
insert into $(defaultSchema)$(prefixName)_data
(table_name, event_type, trigger_hist_id, pk_data, old_data, channel_id, transaction_id, source_node_id, external_data, create_time)
values(
'$(targetTableName)',
'D',
$(triggerHistoryId),
$(oldKeys),
$(oldColumns),
'$(channelName)',
$(txIdExpression),
$(defaultSchema)$(prefixName)_node_disabled(),
$(externalSelect),
CURRENT_TIMESTAMP
);
end if;
return null;
end;
$function$ language plpgsql;
]]>
</value>
</entry>
<entry key="deletePostTriggerTemplate">
<value>
<![CDATA[
create trigger $(triggerName) after delete on $(schemaName)$(tableName)
for each row execute procedure $(schemaName)f$(triggerName)();
]]>
</value>
</entry>
<entry key="initialLoadSqlTemplate">
<value>
<![CDATA[select $(columns) from $(schemaName)$(tableName) t where $(whereClause)]]>
</value>
</entry>
</map>
</property>
</bean>
</property>
</bean>

</beans>
Expand Up @@ -21,5 +21,6 @@
<import resource="classpath:/org/jumpmind/symmetric/db/informix.xml" />
<import resource="classpath:/org/jumpmind/symmetric/db/sybase.xml" />
<import resource="classpath:/org/jumpmind/symmetric/db/interbase.xml" />
<import resource="classpath:/org/jumpmind/symmetric/db/greenplum.xml" />

</beans>
Expand Up @@ -58,7 +58,7 @@ DB2DialectInitializingError=Error while initializing DB2 dialect
DB2ResettingAutoIncrementColumns=Resetting auto increment columns for %s
DataPullingInReloadMode=Immediate pull requested while in reload mode.
DataPulling=Pull requested for %s
DataPulled=Pull data received for %s. %d rows and %d batches were processed.
DataPulled=Pull data received from %s. %d rows and %d batches were processed.
DataPullingFailed=Pull no data received %s
DataPullingFailedLock=Did not run the pull process because the cluster service has it locked
DataPurgeIncomingRunning=The incoming purge process is about to run.
Expand Down

0 comments on commit 39d8109

Please sign in to comment.