Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add db realization for distribute lock #3487

Merged
merged 44 commits into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
aea3f5d
the simple version
caohdgege Jan 12, 2021
c473d34
modify
caohdgege Jan 23, 2021
9f34d65
add copyright & pg create table sql
caohdgege Jan 23, 2021
d2fc7f1
add the init size of hash map
caohdgege Jan 23, 2021
273550a
make select & insert & update in one local tx
caohdgege Jan 23, 2021
b115d5f
remove useless code
caohdgege Jan 23, 2021
06e657c
fix table name
caohdgege Jan 23, 2021
4d72f29
optimize
caohdgege Jan 31, 2021
e9815c9
optimize
caohdgege Jan 31, 2021
ec10348
rename the do
caohdgege Feb 9, 2021
63fe9c1
fix will not commit when auto commit is false
caohdgege Feb 25, 2021
cfca72e
fix will not commit when auto commit is false
caohdgege Feb 25, 2021
45a2529
Merge branch 'develop' into feature-distribute-lock-db
caohdgege Feb 25, 2021
c8ad5fb
ci
caohdgege Feb 25, 2021
2853c16
Merge remote-tracking branch 'chd/feature-distribute-lock-db' into fe…
caohdgege Feb 25, 2021
4f44875
ci
caohdgege Feb 25, 2021
9afaff1
Merge branch 'develop' into feature-distribute-lock-db
caohdgege Mar 28, 2021
0f5a5f0
Merge branch 'github-develop' into feature-distribute-lock-db
caohdgege Jun 23, 2021
3c5b352
modify
caohdgege Jun 23, 2021
72523e3
Merge branch 'github-develop' into feature-distribute-lock-db
caohdgege Jun 23, 2021
0d76f35
Merge branch 'github-develop' into feature-distribute-lock-db
caohdgege Jul 6, 2021
a192950
lock
caohdgege Jul 7, 2021
352238a
lock
caohdgege Jul 11, 2021
50800b1
lock
caohdgege Jul 18, 2021
44c342e
lock
caohdgege Jul 18, 2021
6ac727c
modify
caohdgege Jul 27, 2021
92daa32
optimize duplicate code
caohdgege Jul 27, 2021
44a7827
Merge branch 'develop' into feature-distribute-lock-db
caohdgege Jul 27, 2021
2d7bc80
remove test
caohdgege Jul 27, 2021
2631c08
Merge remote-tracking branch 'origin/feature-distribute-lock-db' into…
caohdgege Jul 27, 2021
c171db0
fix ci
caohdgege Jul 27, 2021
c333cd6
init xid
caohdgege Jul 27, 2021
6918575
init xid
caohdgege Jul 27, 2021
4922731
optimize
caohdgege Jul 27, 2021
b53d313
ConfigurationCache#removeConfigListener
caohdgege Jul 27, 2021
dae7c69
ci
caohdgege Jul 27, 2021
c69442f
ci
caohdgege Jul 27, 2021
6ee86e8
fix add -> remove
caohdgege Jul 28, 2021
d4e3057
optimize import
caohdgege Jul 28, 2021
9fd64cd
Merge branch 'develop' into feature-distribute-lock-db
jsbxyyx Aug 6, 2021
7d4df52
rm druid package
caohdgege Aug 15, 2021
0805ec7
Merge branch 'develop' into feature-distribute-lock-db
caohdgege Aug 15, 2021
2c9ce7b
Merge remote-tracking branch 'origin/feature-distribute-lock-db' into…
caohdgege Aug 15, 2021
663e08d
distribute -> distributed
caohdgege Aug 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/src/main/java/io/seata/common/DefaultValues.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public interface DefaultValues {
*/
String DEFAULT_LOCK_DB_TABLE = "lock_table";

/**
* the constant DEFAULT_DISTRIBUTE_LOCK_DB_TABLE
*/
String DEFAULT_DISTRIBUTE_LOCK_DB_TABLE = "distribute_lock";

int DEFAULT_TM_COMMIT_RETRY_COUNT = 5;
int DEFAULT_TM_ROLLBACK_RETRY_COUNT = 5;
int DEFAULT_GLOBAL_TRANSACTION_TIMEOUT = 60000;
Expand Down Expand Up @@ -126,4 +131,9 @@ public interface DefaultValues {
* the constant TCC_ACTION_INTERCEPTOR_ORDER
*/
int TCC_ACTION_INTERCEPTOR_ORDER = Integer.MIN_VALUE + 1000;

/**
* the constant DEFAULT_DISTRIBUTE_LOCK_EXPIRE
*/
int DEFAULT_DISTRIBUTE_LOCK_EXPIRE = 10000;
}
5 changes: 5 additions & 0 deletions common/src/main/java/io/seata/common/util/StringUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ private StringUtils() {
*/
public static final String EMPTY = "";

/**
* Space string
*/
public static final String SPACE = " ";

/**
* Is empty boolean.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import io.seata.common.util.CollectionUtils;
import io.seata.common.util.DurationUtil;
import io.seata.common.util.StringUtils;
import net.sf.cglib.proxy.Enhancer;
Expand Down Expand Up @@ -61,6 +62,22 @@ public static void addConfigListener(String dataId, ConfigurationChangeListener.
}
}

public static void removeConfigListener(String dataId, ConfigurationChangeListener... listeners) {
if (StringUtils.isBlank(dataId)) {
return;
}
synchronized (ConfigurationCache.class) {
final HashSet<ConfigurationChangeListener> listenerSet = getInstance().configListenersMap.get(dataId);
if (CollectionUtils.isNotEmpty(listenerSet)) {
for (ConfigurationChangeListener listener : listeners) {
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
if (listenerSet.remove(listener)) {
ConfigurationFactory.getInstance().addConfigListener(dataId, listener);
}
}
}
}
}

public static ConfigurationCache getInstance() {
return ConfigurationCacheInstance.INSTANCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ public interface ConfigurationKeys {
*/
String STORE_DB_BRANCH_TABLE = STORE_DB_PREFIX + "branchTable";

/**
* The constant DISTRIBUTE_LOCK_DB_TABLE.
*/
String DISTRIBUTE_LOCK_DB_TABLE = STORE_DB_PREFIX + "distributeLockTable";

/**
* The constant STORE_DB_DATASOURCE_TYPE.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,19 @@ public interface ServerTableColumnsName {
* The constant lock_table column name gmt_modified
*/
String LOCK_TABLE_GMT_MODIFIED = "gmt_modified";



/**
* The constant distribute_lock column name lock key
*/
String DISTRIBUTE_LOCK_KEY = "lock_key";
/**
* The constant distribute_lock column name lock value
*/
String DISTRIBUTE_LOCK_VALUE = "lock_value";
/**
* The constant distribute_lock column name expire
*/
String DISTRIBUTE_LOCK_EXPIRE = "expire";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.core.store.db.sql.distribute.lock;

import io.seata.core.constants.ServerTableColumnsName;

/**
* @author chd
*/
public class BaseDistributeLockSql implements DistributeLockSql {
protected static final String DISTRIBUTE_LOCK_TABLE_PLACE_HOLD = " #distribute_lock_table# ";

protected static final String ALL_COLUMNS = ServerTableColumnsName.DISTRIBUTE_LOCK_KEY + "," +
ServerTableColumnsName.DISTRIBUTE_LOCK_VALUE + "," + ServerTableColumnsName.DISTRIBUTE_LOCK_EXPIRE;

protected static final String SELECT_FOR_UPDATE_SQL = "SELECT " + ALL_COLUMNS + " FROM " + DISTRIBUTE_LOCK_TABLE_PLACE_HOLD
+ " WHERE " + ServerTableColumnsName.DISTRIBUTE_LOCK_KEY + " = ? FOR UPDATE";

protected static final String INSERT_DISTRIBUTE_LOCK_SQL = "INSERT INTO " + DISTRIBUTE_LOCK_TABLE_PLACE_HOLD + "("
+ ALL_COLUMNS + ") VALUE (?, ?, ?)";

protected static final String UPDATE_DISTRIBUTE_LOCK_SQL = "UPDATE " + DISTRIBUTE_LOCK_TABLE_PLACE_HOLD + " SET "
+ ServerTableColumnsName.DISTRIBUTE_LOCK_VALUE + "=?, " + ServerTableColumnsName.DISTRIBUTE_LOCK_EXPIRE + "=?"
+ " WHERE " + ServerTableColumnsName.DISTRIBUTE_LOCK_KEY + "=?";


@Override
public String getSelectDistributeForUpdateSql(String distributeLockTable) {
return SELECT_FOR_UPDATE_SQL.replace(DISTRIBUTE_LOCK_TABLE_PLACE_HOLD, distributeLockTable);
}

@Override
public String getInsertSql(String distributeLockTable) {
return INSERT_DISTRIBUTE_LOCK_SQL.replace(DISTRIBUTE_LOCK_TABLE_PLACE_HOLD, distributeLockTable);
}

@Override
public String getUpdateSql(String distributeLockTable) {
return UPDATE_DISTRIBUTE_LOCK_SQL.replace(DISTRIBUTE_LOCK_TABLE_PLACE_HOLD, distributeLockTable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.core.store.db.sql.distribute.lock;

/**
* @author chd
* @since 1.5.0
*/
public interface DistributeLockSql {
/**
* Get the select distribute lock sql
* @param distributeLockTable the table name of the distribute lock table
* @return the sql
*/
String getSelectDistributeForUpdateSql(String distributeLockTable);

/**
* Get insert distribute lock sql
* @param distributeLockTable the table name of the distribute lock table
* @return the sql
*/
String getInsertSql(String distributeLockTable);

/**
* Get update distribute lock sql
* @param distributeLockTable the table name of the distribute lock table
* @return the sql
*/
String getUpdateSql(String distributeLockTable);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.core.store.db.sql.distribute.lock;

/**
* @author chd
*/
public class DistributeLockSqlFactory {
private static final DistributeLockSql DISTRIBUTE_LOCK_SQL = new BaseDistributeLockSql();

/**
* get the lock store sql
*
* @param dbType the dbType, support mysql/oracle/h2/postgre/oceanbase, it's useless now, but maybe useful later
* @return lock store sql
*/
public static DistributeLockSql getDistributeLogStoreSql(String dbType) {
return DISTRIBUTE_LOCK_SQL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ protected static boolean canUndo(int state) {
}

protected String buildContext(String serializer, CompressorType compressorType) {
Map<String, String> map = new HashMap<>();
Map<String, String> map = new HashMap<>(2, 1.01f);
caohdgege marked this conversation as resolved.
Show resolved Hide resolved
map.put(UndoLogConstants.SERIALIZER_KEY, serializer);
map.put(UndoLogConstants.COMPRESSOR_TYPE_KEY, compressorType.name());
return CollectionUtils.encodeMap(map);
Expand Down
1 change: 1 addition & 0 deletions script/config-center/config.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributeLockTable=distribute_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
Expand Down
14 changes: 14 additions & 0 deletions script/server/db/mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,17 @@ CREATE TABLE IF NOT EXISTS `lock_table`
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

CREATE TABLE IF NOT EXISTS `distribute_lock`
(
`lock_key` CHAR(20) NOT NULL,
`lock_value` VARCHAR(20) NOT NULL,
`expire` BIGINT,
primary key (`lock_key`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

INSERT INTO `distribute_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distribute_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distribute_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distribute_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
15 changes: 14 additions & 1 deletion script/server/db/oracle.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,17 @@ CREATE TABLE lock_table
PRIMARY KEY (row_key)
);

CREATE INDEX idx_branch_id ON lock_table (branch_id);
CREATE INDEX idx_branch_id ON lock_table (branch_id);

CREATE TABLE distribute_lock (
lock_key VARCHAR2(20) NOT NULL,
lock_value VARCHAR2(20) NOT NULL,
expire DECIMAL(18) NOT NULL,
PRIMARY KEY (lock_key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是否需要创建时间这个字段?如果出现什么问题,能知道这个记录什么时候insert进去的。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个我觉得应该不需要。

  1. expire + 配置的distributedLockExpireTime是可以反推出来插入时间的
  2. 不同公司的对于插入时间的字段名一般也不太一样,如果他们有需要可以随时自行加一个default current_timestamp的插入时间

);

INSERT INTO distribute_lock (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO distribute_lock (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO distribute_lock (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO distribute_lock (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);

12 changes: 12 additions & 0 deletions script/server/db/postgresql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,15 @@ CREATE TABLE IF NOT EXISTS public.lock_table
);

CREATE INDEX idx_branch_id ON public.lock_table (branch_id);

CREATE TABLE distribute_lock (
lock_key VARCHAR(20) NOT NULL,
lock_value VARCHAR(20) NOT NULL,
expire BIGINT NOT NULL,
CONSTRAINT pk_distribute_lock_table PRIMARY KEY (lock_key)
caohdgege marked this conversation as resolved.
Show resolved Hide resolved
);

INSERT INTO distribute_lock (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO distribute_lock (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO distribute_lock (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO distribute_lock (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
17 changes: 9 additions & 8 deletions server/src/main/java/io/seata/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ public static void start(String[] args) {
//Because, here we need to parse the parameters needed for startup.
ParameterParser parameterParser = new ParameterParser(args);


//127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
caohdgege marked this conversation as resolved.
Show resolved Hide resolved
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(parameterParser.getPort());

//initialize the metrics
MetricsManager.get().init();

Expand All @@ -81,14 +90,6 @@ public static void start(String[] args) {
ShutdownHook.getInstance().addDisposable(coordinator);
ShutdownHook.getInstance().addDisposable(nettyRemotingServer);

//127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(nettyRemotingServer.getListenPort());

nettyRemotingServer.init();
}
}
Loading