Skip to content

Commit

Permalink
feature: add db realization for distribute lock (#3487)
Browse files Browse the repository at this point in the history
  • Loading branch information
caohdgege committed Aug 16, 2021
1 parent 4628861 commit e2e5fa6
Show file tree
Hide file tree
Showing 19 changed files with 581 additions and 126 deletions.
10 changes: 10 additions & 0 deletions common/src/main/java/io/seata/common/DefaultValues.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public interface DefaultValues {
*/
String DEFAULT_LOCK_DB_TABLE = "lock_table";

/**
* the constant DEFAULT_DISTRIBUTED_LOCK_DB_TABLE
*/
String DEFAULT_DISTRIBUTED_LOCK_DB_TABLE = "distributed_lock";

int DEFAULT_TM_COMMIT_RETRY_COUNT = 5;
int DEFAULT_TM_ROLLBACK_RETRY_COUNT = 5;
int DEFAULT_GLOBAL_TRANSACTION_TIMEOUT = 60000;
Expand Down Expand Up @@ -126,4 +131,9 @@ public interface DefaultValues {
* the constant TCC_ACTION_INTERCEPTOR_ORDER
*/
int TCC_ACTION_INTERCEPTOR_ORDER = Integer.MIN_VALUE + 1000;

/**
* the constant DEFAULT_DISTRIBUTED_LOCK_EXPIRE
*/
int DEFAULT_DISTRIBUTED_LOCK_EXPIRE = 10000;
}
5 changes: 5 additions & 0 deletions common/src/main/java/io/seata/common/util/StringUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ private StringUtils() {
*/
public static final String EMPTY = "";

/**
* Space string
*/
public static final String SPACE = " ";

/**
* Is empty boolean.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import io.seata.common.util.CollectionUtils;
import io.seata.common.util.DurationUtil;
import io.seata.common.util.StringUtils;
import net.sf.cglib.proxy.Enhancer;
Expand Down Expand Up @@ -61,6 +62,22 @@ public static void addConfigListener(String dataId, ConfigurationChangeListener.
}
}

public static void removeConfigListener(String dataId, ConfigurationChangeListener... listeners) {
if (StringUtils.isBlank(dataId)) {
return;
}
synchronized (ConfigurationCache.class) {
final HashSet<ConfigurationChangeListener> listenerSet = getInstance().configListenersMap.get(dataId);
if (CollectionUtils.isNotEmpty(listenerSet)) {
for (ConfigurationChangeListener listener : listeners) {
if (listenerSet.remove(listener)) {
ConfigurationFactory.getInstance().removeConfigListener(dataId, listener);
}
}
}
}
}

public static ConfigurationCache getInstance() {
return ConfigurationCacheInstance.INSTANCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ public interface ConfigurationKeys {
*/
String STORE_DB_BRANCH_TABLE = STORE_DB_PREFIX + "branchTable";

/**
* The constant DISTRIBUTED_LOCK_DB_TABLE.
*/
String DISTRIBUTED_LOCK_DB_TABLE = STORE_DB_PREFIX + "distributedLockTable";

/**
* The constant STORE_DB_DATASOURCE_TYPE.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,19 @@ public interface ServerTableColumnsName {
* The constant lock_table column name gmt_modified
*/
String LOCK_TABLE_GMT_MODIFIED = "gmt_modified";



/**
* The constant distributed_lock column name lock key
*/
String DISTRIBUTED_LOCK_KEY = "lock_key";
/**
* The constant distributed_lock column name lock value
*/
String DISTRIBUTED_LOCK_VALUE = "lock_value";
/**
* The constant distributed_lock column name expire
*/
String DISTRIBUTED_LOCK_EXPIRE = "expire";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.core.store.db.sql.distributed.lock;

import io.seata.core.constants.ServerTableColumnsName;

/**
* @author chd
*/
public class BaseDistributedLockSql implements DistributedLockSql {
protected static final String DISTRIBUTED_LOCK_TABLE_PLACE_HOLD = " #distributed_lock_table# ";

protected static final String ALL_COLUMNS = ServerTableColumnsName.DISTRIBUTED_LOCK_KEY + "," +
ServerTableColumnsName.DISTRIBUTED_LOCK_VALUE + "," + ServerTableColumnsName.DISTRIBUTED_LOCK_EXPIRE;

protected static final String SELECT_FOR_UPDATE_SQL = "SELECT " + ALL_COLUMNS + " FROM " + DISTRIBUTED_LOCK_TABLE_PLACE_HOLD
+ " WHERE " + ServerTableColumnsName.DISTRIBUTED_LOCK_KEY + " = ? FOR UPDATE";

protected static final String INSERT_DISTRIBUTED_LOCK_SQL = "INSERT INTO " + DISTRIBUTED_LOCK_TABLE_PLACE_HOLD + "("
+ ALL_COLUMNS + ") VALUE (?, ?, ?)";

protected static final String UPDATE_DISTRIBUTED_LOCK_SQL = "UPDATE " + DISTRIBUTED_LOCK_TABLE_PLACE_HOLD + " SET "
+ ServerTableColumnsName.DISTRIBUTED_LOCK_VALUE + "=?, " + ServerTableColumnsName.DISTRIBUTED_LOCK_EXPIRE + "=?"
+ " WHERE " + ServerTableColumnsName.DISTRIBUTED_LOCK_KEY + "=?";


@Override
public String getSelectDistributeForUpdateSql(String distributedLockTable) {
return SELECT_FOR_UPDATE_SQL.replace(DISTRIBUTED_LOCK_TABLE_PLACE_HOLD, distributedLockTable);
}

@Override
public String getInsertSql(String distributedLockTable) {
return INSERT_DISTRIBUTED_LOCK_SQL.replace(DISTRIBUTED_LOCK_TABLE_PLACE_HOLD, distributedLockTable);
}

@Override
public String getUpdateSql(String distributedLockTable) {
return UPDATE_DISTRIBUTED_LOCK_SQL.replace(DISTRIBUTED_LOCK_TABLE_PLACE_HOLD, distributedLockTable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.core.store.db.sql.distributed.lock;

/**
* @author chd
* @since 1.5.0
*/
public interface DistributedLockSql {
/**
* Get the select distribute lock sql
* @param distributedLockTable the table name of the distribute lock table
* @return the sql
*/
String getSelectDistributeForUpdateSql(String distributedLockTable);

/**
* Get insert distribute lock sql
* @param distributedLockTable the table name of the distribute lock table
* @return the sql
*/
String getInsertSql(String distributedLockTable);

/**
* Get update distribute lock sql
* @param distributedLockTable the table name of the distribute lock table
* @return the sql
*/
String getUpdateSql(String distributedLockTable);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.core.store.db.sql.distributed.lock;

/**
* @author chd
*/
public class DistributedLockSqlFactory {
private static final DistributedLockSql DISTRIBUTED_LOCK_SQL = new BaseDistributedLockSql();

/**
* get the lock store sql
*
* @param dbType the dbType, support mysql/oracle/h2/postgre/oceanbase, it's useless now, but maybe useful later
* @return lock store sql
*/
public static DistributedLockSql getDistributedLogStoreSql(String dbType) {
return DISTRIBUTED_LOCK_SQL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ protected static boolean canUndo(int state) {
}

protected String buildContext(String serializer, CompressorType compressorType) {
Map<String, String> map = new HashMap<>();
Map<String, String> map = new HashMap<>(2, 1.01f);
map.put(UndoLogConstants.SERIALIZER_KEY, serializer);
map.put(UndoLogConstants.COMPRESSOR_TYPE_KEY, compressorType.name());
return CollectionUtils.encodeMap(map);
Expand Down
1 change: 1 addition & 0 deletions script/config-center/config.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
Expand Down
14 changes: 14 additions & 0 deletions script/server/db/mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,17 @@ CREATE TABLE IF NOT EXISTS `lock_table`
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

CREATE TABLE IF NOT EXISTS `distributed_lock`
(
`lock_key` CHAR(20) NOT NULL,
`lock_value` VARCHAR(20) NOT NULL,
`expire` BIGINT,
primary key (`lock_key`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
15 changes: 14 additions & 1 deletion script/server/db/oracle.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,17 @@ CREATE TABLE lock_table
PRIMARY KEY (row_key)
);

CREATE INDEX idx_branch_id ON lock_table (branch_id);
CREATE INDEX idx_branch_id ON lock_table (branch_id);

CREATE TABLE distributed_lock (
lock_key VARCHAR2(20) NOT NULL,
lock_value VARCHAR2(20) NOT NULL,
expire DECIMAL(18) NOT NULL,
PRIMARY KEY (lock_key)
);

INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);

12 changes: 12 additions & 0 deletions script/server/db/postgresql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,15 @@ CREATE TABLE IF NOT EXISTS public.lock_table
);

CREATE INDEX idx_branch_id ON public.lock_table (branch_id);

CREATE TABLE distributed_lock (
lock_key VARCHAR(20) NOT NULL,
lock_value VARCHAR(20) NOT NULL,
expire BIGINT NOT NULL,
CONSTRAINT pk_distributed_lock_table PRIMARY KEY (lock_key)
);

INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
17 changes: 9 additions & 8 deletions server/src/main/java/io/seata/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ public static void start(String[] args) {
//Because, here we need to parse the parameters needed for startup.
ParameterParser parameterParser = new ParameterParser(args);


//127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(parameterParser.getPort());

//initialize the metrics
MetricsManager.get().init();

Expand All @@ -81,14 +90,6 @@ public static void start(String[] args) {
ShutdownHook.getInstance().addDisposable(coordinator);
ShutdownHook.getInstance().addDisposable(nettyRemotingServer);

//127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(nettyRemotingServer.getListenPort());

nettyRemotingServer.init();
}
}
Loading

0 comments on commit e2e5fa6

Please sign in to comment.