Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: support oracle and postgresql multi primary key #4863

Merged
merged 23 commits into from Aug 20, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8e954f6
future:support oracle and postgresql multi primary key
tuwenlin Mar 10, 2022
007cbfc
future:support oracle and postgresql multi primary key
tuwenlin Mar 11, 2022
fb9d0cf
future:support oracle and postgresql multi primary key
tuwenlin Mar 11, 2022
ac23a1a
Merge branch 'develop' into oracle_multi_pk
tuwenlin Mar 11, 2022
03d01a5
Merge branch 'develop' into oracle_multi_pk
tuwenlin Mar 16, 2022
95cfe37
Merge branch 'develop' into oracle_multi_pk
tuwenlin Mar 18, 2022
7d6b819
Merge remote-tracking branch 'origin/develop' into oracle_multi_pk
tuwenlin Mar 22, 2022
210d366
Merge branch 'develop' into oracle_multi_pk
tuwenlin Mar 22, 2022
9610674
Merge remote-tracking branch 'origin/oracle_multi_pk' into oracle_mul…
tuwenlin Mar 22, 2022
b852b17
optimize the format
tuwenlin Mar 22, 2022
bdc6c37
optimize the code
tuwenlin Mar 22, 2022
c711b4d
Merge branch 'develop' into oracle_multi_pk
tuwenlin Apr 16, 2022
234a24c
Merge branch 'seata:develop' into oracle_multi_pk
tuwenlin May 5, 2022
00a0678
Merge remote-tracking branch 'origin/oracle_multi_pk' into oracle_mul…
tuwenlin Aug 12, 2022
c51d95f
changes
tuwenlin Aug 12, 2022
810cbaa
changes
tuwenlin Aug 12, 2022
16a67f7
disable multi pk not support test
tuwenlin Aug 12, 2022
237c8a4
Merge branch 'develop' into oracle_multi_pk
tuwenlin Aug 15, 2022
51765ec
overload method
tuwenlin Aug 15, 2022
bbb43df
Merge remote-tracking branch 'origin/oracle_multi_pk' into oracle_mul…
tuwenlin Aug 15, 2022
d730ca8
code
tuwenlin Aug 15, 2022
457ae51
Merge branch 'develop' into oracle_multi_pk
jsbxyyx Aug 19, 2022
4770c88
Merge branch 'develop' into oracle_multi_pk
jsbxyyx Aug 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/en-us/develop.md
Expand Up @@ -4,6 +4,7 @@ Add changes here for all PR submitted to the develop branch.

### feature:
- [[#4802](https://github.com/seata/seata/pull/4802)] dockerfile support arm64
- [[#4863](https://github.com/seata/seata/pull/4863)] support oracle and postgresql multi primary key
- [[#4649](https://github.com/seata/seata/pull/4649)] seata-server support multiple registry


Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/develop.md
Expand Up @@ -4,6 +4,7 @@

### feature:
- [[#4802](https://github.com/seata/seata/pull/4802)] dockerfile 支持 arm64
- [[#4863](https://github.com/seata/seata/pull/4863)] support oracle and postgresql multi primary key
- [[#4649](https://github.com/seata/seata/pull/4649)] seata-server支持多注册中心


Expand Down
Expand Up @@ -25,15 +25,13 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import io.seata.common.exception.NotSupportYetException;
import io.seata.common.util.CollectionUtils;
import io.seata.rm.datasource.AbstractConnectionProxy;
import io.seata.rm.datasource.ConnectionContext;
import io.seata.rm.datasource.ConnectionProxy;
import io.seata.rm.datasource.StatementProxy;
import io.seata.rm.datasource.sql.struct.TableRecords;
import io.seata.sqlparser.SQLRecognizer;
import io.seata.sqlparser.util.JdbcConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -93,9 +91,6 @@ public T doExecute(Object... args) throws Throwable {
* @throws Exception the exception
*/
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
throw new NotSupportYetException("multi pk only support mysql!");
}
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
Expand Down
Expand Up @@ -30,6 +30,7 @@
import io.seata.common.exception.NotSupportYetException;
import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.common.util.CollectionUtils;
import io.seata.common.util.StringUtils;
import io.seata.rm.datasource.ColumnUtils;
import io.seata.rm.datasource.PreparedStatementProxy;
import io.seata.rm.datasource.StatementProxy;
Expand Down Expand Up @@ -223,6 +224,7 @@ protected Map<String, List<Object>> parsePkValuesFromStatement() {
* @return
* @throws SQLException
*/
@Deprecated
public List<Object> getGeneratedKeys() throws SQLException {
// PK is just auto generated
ResultSet genKeys = statementProxy.getGeneratedKeys();
Expand All @@ -242,13 +244,39 @@ public List<Object> getGeneratedKeys() throws SQLException {
return pkValues;
}

/**
* default get generated keys.
* @param pkKey the pk key
* @return
* @throws SQLException
*/
public List<Object> getGeneratedKeys(String pkKey) throws SQLException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议改成方法重载方式,避免涉及太多改动

// PK is just auto generated
ResultSet genKeys = statementProxy.getGeneratedKeys();
List<Object> pkValues = new ArrayList<>();
while (genKeys.next()) {
Object v = StringUtils.isEmpty(pkKey) ? genKeys.getObject(1) : genKeys.getObject(pkKey);
pkValues.add(v);
}
if (pkValues.isEmpty()) {
throw new NotSupportYetException(String.format("not support sql [%s]", sqlRecognizer.getOriginalSQL()));
}
try {
genKeys.beforeFirst();
} catch (SQLException e) {
LOGGER.warn("Fail to reset ResultSet cursor. can not get primary key value");
}
return pkValues;
}

/**
* the modify for test
*
* @param expr the expr
* @return the pk values by sequence
* @throws SQLException the sql exception
*/
@Deprecated
protected List<Object> getPkValuesBySequence(SqlSequenceExpr expr) throws SQLException {
List<Object> pkValues = null;
try {
Expand Down Expand Up @@ -277,6 +305,42 @@ protected List<Object> getPkValuesBySequence(SqlSequenceExpr expr) throws SQLExc
}
}

/**
* the modify for test
*
* @param expr the expr
* @param pkKey the pk key
* @return the pk values by sequence
* @throws SQLException the sql exception
*/
protected List<Object> getPkValuesBySequence(SqlSequenceExpr expr, String pkKey) throws SQLException {
List<Object> pkValues = null;
try {
pkValues = getGeneratedKeys(pkKey);
} catch (NotSupportYetException | SQLException ignore) {
}

if (!CollectionUtils.isEmpty(pkValues)) {
return pkValues;
}

Sequenceable sequenceable = (Sequenceable) this;
final String sql = sequenceable.getSequenceSql(expr);
LOGGER.warn("Fail to get auto-generated keys, use '{}' instead. Be cautious, statement could be polluted. Recommend you set the statement to return generated keys.", sql);

Connection conn = statementProxy.getConnection();
try (Statement ps = conn.createStatement();
ResultSet genKeys = ps.executeQuery(sql)) {

pkValues = new ArrayList<>();
while (genKeys.next()) {
Object v = genKeys.getObject(1);
pkValues.add(v);
}
return pkValues;
}
}

/**
* check pk values for multi Pk
* At most one null per row.
Expand Down
Expand Up @@ -199,13 +199,21 @@ else if (!pkValues.isEmpty() && pkValues.get(0) instanceof Null) {
return pkValuesMap;
}

@Deprecated
@SuppressWarnings("lgtm[java/database-resource-leak]")
@Override
public List<Object> getPkValuesByDefault() throws SQLException {
// mysql default keyword the logic not support. (sample: insert into test(id, name) values(default, 'xx'))
throw new NotSupportYetException();
}

@SuppressWarnings("lgtm[java/database-resource-leak]")
@Override
public List<Object> getPkValuesByDefault(String pkKey) throws SQLException {
// mysql default keyword the logic not support. (sample: insert into test(id, name) values(default, 'xx'))
throw new NotSupportYetException();
}

protected Map<String, List<Object>> autoGeneratePks(BigDecimal cursor, String autoColumnName, Integer updateCount) throws SQLException {
BigDecimal step = BigDecimal.ONE;
String resourceId = statementProxy.getConnectionProxy().getDataSourceProxy().getResourceId();
Expand Down Expand Up @@ -245,4 +253,5 @@ protected boolean canAutoIncrement(Map<String, ColumnMeta> primaryKeyMap) {
}
return false;
}

}
Expand Up @@ -15,7 +15,6 @@
*/
package io.seata.rm.datasource.exec.oracle;

import io.seata.common.exception.NotSupportYetException;
import io.seata.common.loader.LoadLevel;
import io.seata.common.loader.Scope;
import io.seata.rm.datasource.StatementProxy;
Expand All @@ -31,9 +30,9 @@
import org.slf4j.LoggerFactory;

import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* The type Oracle insert executor.
Expand All @@ -58,40 +57,33 @@ public OracleInsertExecutor(StatementProxy statementProxy, StatementCallback sta
}

@Override
public Map<String,List<Object>> getPkValues() throws SQLException {
Map<String,List<Object>> pkValuesMap = null;
boolean isContainsPk = containsPK();
//when there is only one pk in the table
if (isContainsPk) {
pkValuesMap = getPkValuesByColumn();
} else if (containsColumns()) {
String columnName = getTableMeta().getPrimaryKeyOnlyName().get(0);
pkValuesMap = Collections.singletonMap(columnName, getGeneratedKeys());
} else {
pkValuesMap = getPkValuesByColumn();
}
return pkValuesMap;
public Map<String, List<Object>> getPkValues() throws SQLException {
return getPkValuesByColumn();
}

@Override
public Map<String,List<Object>> getPkValuesByColumn() throws SQLException {
Map<String,List<Object>> pkValuesMap = parsePkValuesFromStatement();
String pkKey = pkValuesMap.keySet().iterator().next();
List<Object> pkValues = pkValuesMap.get(pkKey);

if (!pkValues.isEmpty() && pkValues.get(0) instanceof SqlSequenceExpr) {
pkValuesMap.put(pkKey, getPkValuesBySequence((SqlSequenceExpr) pkValues.get(0)));
} else if (pkValues.size() == 1 && pkValues.get(0) instanceof SqlMethodExpr) {
pkValuesMap.put(pkKey, getGeneratedKeys());
} else if (pkValues.size() == 1 && pkValues.get(0) instanceof Null) {
throw new NotSupportYetException("oracle not support null");
public Map<String, List<Object>> getPkValuesByColumn() throws SQLException {
Map<String, List<Object>> pkValuesMap = parsePkValuesFromStatement();
Set<String> keySet = pkValuesMap.keySet();
for (String pkKey : keySet) {
List<Object> pkValues = pkValuesMap.get(pkKey);
for (int i = 0; i < pkValues.size(); i++) {
if (!pkKey.isEmpty() && pkValues.get(i) instanceof SqlSequenceExpr) {
pkValues.set(i, getPkValuesBySequence((SqlSequenceExpr) pkValues.get(i), pkKey).get(0));
} else if (!pkKey.isEmpty() && pkValues.get(i) instanceof SqlMethodExpr) {
pkValues.set(i, getGeneratedKeys(pkKey).get(0));
} else if (!pkKey.isEmpty() && pkValues.get(i) instanceof Null) {
pkValues.set(i, getGeneratedKeys(pkKey).get(0));
}
}
pkValuesMap.put(pkKey, pkValues);
}

return pkValuesMap;
}

@Override
public String getSequenceSql(SqlSequenceExpr expr) {
return "SELECT " + expr.getSequence() + ".currval FROM DUAL";
}

}
Expand Up @@ -34,9 +34,9 @@
import org.slf4j.LoggerFactory;

import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* The type Postgresql insert executor.
Expand All @@ -61,34 +61,27 @@ public PostgresqlInsertExecutor(StatementProxy statementProxy, StatementCallback
}

@Override
public Map<String,List<Object>> getPkValues() throws SQLException {
Map<String,List<Object>> pkValuesMap = null;
boolean isContainsPk = containsPK();
//when there is only one pk in the table
if (isContainsPk) {
pkValuesMap = getPkValuesByColumn();
} else if (containsColumns()) {
String columnName = getTableMeta().getPrimaryKeyOnlyName().get(0);
pkValuesMap = Collections.singletonMap(columnName, getGeneratedKeys());
} else {
pkValuesMap = getPkValuesByColumn();
}
return pkValuesMap;
public Map<String, List<Object>> getPkValues() throws SQLException {
return getPkValuesByColumn();
}

@Override
public Map<String,List<Object>> getPkValuesByColumn() throws SQLException {
Map<String,List<Object>> pkValuesMap = parsePkValuesFromStatement();
String pkKey = pkValuesMap.keySet().iterator().next();
List<Object> pkValues = pkValuesMap.get(pkKey);
if (!pkValues.isEmpty() && pkValues.get(0) instanceof SqlSequenceExpr) {
pkValuesMap.put(pkKey, getPkValuesBySequence((SqlSequenceExpr) pkValues.get(0)));
} else if (!pkValues.isEmpty() && pkValues.get(0) instanceof SqlMethodExpr) {
pkValuesMap.put(pkKey, getGeneratedKeys());
} else if (!pkValues.isEmpty() && pkValues.get(0) instanceof SqlDefaultExpr) {
pkValuesMap.put(pkKey, getPkValuesByDefault());
public Map<String, List<Object>> getPkValuesByColumn() throws SQLException {
Map<String, List<Object>> pkValuesMap = parsePkValuesFromStatement();
Set<String> keySet = pkValuesMap.keySet();
for (String pkKey : keySet) {
List<Object> pkValues = pkValuesMap.get(pkKey);
for (int i = 0; i < pkValues.size(); i++) {
if (!pkKey.isEmpty() && pkValues.get(i) instanceof SqlSequenceExpr) {
pkValues.set(i, getPkValuesBySequence((SqlSequenceExpr) pkValues.get(i), pkKey).get(0));
} else if (!pkKey.isEmpty() && pkValues.get(i) instanceof SqlMethodExpr) {
pkValues.set(i, getGeneratedKeys(pkKey).get(0));
} else if (!pkValues.isEmpty() && pkValues.get(i) instanceof SqlDefaultExpr) {
pkValues.set(i, getPkValuesByDefault(pkKey).get(0));
}
}
pkValuesMap.put(pkKey, pkValues);
}

return pkValuesMap;
}

Expand All @@ -98,6 +91,7 @@ public Map<String,List<Object>> getPkValuesByColumn() throws SQLException {
* @throws SQLException
*/
@Override
@Deprecated
public List<Object> getPkValuesByDefault() throws SQLException {
// current version 1.2 only support postgresql.
Map<String, ColumnMeta> pkMetaMap = getTableMeta().getPrimaryKeyMap();
Expand All @@ -112,8 +106,31 @@ public List<Object> getPkValuesByDefault() throws SQLException {
return getPkValuesBySequence(new SqlSequenceExpr("'" + seq + "'", function));
}

/**
* get primary key values by default
*
* @param pkKey the pk key
* @return
* @throws SQLException
*/
@Override
public List<Object> getPkValuesByDefault(String pkKey) throws SQLException {
// current version 1.2 only support postgresql.
Map<String, ColumnMeta> pkMetaMap = getTableMeta().getPrimaryKeyMap();
ColumnMeta pkMeta = pkMetaMap.values().iterator().next();
String columnDef = pkMeta.getColumnDef();
// sample: nextval('test_id_seq'::regclass)
String seq = org.apache.commons.lang.StringUtils.substringBetween(columnDef, "'", "'");
String function = org.apache.commons.lang.StringUtils.substringBetween(columnDef, "", "(");
if (StringUtils.isBlank(seq)) {
throw new ShouldNeverHappenException("get primary key value failed, cause columnDef is " + columnDef);
}
return getPkValuesBySequence(new SqlSequenceExpr("'" + seq + "'", function), pkKey);
}

@Override
public String getSequenceSql(SqlSequenceExpr expr) {
return "SELECT currval(" + expr.getSequence() + ")";
}

}
Expand Up @@ -16,7 +16,6 @@
package io.seata.rm.datasource.exec;


import io.seata.common.exception.NotSupportYetException;
import io.seata.rm.datasource.ConnectionContext;
import io.seata.rm.datasource.ConnectionProxy;
import io.seata.rm.datasource.PreparedStatementProxy;
Expand All @@ -28,6 +27,7 @@
import io.seata.sqlparser.util.JdbcConstants;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnJre;
import org.junit.jupiter.api.condition.JRE;
Expand Down Expand Up @@ -110,7 +110,8 @@ public void testLockRetryPolicyNotRollbackOnConflict() throws Throwable {
}

@Test
public void testOnlySupportMysqlWhenUseMultiPk(){
@Disabled
public void testOnlySupportMysqlWhenUseMultiPk() throws Exception {
Mockito.when(connectionProxy.getContext())
.thenReturn(new ConnectionContext());
PreparedStatementProxy statementProxy = Mockito.mock(PreparedStatementProxy.class);
Expand All @@ -123,7 +124,7 @@ public void testOnlySupportMysqlWhenUseMultiPk(){
Mockito.when(executor.getDbType()).thenReturn(JdbcConstants.ORACLE);
Mockito.doReturn(tableMeta).when(executor).getTableMeta();
Mockito.when(tableMeta.getPrimaryKeyOnlyName()).thenReturn(Arrays.asList("id","userCode"));
Assertions.assertThrows(NotSupportYetException.class,()-> executor.executeAutoCommitFalse(null));
executor.executeAutoCommitFalse(null);
}


Expand Down